Skip to content

Commit 381ddf8

Browse files
Merge branch 'master' of https://github.com/Matrixbirds/rabbitmq-tutorials into Matrixbirds-master
2 parents 589e5e9 + 48e2c0a commit 381ddf8

16 files changed

+491
-0
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,4 @@ target/
3131
.vs/
3232

3333
*.log
34+
.packages

dart/README.md

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Dart code for RabbitMQ tutorials
2+
3+
Here you can find an [Dart](https://www.dartlang.org/) port of
4+
[RabbitMQ tutorials](http://www.rabbitmq.com/getstarted.html).
5+
6+
7+
## Requirements
8+
9+
To run this code you need a [Dart 2 server platform installed](https://www.dartlang.org/tools/sdk#install)
10+
11+
### Dart 2.0+
12+
13+
These tutorials use [dart_amqp](https://github.com/achilleasa/dart_amqp).
14+
15+
To install dependencies with pub, run:
16+
17+
pub get
18+
19+
## Code
20+
21+
To run the examples, use `dart source_file.dart`.
22+
23+
Tutorial one: "Hello World!":
24+
25+
dart receive.dart
26+
dart send.dart
27+
28+
Tutorial two: Work Queues
29+
30+
dart worker.dart
31+
dart new_task.dart
32+
33+
Tutorial three: Publish/Subscribe
34+
35+
dart receive_logs.dart
36+
dart emit_log.dart
37+
38+
Tutorial four: Routing
39+
40+
dart receive_logs_direct.dart info warning
41+
dart emit_log_direct.dart info "A message"
42+
dart emit_log_direct.dart warning "A message"
43+
44+
Tutorial five: Topics
45+
46+
dart receive_logs_topic.dart "info.*" "warn.*"
47+
dart emit_log_topic.dart "info.connections" "Connected"
48+
dart emit_log_topic.dart "info.connections" "Connected"
49+
50+
Tutorial six: RPC (Request/Response)
51+
52+
dart rpc_server.dart
53+
dart rpc_client.dart

dart/emit_log.dart

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import "package:dart_amqp/dart_amqp.dart";
2+
3+
void main (List<String> arguments) {
4+
ConnectionSettings settings = new ConnectionSettings(
5+
host: "localhost"
6+
);
7+
8+
Client client = new Client(settings: settings);
9+
10+
String msg = arguments.isEmpty ? "Hello World!" : arguments[0];
11+
12+
client
13+
.channel()
14+
.then((Channel channel) =>
15+
channel.exchange("logs", ExchangeType.FANOUT, durable: false))
16+
.then((Exchange exchange) {
17+
exchange.publish(msg, null);
18+
print(" [x] Sent ${msg}");
19+
return client.close();
20+
});
21+
}

dart/emit_log_direct.dart

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import "package:dart_amqp/dart_amqp.dart";
2+
3+
void main(List<String> arguments) {
4+
ConnectionSettings settings = new ConnectionSettings(
5+
host: "localhost"
6+
);
7+
8+
Client client = new Client(settings: settings);
9+
10+
String routingKey = arguments.length < 1 ? "info" : arguments[0];
11+
String msg = arguments.length < 2 ? "Hello World!" : arguments[1];
12+
13+
client
14+
.channel()
15+
.then((Channel channel) =>
16+
channel.exchange("direct_logs", ExchangeType.DIRECT,
17+
durable: false))
18+
.then((Exchange exchange) {
19+
exchange.publish(msg, routingKey);
20+
print(" [x] Sent ${routingKey}: ${msg}");
21+
return client.close();
22+
});
23+
}

dart/emit_log_topic.dart

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import "package:dart_amqp/dart_amqp.dart";
2+
3+
void main(List<String> arguments) {
4+
ConnectionSettings settings = new ConnectionSettings(
5+
host: "localhost"
6+
);
7+
8+
Client client = new Client(settings: settings);
9+
10+
String routingKey = arguments.length < 1 ? "anonymous.info" : arguments[0];
11+
String msg = arguments.length < 2 ? "Hello World!" : arguments[1];
12+
13+
client
14+
.channel()
15+
.then((Channel channel) =>
16+
channel.exchange("topic_logs", ExchangeType.TOPIC,
17+
durable: false))
18+
.then((Exchange exchange) {
19+
exchange.publish(msg, routingKey);
20+
print(" [x] Sent ${routingKey}: ${msg}");
21+
return client.close();
22+
});
23+
}

dart/new_task.dart

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import "package:dart_amqp/dart_amqp.dart";
2+
3+
void main(List<String> arguments) {
4+
ConnectionSettings settings = new ConnectionSettings(
5+
host: "localhost"
6+
);
7+
8+
Client client = new Client(settings: settings);
9+
10+
String consumeTag = "task_queue";
11+
String msg = arguments.isEmpty ? "Hello World!" : arguments[0];
12+
13+
MessageProperties properties = new MessageProperties.persistentMessage();
14+
15+
client
16+
.channel()
17+
.then((Channel channel) =>
18+
channel.queue(consumeTag, durable: true))
19+
.then((Queue queue) {
20+
queue.publish(msg, properties: properties);
21+
print(" [x] Sent ${msg}");
22+
return client.close();
23+
});
24+
}

dart/pubspec.lock

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Generated by pub
2+
# See https://www.dartlang.org/tools/pub/glossary#lockfile
3+
packages:
4+
dart_amqp:
5+
dependency: "direct main"
6+
description:
7+
name: dart_amqp
8+
url: "https://pub.dartlang.org"
9+
source: hosted
10+
version: "0.1.1"
11+
logging:
12+
dependency: transitive
13+
description:
14+
name: logging
15+
url: "https://pub.dartlang.org"
16+
source: hosted
17+
version: "0.11.3+2"
18+
sdks:
19+
dart: ">=2.0.0-dev <3.0.0"

dart/pubspec.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
name: dart_rabbitmq_example
2+
dependencies:
3+
dart_amqp: 0.1.1

dart/receive.dart

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import "dart:io";
2+
import "package:dart_amqp/dart_amqp.dart";
3+
4+
void main (List<String> arguments) {
5+
ConnectionSettings settings = new ConnectionSettings(
6+
host: "localhost"
7+
);
8+
9+
Client client = new Client(settings: settings);
10+
11+
ProcessSignal.sigint.watch().listen((_) {
12+
client.close().then((_) {
13+
print("close client");
14+
exit(0);
15+
});
16+
});
17+
18+
String msg = arguments.isEmpty ? "Hello World!": arguments[0];
19+
20+
String queueTag = "hello";
21+
22+
client
23+
.channel()
24+
.then((Channel channel) => channel.queue(queueTag, durable: false))
25+
.then((Queue queue) {
26+
print(" [*] Waiting for messages in ${queueTag}. To Exit press CTRL+C");
27+
return queue.consume(consumerTag: queueTag, noAck: true);
28+
})
29+
.then((Consumer consumer) {
30+
consumer.listen((AmqpMessage event) {
31+
print(" [x] Received ${event.payloadAsString}");
32+
});
33+
});
34+
}

dart/receive_logs.dart

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import "dart:io";
2+
import "package:dart_amqp/dart_amqp.dart";
3+
4+
void main (List<String> arguments) {
5+
ConnectionSettings settings = new ConnectionSettings(
6+
host: "localhost"
7+
);
8+
9+
Client client = new Client(settings: settings);
10+
11+
ProcessSignal.sigint.watch().listen((_) {
12+
client.close().then((_) {
13+
print("close client");
14+
exit(0);
15+
});
16+
});
17+
18+
String msg = arguments.isEmpty ? "Hello World!": arguments[0];
19+
20+
client
21+
.channel()
22+
.then((Channel channel) {
23+
return channel.exchange("logs", ExchangeType.FANOUT, durable: false);
24+
})
25+
.then((Exchange exchange) {
26+
print(" [*] Waiting for messages in logs. To Exit press CTRL+C");
27+
return exchange.bindPrivateQueueConsumer(null);
28+
})
29+
.then((Consumer consumer) {
30+
consumer.listen((AmqpMessage event) {
31+
print(" [x] Received ${event.payloadAsString}");
32+
});
33+
});
34+
}

dart/receive_logs_direct.dart

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import "dart:io";
2+
import "package:dart_amqp/dart_amqp.dart";
3+
4+
void main (List<String> arguments) {
5+
if (arguments.isEmpty) {
6+
print("Usage: receive_logs_direct.dart [info] [warning] [error]");
7+
return;
8+
}
9+
10+
ConnectionSettings settings = new ConnectionSettings(
11+
host: "localhost"
12+
);
13+
14+
Client client = new Client(settings: settings);
15+
16+
ProcessSignal.sigint.watch().listen((_) {
17+
client.close().then((_) {
18+
print("close client");
19+
exit(0);
20+
});
21+
});
22+
23+
List<String> routingKeys = arguments.sublist(0, 2);
24+
client
25+
.channel()
26+
.then((Channel channel) {
27+
return channel.exchange("direct_logs", ExchangeType.DIRECT, durable: false);
28+
})
29+
.then((Exchange exchange) {
30+
print(" [*] Waiting for messages in logs. To Exit press CTRL+C");
31+
return exchange.bindPrivateQueueConsumer(routingKeys,
32+
consumerTag: "direct_logs", noAck: true
33+
);
34+
})
35+
.then((Consumer consumer) {
36+
consumer.listen((AmqpMessage event) {
37+
print(" [x] ${event.routingKey}:'${event.payloadAsString}'");
38+
});
39+
});
40+
}

dart/receive_logs_topic.dart

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import "dart:io";
2+
import "package:dart_amqp/dart_amqp.dart";
3+
4+
void main (List<String> arguments) {
5+
if (arguments.isEmpty) {
6+
print("Usage: receive_logs_direct.dart <facility>.<routingKey>");
7+
return;
8+
}
9+
10+
ConnectionSettings settings = new ConnectionSettings(
11+
host: "localhost"
12+
);
13+
14+
Client client = new Client(settings: settings);
15+
16+
ProcessSignal.sigint.watch().listen((_) {
17+
client.close().then((_) {
18+
print("close client");
19+
exit(0);
20+
});
21+
});
22+
23+
List<String> routingKeys = arguments.sublist(0, 2);
24+
client
25+
.channel()
26+
.then((Channel channel) {
27+
return channel.exchange("topic_logs", ExchangeType.TOPIC, durable: false);
28+
})
29+
.then((Exchange exchange) {
30+
print(" [*] Waiting for messages in logs. To Exit press CTRL+C");
31+
return exchange.bindPrivateQueueConsumer(routingKeys,
32+
consumerTag: "topic_logs", noAck: true
33+
);
34+
})
35+
.then((Consumer consumer) {
36+
consumer.listen((AmqpMessage event) {
37+
print(" [x] ${event.routingKey}:'${event.payloadAsString}'");
38+
});
39+
});
40+
}

0 commit comments

Comments
 (0)