@@ -23,11 +23,14 @@ import spock.lang.Shared
23
23
24
24
import java.util.concurrent.CompletableFuture
25
25
import java.util.concurrent.CountDownLatch
26
+ import java.util.concurrent.TimeUnit
26
27
27
28
class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<Document > > implements AgentTestTrait {
28
29
29
30
@Shared
30
31
MongoClient client
32
+ @Shared
33
+ List<Closeable > cleanup = []
31
34
32
35
def setupSpec () throws Exception {
33
36
client = MongoClients . create(" mongodb://localhost:$port " )
@@ -36,18 +39,27 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
36
39
def cleanupSpec () throws Exception {
37
40
client?. close()
38
41
client = null
42
+ cleanup. forEach {
43
+ it. close()
44
+ }
39
45
}
40
46
41
47
@Override
42
48
void createCollection (String dbName , String collectionName ) {
43
49
MongoDatabase db = client. getDatabase(dbName)
44
- db. createCollection(collectionName). subscribe(toSubscriber {})
50
+ def latch = new CountDownLatch (1 )
51
+ db. createCollection(collectionName). subscribe(toSubscriber { latch. countDown() })
52
+ latch. await(30 , TimeUnit . SECONDS )
45
53
}
46
54
47
55
@Override
48
56
void createCollectionNoDescription (String dbName , String collectionName ) {
49
- MongoDatabase db = MongoClients . create(" mongodb://localhost:${ port} " ). getDatabase(dbName)
50
- db. createCollection(collectionName). subscribe(toSubscriber {})
57
+ def tmpClient = MongoClients . create(" mongodb://localhost:${ port} " )
58
+ cleanup. add(tmpClient)
59
+ MongoDatabase db = tmpClient. getDatabase(dbName)
60
+ def latch = new CountDownLatch (1 )
61
+ db. createCollection(collectionName). subscribe(toSubscriber { latch. countDown() })
62
+ latch. await(30 , TimeUnit . SECONDS )
51
63
}
52
64
53
65
@Override
@@ -63,16 +75,20 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
63
75
new ServerAddress (" localhost" , port)))
64
76
})
65
77
settings. build()
66
- MongoDatabase db = MongoClients . create(settings. build()). getDatabase(dbName)
67
- db. createCollection(collectionName). subscribe(toSubscriber {})
78
+ def tmpClient = MongoClients . create(settings. build())
79
+ cleanup. add(tmpClient)
80
+ MongoDatabase db = tmpClient. getDatabase(dbName)
81
+ def latch = new CountDownLatch (1 )
82
+ db. createCollection(collectionName). subscribe(toSubscriber { latch. countDown() })
83
+ latch. await(30 , TimeUnit . SECONDS )
68
84
}
69
85
70
86
@Override
71
87
int getCollection (String dbName , String collectionName ) {
72
88
MongoDatabase db = client. getDatabase(dbName)
73
89
def count = new CompletableFuture<Integer > ()
74
90
db. getCollection(collectionName). estimatedDocumentCount(). subscribe(toSubscriber { count. complete(it) })
75
- return count. join( )
91
+ return count. get( 30 , TimeUnit . SECONDS )
76
92
}
77
93
78
94
@Override
@@ -81,7 +97,7 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
81
97
MongoDatabase db = client. getDatabase(dbName)
82
98
def latch1 = new CountDownLatch (1 )
83
99
db. createCollection(collectionName). subscribe(toSubscriber { latch1. countDown() })
84
- latch1. await()
100
+ latch1. await(30 , TimeUnit . SECONDS )
85
101
return db. getCollection(collectionName)
86
102
}
87
103
ignoreTracesAndClear(1 )
@@ -94,7 +110,7 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
94
110
collection. insertOne(new Document (" password" , " SECRET" )). subscribe(toSubscriber {
95
111
collection. estimatedDocumentCount(). subscribe(toSubscriber { count. complete(it) })
96
112
})
97
- return count. join( )
113
+ return count. get( 30 , TimeUnit . SECONDS )
98
114
}
99
115
100
116
@Override
@@ -103,11 +119,11 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
103
119
MongoDatabase db = client. getDatabase(dbName)
104
120
def latch1 = new CountDownLatch (1 )
105
121
db. createCollection(collectionName). subscribe(toSubscriber { latch1. countDown() })
106
- latch1. await()
122
+ latch1. await(30 , TimeUnit . SECONDS )
107
123
def coll = db. getCollection(collectionName)
108
124
def latch2 = new CountDownLatch (1 )
109
125
coll. insertOne(new Document (" password" , " OLDPW" )). subscribe(toSubscriber { latch2. countDown() })
110
- latch2. await()
126
+ latch2. await(30 , TimeUnit . SECONDS )
111
127
return coll
112
128
}
113
129
ignoreTracesAndClear(1 )
@@ -124,7 +140,7 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
124
140
result. complete(it)
125
141
collection. estimatedDocumentCount(). subscribe(toSubscriber { count. complete(it) })
126
142
})
127
- return result. join( ). modifiedCount
143
+ return result. get( 30 , TimeUnit . SECONDS ). modifiedCount
128
144
}
129
145
130
146
@Override
@@ -133,11 +149,11 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
133
149
MongoDatabase db = client. getDatabase(dbName)
134
150
def latch1 = new CountDownLatch (1 )
135
151
db. createCollection(collectionName). subscribe(toSubscriber { latch1. countDown() })
136
- latch1. await()
152
+ latch1. await(30 , TimeUnit . SECONDS )
137
153
def coll = db. getCollection(collectionName)
138
154
def latch2 = new CountDownLatch (1 )
139
155
coll. insertOne(new Document (" password" , " SECRET" )). subscribe(toSubscriber { latch2. countDown() })
140
- latch2. await()
156
+ latch2. await(30 , TimeUnit . SECONDS )
141
157
return coll
142
158
}
143
159
ignoreTracesAndClear(1 )
@@ -152,7 +168,7 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
152
168
result. complete(it)
153
169
collection. estimatedDocumentCount(). subscribe(toSubscriber { count. complete(it) })
154
170
})
155
- return result. join( ). deletedCount
171
+ return result. get( 30 , TimeUnit . SECONDS ). deletedCount
156
172
}
157
173
158
174
@Override
@@ -173,18 +189,18 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
173
189
db. createCollection(collectionName). subscribe(toSubscriber {
174
190
latch. countDown()
175
191
})
176
- latch. await()
192
+ latch. await(30 , TimeUnit . SECONDS )
177
193
return db. getCollection(collectionName)
178
194
}
179
195
ignoreTracesAndClear(1 )
180
196
def result = new CompletableFuture<Throwable > ()
181
197
collection. updateOne(new BsonDocument (), new BsonDocument ()). subscribe(toSubscriber {
182
198
result. complete(it)
183
199
})
184
- throw result. join( )
200
+ throw result. get( 30 , TimeUnit . SECONDS )
185
201
}
186
202
187
- def Subscriber<?> toSubscriber (Closure closure ) {
203
+ Subscriber<?> toSubscriber (Closure closure ) {
188
204
return new Subscriber () {
189
205
boolean hasResult
190
206
0 commit comments