-
-
Notifications
You must be signed in to change notification settings - Fork 720
/
Copy pathdemo_rabbitmq_exchange.py
52 lines (38 loc) · 1.57 KB
/
demo_rabbitmq_exchange.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
'''
Copyright (C) 2017-2025 Bryant Moscon - [email protected]
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from multiprocessing import Process
from cryptofeed import FeedHandler
from cryptofeed.backends.rabbitmq import BookRabbit
from cryptofeed.defines import L2_BOOK
from cryptofeed.exchanges import Kraken
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
def receiver(port):
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', port=port))
channel = connection.channel()
exchange_name = 'amq.topic'
exchange_type = 'topic'
channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type, durable=True)
queue_name = 'cryptofeed'
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
def main():
try:
p = Process(target=receiver, args=(5672,))
p.start()
f = FeedHandler()
rabbitargs = {'exchange_mode': True, 'exchange_name': 'amq.topic', 'exchange_type': 'topic', 'routing_key': 'cryptofeed', 'passive': False}
f.add_feed(Kraken(max_depth=2, channels=[L2_BOOK], symbols=['BTC-USD', 'ETH-USD'], callbacks={L2_BOOK: BookRabbit(**rabbitargs)}))
f.run()
finally:
p.terminate()
if __name__ == '__main__':
main()