Webhooks & Event Triggers

React to database changes and external events with Hasura Event Triggers and webhook integrations for real-time data processing and external service notifications.

Overview

Webhooks in nself enable real-time reactions to database changes through Hasura Event Triggers, scheduled events, and external webhook integrations. This allows for automatic processing, notifications, and data synchronization.

Event Triggers

Database Event Triggers

Automatically trigger webhooks when database rows are inserted, updated, or deleted:

# Event triggers fire on:
# - INSERT operations (new records)
# - UPDATE operations (modified records)  
# - DELETE operations (removed records)
# - Manual events (triggered via GraphQL)

# Common use cases:
# - Send welcome email when user registers
# - Update search index when content changes
# - Sync data with external systems
# - Generate notifications
# - Audit logging
# - Cache invalidation

Setting Up Event Triggers

Configure event triggers in Hasura Console:

  1. Go to "Events" tab in Hasura Console
  2. Click "Create Event Trigger"
  3. Select table and operations (INSERT/UPDATE/DELETE)
  4. Configure webhook URL
  5. Set retry logic and headers

Webhook Implementation

NestJS Webhook Handler

// functions/nestjs/src/webhooks/user.controller.ts
import { Controller, Post, Body, Headers, Logger } from '@nestjs/common';
import { EmailService } from '../services/email.service';
import { AnalyticsService } from '../services/analytics.service';

interface EventTriggerPayload {
  event: {
    session_variables: Record<string, any>;
    op: 'INSERT' | 'UPDATE' | 'DELETE';
    data: {
      old: any;
      new: any;
    };
  };
  created_at: string;
  id: string;
  delivery_info: {
    max_retries: number;
    current_retry: number;
  };
  trigger: {
    name: string;
  };
  table: {
    schema: string;
    name: string;
  };
}

@Controller('webhooks')
export class UserWebhookController {
  private readonly logger = new Logger(UserWebhookController.name);

  constructor(
    private emailService: EmailService,
    private analyticsService: AnalyticsService,
  ) {}

  @Post('user-created')
  async handleUserCreated(
    @Body() payload: EventTriggerPayload,
    @Headers('x-hasura-admin-secret') adminSecret: string,
  ) {
    // Verify webhook authenticity
    if (adminSecret !== process.env.HASURA_GRAPHQL_ADMIN_SECRET) {
      this.logger.warn('Invalid admin secret in webhook request');
      return { status: 'unauthorized' };
    }

    try {
      const { event } = payload;
      const newUser = event.data.new;

      this.logger.log(`Processing user created event for: ${newUser.email}`);

      // Send welcome email
      await this.emailService.sendWelcomeEmail({
        email: newUser.email,
        name: newUser.name,
        userId: newUser.id,
      });

      // Track user registration in analytics
      await this.analyticsService.trackEvent({
        event: 'user_registered',
        userId: newUser.id,
        properties: {
          email: newUser.email,
          name: newUser.name,
          registration_method: newUser.registration_method || 'email',
        },
      });

      // Create user profile
      await this.createDefaultUserProfile(newUser);

      this.logger.log(`Successfully processed user created event for: ${newUser.email}`);

      return {
        status: 'success',
        message: 'User created event processed successfully',
      };
    } catch (error) {
      this.logger.error('Error processing user created event:', error);
      
      // Return error to trigger retry
      throw new Error(`Failed to process user created event: ${error.message}`);
    }
  }

  @Post('user-updated')
  async handleUserUpdated(@Body() payload: EventTriggerPayload) {
    const { event } = payload;
    const oldUser = event.data.old;
    const newUser = event.data.new;

    // Check what fields changed
    const changedFields = this.getChangedFields(oldUser, newUser);

    if (changedFields.includes('email')) {
      // Email changed - send verification email
      await this.emailService.sendEmailVerification(newUser.email, newUser.id);
    }

    if (changedFields.includes('name')) {
      // Update external services with new name
      await this.syncUserNameToExternalServices(newUser);
    }

    return { status: 'success' };
  }

  private async createDefaultUserProfile(user: any) {
    // Create default user profile
    // This could be a GraphQL mutation or direct database insert
  }

