Skip to content

Commit 4a2f4f7

Browse files
authored
Merge pull request #1 from 116davinder/v2
create multithreaded kafka consumer ( Single Topic + Multi Partition )
2 parents 41b23a5 + 0b413fa commit 4a2f4f7

File tree

7 files changed

+154
-85
lines changed

7 files changed

+154
-85
lines changed

README.md

+51-20
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44
* It will auto resume from same point from where it died if given consumer group name is same before and after crash.
55
* it will upload `current.bin` file to s3 which contains messages upto `NUMBER_OF_MESSAGE_PER_BACKUP_FILE`
66
but will only upload with other backup files.
7-
* upload to s3 is background process and it depends on `RETRY_UPLOAD_SECONDS`.
7+
* `RETRY_UPLOAD_SECONDS` controls upload to s3 or other cloud storage.
8+
* `NUMBER_OF_KAFKA_THREADS` is used to parallelise reading from kafka topic.
9+
It should not be more than number of partitions.
10+
* `LOG_LEVEL` values can be found https://docs.python.org/3/library/logging.html#logging-levels
11+
* `NUMBER_OF_MESSAGE_PER_BACKUP_FILE` will try to keep this number consistent in file
12+
but if application got restarted then it may be vary for first back file.
813

914
**Restore Application**
1015
* it will restore from backup dir into given topic.
@@ -25,50 +30,76 @@ python3 backup.py backup.json
2530
**Local Filesytem Backup.json**
2631
```
2732
{
28-
"BOOTSTRAP_SERVERS": "localhost:9092",
33+
"BOOTSTRAP_SERVERS": "kafka01:9092,kafka02:9092,kafka03:9092",
2934
"TOPIC_NAMES": ["davinder.test"],
3035
"GROUP_ID": "Kafka-BackUp-Consumer-Group",
3136
"FILESYSTEM_TYPE": "LINUX",
3237
"FILESYSTEM_BACKUP_DIR": "/tmp/",
33-
"NUMBER_OF_MESSAGE_PER_BACKUP_FILE": 50
38+
"NUMBER_OF_MESSAGE_PER_BACKUP_FILE": 1000,
39+
"RETRY_UPLOAD_SECONDS": 100,
40+
"NUMBER_OF_KAFKA_THREADS": 3,
41+
"LOG_LEVEL": 20
3442
}
3543
```
3644

