loke.dev

The WAL as a Message Bus: A Deep Dive into Postgres Logical Decoding

Stop polling your database and start streaming it: a technical look at turning the Write-Ahead Log into a high-performance event stream.

· 7 min read

Every time you write a cron job that polls a database table for "new" rows, a little piece of your system's performance dies. We’ve all done it—querying updated_at columns every five seconds, hoping to catch a state change before the user notices a delay—but this approach scales like a lead balloon and puts unnecessary pressure on your primary data store.

If you are using Postgres, you are sitting on top of a world-class message bus that you probably didn't even realize was there. It’s called the Write-Ahead Log (WAL), and through a feature called Logical Decoding, you can transform your database into a real-time event streamer without adding a single trigger to your tables.

The Source of Truth: What is the WAL?

Before we stream data, we have to understand where it comes from. In Postgres, every single change (INSERT, UPDATE, DELETE) is written to the Write-Ahead Log before it touches the actual data files. This is the database's "undo/redo" insurance policy. If the power cuts out, Postgres replays the WAL to ensure the data is consistent.

Historically, this log was just a stream of opaque binary data, used primarily for physical replication (streaming the exact binary state to a standby follower). But with the introduction of Logical Decoding, Postgres gained the ability to decode that binary stream into discrete, row-level changes.

Essentially, the WAL becomes a transactionally consistent audit log. If a transaction commits, the changes appear in the stream. If it rolls back, they don't. You get exactly-once delivery semantics for free, backed by the ACID guarantees of the database itself.

Setting the Stage: Configuring Postgres

You can't just start streaming; you have to tell Postgres to keep enough information in the WAL to make it decodable. This requires a restart if you haven't changed these settings before.

In your postgresql.conf, you need these three lines:

# The log level must be 'logical' to support decoding
wal_level = logical

# Increase these if you plan on having multiple consumers
max_replication_slots = 10
max_wal_senders = 10

Once you've restarted, you need a Publication. This defines *what* data you want to share. You can share the whole database or just specific tables.

-- Create a publication for all changes in the 'users' and 'orders' tables
CREATE PUBLICATION my_app_changes FOR TABLE users, orders;

The Magic of the Replication Slot

A "slot" is how Postgres keeps track of where a specific consumer is in the stream. This is the most critical concept to master.

Unlike a standard message queue like RabbitMQ, where messages are deleted once consumed, Postgres holds onto the WAL files until the replication slot acknowledges that it has processed them. This is both a superpower and a loaded gun.

- The Superpower: If your consumer goes down for an hour, Postgres will buffer the changes. When the consumer reconnects, it picks up exactly where it left off.
- The Loaded Gun: If your consumer goes down and *never* comes back, Postgres will keep saving WAL files until your disk is 100% full. Monitoring your replication slot lag is not optional.

You create a slot like this:

SELECT * FROM pg_create_logical_replication_slot('my_subscriber_slot', 'pgoutput');

The 'pgoutput' plugin is the standard binary protocol built into Postgres 10+. It’s fast, efficient, and handles the heavy lifting of serialization.

Consuming the Stream with Python