  private getChangedFields(oldData: any, newData: any): string[] {
    const changed = [];
    for (const key in newData) {
      if (oldData[key] !== newData[key]) {
        changed.push(key);
      }
    }
    return changed;
  }
}

Python Webhook Handler

# functions/python/webhooks/order_processing.py
from flask import Flask, request, jsonify
import os
import logging
from typing import Dict, Any
import requests

app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@app.route('/webhooks/order-completed', methods=['POST'])
def handle_order_completed():
    try:
        # Verify webhook authenticity
        admin_secret = request.headers.get('x-hasura-admin-secret')
        if admin_secret != os.getenv('HASURA_GRAPHQL_ADMIN_SECRET'):
            return jsonify({'status': 'unauthorized'}), 401

        payload = request.get_json()
        event = payload['event']
        new_order = event['data']['new']
        
        logger.info(f"Processing order completed: {new_order['id']}")

        # Process the order
        result = process_order_completion(new_order)
        
        if result['success']:
            return jsonify({
                'status': 'success',
                'message': 'Order processed successfully'
            })
        else:
            # Return error to trigger retry
            return jsonify({
                'status': 'error',
                'message': result['error']
            }), 500

    except Exception as e:
        logger.error(f"Error processing order webhook: {str(e)}")
        return jsonify({
            'status': 'error',
            'message': 'Failed to process order'
        }), 500

def process_order_completion(order: Dict[str, Any]) -> Dict[str, Any]:
    try:
        # Send order confirmation email
        send_order_confirmation_email(order)
        
        # Update inventory
        update_inventory(order['items'])
        
        # Create shipment
        shipment = create_shipment(order)
        
        # Send to fulfillment service
        notify_fulfillment_service(order, shipment)
        
        # Update analytics
        track_order_completion(order)
        
        return {'success': True}
        
    except Exception as e:
        logger.error(f"Order processing failed: {str(e)}")
        return {'success': False, 'error': str(e)}

def send_order_confirmation_email(order: Dict[str, Any]):
    # Send email using your email service
    pass

def update_inventory(items: list):
    # Update product inventory
    pass

def create_shipment(order: Dict[str, Any]):
    # Create shipment record
    return {'id': 'ship_123', 'tracking_number': 'TRK123456'}

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=3002)

Scheduled Events

Cron-based Webhooks

Schedule webhooks to run at specific intervals:

# Create scheduled event in Hasura Console:
# Name: daily-report-generation
# Webhook: http://nestjs:3001/webhooks/scheduled/daily-report
# Schedule: 0 9 * * * (9 AM daily)
# Payload: {"report_type": "daily_summary"}

@Post('scheduled/daily-report')
async handleDailyReport(@Body() payload: any) {
  try {
    const reportType = payload.report_type;
    
    // Generate daily report
    const report = await this.reportService.generateDailyReport();
    
    // Send report to stakeholders
    await this.emailService.sendDailyReport(report);
    
    // Store report in database
    await this.reportService.saveReport(report);
    
    return {
      status: 'success',
      message: 'Daily report generated and sent',
      report_id: report.id
    };
  } catch (error) {
    throw new Error(`Failed to generate daily report: ${error.message}`);
  }
}

Common Scheduled Events

# Examples of scheduled webhooks:

# Daily cleanup (2 AM)
0 2 * * * -> /webhooks/scheduled/cleanup
- Delete expired sessions
- Archive old logs  
- Clean temporary files

# Weekly reports (Monday 9 AM)
0 9 * * 1 -> /webhooks/scheduled/weekly-report
- Generate usage reports
- Send to administrators
- Update dashboards

# Monthly billing (1st of month, midnight)
0 0 1 * * -> /webhooks/scheduled/billing
- Calculate usage charges
- Generate invoices
- Process payments

# Hourly health check
0 * * * * -> /webhooks/scheduled/health-check
- Check external services
- Verify database connections
- Send alerts if needed

External Webhooks

Receiving External Webhooks

Handle webhooks from external services like payment providers, email services, etc:

