Skip to content

Commit 37576e8

Browse files
authored
Move benchmark scripts to kafka.benchmarks module (#2584)
1 parent 6b9076b commit 37576e8

11 files changed

+255
-262
lines changed

Diff for: benchmarks/load_example.py

-66
This file was deleted.
File renamed without changes.

Diff for: kafka/benchmarks/__init__.py

Whitespace-only changes.

Diff for: benchmarks/consumer_performance.py renamed to kafka/benchmarks/consumer_performance.py

+20-59
Original file line numberDiff line numberDiff line change
@@ -4,43 +4,16 @@
44
from __future__ import absolute_import, print_function
55

66
import argparse
7-
import logging
87
import pprint
98
import sys
109
import threading
10+
import time
1111
import traceback
1212

13-
from kafka.vendor.six.moves import range
14-
15-
from kafka import KafkaConsumer, KafkaProducer
16-
from test.fixtures import KafkaFixture, ZookeeperFixture
17-
18-
logging.basicConfig(level=logging.ERROR)
19-
20-
21-
def start_brokers(n):
22-
print('Starting {0} {1}-node cluster...'.format(KafkaFixture.kafka_version, n))
23-
print('-> 1 Zookeeper')
24-
zk = ZookeeperFixture.instance()
25-
print('---> {0}:{1}'.format(zk.host, zk.port))
26-
print()
27-
28-
partitions = min(n, 3)
29-
replicas = min(n, 3)
30-
print('-> {0} Brokers [{1} partitions / {2} replicas]'.format(n, partitions, replicas))
31-
brokers = [
32-
KafkaFixture.instance(i, zk, zk_chroot='',
33-
partitions=partitions, replicas=replicas)
34-
for i in range(n)
35-
]
36-
for broker in brokers:
37-
print('---> {0}:{1}'.format(broker.host, broker.port))
38-
print()
39-
return brokers
13+
from kafka import KafkaConsumer
4014

4115

4216
class ConsumerPerformance(object):
43-
4417
@staticmethod
4518
def run(args):
4619
try:
@@ -53,28 +26,17 @@ def run(args):
5326
pass
5427
if v == 'None':
5528
v = None
29+
elif v == 'False':
30+
v = False
31+
elif v == 'True':
32+
v = True
5633
props[k] = v
5734

58-
if args.brokers:
59-
brokers = start_brokers(args.brokers)
60-
props['bootstrap_servers'] = ['{0}:{1}'.format(broker.host, broker.port)
61-
for broker in brokers]
62-
print('---> bootstrap_servers={0}'.format(props['bootstrap_servers']))
63-
print()
64-
65-
print('-> Producing records')
66-
record = bytes(bytearray(args.record_size))
67-
producer = KafkaProducer(compression_type=args.fixture_compression,
68-
**props)
69-
for i in range(args.num_records):
70-
producer.send(topic=args.topic, value=record)
71-
producer.flush()
72-
producer.close()
73-
print('-> OK!')
74-
print()
75-
7635
print('Initializing Consumer...')
36+
props['bootstrap_servers'] = args.bootstrap_servers
7737
props['auto_offset_reset'] = 'earliest'
38+
if 'group_id' not in props:
39+
props['group_id'] = 'kafka-consumer-benchmark'
7840
if 'consumer_timeout_ms' not in props:
7941
props['consumer_timeout_ms'] = 10000
8042
props['metrics_sample_window_ms'] = args.stats_interval * 1000
@@ -92,14 +54,18 @@ def run(args):
9254
print('-> OK!')
9355
print()
9456

57+
start_time = time.time()
9558
records = 0
9659
for msg in consumer:
9760
records += 1
9861
if records >= args.num_records:
9962
break
100-
print('Consumed {0} records'.format(records))
10163

64+
end_time = time.time()
10265
timer_stop.set()
66+
timer.join()
67+
print('Consumed {0} records'.format(records))
68+
print('Execution time:', end_time - start_time, 'secs')
10369

10470
except Exception:
10571
exc_info = sys.exc_info()
@@ -143,32 +109,27 @@ def get_args_parser():
143109
parser = argparse.ArgumentParser(
144110
description='This tool is used to verify the consumer performance.')
145111

112+
parser.add_argument(
113+
'--bootstrap-servers', type=str, nargs='+', default=(),
114+
help='host:port for cluster bootstrap servers')
146115
parser.add_argument(
147116
'--topic', type=str,
148-
help='Topic for consumer test',
117+
help='Topic for consumer test (default: kafka-python-benchmark-test)',
149118
default='kafka-python-benchmark-test')
150119
parser.add_argument(
151120
'--num-records', type=int,
152-
help='number of messages to consume',
121+
help='number of messages to consume (default: 1000000)',
153122
default=1000000)
154-
parser.add_argument(
155-
'--record-size', type=int,
156-
help='message size in bytes',
157-
default=100)
158123
parser.add_argument(
159124
'--consumer-config', type=str, nargs='+', default=(),
160125
help='kafka consumer related configuration properties like '
161126
'bootstrap_servers,client_id etc..')
162127
parser.add_argument(
163128
'--fixture-compression', type=str,
164129
help='specify a compression type for use with broker fixtures / producer')
165-
parser.add_argument(
166-
'--brokers', type=int,
167-
help='Number of kafka brokers to start',
168-
default=0)
169130
parser.add_argument(
170131
'--stats-interval', type=int,
171-
help='Interval in seconds for stats reporting to console',
132+
help='Interval in seconds for stats reporting to console (default: 5)',
172133
default=5)
173134
parser.add_argument(
174135
'--raw-metrics', action='store_true',

Diff for: kafka/benchmarks/load_example.py

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
#!/usr/bin/env python
2+
from __future__ import print_function
3+
4+
import argparse
5+
import logging
6+
import threading
7+
import time
8+
9+
from kafka import KafkaConsumer, KafkaProducer
10+
11+
12+
class Producer(threading.Thread):
13+
14+
def __init__(self, bootstrap_servers, topic, stop_event, msg_size):
15+
super(Producer, self).__init__()
16+
self.bootstrap_servers = bootstrap_servers
17+
self.topic = topic
18+
self.stop_event = stop_event
19+
self.big_msg = b'1' * msg_size
20+
21+
def run(self):
22+
producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
23+
self.sent = 0
24+
25+
while not self.stop_event.is_set():
26+
producer.send(self.topic, self.big_msg)
27+
self.sent += 1
28+
producer.flush()
29+
producer.close()
30+
31+
32+
class Consumer(threading.Thread):
33+
def __init__(self, bootstrap_servers, topic, stop_event, msg_size):
34+
super(Consumer, self).__init__()
35+
self.bootstrap_servers = bootstrap_servers
36+
self.topic = topic
37+
self.stop_event = stop_event
38+
self.msg_size = msg_size
39+
40+
def run(self):
41+
consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers,
42+
auto_offset_reset='earliest')
43+
consumer.subscribe([self.topic])
44+
self.valid = 0
45+
self.invalid = 0
46+
47+
for message in consumer:
48+
if len(message.value) == self.msg_size:
49+
self.valid += 1
50+
else:
51+
print('Invalid message:', len(message.value), self.msg_size)
52+
self.invalid += 1
53+
54+
if self.stop_event.is_set():
55+
break
56+
consumer.close()
57+
58+
59+
def get_args_parser():
60+
parser = argparse.ArgumentParser(
61+
description='This tool is used to demonstrate consumer and producer load.')
62+
63+
parser.add_argument(
64+
'--bootstrap-servers', type=str, nargs='+', default=('localhost:9092'),
65+
help='host:port for cluster bootstrap servers (default: localhost:9092)')
66+
parser.add_argument(
67+
'--topic', type=str,
68+
help='Topic for load test (default: kafka-python-benchmark-load-example)',
69+
default='kafka-python-benchmark-load-example')
70+
parser.add_argument(
71+
'--msg-size', type=int,
72+
help='Message size, in bytes, for load test (default: 524288)',
73+
default=524288)
74+
parser.add_argument(
75+
'--load-time', type=int,
76+
help='number of seconds to run load test (default: 10)',
77+
default=10)
78+
parser.add_argument(
79+
'--log-level', type=str,
80+
help='Optional logging level for load test: ERROR|INFO|DEBUG etc',
81+
default=None)
82+
return parser
83+
84+
85+
def main(args):
86+
if args.log_level:
87+
logging.basicConfig(
88+
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
89+
level=getattr(logging, args.log_level))
90+
producer_stop = threading.Event()
91+
consumer_stop = threading.Event()
92+
threads = [
93+
Producer(args.bootstrap_servers, args.topic, producer_stop, args.msg_size),
94+
Consumer(args.bootstrap_servers, args.topic, consumer_stop, args.msg_size)
95+
]
96+
97+
for t in threads:
98+
t.start()
99+
100+
time.sleep(args.load_time)
101+
producer_stop.set()
102+
consumer_stop.set()
103+
print('Messages sent: %d' % threads[0].sent)
104+
print('Messages recvd: %d' % threads[1].valid)
105+
print('Messages invalid: %d' % threads[1].invalid)
106+
107+
108+
if __name__ == "__main__":
109+
args = get_args_parser().parse_args()
110+
main(args)

0 commit comments

Comments
 (0)