WebSocket Events Architecture: Real-Time Updates at Scale

Building a comprehensive WebSocket event system for real-time workflow updates - from pattern-based subscriptions to organization isolation.

When you're building a workflow orchestration platform, users want to see their processes update in real-time. They want to watch clusters spin up, see steps complete, and get immediate feedback when something goes wrong. Traditional polling feels sluggish and wastes resources.

We needed a WebSocket event system that could handle thousands of concurrent connections, route events efficiently, and maintain strict organization isolation. Here's how we built it.

The Polling Problem

Our first implementation used simple HTTP polling. Every 5 seconds, the frontend would ask: "Any updates for workflow X?" This approach had obvious problems:

  • High server load from constant polling
  • Delayed updates (up to 5 seconds lag)
  • Wasted bandwidth on empty responses
  • Poor user experience with stale data
  • Memory leaks from abandoned connections
  • Zero organization isolation

We needed something much more sophisticated.

Event-Driven Architecture Foundation

We started with a proper event architecture. Every significant action in the system publishes an event:

[3 highlights]
1 type Event interface {
2 Type() string
3 OrganizationID() string
Note

Organization ID is crucial for multi-tenant isolation - every event must belong to exactly one organization

4 Timestamp() time.Time
5 Payload() map[string]interface{}
6 }
7
8 type WorkflowStepStarted struct {
9 WorkflowID string
10 StepID string
11 StepType string
12 StartedAt time.Time
13 OrgID string
14 }
Note

Event structs implement the Event interface. Each event type captures specific business logic while maintaining consistent structure.

15
16 func (w WorkflowStepStarted) Type() string { return "workflow.step.started" }
17 func (w WorkflowStepStarted) OrganizationID() string { return w.OrgID }
18 func (w WorkflowStepStarted) Timestamp() time.Time { return w.StartedAt }
19 func (w WorkflowStepStarted) Payload() map[string]interface{} {
20 return map[string]interface{}{
21 "workflow_id": w.WorkflowID,
22 "step_id": w.StepID,
23 "step_type": w.StepType,
24 }
25 }
Note

The Payload method returns a generic map for flexibility. This allows different event types to include relevant data without changing the interface.

Every event includes the organization ID for security isolation and a consistent interface for routing.

WebSocket Connection Management

We built a connection hub that manages WebSocket lifecycles and subscriptions:

hub/types.go
1 type Connection struct {
2 ID string
3 OrganizationID string
4 UserID string
5 Conn *websocket.Conn
6 Send chan []byte
7 Subscriptions map[string]*Subscription
8 hub *Hub
9 }
10
11 type Subscription struct {
12 Pattern string
13 OrganizationID string
14 Connection *Connection
15 }
16
17 type Hub struct {
18 connections map[string]*Connection
19 subscriptions map[string][]*Subscription // pattern -> subscriptions
20 register chan *Connection
21 unregister chan *Connection
22 broadcast chan Event
23 mu sync.RWMutex
24 }

The hub runs in its own goroutine, handling connection lifecycle and event distribution.

The Hub Event Loop

The heart of our system is a simple event loop that processes connections and events:

hub/run.go
1 func (h *Hub) Run() {
2 for {
3 select {
4 case conn := <-h.register:
5 h.registerConnection(conn)
6
7 case conn := <-h.unregister:
8 h.unregisterConnection(conn)
9
10 case event := <-h.broadcast:
11 h.broadcastEvent(event)
12 }
13 }
14 }
15
16 func (h *Hub) broadcastEvent(event Event) {
17 eventType := event.Type()
18 orgID := event.OrganizationID()
19
20 h.mu.RLock()
21 defer h.mu.RUnlock()
22
23 for pattern, subscriptions := range h.subscriptions {
24 if !h.matchesPattern(eventType, pattern) {
25 continue
26 }
27
28 for _, sub := range subscriptions {
29 // Organization isolation
30 if sub.OrganizationID != orgID {
31 continue
32 }
33
34 // Send to connection
35 select {
36 case sub.Connection.Send <- h.encodeEvent(event):
37 default:
38 // Connection is slow/blocked, drop the event
39 log.Printf("Dropping event for slow connection %s", sub.Connection.ID)
40 }
41 }
42 }
43 }

Pattern Matching

We implemented a simple but effective pattern matching system:

hub/patterns.go
1 func (h *Hub) matchesPattern(eventType, pattern string) bool {
2 eventParts := strings.Split(eventType, ".")
3 patternParts := strings.Split(pattern, ".")
4
5 if len(eventParts) != len(patternParts) {
6 return false
7 }
8
9 for i, patternPart := range patternParts {
10 if patternPart != "*" && patternPart != eventParts[i] {
11 return false
12 }
13 }
14
15 return true
16 }
17
18 // Examples:
19 // matchesPattern("workflow.step.started", "workflow.*.*") -> true
20 // matchesPattern("workflow.step.started", "workflow.step.*") -> true
21 // matchesPattern("user.login", "workflow.*") -> false

Clients can subscribe to specific events or use wildcards for broader categories.

Client Subscription Protocol

Clients communicate with the hub through a simple JSON protocol:

