Skip to content

Commit 4cfb362

Browse files
authored
Add amqp091-go example (#96)
* Add amqp091-go example * Add oauth producer and consumer * Refactor project * Update README * Update README * Update readme file * Update readme * Update readme * Update readme file, add vhost * Add plain example and fix example
1 parent 30a1820 commit 4cfb362

File tree

12 files changed

+607
-0
lines changed

12 files changed

+607
-0
lines changed

aop/amqp091-go/README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Overview
2+
3+
The `amqp091-go` is a AMQP client written by RabbitMQ.
4+
5+
# Prerequisites
6+
7+
- Go 1.18 or higher version
8+
9+
# Running steps
10+
11+
**Please first create the namespace, which will be configured to vhost**
12+
13+
1. Clone example project
14+
15+
```
16+
git clone https://github.com/streamnative/examples.git
17+
```
18+
19+
2. Download dependencies
20+
21+
```
22+
cd examples/aop/amqp091-go
23+
go mod tidy
24+
```
25+
26+
3. Configure endpoint, audience, keyfile and vhost on producer/oauth-producer.go and consumer/oauth-consumer.go
27+
28+
29+
4. Build and run example
30+
31+
```
32+
go build producer/oauth-producer.go
33+
go build consumer/oauth-consumer.go
34+
./oauth-consumer
35+
./oauth-producer
36+
```
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
7+
"github.com/apache/pulsar-client-go/oauth2"
8+
amqp "github.com/rabbitmq/amqp091-go"
9+
amqpauth "github.com/streamnative/aop-amqp091-auth-go"
10+
)
11+
12+
func main() {
13+
endpoint := "amqp://your-host-cluster:5671"
14+
15+
oauth2Authentication, err := amqpauth.NewOAuth2Authentication(
16+
amqpauth.ClientCredentialsFlowOptions{
17+
ClientCredentialsFlowOptions: oauth2.ClientCredentialsFlowOptions{
18+
// your key file from here
19+
// https://docs.streamnative.io/cloud/stable/managed-access/service-account#work-with-a-service-account-through-streamnative-cloud-manager
20+
KeyFile: "/your-key-file-path.json",
21+
},
22+
// your audience from streamnative cloud console ui
23+
Audience: "your-audience",
24+
})
25+
saslConfigs := []amqp.Authentication{oauth2Authentication}
26+
conn, err := amqp.DialConfig(endpoint, amqp.Config{
27+
SASL: saslConfigs,
28+
Vhost: "vhost2",
29+
})
30+
if err != nil {
31+
log.Fatalf("failed to open connection: %v", err)
32+
}
33+
34+
channel, err := conn.Channel()
35+
if err != nil {
36+
log.Fatalf("failed to open channel: %v", err)
37+
}
38+
39+
exchange := "exchange-1"
40+
if err = channel.ExchangeDeclare(
41+
exchange,
42+
"fanout",
43+
true,
44+
false,
45+
false,
46+
false,
47+
nil,
48+
); err != nil {
49+
log.Fatalf("ExchangeDeclare: %v", err)
50+
}
51+
52+
queueName := "topic-1"
53+
queue, err := channel.QueueDeclare(
54+
queueName,
55+
true,
56+
false,
57+
false,
58+
false,
59+
nil,
60+
)
61+
if err != nil {
62+
log.Fatalf("QueueDeclare: %v", err)
63+
}
64+
65+
if err = channel.QueueBind(
66+
queue.Name,
67+
"",
68+
exchange,
69+
false,
70+
nil,
71+
); err != nil {
72+
log.Fatalf("QueueBind: %v", err)
73+
}
74+
75+
deliveries, err := channel.Consume(
76+
queue.Name,
77+
"",
78+
false,
79+
false,
80+
false,
81+
false,
82+
nil,
83+
)
84+
85+
for delivery := range deliveries {
86+
fmt.Printf("received a message: %s", delivery.Body)
87+
_ = delivery.Ack(false)
88+
}
89+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
7+
amqp "github.com/rabbitmq/amqp091-go"
8+
)
9+
10+
func main() {
11+
endpoint := "amqp://user:password@your-host-cluster:5671/vhost2"
12+
13+
conn, err := amqp.Dial(endpoint)
14+
if err != nil {
15+
log.Fatalf("failed to open connection: %v", err)
16+
}
17+
18+
channel, err := conn.Channel()
19+
if err != nil {
20+
log.Fatalf("failed to open channel: %v", err)
21+
}
22+
23+
exchange := "exchange-1"
24+
if err = channel.ExchangeDeclare(
25+
exchange,
26+
"fanout",
27+
true,
28+
false,
29+
false,
30+
false,
31+
nil,
32+
); err != nil {
33+
log.Fatalf("ExchangeDeclare: %v", err)
34+
}
35+
36+
queueName := "topic-1"
37+
queue, err := channel.QueueDeclare(
38+
queueName,
39+
true,
40+
false,
41+
false,
42+
false,
43+
nil,
44+
)
45+
if err != nil {
46+
log.Fatalf("QueueDeclare: %v", err)
47+
}
48+
49+
if err = channel.QueueBind(
50+
queue.Name,
51+
"",
52+
exchange,
53+
false,
54+
nil,
55+
); err != nil {
56+
log.Fatalf("QueueBind: %v", err)
57+
}
58+
59+
deliveries, err := channel.Consume(
60+
queue.Name,
61+
"",
62+
false,
63+
false,
64+
false,
65+
false,
66+
nil,
67+
)
68+
69+
for delivery := range deliveries {
70+
fmt.Printf("received a message: %s", delivery.Body)
71+
_ = delivery.Ack(false)
72+
}
73+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package main
2+
3+
import (
4+
"log"
5+
6+
"github.com/apache/pulsar-client-go/oauth2"
7+
amqp "github.com/rabbitmq/amqp091-go"
8+
amqpauth "github.com/streamnative/aop-amqp091-auth-go"
9+
)
10+
11+
func main() {
12+
endpoint := "amqp://localhost:15002"
13+
14+
oauth2Authentication, err := amqpauth.NewOAuth2Authentication(amqpauth.ClientCredentialsFlowOptions{
15+
ClientCredentialsFlowOptions: oauth2.ClientCredentialsFlowOptions{
16+
KeyFile: "./oauth2/client-credentials.json",
17+
},
18+
Audience: "your-audience",
19+
})
20+
if err != nil {
21+
log.Fatalf("NewOAuth2Authentication: %v", err)
22+
}
23+
24+
saslConfigs := []amqp.Authentication{oauth2Authentication}
25+
connection, err := amqp.DialConfig(endpoint,
26+
amqp.Config{
27+
SASL: saslConfigs,
28+
Vhost: "vhost1",
29+
})
30+
if err != nil {
31+
log.Fatalf("failed to open connection: %v", err)
32+
}
33+
defer connection.Close()
34+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package main
2+
3+
import (
4+
amqp "github.com/rabbitmq/amqp091-go"
5+
)
6+
7+
func main() {
8+
endpoint := "amqp://user:password@your-host-cluster:5671/vhost2"
9+
10+
connection, err := amqp.Dial(endpoint)
11+
if err != nil {
12+
panic(err)
13+
}
14+
defer connection.Close()
15+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package main
2+
3+
import (
4+
"log"
5+
6+
amqp "github.com/rabbitmq/amqp091-go"
7+
amqpauth "github.com/streamnative/aop-amqp091-auth-go"
8+
)
9+
10+
func main() {
11+
endpoint := "amqp://localhost:15002"
12+
13+
token := "your-token"
14+
tokenAuthentication, err := amqpauth.NewTokenAuthentication(token)
15+
if err != nil {
16+
log.Fatalf("NewTokenAuthentication: %v", err)
17+
}
18+
19+
saslConfigs := []amqp.Authentication{tokenAuthentication}
20+
connection, err := amqp.DialConfig(endpoint,
21+
amqp.Config{
22+
SASL: saslConfigs,
23+
Vhost: "vhost1",
24+
})
25+
if err != nil {
26+
panic(err)
27+
}
28+
defer connection.Close()
29+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
7+
amqp "github.com/rabbitmq/amqp091-go"
8+
)
9+
10+
func main() {
11+
endpoint := "amqp://localhost:15002"
12+
conn, err := amqp.DialConfig(endpoint, amqp.Config{
13+
SASL: nil,
14+
Vhost: "vhost1",
15+
})
16+
if err != nil {
17+
log.Fatalf("failed to open connection: %v", err)
18+
}
19+
20+
channel, err := conn.Channel()
21+
if err != nil {
22+
log.Fatalf("failed to open channel: %v", err)
23+
}
24+
25+
exchange := "exchange-1"
26+
if err = channel.ExchangeDeclare(
27+
exchange,
28+
"direct",
29+
true,
30+
false,
31+
false,
32+
false,
33+
nil,
34+
); err != nil {
35+
log.Fatalf("ExchangeDeclare: %v", err)
36+
}
37+
38+
queueName := "topic-1"
39+
queue, err := channel.QueueDeclare(
40+
queueName,
41+
true,
42+
false,
43+
false,
44+
false,
45+
nil,
46+
)
47+
if err != nil {
48+
log.Fatalf("QueueDeclare: %v", err)
49+
}
50+
51+
if err = channel.QueueBind(
52+
queue.Name,
53+
"",
54+
exchange,
55+
false,
56+
nil,
57+
); err != nil {
58+
log.Fatalf("QueueBind: %v", err)
59+
}
60+
61+
deliveries, err := channel.Consume(
62+
queue.Name,
63+
"",
64+
false,
65+
false,
66+
false,
67+
false,
68+
nil,
69+
)
70+
71+
for delivery := range deliveries {
72+
fmt.Printf("received a message: %s", delivery.Body)
73+
_ = delivery.Ack(false)
74+
}
75+
}

aop/amqp091-go/go.mod

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
module github.com/streamnative/examples/aop/amqp091-go
2+
3+
go 1.18
4+
5+
require (
6+
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20220707015503-ea6eccf7ddad
7+
github.com/rabbitmq/amqp091-go v1.3.4
8+
github.com/streamnative/aop-amqp091-auth-go v0.0.0-20220706135027-3352980e769b
9+
)
10+
11+
require (
12+
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
13+
github.com/golang/protobuf v1.5.2 // indirect
14+
github.com/pkg/errors v0.9.1 // indirect
15+
golang.org/x/net v0.0.0-20220706163947-c90051bbdb60 // indirect
16+
golang.org/x/oauth2 v0.0.0-20220630143837-2104d58473e0 // indirect
17+
google.golang.org/appengine v1.6.7 // indirect
18+
google.golang.org/protobuf v1.28.0 // indirect
19+
)

0 commit comments

Comments
 (0)