// Handle Stripe webhook
@Post('external/stripe')
async handleStripeWebhook(
  @Body() payload: any,
  @Headers('stripe-signature') signature: string,
) {
  // Verify webhook signature
  const isValid = this.stripeService.verifyWebhook(payload, signature);
  if (!isValid) {
    throw new BadRequestException('Invalid webhook signature');
  }

  const event = payload;
  
  switch (event.type) {
    case 'payment_intent.succeeded':
      await this.handlePaymentSuccess(event.data.object);
      break;
      
    case 'payment_intent.payment_failed':
      await this.handlePaymentFailure(event.data.object);
      break;
      
    case 'customer.subscription.created':
      await this.handleSubscriptionCreated(event.data.object);
      break;
      
    case 'customer.subscription.deleted':
      await this.handleSubscriptionCancelled(event.data.object);
      break;
      
    default:
      this.logger.log(`Unhandled Stripe event: ${event.type}`);
  }

  return { status: 'success' };
}

Webhook Security

// Verify webhook signatures
import crypto from 'crypto';

function verifyWebhookSignature(
  payload: string,
  signature: string,
  secret: string
): boolean {
  const expectedSignature = crypto
    .createHmac('sha256', secret)
    .update(payload, 'utf8')
    .digest('hex');
    
  return crypto.timingSafeEqual(
    Buffer.from(signature),
    Buffer.from(expectedSignature)
  );
}

// GitHub webhook verification
@Post('external/github')
async handleGitHubWebhook(
  @Body() payload: any,
  @Headers('x-hub-signature-256') signature: string,
) {
  const isValid = verifyWebhookSignature(
    JSON.stringify(payload),
    signature.replace('sha256=', ''),
    process.env.GITHUB_WEBHOOK_SECRET
  );
  
  if (!isValid) {
    throw new UnauthorizedException('Invalid webhook signature');
  }
  
  // Process GitHub webhook
  await this.processGitHubEvent(payload);
}

Webhook Configuration

Hasura Event Trigger Configuration

# Event trigger configuration example:
{
  "name": "user_created_trigger",
  "definition": {
    "enable_manual": false,
    "insert": {
      "columns": "*"
    },
    "retry_conf": {
      "num_retries": 3,
      "interval_sec": 10,
      "timeout_sec": 60
    },
    "webhook": "http://nestjs:3001/webhooks/user-created",
    "headers": [
      {
        "name": "Content-Type",
        "value": "application/json"
      },
      {
        "name": "X-Hasura-Admin-Secret",
        "value_from_env": "HASURA_GRAPHQL_ADMIN_SECRET"
      }
    ]
  }
}

Webhook Headers and Authentication

# Configure webhook headers:
# Content-Type: application/json
# X-Hasura-Admin-Secret: {{ADMIN_SECRET}}
# X-Request-ID: {{REQUEST_ID}}
# Authorization: Bearer {{API_TOKEN}}

# Custom headers for external services:
{
  "name": "X-API-Key",
  "value_from_env": "EXTERNAL_SERVICE_API_KEY"
}

# Session variables in headers:
{
  "name": "X-User-ID", 
  "value": "{{SESSION_VARIABLES['x-hasura-user-id']}}"
}

Error Handling and Retries

Retry Configuration

# Hasura retry configuration:
{
  "retry_conf": {
    "num_retries": 3,      # Number of retry attempts
    "interval_sec": 10,    # Delay between retries (seconds)
    "timeout_sec": 60      # Timeout per request (seconds)
  }
}

# Retry backoff strategy:
# Attempt 1: Immediate
# Attempt 2: 10 seconds later
# Attempt 3: 20 seconds later  
# Attempt 4: 40 seconds later

Error Response Handling

// Proper error responses for retries
@Post('webhooks/user-created')
async handleUserCreated(@Body() payload: any) {
  try {
    await this.processUserCreation(payload);
    
    // Success - return 200
    return { status: 'success' };
  } catch (error) {
    if (error instanceof ValidationError) {
      // Don't retry validation errors - return 200
      this.logger.warn('Validation error:', error.message);
      return { 
        status: 'error', 
        message: 'Validation failed',
        retry: false 
      };
    }
    
    if (error instanceof TemporaryError) {
      // Retry temporary errors - return 500
      this.logger.error('Temporary error:', error.message);
      throw new InternalServerErrorException(error.message);
    }
    
    // Unknown error - log and retry
    this.logger.error('Unknown error:', error);
    throw new InternalServerErrorException('Processing failed');
  }
}

