React to database changes and external events with Hasura Event Triggers and webhook integrations for real-time data processing and external service notifications.
Webhooks in nself enable real-time reactions to database changes through Hasura Event Triggers, scheduled events, and external webhook integrations. This allows for automatic processing, notifications, and data synchronization.
Automatically trigger webhooks when database rows are inserted, updated, or deleted:
# Event triggers fire on:
# - INSERT operations (new records)
# - UPDATE operations (modified records)
# - DELETE operations (removed records)
# - Manual events (triggered via GraphQL)
# Common use cases:
# - Send welcome email when user registers
# - Update search index when content changes
# - Sync data with external systems
# - Generate notifications
# - Audit logging
# - Cache invalidation
Configure event triggers in Hasura Console:
// functions/nestjs/src/webhooks/user.controller.ts
import { Controller, Post, Body, Headers, Logger } from '@nestjs/common';
import { EmailService } from '../services/email.service';
import { AnalyticsService } from '../services/analytics.service';
interface EventTriggerPayload {
event: {
session_variables: Record<string, any>;
op: 'INSERT' | 'UPDATE' | 'DELETE';
data: {
old: any;
new: any;
};
};
created_at: string;
id: string;
delivery_info: {
max_retries: number;
current_retry: number;
};
trigger: {
name: string;
};
table: {
schema: string;
name: string;
};
}
@Controller('webhooks')
export class UserWebhookController {
private readonly logger = new Logger(UserWebhookController.name);
constructor(
private emailService: EmailService,
private analyticsService: AnalyticsService,
) {}
@Post('user-created')
async handleUserCreated(
@Body() payload: EventTriggerPayload,
@Headers('x-hasura-admin-secret') adminSecret: string,
) {
// Verify webhook authenticity
if (adminSecret !== process.env.HASURA_GRAPHQL_ADMIN_SECRET) {
this.logger.warn('Invalid admin secret in webhook request');
return { status: 'unauthorized' };
}
try {
const { event } = payload;
const newUser = event.data.new;
this.logger.log(`Processing user created event for: ${newUser.email}`);
// Send welcome email
await this.emailService.sendWelcomeEmail({
email: newUser.email,
name: newUser.name,
userId: newUser.id,
});
// Track user registration in analytics
await this.analyticsService.trackEvent({
event: 'user_registered',
userId: newUser.id,
properties: {
email: newUser.email,
name: newUser.name,
registration_method: newUser.registration_method || 'email',
},
});
// Create user profile
await this.createDefaultUserProfile(newUser);
this.logger.log(`Successfully processed user created event for: ${newUser.email}`);
return {
status: 'success',
message: 'User created event processed successfully',
};
} catch (error) {
this.logger.error('Error processing user created event:', error);
// Return error to trigger retry
throw new Error(`Failed to process user created event: ${error.message}`);
}
}
@Post('user-updated')
async handleUserUpdated(@Body() payload: EventTriggerPayload) {
const { event } = payload;
const oldUser = event.data.old;
const newUser = event.data.new;
// Check what fields changed
const changedFields = this.getChangedFields(oldUser, newUser);
if (changedFields.includes('email')) {
// Email changed - send verification email
await this.emailService.sendEmailVerification(newUser.email, newUser.id);
}
if (changedFields.includes('name')) {
// Update external services with new name
await this.syncUserNameToExternalServices(newUser);
}
return { status: 'success' };
}
private async createDefaultUserProfile(user: any) {
// Create default user profile
// This could be a GraphQL mutation or direct database insert
}
private getChangedFields(oldData: any, newData: any): string[] {
const changed = [];
for (const key in newData) {
if (oldData[key] !== newData[key]) {
changed.push(key);
}
}
return changed;
}
}
# functions/python/webhooks/order_processing.py
from flask import Flask, request, jsonify
import os
import logging
from typing import Dict, Any
import requests
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.route('/webhooks/order-completed', methods=['POST'])
def handle_order_completed():
try:
# Verify webhook authenticity
admin_secret = request.headers.get('x-hasura-admin-secret')
if admin_secret != os.getenv('HASURA_GRAPHQL_ADMIN_SECRET'):
return jsonify({'status': 'unauthorized'}), 401
payload = request.get_json()
event = payload['event']
new_order = event['data']['new']
logger.info(f"Processing order completed: {new_order['id']}")
# Process the order
result = process_order_completion(new_order)
if result['success']:
return jsonify({
'status': 'success',
'message': 'Order processed successfully'
})
else:
# Return error to trigger retry
return jsonify({
'status': 'error',
'message': result['error']
}), 500
except Exception as e:
logger.error(f"Error processing order webhook: {str(e)}")
return jsonify({
'status': 'error',
'message': 'Failed to process order'
}), 500
def process_order_completion(order: Dict[str, Any]) -> Dict[str, Any]:
try:
# Send order confirmation email
send_order_confirmation_email(order)
# Update inventory
update_inventory(order['items'])
# Create shipment
shipment = create_shipment(order)
# Send to fulfillment service
notify_fulfillment_service(order, shipment)
# Update analytics
track_order_completion(order)
return {'success': True}
except Exception as e:
logger.error(f"Order processing failed: {str(e)}")
return {'success': False, 'error': str(e)}
def send_order_confirmation_email(order: Dict[str, Any]):
# Send email using your email service
pass
def update_inventory(items: list):
# Update product inventory
pass
def create_shipment(order: Dict[str, Any]):
# Create shipment record
return {'id': 'ship_123', 'tracking_number': 'TRK123456'}
if __name__ == '__main__':
app.run(host='0.0.0.0', port=3002)
Schedule webhooks to run at specific intervals:
# Create scheduled event in Hasura Console:
# Name: daily-report-generation
# Webhook: http://nestjs:3001/webhooks/scheduled/daily-report
# Schedule: 0 9 * * * (9 AM daily)
# Payload: {"report_type": "daily_summary"}
@Post('scheduled/daily-report')
async handleDailyReport(@Body() payload: any) {
try {
const reportType = payload.report_type;
// Generate daily report
const report = await this.reportService.generateDailyReport();
// Send report to stakeholders
await this.emailService.sendDailyReport(report);
// Store report in database
await this.reportService.saveReport(report);
return {
status: 'success',
message: 'Daily report generated and sent',
report_id: report.id
};
} catch (error) {
throw new Error(`Failed to generate daily report: ${error.message}`);
}
}
# Examples of scheduled webhooks:
# Daily cleanup (2 AM)
0 2 * * * -> /webhooks/scheduled/cleanup
- Delete expired sessions
- Archive old logs
- Clean temporary files
# Weekly reports (Monday 9 AM)
0 9 * * 1 -> /webhooks/scheduled/weekly-report
- Generate usage reports
- Send to administrators
- Update dashboards
# Monthly billing (1st of month, midnight)
0 0 1 * * -> /webhooks/scheduled/billing
- Calculate usage charges
- Generate invoices
- Process payments
# Hourly health check
0 * * * * -> /webhooks/scheduled/health-check
- Check external services
- Verify database connections
- Send alerts if needed
Handle webhooks from external services like payment providers, email services, etc:
// Handle Stripe webhook
@Post('external/stripe')
async handleStripeWebhook(
@Body() payload: any,
@Headers('stripe-signature') signature: string,
) {
// Verify webhook signature
const isValid = this.stripeService.verifyWebhook(payload, signature);
if (!isValid) {
throw new BadRequestException('Invalid webhook signature');
}
const event = payload;
switch (event.type) {
case 'payment_intent.succeeded':
await this.handlePaymentSuccess(event.data.object);
break;
case 'payment_intent.payment_failed':
await this.handlePaymentFailure(event.data.object);
break;
case 'customer.subscription.created':
await this.handleSubscriptionCreated(event.data.object);
break;
case 'customer.subscription.deleted':
await this.handleSubscriptionCancelled(event.data.object);
break;
default:
this.logger.log(`Unhandled Stripe event: ${event.type}`);
}
return { status: 'success' };
}
// Verify webhook signatures
import crypto from 'crypto';
function verifyWebhookSignature(
payload: string,
signature: string,
secret: string
): boolean {
const expectedSignature = crypto
.createHmac('sha256', secret)
.update(payload, 'utf8')
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expectedSignature)
);
}
// GitHub webhook verification
@Post('external/github')
async handleGitHubWebhook(
@Body() payload: any,
@Headers('x-hub-signature-256') signature: string,
) {
const isValid = verifyWebhookSignature(
JSON.stringify(payload),
signature.replace('sha256=', ''),
process.env.GITHUB_WEBHOOK_SECRET
);
if (!isValid) {
throw new UnauthorizedException('Invalid webhook signature');
}
// Process GitHub webhook
await this.processGitHubEvent(payload);
}
# Event trigger configuration example:
{
"name": "user_created_trigger",
"definition": {
"enable_manual": false,
"insert": {
"columns": "*"
},
"retry_conf": {
"num_retries": 3,
"interval_sec": 10,
"timeout_sec": 60
},
"webhook": "http://nestjs:3001/webhooks/user-created",
"headers": [
{
"name": "Content-Type",
"value": "application/json"
},
{
"name": "X-Hasura-Admin-Secret",
"value_from_env": "HASURA_GRAPHQL_ADMIN_SECRET"
}
]
}
}
# Configure webhook headers:
# Content-Type: application/json
# X-Hasura-Admin-Secret: {{ADMIN_SECRET}}
# X-Request-ID: {{REQUEST_ID}}
# Authorization: Bearer {{API_TOKEN}}
# Custom headers for external services:
{
"name": "X-API-Key",
"value_from_env": "EXTERNAL_SERVICE_API_KEY"
}
# Session variables in headers:
{
"name": "X-User-ID",
"value": "{{SESSION_VARIABLES['x-hasura-user-id']}}"
}
# Hasura retry configuration:
{
"retry_conf": {
"num_retries": 3, # Number of retry attempts
"interval_sec": 10, # Delay between retries (seconds)
"timeout_sec": 60 # Timeout per request (seconds)
}
}
# Retry backoff strategy:
# Attempt 1: Immediate
# Attempt 2: 10 seconds later
# Attempt 3: 20 seconds later
# Attempt 4: 40 seconds later
// Proper error responses for retries
@Post('webhooks/user-created')
async handleUserCreated(@Body() payload: any) {
try {
await this.processUserCreation(payload);
// Success - return 200
return { status: 'success' };
} catch (error) {
if (error instanceof ValidationError) {
// Don't retry validation errors - return 200
this.logger.warn('Validation error:', error.message);
return {
status: 'error',
message: 'Validation failed',
retry: false
};
}
if (error instanceof TemporaryError) {
// Retry temporary errors - return 500
this.logger.error('Temporary error:', error.message);
throw new InternalServerErrorException(error.message);
}
// Unknown error - log and retry
this.logger.error('Unknown error:', error);
throw new InternalServerErrorException('Processing failed');
}
}
# Test webhook locally
curl -X POST http://localhost:3001/webhooks/user-created \
-H "Content-Type: application/json" \
-H "X-Hasura-Admin-Secret: your-admin-secret" \
-d '{
"event": {
"session_variables": {},
"op": "INSERT",
"data": {
"old": null,
"new": {
"id": "123e4567-e89b-12d3-a456-426614174000",
"email": "test@example.com",
"name": "Test User",
"created_at": "2024-01-01T00:00:00Z"
}
}
},
"created_at": "2024-01-01T00:00:00Z",
"id": "webhook-event-id",
"trigger": {"name": "user_created_trigger"},
"table": {"schema": "public", "name": "users"}
}'
# Monitor webhook performance
@Post('webhooks/user-created')
async handleUserCreated(@Body() payload: any) {
const startTime = Date.now();
const eventId = payload.id;
try {
this.logger.log(`Processing webhook ${eventId}`);
await this.processUserCreation(payload);
const duration = Date.now() - startTime;
this.metricsService.recordWebhookSuccess('user_created', duration);
return { status: 'success' };
} catch (error) {
const duration = Date.now() - startTime;
this.metricsService.recordWebhookError('user_created', duration, error);
throw error;
}
}
// Batch process multiple events
@Injectable()
export class WebhookBatchProcessor {
private eventQueue: any[] = [];
private batchSize = 10;
private batchTimeout = 5000; // 5 seconds
async addEvent(event: any) {
this.eventQueue.push(event);
if (this.eventQueue.length >= this.batchSize) {
await this.processBatch();
}
}
@Cron('*/5 * * * * *') // Every 5 seconds
async processBatch() {
if (this.eventQueue.length === 0) return;
const batch = this.eventQueue.splice(0, this.batchSize);
try {
await this.processBulkEvents(batch);
} catch (error) {
// Handle batch processing error
this.logger.error('Batch processing failed:', error);
// Optionally re-queue events
}
}
}
// Transform webhook data before processing
@Post('webhooks/order-status-changed')
async handleOrderStatusChange(@Body() payload: any) {
// Transform Hasura event to standard format
const standardEvent = this.transformHasuraEvent(payload);
// Send to external systems in their expected format
await Promise.all([
this.notifyShippingService(this.transformForShipping(standardEvent)),
this.updateAnalytics(this.transformForAnalytics(standardEvent)),
this.sendCustomerNotification(this.transformForEmail(standardEvent))
]);
}
private transformHasuraEvent(hasuraPayload: any) {
return {
eventType: 'order.status.changed',
timestamp: hasuraPayload.created_at,
orderId: hasuraPayload.event.data.new.id,
oldStatus: hasuraPayload.event.data.old?.status,
newStatus: hasuraPayload.event.data.new.status,
customerId: hasuraPayload.event.data.new.customer_id
};
}