Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Live Queries

Live queries provide real-time data updates to clients using the @live directive. When underlying data changes, connected clients automatically receive updated results.

Overview

Unlike traditional GraphQL subscriptions that push specific events, live queries automatically re-execute the query when relevant data mutations occur, sending the complete updated result to the client.

Key Features

  • @live Directive: Add to any query to make it “live”
  • WebSocket Delivery: Real-time updates via /graphql/live endpoint
  • Invalidation-Based: Mutations trigger query re-execution
  • Configurable Strategies: Invalidation, polling, or hash-diff modes
  • Throttling: Prevent flooding clients with too many updates

Quick Start

1. Client: Send a Live Query

Connect to the WebSocket endpoint and subscribe with the @live directive:

const ws = new WebSocket('ws://localhost:9000/graphql/live', 'graphql-transport-ws');

ws.onopen = () => {
  // Initialize connection
  ws.send(JSON.stringify({ type: 'connection_init' }));
};

ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);
  
  if (msg.type === 'connection_ack') {
    // Subscribe with @live query
    ws.send(JSON.stringify({
      id: 'users-live',
      type: 'subscribe',
      payload: {
        query: `query @live {
          users {
            id
            name
            status
          }
        }`
      }
    }));
  }
  
  if (msg.type === 'next') {
    console.log('Received update:', msg.payload.data);
  }
};

2. Proto: Configure Live Query Support

Mark RPC methods as live query compatible:

service UserService {
  rpc ListUsers(Empty) returns (UserList) {
    option (graphql.schema) = { 
      type: QUERY 
      name: "users" 
    };
    option (graphql.live_query) = {
      enabled: true
      strategy: INVALIDATION
      triggers: ["User.create", "User.update", "User.delete"]
      throttle_ms: 100
    };
  }
  
  rpc CreateUser(CreateUserRequest) returns (User) {
    option (graphql.schema) = { 
      type: MUTATION 
      name: "createUser" 
    };
    // Mutations don't need live_query config - they trigger invalidation
  }
}

3. Server: Trigger Invalidation

After mutations, trigger invalidation to notify live queries:

use grpc_graphql_gateway::{InvalidationEvent, LiveQueryStore};

// In your mutation handler
async fn create_user(&self, req: CreateUserRequest) -> Result<User, Status> {
    // ... create user logic ...
    
    // Notify live queries that User data changed
    if let Some(store) = &self.live_query_store {
        store.invalidate(InvalidationEvent::new("User", "create"));
    }
    
    Ok(user)
}

Live Query Strategies

Re-execute query only when relevant mutations occur:

option (graphql.live_query) = {
  enabled: true
  strategy: INVALIDATION
  triggers: ["User.update", "User.delete"]
};

Polling

Periodically re-execute query at fixed intervals:

option (graphql.live_query) = {
  enabled: true
  strategy: POLLING
  poll_interval_ms: 5000  // Every 5 seconds
};

Hash Diff

Only send updates if result actually changed:

option (graphql.live_query) = {
  enabled: true
  strategy: HASH_DIFF
  poll_interval_ms: 1000
};

Configuration Options

OptionTypeDescription
enabledboolEnable live query for this operation
strategyenumINVALIDATION, POLLING, or HASH_DIFF
triggersstring[]Invalidation event patterns (e.g., “User.update”)
throttle_msuint32Minimum time between updates (default: 100ms)
poll_interval_msuint32Polling interval for POLLING/HASH_DIFF strategies
ttl_secondsuint32Auto-expire subscription after N seconds

API Reference

Public Functions

// Check if query contains @live directive
pub fn has_live_directive(query: &str) -> bool;

// Strip @live directive for execution
pub fn strip_live_directive(query: &str) -> String;

// Create a shared live query store
pub fn create_live_query_store() -> SharedLiveQueryStore;

// Create with custom config
pub fn create_live_query_store_with_config(config: LiveQueryConfig) -> SharedLiveQueryStore;

LiveQueryStore Methods

impl LiveQueryStore {
    // Register a new live query subscription
    pub fn register(&self, query: ActiveLiveQuery, sender: Sender<LiveQueryUpdate>) -> Result<(), LiveQueryError>;
    
    // Unregister a subscription
    pub fn unregister(&self, subscription_id: &str) -> Option<ActiveLiveQuery>;
    
    // Trigger invalidation for matching subscriptions
    pub fn invalidate(&self, event: InvalidationEvent) -> usize;
    
    // Get current statistics
    pub fn stats(&self) -> LiveQueryStats;
}

InvalidationEvent

// Create an invalidation event
let event = InvalidationEvent::new("User", "update");

// With specific entity ID
let event = InvalidationEvent::with_id("User", "update", "user-123");

WebSocket Protocol

The /graphql/live endpoint uses the graphql-transport-ws protocol:

Client → Server

Message TypeDescription
connection_initInitialize connection
subscribeStart a live query subscription
completeEnd a subscription
pingKeep-alive ping