3745
**S3 backup.json**
3846
```
3947
{
40-
"BOOTSTRAP_SERVERS": "localhost:9092",
48+
"BOOTSTRAP_SERVERS": "kafka01:9092,kafka02:9092,kafka03:9092",
4149
"TOPIC_NAMES": ["davinder.test"],
4250
"GROUP_ID": "Kafka-BackUp-Consumer-Group",
4351
"FILESYSTEM_TYPE": "S3",
4452
"FILESYSTEM_BACKUP_DIR": "/tmp/",
45-
"NUMBER_OF_MESSAGE_PER_BACKUP_FILE": 50,
46-
"RETRY_UPLOAD_SECONDS": 100
53+
"NUMBER_OF_MESSAGE_PER_BACKUP_FILE": 1000,
54+
"RETRY_UPLOAD_SECONDS": 100,
55+
"NUMBER_OF_KAFKA_THREADS": 3,
56+
"LOG_LEVEL": 20
4757
}
4858
```
49-
59+
**Example Local Backup Run Output**
60+
```
61+
{ "@timestamp": "2020-06-08 10:56:34,557","level": "INFO","thread": "Kafka Consumer 0","name": "root","message": "started polling on davinder.test" }
62+
{ "@timestamp": "2020-06-08 10:56:34,557","level": "INFO","thread": "Kafka Consumer 1","name": "root","message": "started polling on davinder.test" }
63+
{ "@timestamp": "2020-06-08 10:56:34,557","level": "INFO","thread": "Kafka Consumer 2","name": "root","message": "started polling on davinder.test" }
64+
{ "@timestamp": "2020-06-08 10:56:51,590","level": "INFO","thread": "Kafka Consumer 1","name": "root","message": "Created Successful Backupfile /tmp/davinder.test/1/20200608-105651.tar.gz" }
65+
{ "@timestamp": "2020-06-08 10:56:51,593","level": "INFO","thread": "Kafka Consumer 1","name": "root","message": "Created Successful Backup sha256 file of /tmp/davinder.test/1/20200608-105651.tar.gz.sha256" }
66+
{ "@timestamp": "2020-06-08 10:57:17,270","level": "INFO","thread": "Kafka Consumer 0","name": "root","message": "Created Successful Backupfile /tmp/davinder.test/0/20200608-105717.tar.gz" }
67+
{ "@timestamp": "2020-06-08 10:57:17,277","level": "INFO","thread": "Kafka Consumer 0","name": "root","message": "Created Successful Backup sha256 file of /tmp/davinder.test/0/20200608-105717.tar.gz.sha256" }
68+
{ "@timestamp": "2020-06-08 10:57:17,399","level": "INFO","thread": "Kafka Consumer 2","name": "root","message": "Created Successful Backupfile /tmp/davinder.test/2/20200608-105717.tar.gz" }
69+
{ "@timestamp": "2020-06-08 10:57:17,406","level": "INFO","thread": "Kafka Consumer 2","name": "root","message": "Created Successful Backup sha256 file of /tmp/davinder.test/2/20200608-105717.tar.gz.sha256" }
70+
...
71+
```
5072
**Example S3 Backup Run Output**
5173
```
5274
$ python3 backup.py backup.json
53-
{ "@timestamp": "2020-06-01 10:37:00,168","level": "INFO","thread": "MainThread","name": "root","message": "Successful loading of config.json file" }
54-
{ "@timestamp": "2020-06-01 10:37:00,169","level": "INFO","thread": "MainThread","name": "root","message": "all required variables are successfully" }
55-
{ "@timestamp": "2020-06-01 10:37:00,187","level": "INFO","thread": "Kafka Consumer","name": "root","message": "starting polling on davinder.test" }
56-
{ "@timestamp": "2020-06-01 10:38:17,291","level": "INFO","thread": "Kafka Consumer","name": "root","message": "Created Successful Backupfile /tmp/davinder.test/20200601-103817.tar.gz" }
57-
{ "@timestamp": "2020-06-01 10:39:00,631","level": "INFO","thread": "S3-Upload","name": "root","message": "upload successful at s3://davinder-test-kafka-backup/davinder.test/20200601-103817.tar.gz" }
75+
{ "@timestamp": "2020-06-10 12:49:43,871","level": "INFO","thread": "S3 Upload","name": "botocore.credentials","message": "Found credentials in environment variables." }
76+
{ "@timestamp": "2020-06-10 12:49:43,912","level": "INFO","thread": "Kafka Consumer 1","name": "root","message": "started polling on davinder.test" }
77+
{ "@timestamp": "2020-06-10 12:49:43,915","level": "INFO","thread": "Kafka Consumer 0","name": "root","message": "started polling on davinder.test" }
78+
{ "@timestamp": "2020-06-10 12:49:43,916","level": "INFO","thread": "Kafka Consumer 2","name": "root","message": "started polling on davinder.test" }
79+
{ "@timestamp": "2020-06-10 12:49:44,307","level": "INFO","thread": "S3 Upload","name": "root","message": "upload successful at s3://davinder-test-kafka-backup/davinder.test/0/20200608-102909.tar.gz" }
80+
{ "@timestamp": "2020-06-10 12:49:45,996","level": "INFO","thread": "S3 Upload","name": "root","message": "waiting for new files to be generated" }
81+
{ "@timestamp": "2020-06-10 12:52:33,130","level": "INFO","thread": "Kafka Consumer 0","name": "root","message": "Created Successful Backupfile /tmp/davinder.test/0/20200610-125233.tar.gz" }
82+
{ "@timestamp": "2020-06-10 12:52:33,155","level": "INFO","thread": "Kafka Consumer 0","name": "root","message": "Created Successful Backup sha256 file of /tmp/davinder.test/0/20200610-125233.tar.gz.sha256" }
5883
....
5984
```
6085