Webhook Testing and Debugging

Local Testing

# Test webhook locally
curl -X POST http://localhost:3001/webhooks/user-created \
  -H "Content-Type: application/json" \
  -H "X-Hasura-Admin-Secret: your-admin-secret" \
  -d '{
    "event": {
      "session_variables": {},
      "op": "INSERT",
      "data": {
        "old": null,
        "new": {
          "id": "123e4567-e89b-12d3-a456-426614174000",
          "email": "test@example.com",
          "name": "Test User",
          "created_at": "2024-01-01T00:00:00Z"
        }
      }
    },
    "created_at": "2024-01-01T00:00:00Z",
    "id": "webhook-event-id",
    "trigger": {"name": "user_created_trigger"},
    "table": {"schema": "public", "name": "users"}
  }'

Webhook Monitoring

# Monitor webhook performance
@Post('webhooks/user-created')
async handleUserCreated(@Body() payload: any) {
  const startTime = Date.now();
  const eventId = payload.id;
  
  try {
    this.logger.log(`Processing webhook ${eventId}`);
    
    await this.processUserCreation(payload);
    
    const duration = Date.now() - startTime;
    this.metricsService.recordWebhookSuccess('user_created', duration);
    
    return { status: 'success' };
  } catch (error) {
    const duration = Date.now() - startTime;
    this.metricsService.recordWebhookError('user_created', duration, error);
    
    throw error;
  }
}

Advanced Webhook Patterns

Webhook Aggregation

// Batch process multiple events
@Injectable()
export class WebhookBatchProcessor {
  private eventQueue: any[] = [];
  private batchSize = 10;
  private batchTimeout = 5000; // 5 seconds
  
  async addEvent(event: any) {
    this.eventQueue.push(event);
    
    if (this.eventQueue.length >= this.batchSize) {
      await this.processBatch();
    }
  }
  
  @Cron('*/5 * * * * *') // Every 5 seconds
  async processBatch() {
    if (this.eventQueue.length === 0) return;
    
    const batch = this.eventQueue.splice(0, this.batchSize);
    
    try {
      await this.processBulkEvents(batch);
    } catch (error) {
      // Handle batch processing error
      this.logger.error('Batch processing failed:', error);
      // Optionally re-queue events
    }
  }
}

Webhook Transformation

// Transform webhook data before processing
@Post('webhooks/order-status-changed')
async handleOrderStatusChange(@Body() payload: any) {
  // Transform Hasura event to standard format
  const standardEvent = this.transformHasuraEvent(payload);
  
  // Send to external systems in their expected format
  await Promise.all([
    this.notifyShippingService(this.transformForShipping(standardEvent)),
    this.updateAnalytics(this.transformForAnalytics(standardEvent)),
    this.sendCustomerNotification(this.transformForEmail(standardEvent))
  ]);
}

private transformHasuraEvent(hasuraPayload: any) {
  return {
    eventType: 'order.status.changed',
    timestamp: hasuraPayload.created_at,
    orderId: hasuraPayload.event.data.new.id,
    oldStatus: hasuraPayload.event.data.old?.status,
    newStatus: hasuraPayload.event.data.new.status,
    customerId: hasuraPayload.event.data.new.customer_id
  };
}

Best Practices

  • Idempotency: Handle duplicate webhook deliveries gracefully
  • Verify signatures: Always verify webhook authenticity
  • Fast responses: Process quickly, queue heavy work
  • Proper status codes: Use appropriate HTTP status codes for retries
  • Comprehensive logging: Log all webhook events for debugging
  • Monitor performance: Track webhook processing times and errors
  • Handle failures: Implement proper error handling and retries
  • Test thoroughly: Test webhooks with various payload scenarios
  • Document webhooks: Keep webhook schemas and behaviors documented
  • Security first: Implement proper authentication and rate limiting

Next Steps