Server → Client

Message TypeDescription
connection_ackConnection accepted
nextQuery result (initial or update)
errorError occurred
completeSubscription ended
pongKeep-alive response

Example: Full CRUD with Live Updates

See the complete example at examples/live_query/:

# Run the example
cargo run --example live_query

# In another terminal, run the WebSocket test
node examples/live_query/test_ws.js

The test demonstrates:

  1. Initial live query returning 3 users
  2. Delete mutation removing a user
  3. Re-query showing 2 users
  4. Create mutation adding a new user
  5. Final query showing updated user list

Best Practices

  1. Use Specific Triggers: Only subscribe to relevant entity types
  2. Set Appropriate Throttle: Prevent overwhelming clients (100-500ms)
  3. Use TTL for Temporary Subscriptions: Auto-cleanup inactive queries
  4. Prefer Invalidation over Polling: More efficient for most use cases
  5. Handle Reconnection: Clients should re-subscribe after disconnect

Advanced Features

The live query system includes 4 advanced features for optimizing bandwidth, performance, and user experience.

1. Filtered Live Queries

Apply server-side filtering to live queries to receive only relevant updates.

Usage

# Only receive updates for online users
query @live {
  users(status: ONLINE) {
    users { id name }
    total_count
  }
}

Implementation

use grpc_graphql_gateway::{parse_query_arguments, matches_filter};

// Parse filter from query
let args = parse_query_arguments("users(status: ONLINE) @live");
// → { "status": "ONLINE" }

// Check if entity matches filter
let user = json!({"id": "1", "status": "ONLINE", "name": "Alice"});
if matches_filter(&args, &user) {
    // Include in live query results
}

Benefits

  • 50-90% bandwidth reduction for filtered datasets
  • Natural GraphQL query syntax
  • No client-side filtering needed

2. Field-Level Invalidation

Track which specific fields changed and communicate this to clients for surgical updates.

Response Format

{
  id: "sub-123",
  data: { user: { id: "1", name: "Alice Smith", age: 31 } },
  changed_fields: ["user.name", "user.age"],  // ← Only these changed!
  is_initial: false,
  revision: 5
}

Implementation

use grpc_graphql_gateway::detect_field_changes;

let old_data = json!({"user": {"name": "Alice", "age": 30}});
let new_data = json!({"user": {"name": "Alice Smith", "age": 31}});

let changes = detect_field_changes(&old_data, &new_data, "", 0, 10);

// changes = [
//   FieldChange { field_path: "user.name", old_value: "Alice", new_value: "Alice Smith" },
//   FieldChange { field_path: "user.age", old_value: 30, new_value: 31 }
// ]

Client-Side Usage

ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);
  
  if (msg.type === 'next' && msg.payload.changed_fields) {
    // Only update changed fields in UI
    msg.payload.changed_fields.forEach(field => {
      updateFieldInDOM(field, msg.payload.data);
    });
  }
};

Benefits

  • 30-70% bandwidth reduction when few fields change
  • Surgical UI updates - only re-render changed components
  • Reduced client-side processing overhead

3. Batch Invalidation

Merge multiple rapid invalidation events into a single update to reduce network traffic.

Configuration

use grpc_graphql_gateway::BatchInvalidationConfig;

let config = BatchInvalidationConfig {
    enabled: true,
    debounce_ms: 50,        // Wait 50ms before flushing
    max_batch_size: 100,     // Auto-flush at 100 events
    max_wait_ms: 500,        // Force flush after 500ms max
};

How It Works

Without batching:
━━━━━━━━━━━━━━━━━━━━━━━
Event 1 (0ms)   → Update 1
Event 2 (10ms)  → Update 2
Event 3 (20ms)  → Update 3
Event 4 (30ms)  → Update 4
Event 5 (40ms)  → Update 5
━━━━━━━━━━━━━━━━━━━━━━━
Result: 5 updates sent

With batching (100ms throttle):
━━━━━━━━━━━━━━━━━━━━━━━
Events 1-5 (0-40ms)
  ↓ (wait 100ms)
Single merged update
━━━━━━━━━━━━━━━━━━━━━━━
Result: 1 update sent

Proto Configuration

option (graphql.live_query) = {
  enabled: true
  strategy: INVALIDATION
  throttle_ms: 100  // ← Enables batching
  triggers: ["User.create", "User.update"]
};

Benefits

  • 70-95% fewer network requests during high-frequency updates
  • Lower client processing overhead
  • Better performance for rapidly changing data

4. Client-Side Caching Hints

Send cache control directives to help clients optimize caching based on data volatility.

Response Format

{
  id: "sub-123",
  data: { user: { name: "Alice" } },
  cache_control: {
    max_age: 300,          // Cache for 5 minutes
    must_revalidate: true,
    etag: "abc123def456"   // For efficient revalidation
  }
}

Implementation

use grpc_graphql_gateway::{generate_cache_control, DataVolatility};

