Skip to content

Commit 6721db8

Browse files
Merge pull request #202 from rabbitmq/pika-1.0.0-pt-163872880
Update for Pika 1.0.0, yapf formatting
2 parents d96a1b9 + 8d392f4 commit 6721db8

14 files changed

+96
-97
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,4 @@ target/
3232

3333
*.log
3434
.packages
35+
.python-version

python/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ To successfully use the examples you will need a running RabbitMQ server.
77

88
## Requirements
99

10-
To run this code you need to install the `pika` package version 0.10.0 or later. To install it, run
10+
To run this code you need to install the `pika` package version `1.0.0` or later. To install it, run
1111

12-
pip install pika==0.11.0
12+
pip install pika
1313

1414
You may first need to run
1515

python/emit_log.py

+4-7
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,13 @@
22
import pika
33
import sys
44

5-
connection = pika.BlockingConnection(pika.ConnectionParameters(
6-
host='localhost'))
5+
connection = pika.BlockingConnection(
6+
pika.ConnectionParameters(host='localhost'))
77
channel = connection.channel()
88

9-
channel.exchange_declare(exchange='logs',
10-
exchange_type='fanout')
9+
channel.exchange_declare(exchange='logs', exchange_type='fanout')
1110

1211
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
13-
channel.basic_publish(exchange='logs',
14-
routing_key='',
15-
body=message)
12+
channel.basic_publish(exchange='logs', routing_key='', body=message)
1613
print(" [x] Sent %r" % message)
1714
connection.close()

python/emit_log_direct.py

+5-7
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,15 @@
22
import pika
33
import sys
44

5-
connection = pika.BlockingConnection(pika.ConnectionParameters(
6-
host='localhost'))
5+
connection = pika.BlockingConnection(
6+
pika.ConnectionParameters(host='localhost'))
77
channel = connection.channel()
88

9-
channel.exchange_declare(exchange='direct_logs',
10-
exchange_type='direct')
9+
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
1110

1211
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
1312
message = ' '.join(sys.argv[2:]) or 'Hello World!'
14-
channel.basic_publish(exchange='direct_logs',
15-
routing_key=severity,
16-
body=message)
13+
channel.basic_publish(
14+
exchange='direct_logs', routing_key=severity, body=message)
1715
print(" [x] Sent %r:%r" % (severity, message))
1816
connection.close()

python/emit_log_topic.py

+5-7
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,15 @@
22
import pika
33
import sys
44

5-
connection = pika.BlockingConnection(pika.ConnectionParameters(
6-
host='localhost'))
5+
connection = pika.BlockingConnection(
6+
pika.ConnectionParameters(host='localhost'))
77
channel = connection.channel()
88

9-
channel.exchange_declare(exchange='topic_logs',
10-
exchange_type='topic')
9+
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
1110

1211
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
1312
message = ' '.join(sys.argv[2:]) or 'Hello World!'
14-
channel.basic_publish(exchange='topic_logs',
15-
routing_key=routing_key,
16-
body=message)
13+
channel.basic_publish(
14+
exchange='topic_logs', routing_key=routing_key, body=message)
1715
print(" [x] Sent %r:%r" % (routing_key, message))
1816
connection.close()

python/new_task.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,19 @@
22
import pika
33
import sys
44

5-
connection = pika.BlockingConnection(pika.ConnectionParameters(
6-
host='localhost'))
5+
connection = pika.BlockingConnection(
6+
pika.ConnectionParameters(host='localhost'))
77
channel = connection.channel()
88

99
channel.queue_declare(queue='task_queue', durable=True)
1010

1111
message = ' '.join(sys.argv[1:]) or "Hello World!"
12-
channel.basic_publish(exchange='',
13-
routing_key='task_queue',
14-
body=message,
15-
properties=pika.BasicProperties(
16-
delivery_mode = 2, # make message persistent
17-
))
12+
channel.basic_publish(
13+
exchange='',
14+
routing_key='task_queue',
15+
body=message,
16+
properties=pika.BasicProperties(
17+
delivery_mode=2, # make message persistent
18+
))
1819
print(" [x] Sent %r" % message)
1920
connection.close()

python/receive.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
#!/usr/bin/env python
22
import pika
33

