Skip to content

Система противодействующая обработки дубликатов, работает исключительно на Kafka

License

teta42/Anti_Duplicate_System

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

17 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Anti-Duplicate System for Kafka

Проект нужный для создания системы которая гарантирует НЕ обработку копий сообщения.

Примеры использования:

Так как моя библиотека это надстройка для библиотеки Kafka-python, то функционал одинаков.

  1. Производитель(Producer):
from Aduplicate.Producer import Producer
import json
import random
from time import sleep

p = Producer(
    bootstrap_servers='localhost:9092',  # Адрес вашего Kafka брокера
    value_serializer=lambda v: json.dumps(v).encode('utf-8'), # Сериализация сообщений в JSON)
    acks=1,
    retries=10,)

a = 0
while a < 100:
    mess = {'b': int(random.randint(-10, 0)), "a": int(random.randint(10, 20))}
    p.send('test', mess)
    print(f'{mess}')
    a = a + 1
    sleep(0.5)

# Закрываем продюсер
p.close()

Пример кода который отправляет 100 сообщений.

  1. Потребитель(Consumer):
from Aduplicate.Consumer import Consumer
import json

c = Consumer(bootstrap_servers='localhost:9092',  # Адрес вашего Kafka брокера
    auto_offset_reset='latest', 
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')) # Сериализация сообщений в JSON)
    )

c.subscribe(['test'])
a = 1
try:
    for message in c:
        print(f"Получено сообщение: {message.value} из партиции: {message.partition}, смещение: {message.offset}\n")
        a = a + 1
except KeyboardInterrupt:
    print('ok, im stoped')
finally:
    c.close()  # Закрываем потребителя
    
print(a)

Пример кода который получает сообщения и после остановки ctrl + c выводит количество полученных сообщений.

Как работает библиотека:

Потребитель

  1. Сообщения в спец. теме - это сообщения которые говорят о том какие сообщения были проверенны.
  2. Во втором пункте начала мы разделяем большой список на несколько мелких, каждый такой мелкий список хранит сообщения лишь из определённой партиции.
  3. Сообщения из спец. темы выглядит так-
    {"hash": "хэш проверенного сообщения", 
    "topic": "тема где это сообщение лежало", 
    "partition": "партиция где сообщение находилось"}

В начале

  1. Забираем все сообщения из спец. темы, Ложим их в общий список.
  2. Проходимся по списку разделяя сообщения по партициям где они лежали.

Цикл

  1. Получаем проверяемое сообщение.

  2. Перед проверкой обновляем списки.

    1. Получаем все новые сообщения из спец. темы.
    2. Разделяем их по вышеописанному принципу.
  3. Смотрим из какой партиции пришло проверяемое сообщение.

  4. Ищим хэш проверяемого сообщения в соответствующим списке.

  5. Если нашли: Выкидываем это сообщение.

    Не нашли: Передаём сообщение на обработку.

  6. Добавляем новый хэш в соответствующий список.

  7. Отправляем сообщение в спец. тему.

Производитель

Я считаю что код довольно понятный :)

About

Система противодействующая обработки дубликатов, работает исключительно на Kafka

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Languages