1
1
import { decryptPersonalInfo } from '@proca/crypto'
2
- import amqplib from 'amqplib '
2
+ import Connection from 'rabbitmq-client '
3
3
import LineByLine from 'line-by-line'
4
4
export { ActionMessage , ActionMessageV2 , ProcessStage } from './actionMessage'
5
5
6
- import {
7
- ActionMessage ,
8
- actionMessageV1to2
9
- } from './actionMessage'
10
-
11
6
import { QueueOpts , SyncCallback } from './types'
12
7
13
8
@@ -20,19 +15,83 @@ const pause = (time = 1) => { //by default, wait 1 sec
20
15
}
21
16
22
17
export async function testQueue ( queueUrl : string , queueName : string ) {
23
- const conn = await connect ( queueUrl )
24
- const ch = await conn . createChannel ( )
25
- try {
26
- const status = await ch . checkQueue ( queueName )
27
- return status
28
- } finally {
29
- ch . close ( )
30
- conn . close ( )
31
- }
18
+ const rabbit = new Connection ( {
19
+ url : queueUrl ,
20
+ // wait 1 to 30 seconds between connection retries
21
+ retryLow : 1000 ,
22
+ retryHigh : 30000 ,
23
+ } )
24
+ const ch = await rabbit . acquire ( )
25
+ const status = ch . queueDeclare ( { queue : queueName , passive :true } ) ;
26
+ console . log ( status ) ;
27
+ await ch . close ( ) ;
28
+ await rabbit . close ( ) ;
29
+
30
+ process . exit ( 1 )
32
31
}
33
32
34
33
35
- export async function syncQueue (
34
+ const export async syncQueue = (
35
+ queueUrl : string ,
36
+ queueName : string ,
37
+ syncer : SyncCallback ,
38
+ opts ? : QueueOpts
39
+ ) => {
40
+ let errorCount = 0 ; //number of continuous errors
41
+
42
+ const rabbit = new Connection ( {
43
+ url : queueUrl ,
44
+ // wait 1 to 30 seconds between connection retries
45
+ retryLow : 1000 ,
46
+ retryHigh : 30000 ,
47
+ } )
48
+
49
+ rabbit . on ( 'error' , ( err ) => {
50
+ // connection refused, etc
51
+ console . error ( err )
52
+ } )
53
+
54
+ rabbit . on ( 'connection' , ( ) => {
55
+ console . log ( 'The connection is successfully (re)established' )
56
+ } )
57
+
58
+ const consumer = rabbit . createConsumer ( {
59
+ queue : queueName ,
60
+ queueOptions : { exclusive : true } , //one consumer only?
61
+ // handle 2 messages at a time,
62
+ concurrency : 1 ,
63
+ qos : { prefetchCount : 2 } ,
64
+ } , async ( msg ) => {
65
+ console . log ( msg )
66
+ let action : ActionMessage = JSON . parse ( msg . content . toString ( ) )
67
+
68
+ // upgrade old v1 message format to v2
69
+ if ( action . schema === "proca:action:1" ) {
70
+ action = actionMessageV1to2 ( action )
71
+ }
72
+
73
+ // optional decrypt
74
+ if ( action . personalInfo && opts ?. keyStore ) {
75
+ const plainPII = decryptPersonalInfo ( action . personalInfo , opts . keyStore )
76
+ action . contact = { ...action . contact , ...plainPII }
77
+ }
78
+
79
+ const processed = await syncer ( action , msg , ch ) ;
80
+ if ( ! processed ) {
81
+ throw new Error ( "aaa" ) ;
82
+ }
83
+ throw new Error ( "bb" ) ;
84
+
85
+ // msg is automatically acknowledged when this function resolves or msg is
86
+ // rejected (and maybe requeued, or sent to a dead-letter-exchange) if this
87
+ // function throws an error
88
+ } )
89
+
90
+
91
+ }
92
+
93
+ /*
94
+ export async function NOKsyncQueue(
36
95
queueUrl : string,
37
96
queueName : string,
38
97
syncer : SyncCallback,
@@ -68,7 +127,6 @@ export async function syncQueue(
68
127
}
69
128
console.error(`⏳ waiting for actions from ${qn}`)
70
129
71
- return new Promise ( async ( _ , _fail ) => {
72
130
if (typeof opts?.prefetch !== 'undefined') {
73
131
await ch.prefetch(opts.prefetch)
74
132
}
@@ -132,9 +190,8 @@ export async function syncQueue(
132
190
}
133
191
})
134
192
status.tag = ret.consumerTag
135
- } )
136
193
}
137
-
194
+ */
138
195
export async function syncFile ( filePath : string , syncer : SyncCallback , opts ? : QueueOpts ) {
139
196
const lines = new LineByLine ( filePath )
140
197
0 commit comments