1
1
package com .spaceuptech .dbevents .database
2
2
3
- import java .nio .ByteBuffer
4
- import java .util .{Calendar , Properties }
5
- import java .util .concurrent .{Callable , ExecutorService }
6
- import java .util .function .Consumer
7
-
8
3
import akka .actor .typed .ActorRef
9
4
import com .mongodb .client .model .changestream .{ChangeStreamDocument , FullDocument , OperationType }
10
5
import com .mongodb .{MongoClient , MongoClientURI }
11
6
import com .spaceuptech .dbevents .database .Database .ChangeRecord
12
7
import com .spaceuptech .dbevents .{DatabaseSource , Global }
13
8
import io .debezium .engine .format .Json
14
9
import io .debezium .engine .{ChangeEvent , DebeziumEngine }
15
- import org .bson .{BsonDocument , Document }
10
+ import org .bson .{BsonDocument , BsonType , BsonValue , Document }
16
11
import org .json4s ._
17
12
import org .json4s .jackson .JsonMethods ._
18
13
14
+ import java .nio .ByteBuffer
15
+ import java .time .{Instant , OffsetDateTime , ZoneId }
16
+ import java .util .concurrent .{Callable , ExecutorService }
17
+ import java .util .function .Consumer
18
+ import java .util .{Calendar , Properties }
19
+
19
20
object Utils {
20
21
def startMongoWatcher (projectId : String , dbAlias : String , conn : String , dbName : String , executorService : ExecutorService , actor : ActorRef [Database .Command ]): MongoStatus = {
21
22
// Make a mongo client
@@ -58,7 +59,7 @@ object Utils {
58
59
payload = ChangeRecordPayload (
59
60
op = " c" ,
60
61
before = None ,
61
- after = Some (mongoDocumentToMap (doc.getFullDocument) ),
62
+ after = Some (mongoBsonValueToValue (doc.getFullDocument.toBsonDocument( classOf [ BsonValue ], MongoClient .getDefaultCodecRegistry)). asInstanceOf [ Map [ String , Any ]] ),
62
63
source = getMongoSource(projectId, dbAlias, doc)
63
64
),
64
65
project = projectId,
@@ -72,7 +73,7 @@ object Utils {
72
73
payload = ChangeRecordPayload (
73
74
op = " u" ,
74
75
before = Option (mongoDocumentKeyToMap(doc.getDocumentKey)),
75
- after = Some (mongoDocumentToMap (doc.getFullDocument) ),
76
+ after = Some (mongoBsonValueToValue (doc.getFullDocument.toBsonDocument( classOf [ BsonValue ], MongoClient .getDefaultCodecRegistry)). asInstanceOf [ Map [ String , Any ]] ),
76
77
source = getMongoSource(projectId, dbAlias, doc)
77
78
),
78
79
project = projectId,
@@ -130,7 +131,7 @@ object Utils {
130
131
BsonDocument .parse(new String (data.array(), " UTF-8" ))
131
132
}
132
133
133
- def mongoDocumentKeyToMap (find : BsonDocument ): Map [String , Any ] = {
134
+ def mongoDocumentKeyToMap (find : BsonDocument ): Map [String , Any ] = {
134
135
var id : String = " "
135
136
val field = find.get(" _id" )
136
137
@@ -145,21 +146,50 @@ object Utils {
145
146
Map (" _id" -> id)
146
147
}
147
148
148
- def mongoDocumentToMap (doc : Document ): Map [String , Any ] = {
149
- implicit val formats : DefaultFormats .type = org.json4s.DefaultFormats
150
-
151
- // Convert to json object
152
- val jsonString = doc.toJson
153
- var m = parse(jsonString).extract[Map [String , Any ]]
154
-
155
- // See _id is an object id
156
- try {
157
- m += " _id" -> doc.getObjectId(" _id" ).toHexString
158
- } catch {
159
- case _ : Throwable =>
149
+ def mongoBsonValueToValue (value : BsonValue ): Any = {
150
+ // Skip if value is null
151
+ if (value == null ) return null
152
+
153
+ value.getBsonType match {
154
+ case BsonType .INT32 =>
155
+ value.asInt32().getValue
156
+ case BsonType .INT64 =>
157
+ value.asInt64().getValue
158
+ case BsonType .OBJECT_ID =>
159
+ value.asObjectId().getValue.toHexString
160
+ case BsonType .DATE_TIME =>
161
+ OffsetDateTime .ofInstant(Instant .ofEpochMilli(value.asDateTime().getValue), ZoneId .systemDefault()).toString
162
+ case BsonType .TIMESTAMP =>
163
+ OffsetDateTime .ofInstant(Instant .ofEpochMilli(value.asTimestamp().getValue), ZoneId .systemDefault()).toString
164
+ case BsonType .DOCUMENT =>
165
+ val doc = value.asDocument().toBsonDocument(classOf [BsonDocument ], MongoClient .getDefaultCodecRegistry)
166
+ var obj : Map [String , Any ] = Map .empty
167
+ doc.keySet().forEach(key => {
168
+ obj += (key -> mongoBsonValueToValue(doc.get(key)))
169
+ })
170
+ obj
171
+ case BsonType .ARRAY =>
172
+ val arr = value.asArray().getValues
173
+ var op : Array [Any ] = Array .empty
174
+
175
+ arr.forEach(value => {
176
+ op = op :+ mongoBsonValueToValue(value)
177
+ })
178
+
179
+ op
180
+ case BsonType .BOOLEAN =>
181
+ value.asBoolean().getValue
182
+ case BsonType .STRING =>
183
+ value.asString().getValue
184
+ case BsonType .DOUBLE =>
185
+ value.asDouble().getValue
186
+ case BsonType .DECIMAL128 =>
187
+ value.asDecimal128().doubleValue()
188
+ case BsonType .BINARY =>
189
+ value.asBinary().getData
190
+ case _ =>
191
+ value.toString
160
192
}
161
-
162
- m
163
193
}
164
194
165
195
def getMongoSource (projectId : String , dbAlias : String , doc : ChangeStreamDocument [Document ]): ChangeRecordPayloadSource = {
@@ -232,6 +262,24 @@ object Utils {
232
262
props
233
263
}
234
264
265
+ def getOffsetStorageClass : String = {
266
+ Global .storageType match {
267
+ case " k8s" => " com.spaceuptech.dbevents.database.KubeOffsetBackingStore"
268
+ case _ => " org.apache.kafka.connect.storage.FileOffsetBackingStore"
269
+ }
270
+ }
271
+
272
+ def getDatabaseHistoryStorageClass : String = {
273
+ Global .storageType match {
274
+ case " k8s" => " com.spaceuptech.dbevents.database.KubeDatabaseHistory"
275
+ case _ => " io.debezium.relational.history.FileDatabaseHistory"
276
+ }
277
+ }
278
+
279
+ def generateConnectorName (source : DatabaseSource ): String = {
280
+ s " ${source.project}_ ${source.dbAlias}"
281
+ }
282
+
235
283
def prepareSQLServerConfig (source : DatabaseSource ): Properties = {
236
284
val name = generateConnectorName(source)
237
285
@@ -247,16 +295,14 @@ object Utils {
247
295
props.setProperty(" database.hostname" , source.config.getOrElse(" host" , " localhost" ))
248
296
props.setProperty(" database.port" , source.config.getOrElse(" port" , " 1433" ))
249
297
props.setProperty(" database.user" , source.config.getOrElse(" user" , " sa" ))
250
- props.setProperty(" database.password" , source.config.getOrElse(" password" , " mypassword " ))
298
+ props.setProperty(" database.password" , source.config.getOrElse(" password" , " password " ))
251
299
props.setProperty(" database.dbname" , source.config.getOrElse(" db" , " test" ))
252
300
props.setProperty(" database.server.name" , s " ${generateConnectorName(source)}_connector " )
253
301
props.setProperty(" database.history" , getDatabaseHistoryStorageClass)
254
302
props.setProperty(" database.history.file.filename" , s " ./dbevents-dbhistory- $name.dat " )
255
303
props
256
304
}
257
305
258
-
259
-
260
306
def preparePostgresConfig (source : DatabaseSource ): Properties = {
261
307
val name = generateConnectorName(source)
262
308
@@ -284,22 +330,4 @@ object Utils {
284
330
285
331
props
286
332
}
287
-
288
- def getOffsetStorageClass : String = {
289
- Global .storageType match {
290
- case " k8s" => " com.spaceuptech.dbevents.database.KubeOffsetBackingStore"
291
- case _ => " org.apache.kafka.connect.storage.FileOffsetBackingStore"
292
- }
293
- }
294
-
295
- def getDatabaseHistoryStorageClass : String = {
296
- Global .storageType match {
297
- case " k8s" => " com.spaceuptech.dbevents.database.KubeDatabaseHistory"
298
- case _ => " io.debezium.relational.history.FileDatabaseHistory"
299
- }
300
- }
301
-
302
- def generateConnectorName (source : DatabaseSource ): String = {
303
- s " ${source.project}_ ${source.dbAlias}"
304
- }
305
333
}
0 commit comments