// Generate cache control based on data type
let cache = generate_cache_control(
    DataVolatility::Low,  // User profiles change infrequently
    Some("etag-user-123".to_string())
);

// Result:
// CacheControl {
//     max_age: 300,  // 5 minutes
//     must_revalidate: true,
//     etag: Some("etag-user-123")
// }

Data Volatility Levels

VolatilityCache DurationUse Case
VeryHigh0s (no cache)Stock prices, real-time metrics
High5sUser online status, live counts
Medium30sNotification counts, activity feeds
Low5 minutesUser profiles, post content
VeryLow1 hourSettings, configuration data

Client-Side Implementation

const cache = new Map();

ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);
  
  if (msg.type === 'next' && msg.payload.cache_control) {
    const { max_age, etag } = msg.payload.cache_control;
    
    // Store in cache with expiration
    cache.set(msg.id, {
      data: msg.payload.data,
      etag: etag,
      expires: Date.now() + (max_age * 1000)
    });
  }
};

Benefits

  • 40-80% reduced server load through client caching
  • Faster perceived performance
  • Automatic cache invalidation on updates

Advanced Features API Reference

Functions

// Filter Support - Feature #1
pub fn parse_query_arguments(query: &str) -> HashMap<String, String>;
pub fn matches_filter(filter: &HashMap<String, String>, data: &Value) -> bool;

// Field-Level Changes - Feature #2
pub fn detect_field_changes(
    old: &Value,
    new: &Value,
    path: &str,
    depth: usize,
    max_depth: usize
) -> Vec<FieldChange>;

// Cache Control - Feature #4
pub fn generate_cache_control(
    volatility: DataVolatility,
    etag: Option<String>
) -> CacheControl;

Types

// Cache Control
pub struct CacheControl {
    pub max_age: u32,
    pub public: bool,
    pub must_revalidate: bool,
    pub etag: Option<String>,
}

// Field Change
pub struct FieldChange {
    pub field_path: String,
    pub old_value: Option<Value>,
    pub new_value: Value,
}

// Batch Configuration
pub struct BatchInvalidationConfig {
    pub enabled: bool,
    pub debounce_ms: u64,
    pub max_batch_size: usize,
    pub max_wait_ms: u64,
}

// Data Volatility
pub enum DataVolatility {
    VeryHigh,  // Changes multiple times per second
    High,      // Changes every few seconds
    Medium,    // Changes every minute
    Low,       // Changes hourly
    VeryLow,   // Changes daily or less
}

Enhanced LiveQueryUpdate

pub struct LiveQueryUpdate {
    pub id: String,
    pub data: serde_json::Value,
    pub is_initial: bool,
    pub revision: u64,
    
    // Advanced features (all optional)
    pub cache_control: Option<CacheControl>,
    pub changed_fields: Option<Vec<String>>,
    pub batched: Option<bool>,
    pub timestamp: Option<u64>,
}

Performance Comparison

Real-World Scenario

Setup: Live dashboard with 1000 users, 10 fields each, 60 updates/minute

MetricWithout FeaturesWith FeaturesImprovement
Users sent1000100 (filtered)90% reduction
Fields/user102 (changed only)80% reduction
Updates/min6010 (batched)83% reduction
Cache hits0%50%50% less load
Total data/min~2.3 MB~23 KB99% reduction

Complete Example

A comprehensive example demonstrating all 4 features is available:

# Run the server
cargo run --example live_query

# Test all advanced features
cd examples/live_query
node test_advanced_features.js

For detailed documentation and examples, see:


Migration Guide

Adding Filtered Queries

Before:

query @live {
  users {
    users { id name status }
  }
}

After:

query @live {
  users(status: ONLINE) {  // ← Add filter
    users { id name status }
  }
}

Using Field-Level Updates

Before:

if (msg.type === 'next') {
  // Update entire component
  updateUserComponent(msg.payload.data.user);
}

After:

if (msg.type === 'next') {
  if (msg.payload.changed_fields) {
    // Update only changed fields
    msg.payload.changed_fields.forEach(field => {
      updateField(field, msg.payload.data);
    });
  } else {
    // Initial load
    updateUserComponent(msg.payload.data.user);
  }
}

Enabling Batching

Simply increase the throttle in your proto config:

option (graphql.live_query) = {
  throttle_ms: 100  // ← Increase from 0 to enable batching
};

Troubleshooting

Filtered queries not working?

  • Verify filter syntax: key: value format (e.g., status: ONLINE)
  • Filters are case-sensitive
  • Check that entity data contains the filter fields

Too many updates still?

  • Increase throttle_ms for more aggressive batching
  • Add more specific filters to reduce result set
  • Review your invalidation triggers

Cache not working?

  • Ensure client respects max_age header
  • Check that cache_control is present in response
  • Verify ETag handling on client side

Changed fields not showing?

  • Feature requires throttle_ms > 0
  • Check that data actually changes between updates
  • Ensure client is checking changed_fields property