6186
# Backup Directory Structure
6287
```
63-
$ tree davinder.test/
64-
davinder.test/
65-
├── 20204025-154046.tar.gz
66-
├── 20204025-154046.tar.gz.sha256
67-
├── 20204325-154344.tar.gz
68-
├── 20204325-154344.tar.gz.sha256
69-
└── current.bin
88+
/tmp/davinder.test/
89+
├── 0
90+
│   ├── 20200608-102909.tar.gz
91+
│   ├── 20200608-102909.tar.gz.sha256
92+
│   └── current.bin
93+
├── 1
94+
│   ├── 20200608-102909.tar.gz
95+
│   ├── 20200608-102909.tar.gz.sha256
96+
│   └── current.bin
97+
└── 2
98+
├── 20200608-102909.tar.gz
99+
├── 20200608-102909.tar.gz.sha256
100+
└── current.bin
70101
71-
0 directories, 5 files
102+
3 directories, 9 files
72103
```
73104

74105
# How to Run Kafka Restore Application

backup.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,7 @@
55
"FILESYSTEM_TYPE": "S3",
66
"FILESYSTEM_BACKUP_DIR": "/tmp/",
77
"NUMBER_OF_MESSAGE_PER_BACKUP_FILE": 1000,
8-
"RETRY_UPLOAD_SECONDS": 100
8+
"RETRY_UPLOAD_SECONDS": 100,
9+
"NUMBER_OF_KAFKA_THREADS": 2,
10+
"LOG_LEVEL": 20
911
}

backup.py

+38-25
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from common import Common
44
import logging
55
import threading
6-
from upload import Upload
6+
from cloud import Upload
77

