Skip to main content

Protocol

The broker uses a line-based TCP protocol with a versioned envelope.

Request Format

V1|<correlation_id>|<command>|<args>

Example:

V1|42|PRODUCE|orders user1:created

With explicit ack mode:

V1|43|PRODUCE|orders user1:created acks=all

The parser always expects four |-separated fields. Commands with no args must still include a trailing |, for example:

V1|99|PING|

Response Format

Success:

V1|<correlation_id>|OK
V1|<correlation_id>|OK|<payload>

Error:

V1|<correlation_id>|ERR|<code>|<message>

If a request cannot be parsed, broker responds with correlation id 0:

V1|0|ERR|BAD_REQUEST|...

Supported Commands

CommandArgumentsPurpose
PRODUCE`<topic> <key>:<value> [acks=01
CONSUME&lt;topic&gt; &lt;partition&gt; &lt;offset&gt;read messages from offset
JOIN`<group> <topic> <consumer_id> [assignor=round_robinrange]`
SYNC&lt;group&gt; &lt;topic&gt; &lt;consumer_id&gt; &lt;generation&gt;validate generation and fetch latest assignment
HEARTBEAT&lt;group&gt; &lt;topic&gt; &lt;consumer_id&gt; &lt;generation&gt;keep membership alive for session timeout
LEAVE&lt;group&gt; &lt;topic&gt; &lt;consumer_id&gt; &lt;generation&gt;explicitly leave group and trigger rebalance
COMMIT&lt;group&gt; &lt;topic&gt; &lt;consumer_id&gt; &lt;generation&gt; &lt;partition&gt; &lt;offset&gt;persist processed progress with generation validation
OFFSET&lt;group&gt; &lt;topic&gt; &lt;partition&gt;read committed progress
REPLICA_FETCH&lt;topic&gt; &lt;partition&gt; &lt;replica_id&gt; &lt;offset&gt;report follower replication progress
SET_PARTITION_ROLE`<topic> <partition> <leaderfollower>`

Error Codes

CodeMeaning
BAD_REQUESTmalformed envelope or invalid command args
UNKNOWN_COMMANDunsupported command
REPLICATION_TIMEOUTacks=all could not be satisfied before timeout
NOT_LEADERproduce attempted on a partition marked follower on this broker
GENERATION_MISMATCHstale member generation, unknown member, or unassigned commit partition

Validation Rules

The parser enforces:

  • exactly four |-separated sections
  • protocol version must be V1
  • non-empty correlation ID
  • non-empty command

Handlers then enforce command-specific argument rules.

Additional handler-level validation:

  • PRODUCE requires key:value message payload
  • PRODUCE ack mode must be exactly acks=0, acks=1, or acks=all
  • CONSUME, SYNC, HEARTBEAT, LEAVE, COMMIT, OFFSET, REPLICA_FETCH numeric fields must be valid integers
  • REPLICA_FETCH replica_id must be in range 1..(replication_factor-1) (0 is the leader)
  • SET_PARTITION_ROLE role must be exactly leader or follower
  • JOIN optional assignor must be assignor=round_robin or assignor=range
  • COMMIT must match active member generation and owned partition assignment

Replication Semantics (Current)

  • acks=0: accepted by protocol, currently behaves like immediate leader success response.
  • acks=1: leader append acknowledgement.
  • acks=all: waits until partition high watermark reaches produced offset and min ISR is satisfied, otherwise returns REPLICATION_TIMEOUT.
  • CONSUME returns only committed records (up to high watermark).
  • PRODUCE is rejected with NOT_LEADER when the target partition is in local follower role.

Command Payloads (Current)

Response payloads are plain key-value text and may evolve additively.

  • PRODUCE: partition=<n> offset=<n> hw=<n>
  • CONSUME: messages=<offset:value,...> hw=<n> (empty messages= when nothing committed at requested offset)
  • JOIN: generation=<n> assigned=[<partition ids>]
  • SYNC: generation=<n> assigned=[<partition ids>]
  • HEARTBEAT: heartbeat=ok
  • LEAVE: left=true
  • COMMIT: committed=true
  • OFFSET: offset=<n>
  • REPLICA_FETCH: replica=<id> acked_offset=<n> hw=<n> isr=[...] under_replicated=<bool>
  • SET_PARTITION_ROLE: topic=<name> partition=<n> role=<leader|follower> hw=<n>

Compatibility Policy (Current)

  • V1 is the active protocol version.
  • New additive commands can be introduced without breaking existing V1 commands.
  • Breaking envelope or command semantics requires a new major version token.

Consumer Group Durability (Current)

  • Group metadata is persisted to groups.json under broker data directory.
  • Persisted state includes group topic, generation, assignor, members, and assignments.
  • Offsets remain persisted separately in offsets.json.