client/subscriptions.go
1 // Subscribe to events
2 {
3 "type": "subscribe",
4 "pattern": "workflow.${workflowId}.*" // All events for specific workflow
5 }
6
7 // Unsubscribe from pattern
8 {
9 "type": "unsubscribe",
10 "pattern": "workflow.*.*"
11 }
12
13 // Server sends events as:
14 {
15 "type": "event",
16 "event_type": "workflow.step.completed",
17 "timestamp": "2025-07-09T10:30:00Z",
18 "payload": {
19 "workflow_id": "wf-123",
20 "step_id": "step-456"
21 }
22 }

Connection Lifecycle

Each WebSocket connection runs in its own goroutine with proper cleanup:

client/connection.go
1 func (c *Connection) handleConnection() {
2 defer func() {
3 c.hub.unregister <- c
4 c.Conn.Close()
5 }()
6
7 // Start writer goroutine
8 go c.writePump()
9
10 // Read messages from client
11 c.readPump()
12 }
13
14 func (c *Connection) readPump() {
15 defer c.Conn.Close()
16
17 // Set read deadline
18 c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
19
20 // Handle pong messages to reset deadline
21 c.Conn.SetPongHandler(func(string) error {
22 c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
23 return nil
24 })
25
26 for {
27 var msg ClientMessage
28 err := c.Conn.ReadJSON(&msg)
29 if err != nil {
30 if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
31 log.Printf("WebSocket error: %v", err)
32 }
33 break
34 }
35
36 c.handleClientMessage(msg)
37 }
38 }
39
40 func (c *Connection) writePump() {
41 ticker := time.NewTicker(54 * time.Second)
42 defer ticker.Stop()
43
44 for {
45 select {
46 case message, ok := <-c.Send:
47 if !ok {
48 c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
49 return
50 }
51
52 c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
53 if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
54 return
55 }
56
57 case <-ticker.C:
58 // Send ping to keep connection alive
59 c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
60 if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
61 return
62 }
63 }
64 }
65 }

Performance Optimizations

With hundreds of concurrent connections, performance matters:

Connection Pooling: We reuse goroutines instead of creating new ones for each message.

Event Buffering: Events are batched when connections are slow:

client/buffer.go
1 type Connection struct {
2 Send chan []byte // Buffered channel
3 // ...
4 }
5
6 // When creating connections
7 conn := &Connection{
8 Send: make(chan []byte, 256), // Buffer up to 256 events
9 // ...
10 }

Pattern Indexing: We index subscriptions by pattern prefix for faster lookup:

hub/index.go
1 type Hub struct {
2 // Index subscriptions by first pattern segment for fast lookup
3 patternIndex map[string][]*Subscription // "workflow" -> [all workflow subscriptions]
4
5 // Additional optimizations
6 eventBuffer []Event
7 lastEventTime map[string]time.Time
8 }

Organization Isolation

Security is critical in a multi-tenant system. Every connection is scoped to an organization:

  1. Authentication: JWT tokens include organization ID
  2. Connection scoping: Each connection is bound to one organization
  3. Event filtering: Events are only sent to connections in the same organization
  4. Pattern validation: Clients can only subscribe to patterns within their org

This ensures that organizations never see each other's events, even if there's a bug in the pattern matching logic.

Client Integration

On the frontend, we built a simple React hook for real-time updates:

EventClient.ts
1 class EventClient {
2 constructor(token) {
3 this.token = token;
4 this.subscriptions = new Map();
5 this.connect();
6 }
7
8 connect() {
9 this.ws = new WebSocket(`wss://api.example.com/ws?token=${this.token}`);
10
11 this.ws.onmessage = (event) => {
12 const data = JSON.parse(event.data);
13 if (data.type === 'event') {
14 this.handleEvent(data);
15 }
16 };
17 }
18
19 subscribe(pattern, callback) {
20 this.subscriptions.set(pattern, callback);
21 this.ws.send(JSON.stringify({
22 type: 'subscribe',
23 pattern: pattern
24 }));
25 }
26
27 handleEvent(eventData) {
28 for (const [pattern, callback] of this.subscriptions) {
29 if (this.matchesPattern(eventData.event_type, pattern)) {
30 callback(eventData);
31 }
32 }
33 }
34 }

Monitoring and Metrics

We instrument the system heavily to understand performance and troubleshoot issues:

  • Connection count: Active WebSocket connections by organization
  • Event throughput: Events per second, by type and organization
  • Subscription patterns: Most common subscription patterns
  • Message lag: Time from event generation to client delivery
  • Dropped events: Events dropped due to slow connections

Lessons Learned

Building this WebSocket event system taught us several valuable lessons:

  1. Buffered channels prevent blocking: Always buffer event channels to handle slow consumers
  2. Organization isolation is non-negotiable: Build security into the event routing logic
  3. Pattern matching enables flexibility: Clients can subscribe to exactly what they need
  4. Connection lifecycle management is critical: Proper cleanup prevents memory leaks
  5. Monitoring reveals surprises: You'll discover usage patterns you never expected

The Result

Our WebSocket event system now handles thousands of concurrent connections with sub-100ms latency. Users see their workflows update in real-time, and the system scales horizontally as we add more hub instances.

Most importantly, the pattern-based subscription model means we can add new event types without changing client code. When we added cluster health monitoring, existing dashboards automatically started showing the new data.

Real-time updates transformed our user experience from "refresh to see changes" to "watch your infrastructure come alive."