Live Queries
Live queries provide real-time data updates to clients using the @live directive. When underlying data changes, connected clients automatically receive updated results.
Overview
Unlike traditional GraphQL subscriptions that push specific events, live queries automatically re-execute the query when relevant data mutations occur, sending the complete updated result to the client.
Key Features
@liveDirective: Add to any query to make it “live”- WebSocket Delivery: Real-time updates via
/graphql/liveendpoint - Invalidation-Based: Mutations trigger query re-execution
- Configurable Strategies: Invalidation, polling, or hash-diff modes
- Throttling: Prevent flooding clients with too many updates
Quick Start
1. Client: Send a Live Query
Connect to the WebSocket endpoint and subscribe with the @live directive:
const ws = new WebSocket('ws://localhost:9000/graphql/live', 'graphql-transport-ws');
ws.onopen = () => {
// Initialize connection
ws.send(JSON.stringify({ type: 'connection_init' }));
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'connection_ack') {
// Subscribe with @live query
ws.send(JSON.stringify({
id: 'users-live',
type: 'subscribe',
payload: {
query: `query @live {
users {
id
name
status
}
}`
}
}));
}
if (msg.type === 'next') {
console.log('Received update:', msg.payload.data);
}
};
2. Proto: Configure Live Query Support
Mark RPC methods as live query compatible:
service UserService {
rpc ListUsers(Empty) returns (UserList) {
option (graphql.schema) = {
type: QUERY
name: "users"
};
option (graphql.live_query) = {
enabled: true
strategy: INVALIDATION
triggers: ["User.create", "User.update", "User.delete"]
throttle_ms: 100
};
}
rpc CreateUser(CreateUserRequest) returns (User) {
option (graphql.schema) = {
type: MUTATION
name: "createUser"
};
// Mutations don't need live_query config - they trigger invalidation
}
}
3. Server: Trigger Invalidation
After mutations, trigger invalidation to notify live queries:
use grpc_graphql_gateway::{InvalidationEvent, LiveQueryStore};
// In your mutation handler
async fn create_user(&self, req: CreateUserRequest) -> Result<User, Status> {
// ... create user logic ...
// Notify live queries that User data changed
if let Some(store) = &self.live_query_store {
store.invalidate(InvalidationEvent::new("User", "create"));
}
Ok(user)
}
Live Query Strategies
Invalidation (Recommended)
Re-execute query only when relevant mutations occur:
option (graphql.live_query) = {
enabled: true
strategy: INVALIDATION
triggers: ["User.update", "User.delete"]
};
Polling
Periodically re-execute query at fixed intervals:
option (graphql.live_query) = {
enabled: true
strategy: POLLING
poll_interval_ms: 5000 // Every 5 seconds
};
Hash Diff
Only send updates if result actually changed:
option (graphql.live_query) = {
enabled: true
strategy: HASH_DIFF
poll_interval_ms: 1000
};
Configuration Options
| Option | Type | Description |
|---|---|---|
enabled | bool | Enable live query for this operation |
strategy | enum | INVALIDATION, POLLING, or HASH_DIFF |
triggers | string[] | Invalidation event patterns (e.g., “User.update”) |
throttle_ms | uint32 | Minimum time between updates (default: 100ms) |
poll_interval_ms | uint32 | Polling interval for POLLING/HASH_DIFF strategies |
ttl_seconds | uint32 | Auto-expire subscription after N seconds |
API Reference
Public Functions
// Check if query contains @live directive
pub fn has_live_directive(query: &str) -> bool;
// Strip @live directive for execution
pub fn strip_live_directive(query: &str) -> String;
// Create a shared live query store
pub fn create_live_query_store() -> SharedLiveQueryStore;
// Create with custom config
pub fn create_live_query_store_with_config(config: LiveQueryConfig) -> SharedLiveQueryStore;
LiveQueryStore Methods
impl LiveQueryStore {
// Register a new live query subscription
pub fn register(&self, query: ActiveLiveQuery, sender: Sender<LiveQueryUpdate>) -> Result<(), LiveQueryError>;
// Unregister a subscription
pub fn unregister(&self, subscription_id: &str) -> Option<ActiveLiveQuery>;
// Trigger invalidation for matching subscriptions
pub fn invalidate(&self, event: InvalidationEvent) -> usize;
// Get current statistics
pub fn stats(&self) -> LiveQueryStats;
}
InvalidationEvent
// Create an invalidation event
let event = InvalidationEvent::new("User", "update");
// With specific entity ID
let event = InvalidationEvent::with_id("User", "update", "user-123");
WebSocket Protocol
The /graphql/live endpoint uses the graphql-transport-ws protocol:
Client → Server
| Message Type | Description |
|---|---|
connection_init | Initialize connection |
subscribe | Start a live query subscription |
complete | End a subscription |
ping | Keep-alive ping |
Server → Client
| Message Type | Description |
|---|---|
connection_ack | Connection accepted |
next | Query result (initial or update) |
error | Error occurred |
complete | Subscription ended |
pong | Keep-alive response |
Example: Full CRUD with Live Updates
See the complete example at examples/live_query/:
# Run the example
cargo run --example live_query
# In another terminal, run the WebSocket test
node examples/live_query/test_ws.js
The test demonstrates:
- Initial live query returning 3 users
- Delete mutation removing a user
- Re-query showing 2 users
- Create mutation adding a new user
- Final query showing updated user list
Best Practices
- Use Specific Triggers: Only subscribe to relevant entity types
- Set Appropriate Throttle: Prevent overwhelming clients (100-500ms)
- Use TTL for Temporary Subscriptions: Auto-cleanup inactive queries
- Prefer Invalidation over Polling: More efficient for most use cases
- Handle Reconnection: Clients should re-subscribe after disconnect
Advanced Features
The live query system includes 4 advanced features for optimizing bandwidth, performance, and user experience.
1. Filtered Live Queries
Apply server-side filtering to live queries to receive only relevant updates.
Usage
# Only receive updates for online users
query @live {
users(status: ONLINE) {
users { id name }
total_count
}
}
Implementation
use grpc_graphql_gateway::{parse_query_arguments, matches_filter};
// Parse filter from query
let args = parse_query_arguments("users(status: ONLINE) @live");
// → { "status": "ONLINE" }
// Check if entity matches filter
let user = json!({"id": "1", "status": "ONLINE", "name": "Alice"});
if matches_filter(&args, &user) {
// Include in live query results
}
Benefits
- 50-90% bandwidth reduction for filtered datasets
- Natural GraphQL query syntax
- No client-side filtering needed
2. Field-Level Invalidation
Track which specific fields changed and communicate this to clients for surgical updates.
Response Format
{
id: "sub-123",
data: { user: { id: "1", name: "Alice Smith", age: 31 } },
changed_fields: ["user.name", "user.age"], // ← Only these changed!
is_initial: false,
revision: 5
}
Implementation
use grpc_graphql_gateway::detect_field_changes;
let old_data = json!({"user": {"name": "Alice", "age": 30}});
let new_data = json!({"user": {"name": "Alice Smith", "age": 31}});
let changes = detect_field_changes(&old_data, &new_data, "", 0, 10);
// changes = [
// FieldChange { field_path: "user.name", old_value: "Alice", new_value: "Alice Smith" },
// FieldChange { field_path: "user.age", old_value: 30, new_value: 31 }
// ]
Client-Side Usage
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'next' && msg.payload.changed_fields) {
// Only update changed fields in UI
msg.payload.changed_fields.forEach(field => {
updateFieldInDOM(field, msg.payload.data);
});
}
};
Benefits
- 30-70% bandwidth reduction when few fields change
- Surgical UI updates - only re-render changed components
- Reduced client-side processing overhead
3. Batch Invalidation
Merge multiple rapid invalidation events into a single update to reduce network traffic.
Configuration
use grpc_graphql_gateway::BatchInvalidationConfig;
let config = BatchInvalidationConfig {
enabled: true,
debounce_ms: 50, // Wait 50ms before flushing
max_batch_size: 100, // Auto-flush at 100 events
max_wait_ms: 500, // Force flush after 500ms max
};
How It Works
Without batching:
━━━━━━━━━━━━━━━━━━━━━━━
Event 1 (0ms) → Update 1
Event 2 (10ms) → Update 2
Event 3 (20ms) → Update 3
Event 4 (30ms) → Update 4
Event 5 (40ms) → Update 5
━━━━━━━━━━━━━━━━━━━━━━━
Result: 5 updates sent
With batching (100ms throttle):
━━━━━━━━━━━━━━━━━━━━━━━
Events 1-5 (0-40ms)
↓ (wait 100ms)
Single merged update
━━━━━━━━━━━━━━━━━━━━━━━
Result: 1 update sent
Proto Configuration
option (graphql.live_query) = {
enabled: true
strategy: INVALIDATION
throttle_ms: 100 // ← Enables batching
triggers: ["User.create", "User.update"]
};
Benefits
- 70-95% fewer network requests during high-frequency updates
- Lower client processing overhead
- Better performance for rapidly changing data
4. Client-Side Caching Hints
Send cache control directives to help clients optimize caching based on data volatility.
Response Format
{
id: "sub-123",
data: { user: { name: "Alice" } },
cache_control: {
max_age: 300, // Cache for 5 minutes
must_revalidate: true,
etag: "abc123def456" // For efficient revalidation
}
}
Implementation
use grpc_graphql_gateway::{generate_cache_control, DataVolatility};
// Generate cache control based on data type
let cache = generate_cache_control(
DataVolatility::Low, // User profiles change infrequently
Some("etag-user-123".to_string())
);
// Result:
// CacheControl {
// max_age: 300, // 5 minutes
// must_revalidate: true,
// etag: Some("etag-user-123")
// }
Data Volatility Levels
| Volatility | Cache Duration | Use Case |
|---|---|---|
VeryHigh | 0s (no cache) | Stock prices, real-time metrics |
High | 5s | User online status, live counts |
Medium | 30s | Notification counts, activity feeds |
Low | 5 minutes | User profiles, post content |
VeryLow | 1 hour | Settings, configuration data |
Client-Side Implementation
const cache = new Map();
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'next' && msg.payload.cache_control) {
const { max_age, etag } = msg.payload.cache_control;
// Store in cache with expiration
cache.set(msg.id, {
data: msg.payload.data,
etag: etag,
expires: Date.now() + (max_age * 1000)
});
}
};
Benefits
- 40-80% reduced server load through client caching
- Faster perceived performance
- Automatic cache invalidation on updates
Advanced Features API Reference
Functions
// Filter Support - Feature #1
pub fn parse_query_arguments(query: &str) -> HashMap<String, String>;
pub fn matches_filter(filter: &HashMap<String, String>, data: &Value) -> bool;
// Field-Level Changes - Feature #2
pub fn detect_field_changes(
old: &Value,
new: &Value,
path: &str,
depth: usize,
max_depth: usize
) -> Vec<FieldChange>;
// Cache Control - Feature #4
pub fn generate_cache_control(
volatility: DataVolatility,
etag: Option<String>
) -> CacheControl;
Types
// Cache Control
pub struct CacheControl {
pub max_age: u32,
pub public: bool,
pub must_revalidate: bool,
pub etag: Option<String>,
}
// Field Change
pub struct FieldChange {
pub field_path: String,
pub old_value: Option<Value>,
pub new_value: Value,
}
// Batch Configuration
pub struct BatchInvalidationConfig {
pub enabled: bool,
pub debounce_ms: u64,
pub max_batch_size: usize,
pub max_wait_ms: u64,
}
// Data Volatility
pub enum DataVolatility {
VeryHigh, // Changes multiple times per second
High, // Changes every few seconds
Medium, // Changes every minute
Low, // Changes hourly
VeryLow, // Changes daily or less
}
Enhanced LiveQueryUpdate
pub struct LiveQueryUpdate {
pub id: String,
pub data: serde_json::Value,
pub is_initial: bool,
pub revision: u64,
// Advanced features (all optional)
pub cache_control: Option<CacheControl>,
pub changed_fields: Option<Vec<String>>,
pub batched: Option<bool>,
pub timestamp: Option<u64>,
}
Performance Comparison
Real-World Scenario
Setup: Live dashboard with 1000 users, 10 fields each, 60 updates/minute
| Metric | Without Features | With Features | Improvement |
|---|---|---|---|
| Users sent | 1000 | 100 (filtered) | 90% reduction |
| Fields/user | 10 | 2 (changed only) | 80% reduction |
| Updates/min | 60 | 10 (batched) | 83% reduction |
| Cache hits | 0% | 50% | 50% less load |
| Total data/min | ~2.3 MB | ~23 KB | 99% reduction |
Complete Example
A comprehensive example demonstrating all 4 features is available:
# Run the server
cargo run --example live_query
# Test all advanced features
cd examples/live_query
node test_advanced_features.js
For detailed documentation and examples, see:
Migration Guide
Adding Filtered Queries
Before:
query @live {
users {
users { id name status }
}
}
After:
query @live {
users(status: ONLINE) { // ← Add filter
users { id name status }
}
}
Using Field-Level Updates
Before:
if (msg.type === 'next') {
// Update entire component
updateUserComponent(msg.payload.data.user);
}
After:
if (msg.type === 'next') {
if (msg.payload.changed_fields) {
// Update only changed fields
msg.payload.changed_fields.forEach(field => {
updateField(field, msg.payload.data);
});
} else {
// Initial load
updateUserComponent(msg.payload.data.user);
}
}
Enabling Batching
Simply increase the throttle in your proto config:
option (graphql.live_query) = {
throttle_ms: 100 // ← Increase from 0 to enable batching
};
Troubleshooting
Filtered queries not working?
- Verify filter syntax:
key: valueformat (e.g.,status: ONLINE) - Filters are case-sensitive
- Check that entity data contains the filter fields
Too many updates still?
- Increase
throttle_msfor more aggressive batching - Add more specific filters to reduce result set
- Review your invalidation triggers
Cache not working?
- Ensure client respects
max_ageheader - Check that
cache_controlis present in response - Verify ETag handling on client side
Changed fields not showing?
- Feature requires
throttle_ms > 0 - Check that data actually changes between updates
- Ensure client is checking
changed_fieldsproperty