Protocol
The broker uses a line-based TCP protocol with a versioned envelope.
Request Format
V1|<correlation_id>|<command>|<args>
Example:
V1|42|PRODUCE|orders user1:created
With explicit ack mode:
V1|43|PRODUCE|orders user1:created acks=all
The parser always expects four |-separated fields. Commands with no args must still include a trailing |, for example:
V1|99|PING|
Response Format
Success:
V1|<correlation_id>|OK
V1|<correlation_id>|OK|<payload>
Error:
V1|<correlation_id>|ERR|<code>|<message>
If a request cannot be parsed, broker responds with correlation id 0:
V1|0|ERR|BAD_REQUEST|...
Supported Commands
| Command | Arguments | Purpose |
|---|---|---|
PRODUCE | `<topic> <key>:<value> [acks=0 | 1 |
CONSUME | <topic> <partition> <offset> | read messages from offset |
JOIN | `<group> <topic> <consumer_id> [assignor=round_robin | range]` |
SYNC | <group> <topic> <consumer_id> <generation> | validate generation and fetch latest assignment |
HEARTBEAT | <group> <topic> <consumer_id> <generation> | keep membership alive for session timeout |
LEAVE | <group> <topic> <consumer_id> <generation> | explicitly leave group and trigger rebalance |
COMMIT | <group> <topic> <consumer_id> <generation> <partition> <offset> | persist processed progress with generation validation |
OFFSET | <group> <topic> <partition> | read committed progress |
REPLICA_FETCH | <topic> <partition> <replica_id> <offset> | report follower replication progress |
SET_PARTITION_ROLE | `<topic> <partition> <leader | follower>` |
Error Codes
| Code | Meaning |
|---|---|
BAD_REQUEST | malformed envelope or invalid command args |
UNKNOWN_COMMAND | unsupported command |
REPLICATION_TIMEOUT | acks=all could not be satisfied before timeout |
NOT_LEADER | produce attempted on a partition marked follower on this broker |
GENERATION_MISMATCH | stale member generation, unknown member, or unassigned commit partition |
Validation Rules
The parser enforces:
- exactly four
|-separated sections - protocol version must be
V1 - non-empty correlation ID
- non-empty command
Handlers then enforce command-specific argument rules.
Additional handler-level validation:
PRODUCErequireskey:valuemessage payloadPRODUCEack mode must be exactlyacks=0,acks=1, oracks=allCONSUME,SYNC,HEARTBEAT,LEAVE,COMMIT,OFFSET,REPLICA_FETCHnumeric fields must be valid integersREPLICA_FETCHreplica_idmust be in range1..(replication_factor-1)(0is the leader)SET_PARTITION_ROLErole must be exactlyleaderorfollowerJOINoptional assignor must beassignor=round_robinorassignor=rangeCOMMITmust match active member generation and owned partition assignment
Replication Semantics (Current)
acks=0: accepted by protocol, currently behaves like immediate leader success response.acks=1: leader append acknowledgement.acks=all: waits until partition high watermark reaches produced offset and min ISR is satisfied, otherwise returnsREPLICATION_TIMEOUT.CONSUMEreturns only committed records (up to high watermark).PRODUCEis rejected withNOT_LEADERwhen the target partition is in local follower role.
Command Payloads (Current)
Response payloads are plain key-value text and may evolve additively.
PRODUCE:partition=<n> offset=<n> hw=<n>CONSUME:messages=<offset:value,...> hw=<n>(emptymessages=when nothing committed at requested offset)JOIN:generation=<n> assigned=[<partition ids>]SYNC:generation=<n> assigned=[<partition ids>]HEARTBEAT:heartbeat=okLEAVE:left=trueCOMMIT:committed=trueOFFSET:offset=<n>REPLICA_FETCH:replica=<id> acked_offset=<n> hw=<n> isr=[...] under_replicated=<bool>SET_PARTITION_ROLE:topic=<name> partition=<n> role=<leader|follower> hw=<n>
Compatibility Policy (Current)
V1is the active protocol version.- New additive commands can be introduced without breaking existing
V1commands. - Breaking envelope or command semantics requires a new major version token.
Consumer Group Durability (Current)
- Group metadata is persisted to
groups.jsonunder broker data directory. - Persisted state includes group topic, generation, assignor, members, and assignments.
- Offsets remain persisted separately in
offsets.json.