WebSocket Events Architecture: Real-Time Updates at Scale
Building a comprehensive WebSocket event system for real-time workflow updates - from pattern-based subscriptions to organization isolation.
When you're building a workflow orchestration platform, users want to see their processes update in real-time. They want to watch clusters spin up, see steps complete, and get immediate feedback when something goes wrong. Traditional polling feels sluggish and wastes resources.
We needed a WebSocket event system that could handle thousands of concurrent connections, route events efficiently, and maintain strict organization isolation. Here's how we built it.
The Polling Problem
Our first implementation used simple HTTP polling. Every 5 seconds, the frontend would ask: "Any updates for workflow X?" This approach had obvious problems:
- High server load from constant polling
- Delayed updates (up to 5 seconds lag)
- Wasted bandwidth on empty responses
- Poor user experience with stale data
- Memory leaks from abandoned connections
- Zero organization isolation
We needed something much more sophisticated.
Event-Driven Architecture Foundation
We started with a proper event architecture. Every significant action in the system publishes an event:
Every event includes the organization ID for security isolation and a consistent interface for routing.
WebSocket Connection Management
We built a connection hub that manages WebSocket lifecycles and subscriptions:
1 | type Connection struct { |
2 | ID string |
3 | OrganizationID string |
4 | UserID string |
5 | Conn *websocket.Conn |
6 | Send chan []byte |
7 | Subscriptions map[string]*Subscription |
8 | hub *Hub |
9 | } |
10 | |
11 | type Subscription struct { |
12 | Pattern string |
13 | OrganizationID string |
14 | Connection *Connection |
15 | } |
16 | |
17 | type Hub struct { |
18 | connections map[string]*Connection |
19 | subscriptions map[string][]*Subscription // pattern -> subscriptions |
20 | register chan *Connection |
21 | unregister chan *Connection |
22 | broadcast chan Event |
23 | mu sync.RWMutex |
24 | } |
The hub runs in its own goroutine, handling connection lifecycle and event distribution.
The Hub Event Loop
The heart of our system is a simple event loop that processes connections and events:
1 | func (h *Hub) Run() { |
2 | for { |
3 | select { |
4 | case conn := <-h.register: |
5 | h.registerConnection(conn) |
6 | |
7 | case conn := <-h.unregister: |
8 | h.unregisterConnection(conn) |
9 | |
10 | case event := <-h.broadcast: |
11 | h.broadcastEvent(event) |
12 | } |
13 | } |
14 | } |
15 | |
16 | func (h *Hub) broadcastEvent(event Event) { |
17 | eventType := event.Type() |
18 | orgID := event.OrganizationID() |
19 | |
20 | h.mu.RLock() |
21 | defer h.mu.RUnlock() |
22 | |
23 | for pattern, subscriptions := range h.subscriptions { |
24 | if !h.matchesPattern(eventType, pattern) { |
25 | continue |
26 | } |
27 | |
28 | for _, sub := range subscriptions { |
29 | // Organization isolation |
30 | if sub.OrganizationID != orgID { |
31 | continue |
32 | } |
33 | |
34 | // Send to connection |
35 | select { |
36 | case sub.Connection.Send <- h.encodeEvent(event): |
37 | default: |
38 | // Connection is slow/blocked, drop the event |
39 | log.Printf("Dropping event for slow connection %s", sub.Connection.ID) |
40 | } |
41 | } |
42 | } |
43 | } |
Pattern Matching
We implemented a simple but effective pattern matching system:
1 | func (h *Hub) matchesPattern(eventType, pattern string) bool { |
2 | eventParts := strings.Split(eventType, ".") |
3 | patternParts := strings.Split(pattern, ".") |
4 | |
5 | if len(eventParts) != len(patternParts) { |
6 | return false |
7 | } |
8 | |
9 | for i, patternPart := range patternParts { |
10 | if patternPart != "*" && patternPart != eventParts[i] { |
11 | return false |
12 | } |
13 | } |
14 | |
15 | return true |
16 | } |
17 | |
18 | // Examples: |
19 | // matchesPattern("workflow.step.started", "workflow.*.*") -> true |
20 | // matchesPattern("workflow.step.started", "workflow.step.*") -> true |
21 | // matchesPattern("user.login", "workflow.*") -> false |
Clients can subscribe to specific events or use wildcards for broader categories.
Client Subscription Protocol
Clients communicate with the hub through a simple JSON protocol:
1 | // Subscribe to events |
2 | { |
3 | "type": "subscribe", |
4 | "pattern": "workflow.${workflowId}.*" // All events for specific workflow |
5 | } |
6 | |
7 | // Unsubscribe from pattern |
8 | { |
9 | "type": "unsubscribe", |
10 | "pattern": "workflow.*.*" |
11 | } |
12 | |
13 | // Server sends events as: |
14 | { |
15 | "type": "event", |
16 | "event_type": "workflow.step.completed", |
17 | "timestamp": "2025-07-09T10:30:00Z", |
18 | "payload": { |
19 | "workflow_id": "wf-123", |
20 | "step_id": "step-456" |
21 | } |
22 | } |
Connection Lifecycle
Each WebSocket connection runs in its own goroutine with proper cleanup:
1 | func (c *Connection) handleConnection() { |
2 | defer func() { |
3 | c.hub.unregister <- c |
4 | c.Conn.Close() |
5 | }() |
6 | |
7 | // Start writer goroutine |
8 | go c.writePump() |
9 | |
10 | // Read messages from client |
11 | c.readPump() |
12 | } |
13 | |
14 | func (c *Connection) readPump() { |
15 | defer c.Conn.Close() |
16 | |
17 | // Set read deadline |
18 | c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second)) |
19 | |
20 | // Handle pong messages to reset deadline |
21 | c.Conn.SetPongHandler(func(string) error { |
22 | c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second)) |
23 | return nil |
24 | }) |
25 | |
26 | for { |
27 | var msg ClientMessage |
28 | err := c.Conn.ReadJSON(&msg) |
29 | if err != nil { |
30 | if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { |
31 | log.Printf("WebSocket error: %v", err) |
32 | } |
33 | break |
34 | } |
35 | |
36 | c.handleClientMessage(msg) |
37 | } |
38 | } |
39 | |
40 | func (c *Connection) writePump() { |
41 | ticker := time.NewTicker(54 * time.Second) |
42 | defer ticker.Stop() |
43 | |
44 | for { |
45 | select { |
46 | case message, ok := <-c.Send: |
47 | if !ok { |
48 | c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) |
49 | return |
50 | } |
51 | |
52 | c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) |
53 | if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil { |
54 | return |
55 | } |
56 | |
57 | case <-ticker.C: |
58 | // Send ping to keep connection alive |
59 | c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) |
60 | if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { |
61 | return |
62 | } |
63 | } |
64 | } |
65 | } |
Performance Optimizations
With hundreds of concurrent connections, performance matters:
Connection Pooling: We reuse goroutines instead of creating new ones for each message.
Event Buffering: Events are batched when connections are slow:
1 | type Connection struct { |
2 | Send chan []byte // Buffered channel |
3 | // ... |
4 | } |
5 | |
6 | // When creating connections |
7 | conn := &Connection{ |
8 | Send: make(chan []byte, 256), // Buffer up to 256 events |
9 | // ... |
10 | } |
Pattern Indexing: We index subscriptions by pattern prefix for faster lookup:
1 | type Hub struct { |
2 | // Index subscriptions by first pattern segment for fast lookup |
3 | patternIndex map[string][]*Subscription // "workflow" -> [all workflow subscriptions] |
4 | |
5 | // Additional optimizations |
6 | eventBuffer []Event |
7 | lastEventTime map[string]time.Time |
8 | } |
Organization Isolation
Security is critical in a multi-tenant system. Every connection is scoped to an organization:
- Authentication: JWT tokens include organization ID
- Connection scoping: Each connection is bound to one organization
- Event filtering: Events are only sent to connections in the same organization
- Pattern validation: Clients can only subscribe to patterns within their org
This ensures that organizations never see each other's events, even if there's a bug in the pattern matching logic.
Client Integration
On the frontend, we built a simple React hook for real-time updates:
1 | class EventClient { |
2 | constructor(token) { |
3 | this.token = token; |
4 | this.subscriptions = new Map(); |
5 | this.connect(); |
6 | } |
7 | |
8 | connect() { |
9 | this.ws = new WebSocket(`wss://api.example.com/ws?token=${this.token}`); |
10 | |
11 | this.ws.onmessage = (event) => { |
12 | const data = JSON.parse(event.data); |
13 | if (data.type === 'event') { |
14 | this.handleEvent(data); |
15 | } |
16 | }; |
17 | } |
18 | |
19 | subscribe(pattern, callback) { |
20 | this.subscriptions.set(pattern, callback); |
21 | this.ws.send(JSON.stringify({ |
22 | type: 'subscribe', |
23 | pattern: pattern |
24 | })); |
25 | } |
26 | |
27 | handleEvent(eventData) { |
28 | for (const [pattern, callback] of this.subscriptions) { |
29 | if (this.matchesPattern(eventData.event_type, pattern)) { |
30 | callback(eventData); |
31 | } |
32 | } |
33 | } |
34 | } |
Monitoring and Metrics
We instrument the system heavily to understand performance and troubleshoot issues:
- Connection count: Active WebSocket connections by organization
- Event throughput: Events per second, by type and organization
- Subscription patterns: Most common subscription patterns
- Message lag: Time from event generation to client delivery
- Dropped events: Events dropped due to slow connections
Lessons Learned
Building this WebSocket event system taught us several valuable lessons:
- Buffered channels prevent blocking: Always buffer event channels to handle slow consumers
- Organization isolation is non-negotiable: Build security into the event routing logic
- Pattern matching enables flexibility: Clients can subscribe to exactly what they need
- Connection lifecycle management is critical: Proper cleanup prevents memory leaks
- Monitoring reveals surprises: You'll discover usage patterns you never expected
The Result
Our WebSocket event system now handles thousands of concurrent connections with sub-100ms latency. Users see their workflows update in real-time, and the system scales horizontally as we add more hub instances.
Most importantly, the pattern-based subscription model means we can add new event types without changing client code. When we added cluster health monitoring, existing dashboards automatically started showing the new data.
Real-time updates transformed our user experience from "refresh to see changes" to "watch your infrastructure come alive."
Organization ID is crucial for multi-tenant isolation - every event must belong to exactly one organization
Event structs implement the Event interface. Each event type captures specific business logic while maintaining consistent structure.
The Payload method returns a generic map for flexibility. This allows different event types to include relevant data without changing the interface.