-
Notifications
You must be signed in to change notification settings - Fork 11
Kafka/Produce API extension #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 44 commits
f4cfd49
d6db37a
239ce70
1e0dc22
62cb2bc
65f3f7f
03a3cb8
982ec2e
7043210
17ea069
42396b8
01cf9dc
2d085e6
27111ee
1ce465d
0f0f4b3
70fd9c5
5a213be
41a2c4d
4ea0ea6
23d9849
1f8ddfa
3514f03
fe38264
80be2e8
07abacc
4335570
539d2d6
c77abf3
eb78513
f5f5329
7b40866
170f1ee
30bf549
543120f
fb20251
51e6793
b20bc2c
1096865
848e932
7b3f26b
da5378c
5f3ce1d
02caa7b
9970aa6
0140d31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| In this stage, you'll add an entry for the `Produce` API to the APIVersions response. | ||
|
|
||
| ## The Produce API | ||
|
|
||
| The [Produce API](https://kafka.apache.org/protocol#The_Messages_Produce) (API key `0`) is used to produce messages to a Kafka topic. | ||
|
|
||
| We've created an interactive protocol inspector for the request & response structures for `Produce`: | ||
|
|
||
| - 🔎 [Produce Request (v12)](https://binspec.org/kafka-produce-request-v12) | ||
| - 🔎 [Produce Response (v12)](https://binspec.org/kafka-produce-response-v12) | ||
|
|
||
| In this stage, you'll only need to add an entry for the `Produce` API to the APIVersions response you implemented in earlier stages. This lets clients know that your broker supports the `Produce` API. We'll get to responding to `Produce` requests in later stages. | ||
|
|
||
| ## Tests | ||
|
|
||
| The tester will execute your program like this: | ||
|
|
||
| ```bash | ||
| ./your_program.sh /tmp/server.properties | ||
| ``` | ||
|
|
||
| It'll then connect to your server on port 9092 and send a valid `APIVersions` (v4) request. | ||
|
|
||
| The tester will validate that: | ||
|
|
||
| - The first 4 bytes of your response (the "message length") are valid. | ||
| - The correlation ID in the response header matches the correlation ID in the request header. | ||
| - The error code in the response body is `0` (NO_ERROR). | ||
| - The response body contains at least one entry for the API key `0` (`Produce`). | ||
| - The `MaxVersion` for the `Produce` API is at least 12. | ||
|
|
||
| ## Notes | ||
|
|
||
| - You don't have to implement support for handling `Produce` requests in this stage. We'll get to this in later stages. | ||
| - You'll still need to include the entry for `APIVersions` in your response to pass earlier stages. | ||
| - The `MaxVersion` for `Produce` and `APIVersions` are different. For `APIVersions`, it is 4. For `Produce`, it is 12. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| In this stage, you'll add support for handling `Produce` requests to invalid topics or partitions. | ||
|
|
||
| ## Produce API Response for Invalid Topics or Partitions | ||
|
|
||
| When a Kafka broker receives a `Produce` request, it needs to validate that both the topic and partition exist. If either the topic or partition doesn't exist, it returns an appropriate error code and response. | ||
|
|
||
| For this stage, you can hardcode the error response - assume that all `Produce` requests are for invalid topics or partitions and return the error code `3` (UNKNOWN_TOPIC_OR_PARTITION). In the next stage, you'll implement handling success responses. | ||
|
|
||
| We've created an interactive protocol inspector for the request & response structures for `Produce`: | ||
|
|
||
| - 🔎 [Produce Request (v12)](https://binspec.org/kafka-produce-request-v12) | ||
| - 🔎 [Produce Response (v12) - Invalid Topic](https://binspec.org/kafka-produce-error-response-v12-invalid-topic) | ||
|
|
||
| ## Tests | ||
|
|
||
| The tester will execute your program like this: | ||
|
|
||
| ```bash | ||
| ./your_program.sh /tmp/server.properties | ||
| ``` | ||
|
|
||
| It'll then connect to your server on port 9092 and send a `Produce` (v12) request with either an invalid topic name or a valid topic but invalid partition. | ||
|
|
||
| The tester will validate that: | ||
|
|
||
| - The first 4 bytes of your response (the "message length") are valid. | ||
| - The correlation ID in the response header matches the correlation ID in the request header. | ||
| - The `throttle_time_ms` field in the response is `0`. | ||
| - The `topics` field has 1 element, and in that element: | ||
| - The `name` field matches the topic name in the request. | ||
| - The `partitions` field has 1 element, and in that element: | ||
| - The `error_code` is `3` (UNKNOWN_TOPIC_OR_PARTITION). | ||
| - The `index` field matches the partition in the request. | ||
| - The `base_offset` field is `-1`. | ||
| - The `log_append_time_ms` field is `-1`. | ||
| - The `log_start_offset` field is `-1`. | ||
|
|
||
| ## Notes | ||
|
|
||
| - You'll need to parse the `Produce` request in this stage to get the topic name and partition to send in the response. | ||
| - You can hardcode the error response in this stage. We'll get to actually checking for valid topics and partitions in later stages. | ||
| - The official docs for the `Produce` API can be found [here](https://kafka.apache.org/protocol.html#The_Messages_Produce). Make sure to scroll down to the "(Version: 12)" section. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| In this stage, you'll add support for responding to `Produce` requests with a valid topic. | ||
|
|
||
| ## Produce API Response for Valid Topics | ||
|
|
||
| When a Kafka broker receives a `Produce` request, it needs to validate that both the topic and partition exist. If either the topic or partition doesn't exist, it returns an appropriate error code and response. | ||
|
|
||
| The broker performs validation in this order: | ||
| 1. **Topic validation**: Check if the topic exists by reading the `__cluster_metadata` topic's log file | ||
| 2. **Partition validation**: If the topic exists, check if the partition exists within that topic | ||
|
|
||
| ### Topic Validation | ||
|
|
||
| To validate that a topic exists, the broker reads the `__cluster_metadata` topic's log file, located at `/tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log`. Inside the log file, the broker finds the topic's metadata, which is a `record` (inside a `RecordBatch`) with a payload of type `TOPIC_RECORD`. If there exists a `TOPIC_RECORD` with the given topic name and the topic ID, the topic exists. | ||
|
|
||
| ### Partition Validation | ||
|
|
||
| To validate that a partition exists, the broker reads the same `__cluster_metadata` topic's log file and finds the partition's metadata, which is a `record` (inside a RecordBatch) with a payload of type `PARTITION_RECORD`. If there exists a `PARTITION_RECORD` with the given partition index, the UUID of the topic it is associated with, and the UUID of the directory it is associated with, the partition exists. | ||
|
|
||
| We've created an interactive protocol inspector for the request & response structures for `Produce`: | ||
|
|
||
| - 🔎 [Produce Request (v12)](https://binspec.org/kafka-produce-request-v12) | ||
| - 🔎 [Produce Response (v12)](https://binspec.org/kafka-produce-response-v12) | ||
|
|
||
| We've also created an interactive protocol inspector for the `__cluster_metadata` topic's log file: | ||
| - 🔎 [Cluster Metadata Log File](https://binspec.org/kafka-cluster-metadata) | ||
|
|
||
| In this stage, you'll need to implement the response for a `Produce` request with a valid topic. In later stages, you'll handle successfully producing messages to valid topics and partitions and persist messages to disk using Kafka's RecordBatch format. | ||
|
|
||
| ## Tests | ||
|
|
||
| The tester will execute your program like this: | ||
|
|
||
| ```bash | ||
| ./your_program.sh /tmp/server.properties | ||
| ``` | ||
|
|
||
| It'll then connect to your server on port 9092 and send a `Produce` (v12) request with a valid topic and partition. | ||
|
|
||
| The tester will validate that: | ||
|
|
||
| - The first 4 bytes of your response (the "message length") are valid. | ||
| - The correlation ID in the response header matches the correlation ID in the request header. | ||
| - The `throttle_time_ms` field in the response is `0`. | ||
| - The `topics` field has 1 element, and in that element: | ||
| - The `name` field matches the topic name in the request. | ||
| - The `partitions` field has 1 element, and in that element: | ||
| - The `error_code` is `0` (NO_ERROR). | ||
| - The `index` field matches the partition in the request. | ||
| - The `base_offset` field is `0` (signifying that this is the first record in the partition). | ||
| - The `log_append_time_ms` field is `-1` (signifying that the timestamp is the latest). | ||
| - The `log_start_offset` field is `0`. | ||
|
|
||
| ## Notes | ||
|
|
||
| - The official docs for the `Produce` API can be found [here](https://kafka.apache.org/protocol.html#The_Messages_Produce). Make sure to scroll down to the "(Version: 12)" section. | ||
| - The official Kafka docs don't cover the structure of records inside the `__cluster_metadata` topic, but you can find the definitions in the Kafka source code [here](https://github.com/apache/kafka/tree/5b3027dfcbcb62d169d4b4421260226e620459af/metadata/src/main/resources/common/metadata). | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| In this stage, you'll add support for successfully producing a single record. | ||
|
|
||
| ## Producing records | ||
|
|
||
| When a Kafka broker receives a `Produce` request, it needs to validate that the topic and partition exist (using the `__cluster_metadata` topic's log file), store the record in the appropriate log file using Kafka's on-disk format, and return a successful response. | ||
|
|
||
| The record must be persisted to the topic's log file at `<log-dir>/<topic-name>-<partition-index>/00000000000000000000.log` using Kafka's RecordBatch format. | ||
ryan-gang marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| You can refer to the official Kafka docs for the [RecordBatch format](https://kafka.apache.org/documentation/#recordbatch). | ||
ryan-gang marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| Kafka's on-disk log format uses the same [RecordBatch](https://binspec.org/kafka-record-batches) format that is used in `Produce` and `Fetch` requests. The `RecordBatch` you receive in the `Produce` request can be written to the log file as is. | ||
|
||
|
|
||
| You can refer to the following interactive protocol inspector for Kafka's log file format: | ||
| - 🔎 [A sample topic's log file](https://binspec.org/kafka-topic-log) | ||
|
|
||
| ## Tests | ||
|
|
||
| The tester will execute your program like this: | ||
|
|
||
| ```bash | ||
| ./your_program.sh /tmp/server.properties | ||
| ``` | ||
|
|
||
| It'll then connect to your server on port 9092 and send a `Produce` (v12) request with a single recordBatch containing a single record. | ||
|
|
||
| The tester will validate that: | ||
|
|
||
| - The first 4 bytes of your response (the "message length") are valid. | ||
| - The correlation ID in the response header matches the correlation ID in the request header. | ||
| - The `throttle_time_ms` field in the response is `0`. | ||
| - The `topics` field has 1 element, and in that element: | ||
| - The `name` field matches the topic name in the request. | ||
| - The `partitions` field has 1 element, and in that element: | ||
| - The `error_code` is `0` (NO_ERROR). | ||
| - The `index` field matches the partition in the request. | ||
| - The `base_offset` field contains `0` (signifying that this is the first record in the partition). | ||
ryan-gang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| - The `log_append_time_ms` field is `-1` (signifying that the timestamp is the latest). | ||
| - The `log_start_offset` field is `0`. | ||
| - The record is persisted to the appropriate log file on disk at `<log-dir>/<topic-name>-<partition-index>/00000000000000000000.log`. | ||
|
|
||
| ## Notes | ||
|
|
||
| - The on-disk log files must be stored in [RecordBatch](https://kafka.apache.org/documentation/#recordbatch) format. You should write the `RecordBatch` from the request directly to the file. | ||
| - The official docs for the `Produce` API can be found [here](https://kafka.apache.org/protocol.html#The_Messages_Produce). Make sure to scroll down to the "(Version: 12)" section. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| In this stage, you'll add support for producing multiple records in a single `Produce` request. | ||
|
|
||
| ## Producing multiple records | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ryan-gang at what point do we test "appending" to a log file? That seems like something that'd need extra implementation and something we should explicitly call out and test. |
||
|
|
||
| When a Kafka broker receives a `Produce` request containing a `RecordBatch` with multiple records, it needs to validate that the topic and partition exist, assign sequential offsets to each record within the batch, and store the entire batch to the log file. The `RecordBatch` containing multiple records must be stored as a single unit to the topic's log file at `<log-dir>/<topic-name>-<partition-index>/00000000000000000000.log`. | ||
|
|
||
| We've created an interactive protocol inspector for the request & response structures for `Produce`: | ||
|
|
||
| - 🔎 [Produce Request (v12)](https://binspec.org/kafka-produce-request-v12) | ||
| - 🔎 [Produce Response (v12)](https://binspec.org/kafka-produce-response-v12) | ||
|
|
||
| You can refer to the following interactive protocol inspector for Kafka's log file format: | ||
| - 🔎 [A sample topic's log file](https://binspec.org/kafka-topic-log) | ||
|
|
||
| ## Tests | ||
|
|
||
| The tester will execute your program like this: | ||
|
|
||
| ```bash | ||
| ./your_program.sh /tmp/server.properties | ||
| ``` | ||
|
|
||
| It'll then connect to your server on port 9092 and send a `Produce` (v12) request containing a RecordBatch with multiple records. | ||
|
|
||
| The tester will validate that: | ||
|
|
||
| - The first 4 bytes of your response (the "message length") are valid. | ||
| - The correlation ID in the response header matches the correlation ID in the request header. | ||
| - The `throttle_time_ms` field in the response is `0`. | ||
| - The `topics` field has 1 element, and in that element: | ||
| - The `name` field matches the topic name in the request. | ||
| - The `partitions` field has 1 element, and in that element: | ||
| - The `error_code` is `0` (NO_ERROR). | ||
| - The `index` field matches the partition in the request. | ||
| - The `base_offset` field is 0 (the base offset for the batch). | ||
| - The `log_append_time_ms` field is `-1` (signifying that the timestamp is the latest). | ||
| - The `log_start_offset` field is `0`. | ||
| - The records are persisted to the appropriate log file on disk at `<log-dir>/<topic-name>-<partition-index>/00000000000000000000.log` with sequential offsets. | ||
|
|
||
| ## Notes | ||
|
|
||
| - Records within a record batch must be assigned sequential offsets (e.g., if the base offset is 5, records get offsets 5, 6, 7). | ||
| - The official docs for the `Produce` API can be found [here](https://kafka.apache.org/protocol.html#The_Messages_Produce). Make sure to scroll down to the "(Version: 12)" section. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| In this stage, you'll add support for producing to multiple partitions of the same topic in a single request. | ||
|
|
||
| ## Producing to multiple partitions | ||
|
|
||
| When a Kafka broker receives a `Produce` request targeting multiple partitions of the same topic, it needs to validate that the topic and all partitions exist, write records to each partition's log file independently, and return a response containing results for all partitions. | ||
|
|
||
| We've created an interactive protocol inspector for the request & response structures for `Produce`: | ||
|
|
||
| - 🔎 [Produce Request (v12)](https://binspec.org/kafka-produce-request-v12) | ||
| - 🔎 [Produce Response (v12)](https://binspec.org/kafka-produce-response-v12) | ||
|
|
||
| You can refer to the following interactive protocol inspector for Kafka's log file format: | ||
| - 🔎 [A sample topic's log file](https://binspec.org/kafka-topic-log) | ||
|
|
||
| ## Tests | ||
|
|
||
| The tester will execute your program like this: | ||
|
|
||
| ```bash | ||
| ./your_program.sh /tmp/server.properties | ||
| ``` | ||
|
|
||
| It'll then connect to your server on port 9092 and send a `Produce` (v12) request targeting multiple partitions of the same topic. The request will contain multiple RecordBatches, one for each partition. Each RecordBatch will contain a single record. | ||
|
|
||
| The tester will validate that: | ||
|
|
||
| - The first 4 bytes of your response (the "message length") are valid. | ||
| - The correlation ID in the response header matches the correlation ID in the request header. | ||
| - The `throttle_time_ms` field in the response is `0`. | ||
| - The `topics` field has 1 element, and in that element: | ||
| - The `name` field matches the topic name in the request. | ||
| - The `partitions` field has `N` elements, one for each of the `N` partitions in the request: | ||
| - The `error_code` is `0` (NO_ERROR). | ||
| - The `index` field matches the partition in the request. | ||
| - The `base_offset` field contains the assigned offset for that partition. | ||
| - The `log_append_time_ms` field is `-1` (signifying that the timestamp is the latest). | ||
| - The `log_start_offset` field is `0`. | ||
| - Records are persisted to the correct partition log files on disk. | ||
|
|
||
| ## Notes | ||
|
|
||
| - The response must include entries for all requested partitions. | ||
| - On-disk log files must be stored in `RecordBatch` format. | ||
| - The official docs for the `Produce` API can be found [here](https://kafka.apache.org/protocol.html#The_Messages_Produce). Make sure to scroll down to the "(Version: 12)" section. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| In this stage, you'll add support for producing to multiple topics in a single request. | ||
|
|
||
| ## Producing to multiple topics | ||
|
|
||
| When a Kafka broker receives a `Produce` request targeting multiple topics with their respective partitions, it needs to validate that all topics and partitions exist, write records to each partition's log file independently, and return a nested response structure containing results for all topics and their partitions. | ||
|
|
||
| We've created an interactive protocol inspector for the request & response structures for `Produce`: | ||
|
|
||
| - 🔎 [Produce Request (v12)](https://binspec.org/kafka-produce-request-v12) | ||
| - 🔎 [Produce Response (v12)](https://binspec.org/kafka-produce-response-v12) | ||
|
|
||
| You can refer to the following interactive protocol inspector for Kafka's log file format: | ||
| - 🔎 [A sample topic's log file](https://binspec.org/kafka-topic-log) | ||
|
|
||
| ## Tests | ||
|
|
||
| The tester will execute your program like this: | ||
|
|
||
| ```bash | ||
| ./your_program.sh /tmp/server.properties | ||
| ``` | ||
|
|
||
| It'll then connect to your server on port 9092 and send a `Produce` (v12) request targeting multiple topics with their respective partitions. The request will contain data for multiple topics, spanning multiple partitions for each topic. | ||
|
|
||
| The tester will validate that: | ||
|
|
||
| - The first 4 bytes of your response (the "message length") are valid. | ||
| - The correlation ID in the response header matches the correlation ID in the request header. | ||
| - The `throttle_time_ms` field in the response is `0`. | ||
| - The `topics` field has `N` elements, one for each of the `N` topics in the request: | ||
ryan-gang marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| - The `name` field matches the topic name in the request. | ||
| - The `partitions` field has `M` elements, one for each of the `M` partitions in the request: | ||
| - The `error_code` is `0` (NO_ERROR). | ||
| - The `index` field matches the partition in the request. | ||
| - The `base_offset` field contains the assigned offset for that topic-partition. | ||
| - The `log_append_time_ms` field is `-1` (signifying that the timestamp is the latest). | ||
| - The `log_start_offset` field is `0`. | ||
| - Records are persisted to the correct topic-partition log files on disk. | ||
|
|
||
| ## Notes | ||
|
|
||
| - The official docs for the `Produce` API can be found [here](https://kafka.apache.org/protocol.html#The_Messages_Produce). Make sure to scroll down to the "(Version: 12)" section. | ||
Uh oh!
There was an error while loading. Please reload this page.