Skip to content

Commit b79f117

Browse files
committed
Add julia RabbitMQ examples
1 parent 22829e9 commit b79f117

13 files changed

+594
-0
lines changed

julia/README.md

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Julia code for RabbitMQ tutorials
2+
3+
Here you can find Julia code examples from [RabbitMQ
4+
tutorials](https://www.rabbitmq.com/getstarted.html).
5+
6+
To successfully use the examples you will need a running RabbitMQ server.
7+
8+
## Requirements
9+
10+
To run this code you need to install the `AMQPClient` and `JSON`. To install it, run
11+
12+
``` julia
13+
julia> using Pkg
14+
15+
julia> Pkg.add("AMQPClient")
16+
Updating registry at `~/.julia/registries/General`
17+
Resolving package versions...
18+
No Changes to `~/.julia/environments/v1.6/Project.toml`
19+
No Changes to `~/.julia/environments/v1.6/Manifest.toml`
20+
21+
julia> Pkg.add("JSON")
22+
Resolving package versions...
23+
No Changes to `~/.julia/environments/v1.6/Project.toml`
24+
No Changes to `~/.julia/environments/v1.6/Manifest.toml`
25+
26+
```
27+
28+
## Code
29+
30+
Tutorial one: "Hello World!"
31+
32+
julia send.jl
33+
julia receive.jl
34+
35+
36+
Tutorial two: Work Queues:
37+
38+
julia new_task.jl "A very hard task which takes two seconds.."
39+
julia worker.jl
40+
41+
42+
Tutorial three: Publish/Subscribe
43+
44+
julia receive_logs.jl
45+
julia emit_log.jl "info: This is the log message"
46+
47+
48+
Tutorial four: Routing
49+
50+
julia receive_logs_direct.jl info
51+
julia emit_log_direct.jl info "The message"
52+
53+
54+
Tutorial five: Topics
55+
56+
julia receive_logs_topic.jl "*.rabbit"
57+
julia emit_log_topic.jl red.rabbit Hello
58+
59+
60+
Tutorial six: RPC
61+
62+
julia rpc_server.jl
63+
julia rpc_client.jl

julia/emit_log.jl

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
using AMQPClient
2+
const VIRTUALHOST = "/"
3+
const HOST = "127.0.0.1"
4+
5+
function send()
6+
# 1. Create a connection to the localhost or 127.0.0.1 of virtualhost '/'
7+
connection(; virtualhost=VIRTUALHOST, host=HOST) do conn
8+
# 2. Create a channel to send messages
9+
channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan
10+
exchange = "logs"
11+
# 3. Declare the exchange
12+
exchange_declare(chan, exchange, EXCHANGE_TYPE_FANOUT)
13+
if length(Base.ARGS) >= 1
14+
received = join(Base.ARGS, ' ')
15+
else
16+
received = "info: Hello World"
17+
end
18+
19+
data = convert(Vector{UInt8}, codeunits(received))
20+
msg = Message(data, content_type="text/plain", delivery_mode=PERSISTENT)
21+
22+
# 4. Publish message
23+
basic_publish(chan, msg; exchange=exchange, routing_key="")
24+
println("Message sent: $received")
25+
end
26+
end
27+
end
28+
29+
send()

julia/emit_log_direct.jl

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using AMQPClient
2+
const VIRTUALHOST = "/"
3+
const HOST = "127.0.0.1"
4+
5+
6+
function send()
7+
# 1. Create a connection to the localhost or 127.0.0.1 of virtualhost '/'
8+
connection(; virtualhost=VIRTUALHOST, host=HOST) do conn
9+
# 2. Create a channel to send messages
10+
channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan
11+
# 3. Declare exchange
12+
exchange = "direct_logs"
13+
exchange_declare(chan, exchange, EXCHANGE_TYPE_DIRECT)
14+
# 4. Get severity and message
15+
if length(Base.ARGS) >= 3
16+
severity = Base.ARGS[1]
17+
received = join(Base.ARGS[2:end], ' ')
18+
else
19+
severity = "info"
20+
received = "Hello World"
21+
end
22+
data = convert(Vector{UInt8}, codeunits(received))
23+
msg = Message(data, content_type="text/plain", delivery_mode=PERSISTENT)
24+
# 5. Publish the message
25+
basic_publish(chan, msg; exchange=exchange, routing_key=severity)
26+
println("Message sent: $received, Severity: $severity")
27+
end
28+
end
29+
end
30+
31+
send()

julia/emit_log_topic.jl

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using AMQPClient
2+
const VIRTUALHOST = "/"
3+
const HOST = "127.0.0.1"
4+
5+
function send()
6+
# 1. Create a connection to the localhost or 127.0.0.1 of virtualhost '/'
7+
connection(; virtualhost=VIRTUALHOST, host=HOST) do conn
8+
# 2. Create a channel to send messages
9+
channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan
10+
# 3. Declare Exchange
11+
exchange = "topic_logs"
12+
exchange_topic = "topic"
13+
exchange_declare(chan, exchange, EXCHANGE_TYPE_TOPIC)
14+
# 4. Get input data
15+
if length(Base.ARGS) >= 2
16+
routing_key = Base.ARGS[1]
17+
received = join(Base.ARGS[2:end], ' ')
18+
else
19+
routing_key = "info"
20+
received = "Hello World"
21+
end
22+
# 5. Prepare and send data
23+
data = convert(Vector{UInt8}, codeunits(received))
24+
msg = Message(data, content_type="text/plain", delivery_mode=PERSISTENT)
25+
basic_publish(chan, msg; exchange=exchange, routing_key=routing_key)
26+
println("Message sent: $received, routing key: $routing_key")
27+
end
28+
end
29+
end
30+
31+
send()

julia/new_task.jl

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
using AMQPClient
2+
const VIRTUALHOST = "/"
3+
const HOST = "127.0.0.1"
4+
5+
function send()
6+
# 1. Create a connection to the localhost or 127.0.0.1 of virtualhost '/'
7+
connection(; virtualhost=VIRTUALHOST, host=HOST) do conn
8+
# 2. Create a channel to send messages
9+
channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan
10+
queue = "task_queue"
11+
# 3. Configure the queue
12+
success, queue_name, message_count, consumer_count = queue_declare(chan, queue, durable=true)
13+
14+
# 4. Prepare the message text
15+
if length(Base.ARGS) >= 1
16+
received = Base.ARGS[1]
17+
else
18+
received = "Hello World"
19+
end
20+
21+
# 5. Prepare the payload
22+
data = convert(Vector{UInt8}, codeunits(received))
23+
msg = Message(data, content_type="text/plain", delivery_mode=PERSISTENT)
24+
25+
# 6. Send the payload
26+
basic_publish(chan, msg; exchange="", routing_key=queue)
27+
println("Message sent: $received")
28+
end
29+
end
30+
end
31+
32+
send()

julia/receive.jl

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
using AMQPClient
2+
const VIRTUALHOST = "/"
3+
const HOST = "127.0.0.1"
4+
5+
6+
function receive()
7+
# 1. Create a connection to the localhost or 127.0.0.1 of virtualhost '/'
8+
connection(; virtualhost=VIRTUALHOST, host=HOST) do conn
9+
println("Connection established")
10+
# 2. Create a channel to send messages
11+
channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan
12+
# 3. Declare a queue
13+
println("Channel created")
14+
queue = "hello"
15+
success, queue_name, message_count, consumer_count = queue_declare(chan, queue)
16+
17+
# 4. Setup to receive
18+
on_receive = (msg) -> begin
19+
# 4.1 Receive message is Vector{UInt8}
20+
data = String(msg.data)
21+
println("Received the message: $data")
22+
# 4.2 Acknowledge the message
23+
basic_ack(chan, msg.delivery_tag)
24+
end
25+
26+
success, consumer_tag = basic_consume(chan, queue, on_receive)
27+
@assert success == true
28+
29+
# 5. Run for-ever
30+
# listen to new messages
31+
while true
32+
sleep(1)
33+
end
34+
end
35+
end
36+
end
37+
38+
# Don't exit on Ctrl-C
39+
Base.exit_on_sigint(false)
40+
try
41+
receive()
42+
catch ex
43+
if ex isa InterruptException
44+
println("Interrupted")
45+
else
46+
println("Exception: $ex")
47+
end
48+
end

julia/receive_logs.jl

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using AMQPClient
2+
const VIRTUALHOST = "/"
3+
const HOST = "127.0.0.1"
4+
5+
6+
function receive()
7+
# 1. Create a connection to the localhost or 127.0.0.1 of virtualhost '/'
8+
connection(; virtualhost=VIRTUALHOST, host=HOST) do conn
9+
# 2. Create a channel to send messages
10+
channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan
11+
# 3. Declare a exchange
12+
exchange = "logs"
13+
exchange_declare(chan, exchange, EXCHANGE_TYPE_FANOUT)
14+
result, queue, _, _ = queue_declare(chan, "", durable=true)
15+
# 4. Bind the queue
16+
queue_bind(chan, queue, exchange, EXCHANGE_TYPE_FANOUT)
17+
18+
println(" [*] Waiting for messages. To exit press CTRL+C")
19+
on_receive = (msg) -> begin
20+
data = String(msg.data)
21+
println("Received the message: $data")
22+
basic_ack(chan, msg.delivery_tag)
23+
end
24+
25+
success, consumer_tag = basic_consume(chan, queue, on_receive)
26+
27+
while true
28+
sleep(1)
29+
end
30+
end
31+
end
32+
end
33+
34+
# Don't exit on Ctrl-C
35+
Base.exit_on_sigint(false)
36+
try
37+
receive()
38+
catch ex
39+
if ex isa InterruptException
40+
println("Interrupted")
41+
else
42+
println("Exception: $ex")
43+
end
44+
end

julia/receive_logs_direct.jl

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using AMQPClient
2+
const VIRTUALHOST = "/"
3+
const HOST = "127.0.0.1"
4+
5+
6+
function receive()
7+
# 1. Create a connection to the localhost or 127.0.0.1 of virtualhost '/'
8+
connection(; virtualhost=VIRTUALHOST, host=HOST) do conn
9+
# 2. Create a channel to send messages
10+
channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan
11+
# 3. Declare a exchange
12+
exchange = "direct_logs"
13+
exchange_declare(chan, exchange, EXCHANGE_TYPE_DIRECT)
14+
15+
result, queue_name, _, _ = queue_declare(chan, "", exclusive=true)
16+
17+
# 4. Receive queues to bind
18+
if length(Base.ARGS) <= 0
19+
println(Base.stdout, "Usage: [info] [warning] [error]\n")
20+
Base.exit(1)
21+
end
22+
23+
# 4.1 Bind queues
24+
for severity in Base.ARGS[1:end]
25+
queue_bind(chan, queue_name, exchange,
26+
severity)
27+
end
28+
29+
println(" [*] Waiting for messages. To exit press CTRL+C")
30+
on_receive = (msg) -> begin
31+
data = String(msg.data)
32+
println("Received the message: $data")
33+
basic_ack(chan, msg.delivery_tag)
34+
end
35+
36+
success, consumer_tag = basic_consume(chan, queue_name, on_receive)
37+
38+
while true
39+
sleep(1)
40+
end
41+
end
42+
end
43+
end
44+
45+
# Don't exit on Ctrl-C
46+
Base.exit_on_sigint(false)
47+
try
48+
receive()
49+
catch ex
50+
if ex isa InterruptException
51+
println("Interrupted")
52+
else
53+
println("Exception: $ex")
54+
end
55+
end

julia/receive_logs_topic.jl

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
using AMQPClient
2+
const VIRTUALHOST = "/"
3+
const HOST = "127.0.0.1"
4+
5+
6+
function receive()
7+
# 1. Create a connection to the localhost or 127.0.0.1 of virtualhost '/'
8+
connection(; virtualhost=VIRTUALHOST, host=HOST) do conn
9+
# 2. Create a channel to send messages
10+
channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan
11+
# 3. Declare a exchange and queue
12+
exchange = "topic_logs"
13+
exchange_declare(chan, exchange, EXCHANGE_TYPE_TOPIC)
14+
result, queue_name, _, _ = queue_declare(chan, "", exclusive=true)
15+
16+
if length(Base.ARGS) <= 0
17+
println(Base.stdout, "Usage: [binding_key] \n")
18+
Base.exit(1)
19+
end
20+
21+
# 3.1 Bind all queues
22+
for binding_key in Base.ARGS[1:end]
23+
queue_bind(chan, queue_name, exchange,
24+
binding_key)
25+
end
26+
27+
println(" [*] Waiting for messages. To exit press CTRL+C")
28+
29+
# 4. Receive messages
30+
on_receive = (msg) -> begin
31+
data = String(msg.data)
32+
println("Received the message: $data")
33+
basic_ack(chan, msg.delivery_tag)
34+
end
35+
36+
success, consumer_tag = basic_consume(chan, queue_name, on_receive)
37+
38+
while true
39+
sleep(1)
40+
end
41+
end
42+
end
43+
end
44+
45+
46+
# Don't exit on Ctrl-C
47+
Base.exit_on_sigint(false)
48+
try
49+
receive()
50+
catch ex
51+
if ex isa InterruptException
52+
println("Interrupted")
53+
else
54+
println("Exception: $ex")
55+
end
56+
end

0 commit comments

Comments
 (0)