@@ -80,6 +80,26 @@ that expose basic message attributes: topic, partition, offset, key, and value:
80
80
for msg in consumer:
81
81
assert isinstance (msg.value, dict )
82
82
83
+ .. code-block :: python
84
+
85
+ # Access record headers. The returned value is a list of tuples
86
+ # with str, bytes for key and value
87
+ for msg in consumer:
88
+ print (msg.headers)
89
+
90
+ .. code-block :: python
91
+
92
+ # Read only committed messages from transactional topic
93
+ consumer = KafkaConsumer(isolation_level = ' read_committed' )
94
+ consumer.subscribe([' txn_topic' ])
95
+ for msg in consumer:
96
+ print (msg)
97
+
98
+ .. code-block :: python
99
+
100
+ # Get consumer metrics
101
+ metrics = consumer.metrics()
102
+
83
103
84
104
KafkaProducer
85
105
*************
@@ -133,6 +153,32 @@ client. See `KafkaProducer <apidoc/KafkaProducer.html>`_ for more details.
133
153
for i in range (1000 ):
134
154
producer.send(' foobar' , b ' msg %d ' % i)
135
155
156
+ .. code-block :: python
157
+
158
+ # Use transactions
159
+ producer = KafkaProducer(transactional_id = ' fizzbuzz' )
160
+ producer.init_transactions()
161
+ producer.begin_transaction()
162
+ future = producer.send(' txn_topic' , value = b ' yes' )
163
+ future.get() # wait for successful produce
164
+ producer.commit_transaction() # commit the transaction
165
+
166
+ producer.begin_transaction()
167
+ future = producer.send(' txn_topic' , value = b ' no' )
168
+ future.get() # wait for successful produce
169
+ producer.abort_transaction() # abort the transaction
170
+
171
+ .. code-block :: python
172
+
173
+ # Include record headers. The format is list of tuples with string key
174
+ # and bytes value.
175
+ producer.send(' foobar' , value = b ' c29tZSB2YWx1ZQ==' , headers = [(' content-encoding' , b ' base64' )])
176
+
177
+ .. code-block :: python
178
+
179
+ # Get producer performance metrics
180
+ metrics = producer.metrics()
181
+
136
182
137
183
Thread safety
138
184
*************
0 commit comments