-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
Copy pathoptimisticLock.js
453 lines (379 loc) · 17.5 KB
/
optimisticLock.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
/* Copyright (c) 2023, Oracle and/or its affiliates. */
/******************************************************************************
*
* This software is dual-licensed to you under the Universal Permissive License
* (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
* 2.0 as shown at https://www.apache.org/licenses/LICENSE-2.0. You may choose
* either license.
*
* If you elect to accept the software under the Apache License, Version 2.0,
* the following applies:
*
* Licensed under the Apache License, Version 2.0 (the `License`);
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an `AS IS` BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* NAME
* 281. optimisticLock.js
*
* DESCRIPTION
* Testing optimistic locking to handle concurrent updates to a database
*
*****************************************************************************/
'use strict';
const oracledb = require('oracledb');
const assert = require('assert');
const dbConfig = require('./dbconfig.js');
const testsUtil = require('./testsUtil.js');
describe('281. optimisticLock.js', function() {
let isRunnable = false;
const TABLE = 'my_table';
before(async function() {
isRunnable = await testsUtil.checkPrerequisites(2100000000, 2300000000);
if (!isRunnable) {
this.skip();
}
});
describe('281.1 locking in tables', function() {
let connection = null;
// Function to simulate a 1-second delay
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
before(async function() {
connection = await oracledb.getConnection(dbConfig);
const sql = `CREATE TABLE ${TABLE} (
id NUMBER(10) PRIMARY KEY,
value VARCHAR2(100),
version NUMBER(10)
)`;
// Set up database table with a version number column
await testsUtil.createTable(connection, TABLE, sql);
});
after(async function() {
await testsUtil.dropTable(connection, TABLE);
await connection.close();
});
afterEach(async function() {
await connection.execute(`TRUNCATE TABLE ${TABLE}`);
});
it('281.1.1 multiple workers that update the same row in the database at the same time', async function() {
// Insert a row into the table
await connection.execute(
`INSERT INTO ${TABLE} (id, value, version) VALUES (:id, :value, :version)`,
{ id: 1, value: 'foo', version: 1 }
);
// Define a function to simulate a worker updating the row
async function updateRow(workerId) {
//incrementing the version column to indicate that the row has been modified
const updateQuery = `
UPDATE ${TABLE}
SET value = :value, version = version + 1
WHERE id = :id AND version = :version
`;
const updateParams = { id: 1, value: `worker ${workerId}`, version: workerId };
const result = await connection.execute(updateQuery, updateParams);
assert.strictEqual(result.rowsAffected, 1);
}
// Start multiple workers to update the row concurrently
const workerCount = 10;
const workers = [];
for (let i = 1; i <= workerCount; i++) {
workers.push(updateRow(i));
}
// Wait for all workers to finish
await Promise.all(workers);
// Verify that the final value of the row is the expected value
const selectQuery = `SELECT value FROM ${TABLE} WHERE id = :id`;
const selectParams = { id: 1 };
const selectResult = await connection.execute(selectQuery, selectParams);
const finalValue = selectResult.rows[0][0];
assert.strictEqual(finalValue, `worker ${workerCount}`);
});
it('281.1.2 two workers update and insert data into the same table at the same time', async function() {
/*
One of the workers will insert a new row and not commit the transaction,
while the other worker will update an existing row.
This should result in the second worker being blocked by the first worker,
until the first worker either commits or rolls back its transaction.*/
// Insert a row into the table
await connection.execute(
`INSERT INTO ${TABLE} (id, value, version) VALUES (:id, :value, :version)`,
{ id: 1, value: 'foo', version: 1 }
);
// Define a function to simulate a worker inserting a row and not committing
async function insertRow(workerId) {
const insertQuery = `
INSERT INTO ${TABLE} (id, value, version)
VALUES (:id, :value, :version)
`;
const insertParams = { id: 2, value: `worker ${workerId}`, version: 1 };
await connection.execute(insertQuery, insertParams, { autoCommit: false });
}
// Define a function to simulate a worker updating a row
async function updateRow(workerId) {
const updateQuery = `
UPDATE ${TABLE}
SET value = :value, version = version + 1
WHERE id = :id AND version = :version
`;
const updateParams = { id: 1, value: `worker ${workerId}`, version: 1 };
await connection.execute(updateQuery, updateParams);
await connection.commit();
}
// Start the insert and update workers concurrently
const workers = [];
workers.push(insertRow(1));
workers.push(updateRow(2));
// Wait for all workers to finish
await Promise.all(workers);
// Verify that the final values of the rows are the expected values
const selectQuery = `SELECT value FROM ${TABLE} WHERE id IN (:id1, :id2) ORDER BY id`;
const selectParams = { id1: 1, id2: 2 };
const selectResult = await connection.execute(selectQuery, selectParams);
const finalValues = selectResult.rows.map(row => row[0]);
assert.strictEqual(finalValues[0], "worker 2");
assert.strictEqual(finalValues[1], "worker 1");
// Roll back the uncommitted transaction
await connection.rollback();
});
it('281.1.3 two workers suspends the current session to update and insert', async function() {
/*
Each worker will acquire a lock on the row and prevent other workers from updating it.
The worker then simulates a delay before updating the row where multiple workers might
be contending to access and update the same row simultaneously.
After updating the row, the worker checks if the update affected exactly one row and
commits the transaction if successful.
The test case waits for both threads to finish and collects their results. It then
verifies that exactly one thread succeeded and one thread failed, indicating that
the optimistic locking mechanism prevented concurrent updates and handled errors correctly.
*/
await connection.execute(`
INSERT INTO ${TABLE} (id, value, version)
VALUES (1, 'initial', 1)
`);
const numworkers = 2;
const results = [];
const workerTasks = [];
for (let i = 0; i < numworkers; i++) {
workerTasks.push(runworkerTask(i));
}
async function runworkerTask(i) {
let workerResult;
try {
/*
This is done to simulate multiple users accessing a shared resource
at the same time, and to observe how the nodejs driver behaves when
two or more users are attempting to access the same tables simultaneously
*/
// Delay execution for 1 second
await sleep(1000);
// Select the row for update
const result = await connection.execute(`
SELECT id, value, version
FROM ${TABLE}
WHERE id = 1
FOR UPDATE
`, [], { isAutoCommit: false });
const row = result.rows[0];
// Simulate a delay before updating the row
// Delay execution for 1 second
await sleep(1000);
// Increment the version number and update the row
const updateResult = await connection.execute(`
UPDATE ${TABLE}
SET value = :value, version = :newVersion
WHERE id = :id AND version = :oldVersion
`, {
value: `worker ${i}`,
newVersion: row[2] + 1,
id: row[0],
oldVersion: row[2]
}, { isAutoCommit: false });
// Check that the update affected exactly one row
if (updateResult.rowsAffected !== 1) {
throw new Error('Failed to update row');
}
// Commit the transaction
await connection.commit();
workerResult = 'success';
} catch (err) {
workerResult = err.message;
}
return workerResult;
}
const workerResults = await Promise.all(workerTasks);
// Wait for all workers to finish and collect the results
for (let i = 0; i < numworkers; i++) {
results.push(await workerResults[i]);
}
// Verify that one worker succeeded and one worker failed
assert.strictEqual(results.includes('success'), true);
assert.strictEqual(results.includes('Failed to update row'), true);
});
});
describe('281.2 optimistic Locking in JSON Relational Duality View', function() {
let connection = null;
const createTableEmp = `CREATE TABLE employees (employee_id NUMBER(6)
primary key, first_name varchar2(4000),
last_name varchar2(4000), version NUMBER(10), department_id NUMBER(4))`;
const createTableDept = `CREATE TABLE departments (department_id NUMBER(4)
primary key, department_name VARCHAR2(30),
manager_id NUMBER(6))`;
const alterTableEmp = `ALTER TABLE employees ADD
(CONSTRAINT emp_dept_fk FOREIGN KEY (department_id)
REFERENCES departments)`;
const createEmpView = `CREATE or replace JSON relational duality VIEW EMP_OV
AS
select JSON {
'EMPLOYEE_ID' is emp.EMPLOYEE_ID,
'FIRST_NAME' is emp.FIRST_NAME,
'LAST_NAME' is emp.last_name,
'VERSION' is emp.version,
'department_info' is
(
select JSON
{
'DEPARTMENT_ID' is dept.department_id,
'departmentname' is dept.department_name WITH(UPDATE)
}
from departments dept WITH(UPDATE,CHECK ETAG)
where dept.department_id = emp.department_id
)
returning JSON}
from employees emp WITH(INSERT,UPDATE,DELETE)`;
const createDeptView = `CREATE OR REPLACE JSON relational duality VIEW dept_ov
AS
select JSON{
'department_id' is dept.DEPARTMENT_ID,
'department_name' is dept.DEPARTMENT_NAME,
'EMP_INFO' is
( select
json_arrayagg
(
JSON
{
'employee_id' is emp.employee_id,
'FIRST_NAME' is emp.FIRST_NAME
}
)
from employees emp WITH (INSERT,UPDATE,DELETE)
where emp.department_id = dept.department_id
)
returning json
}
from departments dept WITH (INSERT,UPDATE,DELETE,CHECK ETAG)`;
before(async function() {
connection = await oracledb.getConnection(dbConfig);
await testsUtil.createTable(connection, 'employees', createTableEmp);
await testsUtil.createTable(connection, 'departments', createTableDept);
await connection.execute(alterTableEmp);
await connection.execute(createEmpView);
await connection.execute(createDeptView);
});
after(async function() {
await testsUtil.dropTable(connection, 'employees');
await testsUtil.dropTable(connection, 'departments');
await connection.execute(`DROP VIEW IF EXISTS emp_ov`);
await connection.execute(`DROP VIEW IF EXISTS dept_ov`);
await connection.close();
});
afterEach(async function() {
await connection.execute(`DELETE from employees`);
await connection.execute(`DELETE from departments`);
});
it('281.2.1 multiple workers that update the view at the same time', async function() {
// Insert a row into the table
await connection.execute(`INSERT INTO departments VALUES
( 10
,'Administration'
, 100
)`);
await connection.execute(`INSERT INTO employees VALUES
( 100
, 'Steven'
, 'King'
, 1
, 10
)`);
// Define a function to simulate a worker updating the row
async function updateRow(workerId) {
const queryUpdate = `update emp_ov set data = '{"EMPLOYEE_ID":100,"FIRST_NAME":"Lex","LAST_NAME":"De Haan",
"VERSION": ` + workerId + `,"department_info":{"DEPARTMENT_ID":10,"departmentname":"newdept"}}'
where json_value(data,'$.EMPLOYEE_ID') = 100`;
const result = await connection.execute(queryUpdate);
assert.strictEqual(result.rowsAffected, 1);
}
// Start multiple workers to update the row concurrently
const workerCount = 10;
const workers = [];
for (let i = 1; i <= workerCount; i++) {
workers.push(updateRow(i));
}
// Wait for all workers to finish
await Promise.all(workers);
// Verify that the final value of the row is the expected value
const query = `select * from emp_ov order by 1`;
const result = await connection.execute(query);
assert.strictEqual(result.rows[0][0].VERSION, workerCount);
});
it('281.2.2 two workers update and insert data into the same table at the same time', async function() {
/*
One of the workers will insert a new row and not commit the transaction,
while the other worker will update an existing row.
This should result in the second worker being blocked by the first worker,
until the first worker either commits or rolls back its transaction.*/
// Insert a row into the departments table
await connection.execute(`INSERT INTO departments VALUES
( 10
,'Administration'
, 100
)`);
await connection.execute(`INSERT INTO employees VALUES
( 100
, 'Steven'
, 'King'
, 0
, 10
)`);
// Define a function to simulate a worker inserting a row and not committing
async function insertRow(workerId) {
const queryInsert = `insert into emp_ov values ('{"EMPLOYEE_ID":` + 100 + workerId + `,"FIRST_NAME":"Lex",
"LAST_NAME":"De Haan","VERSION": ` + workerId + `,"department_info":{"DEPARTMENT_ID":10,"departmentname":"Administration"}}')`;
const result = await connection.execute(queryInsert, {}, { autoCommit: false });
assert.strictEqual(result.rowsAffected, 1);
}
// Define a function to simulate a worker updating a row
async function updateRow(workerId) {
const queryUpdate = `update emp_ov set data = '{"EMPLOYEE_ID":100,"FIRST_NAME":"Harry","LAST_NAME":"Potter",
"VERSION": ` + workerId + `,"department_info":{"DEPARTMENT_ID":10,"departmentname":"Wizardry"}}'
where json_value(data,'$.EMPLOYEE_ID') = 100`;
const result = await connection.execute(queryUpdate);
assert.strictEqual(result.rowsAffected, 1);
await connection.commit();
}
// Start the insert and update workers concurrently
const workers = [];
workers.push(insertRow(1));
workers.push(updateRow(2));
// Wait for all workers to finish
await Promise.all(workers);
// Verify that the final value of the row is the expected value
const query = `select * from emp_ov order by 1`;
const result = await connection.execute(query);
assert.strictEqual(result.rows.length, 2);
assert.strictEqual(result.rows[0][0].VERSION, 2);
assert.strictEqual(result.rows[1][0].VERSION, 1);
// Roll back the uncommitted transaction
await connection.rollback();
});
});
});