4-
connection = pika.BlockingConnection(pika.ConnectionParameters(
5-
host='localhost'))
4+
connection = pika.BlockingConnection(
5+
pika.ConnectionParameters(host='localhost'))
66
channel = connection.channel()
77

8-
98
channel.queue_declare(queue='hello')
109

10+
1111
def callback(ch, method, properties, body):
1212
print(" [x] Received %r" % body)
1313

14-
channel.basic_consume(callback,
15-
queue='hello',
16-
no_ack=True)
14+
15+
channel.basic_consume(
16+
queue='hello', on_message_callback=callback, auto_ack=True)
1717

1818
print(' [*] Waiting for messages. To exit press CTRL+C')
1919
channel.start_consuming()

python/receive_logs.py

+9-10
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,25 @@
11
#!/usr/bin/env python
22
import pika
33

4-
connection = pika.BlockingConnection(pika.ConnectionParameters(
5-
host='localhost'))
4+
connection = pika.BlockingConnection(
5+
pika.ConnectionParameters(host='localhost'))
66
channel = connection.channel()
77

8-
channel.exchange_declare(exchange='logs',
9-
exchange_type='fanout')
8+
channel.exchange_declare(exchange='logs', exchange_type='fanout')
109

11-
result = channel.queue_declare(exclusive=True)
10+
result = channel.queue_declare('', exclusive=True)
1211
queue_name = result.method.queue
1312

14-
channel.queue_bind(exchange='logs',
15-
queue=queue_name)
13+
channel.queue_bind(exchange='logs', queue=queue_name)
1614

1715
print(' [*] Waiting for logs. To exit press CTRL+C')
1816

17+
1918
def callback(ch, method, properties, body):
2019
print(" [x] %r" % body)
2120

22-
channel.basic_consume(callback,
23-
queue=queue_name,
24-
no_ack=True)
21+
22+
channel.basic_consume(
23+
queue=queue_name, on_message_callback=callback, auto_ack=True)
2524

2625
channel.start_consuming()

python/receive_logs_direct.py

+10-11
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@
22
import pika
33
import sys
44

5-
connection = pika.BlockingConnection(pika.ConnectionParameters(
6-
host='localhost'))
5+
connection = pika.BlockingConnection(
6+
pika.ConnectionParameters(host='localhost'))
77
channel = connection.channel()
88

9-
channel.exchange_declare(exchange='direct_logs',
10-
exchange_type='direct')
9+
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
1110

12-
result = channel.queue_declare(exclusive=True)
11+
result = channel.queue_declare('', exclusive=True)
1312
queue_name = result.method.queue
1413

1514
severities = sys.argv[1:]
@@ -18,17 +17,17 @@
1817
sys.exit(1)
1918

2019
for severity in severities:
21-
channel.queue_bind(exchange='direct_logs',
22-
queue=queue_name,
23-
routing_key=severity)
20+
channel.queue_bind(
21+
exchange='direct_logs', queue=queue_name, routing_key=severity)
2422

2523
print(' [*] Waiting for logs. To exit press CTRL+C')
2624

25+
2726
def callback(ch, method, properties, body):
2827
print(" [x] %r:%r" % (method.routing_key, body))
2928

30-
channel.basic_consume(callback,
31-
queue=queue_name,
32-
no_ack=True)
29+
30+
channel.basic_consume(
31+
queue=queue_name, on_message_callback=callback, auto_ack=True)
3332

3433
channel.start_consuming()

python/receive_logs_topic.py

+10-11
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@
22
import pika
33
import sys
44

5-
connection = pika.BlockingConnection(pika.ConnectionParameters(
6-
host='localhost'))
5+
connection = pika.BlockingConnection(
6+
pika.ConnectionParameters(host='localhost'))
77
channel = connection.channel()
88

9-
channel.exchange_declare(exchange='topic_logs',
10-
exchange_type='topic')
9+
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
1110

12-
result = channel.queue_declare(exclusive=True)
11+
result = channel.queue_declare('', exclusive=True)
1312
queue_name = result.method.queue
1413

1514
binding_keys = sys.argv[1:]
@@ -18,17 +17,17 @@
1817
sys.exit(1)
1918

2019
for binding_key in binding_keys:
21-
channel.queue_bind(exchange='topic_logs',
22-
queue=queue_name,
23-
routing_key=binding_key)
20+
channel.queue_bind(
21+
exchange='topic_logs', queue=queue_name, routing_key=binding_key)
2422

