-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreceiver.py
79 lines (67 loc) · 2.39 KB
/
receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import logging
import uuid
from typing import Optional, Dict
import msgpack
from kafka import KafkaConsumer
from sqlalchemy.orm import sessionmaker, Session
from config import get_settings
from consts import Actions
from db.main import engine
from db.models import Message
settings = get_settings()
logger = logging.getLogger(__name__)
def msgpack_deserializer(msg) -> Optional[Dict]:
try:
return msgpack.unpackb(msg)
except (msgpack.UnpackException, msgpack.ExtraData) as exc:
err_msg = 'Unpack failed. Input: %s' % str(msg)
logger.error(err_msg, exc_info=exc)
def create_or_update(session: Session, action: Actions, data: Dict
) -> Optional[Message]:
if action == Actions.CREATE:
return Message(id=str(uuid.uuid4()), **data)
if action == Actions.UPDATE:
msg_id = data.get('id')
if msg_id is None:
logger.warning(
'Updating message without message id. Data: %s', str(data))
return None
msg = session.query(Message).get(msg_id)
if msg is None:
logger.warning('Message with id %s not found in database', msg_id)
return None
for key, value in data.items():
if value is not None:
setattr(msg, key, value)
return msg
logger.warning('Unknown action %s', action)
return None
def consume(session: Session):
consumer = KafkaConsumer(
settings.kafka_topic,
group_id=settings.kafka_consumer_group,
value_deserializer=msgpack_deserializer,
**settings.kafka
)
for msg in consumer:
action = msg.value.pop('action', None)
if action is None:
logger.warning('Action is None, don\'t know what to do...')
msg_id = msg.value.get('id')
# perform DELETE
if action == Actions.DELETE and msg_id is not None:
session.query(Message).filter(Message.id == msg_id).delete()
session.commit()
continue
# perform CREATE or UPDATE
db_msg = create_or_update(
session=session,
action=action,
data=msg.value
)
if db_msg is not None:
session.merge(db_msg, load=True)
session.commit()
if __name__ == "__main__":
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
consume(session=SessionLocal())