While there are enterprise tools like Debezium (which is fantastic if you're already in the Kafka ecosystem), sometimes you just want a lightweight worker to update a Redis cache or trigger a localized event.

Using psycopg (the version 3 successor to psycopg2), we can write a simple consumer that listens to these changes in real-time.

import psycopg
from psycopg.replication import ReplicationCursor

# Connection string to your Postgres instance
DSN = "dbname=mydb user=postgres password=secret host=localhost"

def handle_change(msg):
    # This is where you'd parse the 'pgoutput' binary format
    # For simplicity, we are just printing the raw payload
    print(f"Received update: {msg.payload}")

def stream_changes():
    try:
        # We must use a dedicated replication connection
        with psycopg.connect(DSN, replication="database") as conn:
            with conn.cursor(binary=True) as cur:
                
                # Start the replication stream
                # 'proto_version' 1 is the standard for pgoutput
                # 'publication_names' must match the one we created in SQL
                options = {
                    "proto_version": "1",
                    "publication_names": "my_app_changes"
                }
                
                cur.start_replication(
                    slot_name="my_subscriber_slot",
                    options=options
                )
                
                print("Streaming started. Waiting for changes...")
                
                while True:
                    # Fetch a message from the stream
                    msg = cur.read_message()
                    
                    if msg:
                        handle_change(msg)
                        
                        # Acknowledge the message so Postgres can clean up WAL
                        # In production, you might want to do this every N messages
                        # or every N seconds to reduce overhead.
                        cur.acknowledge_lsn(msg.wal_start)
                        
    except KeyboardInterrupt:
        print("Streaming stopped.")

if __name__ == "__main__":
    stream_changes()

Why "pgoutput" is Better Than Triggers

I’ve seen many teams try to implement this using LISTEN/NOTIFY and triggers. While NOTIFY is great for simple pings, it has several fatal flaws for data synchronization:
1. Payload Size: Notifications are limited to 8000 bytes. Try fitting a large JSON blob in there; you'll hit a wall fast.
2. Reliability: If your worker isn't listening when the NOTIFY is sent, that message is gone forever. There is no "buffer" or "retry."
3. Overhead: Triggers add latency to your INSERT and UPDATE queries because they run inside the same transaction.

Logical decoding is asynchronous. The data is written to the WAL anyway, so extracting it for replication adds almost zero overhead to the primary transaction. The heavy lifting happens in a separate process that reads the log.

Navigating the Complexity: The Gotchas

Nothing this powerful comes without trade-offs. If you’re going to move to a WAL-based architecture, you need to prepare for three specific headaches.

1. The Schema Change Problem

When you change your table structure (e.g., ALTER TABLE users ADD COLUMN phone_number), the logical stream reflects those changes. However, depending on the plugin you use, your consumer might break if it’s expecting the old schema.

If you’re using pgoutput, the stream includes a "Relation" message before the data changes that describes the table's current structure. Your consumer code needs to be smart enough to cache these relation mappings and update them on the fly.

2. The REPLICA IDENTITY

By default, an UPDATE or DELETE message in the stream only contains the values of the Primary Key. If you change a user’s email from a@b.com to c@d.com, the WAL will tell you: *"User ID 5 was updated."* It won't necessarily tell you what the old email was.

If you need the "before" state of the row, you have to change the table's replica identity:

ALTER TABLE users REPLICA IDENTITY FULL;

Warning: REPLICA IDENTITY FULL makes your WAL logs significantly larger because every update now records the entire old row. Only do this if you absolutely need the previous state for your business logic.

3. Handling LSNs (Log Sequence Numbers)

An LSN is a pointer to a location in the WAL. In the Python example above, we acknowledged the LSN immediately. In a production system, you should handle this more carefully.

If your consumer processes a message, crashes *before* it can acknowledge the LSN, and then restarts, Postgres will send that same message again. Your downstream logic (e.g., updating an ElasticSearch index) must be idempotent. You will receive duplicates. It’s not a matter of "if," but "when."

When to Use This (And When Not To)

Using the WAL as a message bus is a tier-one architectural choice. It's perfect for:
- CQRS Patterns: Keeping your read-side (Redis, Mongo, ElasticSearch) in sync with your write-side (Postgres).
- Audit Logs: Creating an immutable record of every change for compliance.
- Microservice Communication: Notifying Service B that Service A has changed something without tightly coupling them.

However, don't reach for this if:
- You are on a managed DB with limited disk: If you can't monitor disk space effectively, a stuck replication slot will take your site down.
- You only need low-frequency updates: If "eventually consistent within 60 seconds" is fine, a simple cron query is much easier to maintain.
- You have massive bulk writes: If you regularly do UPDATE million_rows SET status = 'active', your logical stream is going to explode with a million individual change messages.

Final Thoughts

Turning Postgres into a message bus shifts the paradigm of how you think about your data. Instead of a static collection of rows, your database becomes a living, breathing stream of events.

It takes effort to get the infrastructure right—especially around monitoring and idempotency—but the result is a system that is faster, more reactive, and significantly more robust than any polling-based solution could ever hope to be. Stop asking your database if there's anything new. Let it tell you.