2523
print(' [*] Waiting for logs. To exit press CTRL+C')
2624

25+
2726
def callback(ch, method, properties, body):
2827
print(" [x] %r:%r" % (method.routing_key, body))
2928

30-
channel.basic_consume(callback,
31-
queue=queue_name,
32-
no_ack=True)
29+
30+
channel.basic_consume(
31+
queue=queue_name, on_message_callback=callback, auto_ack=True)
3332

3433
channel.start_consuming()

python/rpc_client.py

+18-12
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,22 @@
22
import pika
33
import uuid
44

5+
56
class FibonacciRpcClient(object):
7+
68
def __init__(self):
7-
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
8-
host='localhost'))
9+
self.connection = pika.BlockingConnection(
10+
pika.ConnectionParameters(host='localhost'))
911

1012
self.channel = self.connection.channel()
1113

12-
result = self.channel.queue_declare(exclusive=True)
14+
result = self.channel.queue_declare('', exclusive=True)
1315
self.callback_queue = result.method.queue
1416

15-
self.channel.basic_consume(self.on_response, no_ack=True,
16-
queue=self.callback_queue)
17+
self.channel.basic_consume(
18+
queue=self.callback_queue,
19+
on_message_callback=self.on_response,
20+
auto_ack=True)
1721

1822
def on_response(self, ch, method, props, body):
1923
if self.corr_id == props.correlation_id:
@@ -22,17 +26,19 @@ def on_response(self, ch, method, props, body):
2226
def call(self, n):
2327
self.response = None
2428
self.corr_id = str(uuid.uuid4())
25-
self.channel.basic_publish(exchange='',
26-
routing_key='rpc_queue',
27-
properties=pika.BasicProperties(
28-
reply_to = self.callback_queue,
29-
correlation_id = self.corr_id,
30-
),
31-
body=str(n))
29+
self.channel.basic_publish(
30+
exchange='',
31+
routing_key='rpc_queue',
32+
properties=pika.BasicProperties(
33+
reply_to=self.callback_queue,
34+
correlation_id=self.corr_id,
35+
),
36+
body=str(n))
3237
while self.response is None:
3338
self.connection.process_data_events()
3439
return int(self.response)
3540

41+
3642
fibonacci_rpc = FibonacciRpcClient()
3743

3844
print(" [x] Requesting fib(30)")

python/rpc_server.py

+8-5
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
#!/usr/bin/env python
22
import pika
33

4-
connection = pika.BlockingConnection(pika.ConnectionParameters(
5-
host='localhost'))
4+
connection = pika.BlockingConnection(
5+
pika.ConnectionParameters(host='localhost'))
66

77
channel = connection.channel()
88

99
channel.queue_declare(queue='rpc_queue')
1010

11+
1112
def fib(n):
1213
if n == 0:
1314
return 0
1415
elif n == 1:
1516
return 1
1617
else:
17-
return fib(n-1) + fib(n-2)
18+
return fib(n - 1) + fib(n - 2)
19+
1820

1921
def on_request(ch, method, props, body):
2022
n = int(body)
@@ -27,10 +29,11 @@ def on_request(ch, method, props, body):
2729
properties=pika.BasicProperties(correlation_id = \
2830
props.correlation_id),
2931
body=str(response))
30-
ch.basic_ack(delivery_tag = method.delivery_tag)
32+
ch.basic_ack(delivery_tag=method.delivery_tag)
33+
3134

3235
channel.basic_qos(prefetch_count=1)
33-
channel.basic_consume(on_request, queue='rpc_queue')
36+
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
3437

3538
print(" [x] Awaiting RPC requests")
3639
channel.start_consuming()

python/send.py

+3-6
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
#!/usr/bin/env python
22
import pika
33

4-
connection = pika.BlockingConnection(pika.ConnectionParameters(
5-
host='localhost'))
4+
connection = pika.BlockingConnection(
5+
pika.ConnectionParameters(host='localhost'))
66
channel = connection.channel()
77

8-
98
channel.queue_declare(queue='hello')
109

11-
channel.basic_publish(exchange='',
12-
routing_key='hello',
13-
body='Hello World!')
10+
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
1411
print(" [x] Sent 'Hello World!'")
1512
connection.close()

0 commit comments

Comments
 (0)