88
class KBackup:
99
def __init__(self,config):
@@ -12,30 +12,38 @@ def __init__(self,config):
1212
self.GROUP_ID = config['GROUP_ID']
1313
self.TOPIC_NAME_LIST = config['TOPIC_NAMES']
1414
self.BACKUP_DIR = os.path.join(config['FILESYSTEM_BACKUP_DIR'], self.TOPIC_NAME_LIST[0])
15-
self.BACKUP_TMP_FILE = os.path.join(self.BACKUP_DIR, "current.bin")
1615
try:
1716
self.NUMBER_OF_MESSAGE_PER_BACKUP_FILE = int(config['NUMBER_OF_MESSAGE_PER_BACKUP_FILE'])
1817
except:
19-
logging.info(
18+
logging.error(
2019
f"NUMBER_OF_MESSAGE_PER_BACKUP_FILE {str(config['NUMBER_OF_MESSAGE_PER_BACKUP_FILE'])} is not integer value"
2120
)
2221
self.NUMBER_OF_MESSAGE_PER_BACKUP_FILE = 50
23-
logging.info(f"NUMBER_OF_MESSAGE_PER_BACKUP_FILE is set to default value 50")
24-
22+
logging.debug(f"NUMBER_OF_MESSAGE_PER_BACKUP_FILE is set to default value 50")
23+
try:
24+
self.NUMBER_OF_KAFKA_THREADS = config['NUMBER_OF_KAFKA_THREADS']
25+
except:
26+
self.NUMBER_OF_KAFKA_THREADS = 1
2527
self.CONSUMERCONFIG = {
2628
'bootstrap.servers': self.BOOTSTRAP_SERVERS,
2729
'group.id': self.GROUP_ID,
2830
'auto.offset.reset': 'earliest'
2931
}
30-
logging.info(f"successful loading of all variables")
32+
try:
33+
self.LOG_LEVEL = config['LOG_LEVEL']
34+
except:
35+
self.LOG_LEVEL = logging.INFO
36+
37+
logging.debug(f"successful loading of all variables")
3138

3239
def backup(self):
3340
_bt = confluent_kafka.Consumer(self.CONSUMERCONFIG)
3441
_bt.subscribe(self.TOPIC_NAME_LIST)
3542

36-
Common.createBackupTopicDir(self.BACKUP_DIR)
43+
for p in Common.findNumberOfPartitionsInTopic(_bt.list_topics().topics[self.TOPIC_NAME_LIST[0]].partitions):
44+
Common.createDir(os.path.join(self.BACKUP_DIR, str(p)))
3745

38-
count = Common.currentMessageCountInBinFile(self.BACKUP_TMP_FILE)
46+
count = 0
3947
logging.info(f"started polling on {self.TOPIC_NAME_LIST[0]}")
4048
while True:
4149
msg = _bt.poll(timeout=1.0)
@@ -45,18 +53,22 @@ def backup(self):
4553
if msg.error():
4654
logging.error(f"{msg.error()}")
4755
continue
48-
else:
56+
if msg.partition() is not None:
57+
_tmp_file = os.path.join(self.BACKUP_DIR, str(msg.partition()) ,"current.bin")
58+
_tar_location = os.path.join(self.BACKUP_DIR, str(msg.partition()))
4959
_msg = Common.decodeMsgToUtf8(msg)
5060
if _msg is not None:
5161
if count == 0:
52-
Common.writeDataToKafkaBinFile(self.BACKUP_TMP_FILE, _msg, "a+")
62+
Common.writeDataToKafkaBinFile(_tmp_file, _msg, "a+")
5363
if count > 0:
5464
if count % self.NUMBER_OF_MESSAGE_PER_BACKUP_FILE == 0:
55-
Common.createTarGz(self.BACKUP_DIR, self.BACKUP_TMP_FILE)
56-
Common.writeDataToKafkaBinFile(self.BACKUP_TMP_FILE, _msg, "w")
65+
Common.createTarGz(_tar_location, _tmp_file)
66+
Common.writeDataToKafkaBinFile(_tmp_file, _msg, "w")
5767
else:
58-
Common.writeDataToKafkaBinFile(self.BACKUP_TMP_FILE, _msg, "a+")
59-
68+
Common.writeDataToKafkaBinFile(_tmp_file, _msg, "a+")
69+
else:
70+
logging.error(f"no partition found for message")
71+
6072
count += 1
6173

6274
_bt.close()
@@ -72,11 +84,15 @@ def main():
7284
exit(1)
7385

7486
b = KBackup(config)
75-
_r_thread = threading.Thread(
76-
target=b.backup,
77-
name="Kafka Consumer"
78-
)
79-
_r_thread.start()
87+
Common.setLoggingFormat(b.LOG_LEVEL)
88+
89+
for _r_thread in range(b.NUMBER_OF_KAFKA_THREADS):
90+
_r_thread = threading.Thread(
91+
target=b.backup,
92+
name="Kafka Consumer " + str(_r_thread)
93+
)
94+
_r_thread.start()
95+
8096

8197
if config['FILESYSTEM_TYPE'] == "S3":
8298
try:
@@ -89,12 +105,9 @@ def main():
89105
except:
90106
logging.debug(f"setting RETRY_UPLOAD_SECONDS to default 60 ")
91107
retry_upload_seconds = 60
92-
_s3_upload_thread = threading.Thread(
93-
target=Upload.s3_upload_files,
94-
args=[bucket, tmp_dir, topic_name,retry_upload_seconds],
95-
name="S3-Upload"
96-
)
97-
_s3_upload_thread.start()
108+
109+
Upload.s3_upload(bucket,tmp_dir,topic_name,retry_upload_seconds,b.NUMBER_OF_KAFKA_THREADS + 1)
110+
98111
except KeyError as e:
99112
logging.error(f"unable to set s3 required variables {e}")
100113

cloud.py

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import logging
2+
import boto3
3+
from botocore.exceptions import ClientError
4+
import os
5+
import time
6+
from common import Common
7+
import pathlib
8+
import threading
9+
10+
class Upload:
11+
12+
def findFiles(dir,pattern="*"):
13+
_list = []
14+
result = list(pathlib.Path(dir).rglob(pattern))
15+
for path in result:
16+
if os.path.isfile(path):
17+
_list.append(path)
18+
return _list
19+
20+
def s3_upload_file(s3_client,bucket,file_name,object_name):
21+
try:
22+
response = s3_client.upload_file(file_name,bucket,object_name)
23+
logging.info(f"upload successful at s3://{bucket}/{object_name}")
24+
if not file_name.endswith(".bin"):
25+
logging.debug(f"deleting uploaded file {file_name}")
26+
os.remove(file_name)
27+
except ClientError as e:
28+
logging.error(f"{file_path} upload failed error {e}")
29+
30+
def s3_upload(bucket,dir,topic_name,retry_upload_seconds,thread_count):
31+
s3_client = boto3.client('s3')
32+
while True:
33+
_topic_dir = os.path.join(dir, topic_name)
34+
_count_partition_dirs = len(Common.listDirs(_topic_dir))
35+
_list = Upload.findFiles(_topic_dir)
36+
if len(_list) > _count_partition_dirs and threading.active_count() <= thread_count:
37+
for file_name in _list:
38+
file_name = str(file_name)
39+
if os.path.getsize(file_name) > 0:
40+
object_name = file_name.split(dir)[1]
41+
t = threading.Thread(
42+
target=Upload.s3_upload_file,
43+
args=[s3_client,bucket,file_name,object_name],
44+
name="S3 Upload Threads"
45+
).start()
46+
else:
47+
logging.info(f"s3 upload retry for new files in {retry_upload_seconds} seconds")
48+
time.sleep(retry_upload_seconds)

common.py

+13-7
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77

88
class Common:
99

10-
def setLoggingFormat():
10+
def setLoggingFormat(level=20):
1111
logging.basicConfig(
1212
format='{ "@timestamp": "%(asctime)s","level": "%(levelname)s","thread": "%(threadName)s","name": "%(name)s","message": "%(message)s" }'
1313
)
14-
logging.getLogger().setLevel(logging.INFO)
14+
logging.getLogger().setLevel(level)
1515

16-
def listFiles(dir):
16+
def listDirs(dir):
1717
try:
1818
return sorted(os.listdir(dir))
1919
except FileNotFoundError as e:
@@ -24,7 +24,7 @@ def listFiles(dir):
2424
def readJsonConfig(file):
2525
try:
2626
with open(file) as cf:
27-
logging.info(f'loading {file} file')
27+
logging.debug(f'loading {file} file')
2828
return json.load(cf)
2929
except json.decoder.JSONDecodeError as e:
3030
logging.error(f'{e}')
@@ -45,11 +45,11 @@ def createSha256OfBackupFile(file,hash):
4545
except:
4646
logging.error(f'unable to write to {file}.sha256')
4747

48-
def createBackupTopicDir(dir):
48+
def createDir(dir):
4949
try:
5050
os.mkdir(dir)
5151
except FileExistsError as e:
52-
logging.info(e)
52+
logging.debug(e)
5353
except:
5454
logging.error(f'unable to create folder {dir}')
5555
exit(1)
@@ -122,4 +122,10 @@ def extractBinFile(file,hashfile,extractDir):
122122
return os.path.join(extractDir,_sname)
123123
except FileNotFoundError as e:
124124
logging.error(e)
125-
return None
125+
return None
126+
127+
def findNumberOfPartitionsInTopic(list):
128+
_lp = []
129+
for i in list:
130+
_lp.append(i)
131+
return _lp

restore.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def delivery_report(err, msg):
3333
def restore(self):
3434
_rt = confluent_kafka.Producer(self.PRODUCERCONFIG)
3535
while True:
36-
_files_in_backup_dir = Common.listFiles(self.BACKUP_DIR)
36+
_files_in_backup_dir = Common.listDirs(self.BACKUP_DIR)
3737
for file in _files_in_backup_dir:
3838
file = os.path.join(self.BACKUP_DIR,file)
3939
if file.endswith("tar.gz"):

upload.py

-31
This file was deleted.

0 commit comments

Comments
 (0)