-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsetup_kafka.py
262 lines (225 loc) · 11.5 KB
/
setup_kafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
#!/usr/bin/env python3
import os
import re
import subprocess
import argparse
import signal
import sys
import urllib.request
import tarfile
import time
# Global Kafka process handle
kafka_process = None
# Function to execute shell commands
def run_command(command):
try:
subprocess.run(command, shell=True, check=True)
except subprocess.CalledProcessError as e:
print(f"Error running command: {command}\n{e}")
sys.exit(1)
# Cleanup function to delete veth interfaces, namespace, and stop Kafka
def cleanup():
print("Cleaning up...")
if kafka_process:
print("Stopping Kafka broker...")
kafka_process.terminate()
kafka_process.wait()
print("Kafka broker stopped.")
time.sleep(2)
try:
run_command(f"ip link set {args.veth0} down 2>/dev/null")
run_command(f"ip link delete {args.veth0} 2>/dev/null")
run_command(f"ip netns del {args.namespace} 2>/dev/null")
except Exception as e:
print(f"Error during cleanup: {e}")
sys.exit(0)
# Check for sudo permissions
def check_sudo():
if os.geteuid() != 0:
print("This script must be run with sudo. Re-running with sudo...")
try:
os.execvp("sudo", ["sudo"] + sys.argv)
except Exception as e:
print(f"Failed to re-run the script with sudo: {e}")
sys.exit(1)
# Handle exit signals to clean up on script termination
def signal_handler(sig, frame):
cleanup()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Check for sudo permissions
check_sudo()
# Argument parsing for command line input
parser = argparse.ArgumentParser(description="Setup network namespace, latency, and start Apache Kafka.")
parser.add_argument('--namespace', type=str, default='myns', help='Name of the network namespace to create')
parser.add_argument('--veth0', type=str, default='veth0', help='Name of the veth interface on the host')
parser.add_argument('--veth1', type=str, default='veth1', help='Name of the peer veth interface')
parser.add_argument('--host_ip', type=str, default='10.10.1.1/24', help='IP address for the host side of the veth pair')
parser.add_argument('--ns_ip', type=str, default='10.10.1.2/24', help='IP address for the network namespace side of the veth pair')
parser.add_argument('--latency', type=str, default='5ms', help='Latency to apply on the veth interface')
parser.add_argument("--data-directory", required=True, help="Path to the Kafka data directory.")
parser.add_argument("--KAFKA_HEAP_OPTS", type=str, default='-Xms2G -Xmx2G', help="Heap size settings for Kafka")
parser.add_argument("--MaxGCPauseMillis", type=str, default='20', help="Maximum GC pause time.")
parser.add_argument("--G1ConcRefinementThreads", type=str, default='8', help="Number of G1 concurrent refinement threads.")
parser.add_argument("--G1ParallelGCThreads", type=str, default='8', help="Number of G1 Parallel GC threads.")
parser.add_argument("--KAFKA_NUM_NETWORK_THREADS", type=str, default='4', help="Number of network threads for Kafka.")
parser.add_argument("--KAFKA_NUM_IO_THREADS", type=str, default='6', help="Number of I/O threads for Kafka.")
parser.add_argument("--KAFKA_NUM_PARTITIONS", type=str, default='10', help="Number of partitions for Kafka topics.")
parser.add_argument("--kafka_port", type=str, default='9092', help="Kafka port (default: 9092)")
parser.add_argument("--socket-send-buffer-bytes", type=int, default=102400, help="Kafka socket send buffer size (default: 102400)")
parser.add_argument("--socket-receive-buffer-bytes", type=int, default=102400, help="Kafka socket receive buffer size (default: 102400)")
parser.add_argument("--socket-request-max-bytes", type=int, default=104857600, help="Kafka socket request max bytes (default: 104857600)")
args = parser.parse_args()
# Function to check if the namespace exists
def namespace_exists(namespace):
try:
result = subprocess.run(f"ip netns list | grep {namespace}", shell=True, check=False, stdout=subprocess.PIPE)
return result.stdout.strip() != b""
except subprocess.CalledProcessError:
return False
# Function to ensure Kafka storage formatting completes successfully
def initialize_kafka_cluster(kafka_dir, data_dir):
print("Initializing Kafka cluster with KRaft...")
original_cwd = os.getcwd()
os.chdir(kafka_dir)
try:
kafka_storage = os.path.join("bin", "kafka-storage.sh")
cluster_id_cmd = [kafka_storage, "random-uuid"]
cluster_id = subprocess.check_output(cluster_id_cmd).decode().strip()
print(f"Generated KAFKA_CLUSTER_ID: {cluster_id}")
server_properties = os.path.join("config", "kraft", "server.properties")
with open(server_properties, 'r') as file:
properties = file.readlines()
with open(server_properties, 'w') as file:
for line in properties:
if line.startswith("log.dirs"):
file.write(f"log.dirs={data_dir}\n")
else:
file.write(line)
format_cmd = [kafka_storage, "format", "-t", cluster_id, "-c", server_properties]
result = subprocess.run(format_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Error during Kafka storage formatting: {result.stderr}")
sys.exit(1)
else:
print("Kafka storage formatted successfully.")
print(result.stdout)
finally:
os.chdir(original_cwd)
# Function to download Kafka if not present
def download_kafka(kafka_url, kafka_dir):
print("Downloading Apache Kafka...")
file_name = kafka_url.split('/')[-1]
urllib.request.urlretrieve(kafka_url, file_name)
with tarfile.open(file_name, "r:gz") as tar:
tar.extractall()
os.remove(file_name)
if file_name.replace('.tgz', '') != kafka_dir:
os.rename(file_name.replace('.tgz', ''), kafka_dir)
# Function to update Kafka configuration with the network namespace IP and disable auto topic creation
def update_kafka_properties(kafka_dir, ns_ip, kafka_port):
server_properties = os.path.join(kafka_dir, "config", "kraft", "server.properties")
with open(server_properties, 'r') as file:
properties = file.readlines()
with open(server_properties, 'w') as file:
auto_create_set = False
for line in properties:
if line.startswith("advertised.listeners"):
file.write(f"advertised.listeners=PLAINTEXT://{ns_ip}:{kafka_port}\n")
elif line.startswith("auto.create.topics.enable"):
file.write("auto.create.topics.enable=false\n")
auto_create_set = True
elif line.startswith("socket.send.buffer.bytes"):
file.write(f"socket.send.buffer.bytes={args.socket_send_buffer_bytes}\n")
elif line.startswith("socket.receive.buffer.bytes"):
file.write(f"socket.receive.buffer.bytes={args.socket_receive_buffer_bytes}\n")
elif line.startswith("socket.request.max.bytes"):
file.write(f"socket.request.max.bytes={args.socket_request_max_bytes}\n")
else:
file.write(line)
if not auto_create_set:
file.write("auto.create.topics.enable=false\n")
print(f"Updated advertised.listeners to PLAINTEXT://{ns_ip}:{kafka_port}")
print(f"Set socket.send.buffer.bytes to {args.socket_send_buffer_bytes}")
print(f"Set socket.receive.buffer.bytes to {args.socket_receive_buffer_bytes}")
print(f"Set socket.request.max.bytes to {args.socket_request_max_bytes}")
print("Ensured auto.create.topics.enable is set to false.")
# Function to update kafka-run-class.sh to force JMX monitoring on 10.10.1.1 and port 9999
def update_kafka_run_class(kafka_dir):
run_class_path = os.path.join(kafka_dir, "bin", "kafka-run-class.sh")
try:
with open(run_class_path, "r") as f:
lines = f.readlines()
except Exception as e:
print(f"Error reading {run_class_path}: {e}")
sys.exit(1)
new_lines = []
updated = False
jmx_options = ("-Dcom.sun.management.jmxremote "
"-Dcom.sun.management.jmxremote.port=9999 "
"-Dcom.sun.management.jmxremote.rmi.port=9999 "
"-Dcom.sun.management.jmxremote.authenticate=false "
"-Dcom.sun.management.jmxremote.ssl=false "
"-Djava.rmi.server.hostname=10.10.1.2 ")
for line in lines:
# Look for the line that starts with exec and calls "$JAVA"
if not updated and re.search(r'^\s*exec\s+"\$JAVA"', line):
line = re.sub(r'(^\s*exec\s+"\$JAVA")', r'\1 ' + jmx_options, line)
updated = True
new_lines.append(line)
try:
with open(run_class_path, "w") as f:
f.writelines(new_lines)
print("Updated kafka-run-class.sh with JMX options (port 9999, hostname 10.10.1.2).")
except Exception as e:
print(f"Error writing to {run_class_path}: {e}")
sys.exit(1)
# Function to start Kafka in the network namespace
def start_kafka(kafka_dir):
print("Starting Kafka broker in namespace...")
kafka_server = os.path.join(kafka_dir, "bin", "kafka-server-start.sh")
server_properties = os.path.join(kafka_dir, "config", "kraft", "server.properties")
env_vars = {
"KAFKA_HEAP_OPTS": args.KAFKA_HEAP_OPTS,
"KAFKA_JVM_PERFORMANCE_OPTS": f"-XX:MaxGCPauseMillis={args.MaxGCPauseMillis} -XX:G1ConcRefinementThreads={args.G1ConcRefinementThreads} -XX:ParallelGCThreads={args.G1ParallelGCThreads}"
}
command = (
f"sudo ip netns exec {args.namespace} bash -c 'export KAFKA_HEAP_OPTS=\"{env_vars['KAFKA_HEAP_OPTS']}\" && "
f"export KAFKA_JVM_PERFORMANCE_OPTS=\"{env_vars['KAFKA_JVM_PERFORMANCE_OPTS']}\" && "
f"bash {kafka_server} {server_properties}'"
)
return subprocess.Popen(command, shell=True)
# --------------------- LATENCY SETUP -----------------------------
if not namespace_exists(args.namespace):
print(f"Creating network namespace {args.namespace}...")
run_command(f"ip netns add {args.namespace}")
else:
print(f"Namespace {args.namespace} already exists, skipping creation.")
print(f"Creating veth pair: {args.veth0} <-> {args.veth1}...")
run_command(f"ip link add {args.veth0} type veth peer name {args.veth1}")
run_command(f"ip link set {args.veth1} netns {args.namespace}")
run_command(f"ip addr add {args.host_ip} dev {args.veth0}")
run_command(f"ip link set {args.veth0} up")
run_command(f"ip netns exec {args.namespace} ip addr add {args.ns_ip} dev {args.veth1}")
run_command(f"ip netns exec {args.namespace} ip link set {args.veth1} up")
run_command(f"ip netns exec {args.namespace} ip link set lo up")
print(f"Applying latency of {args.latency} to {args.veth0}...")
run_command(f"tc qdisc add dev {args.veth0} root netem delay {args.latency}")
run_command("sysctl -w net.ipv4.ip_forward=1")
run_command(f"ip route add {args.ns_ip.split('/')[0]}/32 dev {args.veth0}")
# --------------------- KAFKA SETUP -----------------------------
kafka_dir = "kafka_2.13-3.8.0"
kafka_url = "https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz"
if not os.path.isdir(kafka_dir):
download_kafka(kafka_url, kafka_dir)
update_kafka_properties(kafka_dir, args.ns_ip.split('/')[0], args.kafka_port)
initialize_kafka_cluster(kafka_dir, args.data_directory)
# Update kafka-run-class.sh with JMX options
update_kafka_run_class(kafka_dir)
# Start Kafka inside the network namespace
kafka_process = start_kafka(kafka_dir)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
cleanup()