-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathDatabaseIsolation.js
196 lines (171 loc) · 6.44 KB
/
DatabaseIsolation.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
const cds = require('@sap/cds')
function getIsolate() {
const { options = {} } = cds
const fts = cds.requires.toggles && cds.resolve(cds.env.features.folders)
const src = [options.from || '*', ...(fts || [])]
const isolation = process.env.TRAVIS_JOB_ID || process.env.GITHUB_RUN_ID || require('os').userInfo().username || 'test_db'
const srchash = hash([cds.root, ...src.flat()].join('/'))
return {
src,
// Create one database for each overall test execution
database: 'D' + hash(isolation),
// Create one tenant for each model source definition
tenant: 'T' + srchash,
// Track source definition hash
source: srchash,
}
}
const hash = str => {
const { createHash } = require('crypto')
const hash = createHash('sha1')
hash.update(str)
return hash.digest('hex')
}
async function beforeWrite(dbs, isolate) {
const { ten } = dbs
const modified = isolate.modified = {}
ten.before(['*'], async (req) => {
if (
!req.query ||
req.query?.SELECT ||
(typeof req.query === 'string' && /^(BEGIN|COMMIT|ROLLBACK|SELECT)/i.test(req.query))
) return // Ignore reading requests
if (req.target) modified[req.target.name] = true
if (req.tx._isolating) return req.tx._isolating
if (ten._writeable) return
// Add modification tracking for deep-queries internal calls
for (const fn of ['onSIMPLE', 'onUPDATE', 'onINSERT']) {
const org = ten[fn]
ten[fn] = function (req) {
if (req.query?.target) modified[req.query.target.name] = true
return org.apply(this, arguments)
}
}
ten._writeable = true
return (req.tx._isolating = req.tx.commit()
.then(() => getWriteTenant(dbs, isolate))
.then(() => req.tx.begin()))
})
}
async function deploy(dbs, isolate) {
console.log('DEPLOYING:', isolate.tenant)
const { ten } = dbs
await ten.tx(async () => {
try {
const src = isolate.src
const { options = {} } = cds
const m = await cds.load(src, options).then(cds.minify)
// options.schema_evolution = 'auto'
await cds.deploy(m).to(ten, options)
} catch (err) {
if (err.code === 'MODEL_NOT_FOUND') return
throw err
}
})
}
async function getReadTenant(dbs, isolate) {
const { dat, ten } = dbs
const { schemas } = dat.entities()
const deployTimeout = 120 // seconds
let isnew = false
try {
await dat.run(cds.ql.CREATE('schemas')).catch(() => { })
await dat.tx(async tx => {
await tx.run(DELETE.from(schemas).where`tenant=${isolate.tenant} and available=${false} and seconds_between(started, $now) > ${deployTimeout}`)
// If insert works the schema does not yet exist and this client has won the race and can deploy the contents
await tx.run(INSERT({ tenant: isolate.tenant, source: isolate.source, available: false, started: new Date() }).into(schemas))
isnew = true
})
} catch (err) {
const query = cds.ql`SELECT FROM ${schemas} {
(SELECT count(1) FROM ${schemas} WHERE tenant=${isolate.tenant} and available=${false} and seconds_between(started, $now) < ${deployTimeout}) as progress,
(SELECT count(1) FROM ${schemas} WHERE tenant=${isolate.tenant} and available=${true}) as available,
}`
// If the schema already exists wait for the row to be updated with available=true
await dat.tx(async tx => {
let available = 0
let progress = 0
while (progress && !available) [{ progress, available }] = await tx.run(query)
})
}
await ten.database(isolate)
await ten.tenant(isolate)
if (isnew) {
let err
await deploy(dbs, isolate).catch(e => { err = e })
if (err) await ten.tenant(isolate, true)
await dat.tx(async tx => {
if (err) {
await tx.run(DELETE(schemas).where`tenant=${isolate.tenant}`)
} else {
await tx.run(UPDATE(schemas).where`tenant=${isolate.tenant}`.with({ available: true }))
}
})
if (err) throw err
}
}
async function getWriteTenant(dbs, isolate) {
const { ten, dat } = dbs
const { schemas } = dat.entities()
// await this.database(isolate)
let isnew = false
await dat.tx(async tx => {
const available = await tx.run(SELECT.from(schemas).where`tenant!=${isolate.tenant} and source=${isolate.source} and available=${true}`.forUpdate().limit(1))
if (available.length) {
const tenant = isolate.tenant = available[0].tenant
await tx.run(UPDATE(schemas).where`tenant=${tenant}`.with({ available: false, started: new Date() }))
} else {
isolate.tenant = 'T' + cds.utils.uuid()
await tx.run(INSERT({ tenant: isolate.tenant, source: isolate.source, available: false, started: new Date() }).into(schemas))
isnew = true
}
})
console.log('USING:', isolate.tenant)
await ten.database(isolate)
await ten.tenant(isolate)
if (isnew) await deploy(dbs, isolate)
// Release schema for follow up test runs
cds.on('shutdown', async () => {
try {
try {
// Clean tenant entities
await ten.tx(async tx => {
await tx.begin()
for (const entity in isolate.modified) {
const query = DELETE(entity).where`true=true`
if (!query.target._unresolved) await tx.onSIMPLE({ query }) // Skip deep delete
}
// UPSERT all data sources again
await cds.deploy.data(tx, tx.model, { schema_evolution: 'auto' })
})
await dat.run(UPDATE(schemas).where`tenant=${isolate.tenant}`.with({ available: true }))
} catch (err) {
// Try to cleanup broken tenant isolation
await ten.tenant(isolate, true)
// Remove cleaned up schema
await dat.run(DELETE(schemas).where`tenant=${isolate.tenant}`)
}
} catch (err) {
// if an shutdown handler throws an error it goes into an infinite loop
console.error(err)
}
})
}
module.exports = async function (db) {
const isolate = getIsolate()
// Just deploy when the database doesn't have isolation implementations available
if (typeof db.database !== 'function' || typeof db.tenant !== 'function') return deploy({ ten: db }, isolate)
const dbs = {
ten: db,
sys: await cds.connect.to('db_sys', { ...cds.requires.db, isolate: false }),
dat: await cds.connect.to('db_dat', {
...cds.requires.db, isolate: false,
model: await cds.load(cds.utils.path.join(__dirname, 'database'))
}),
}
await dbs.dat.database(isolate)
await getReadTenant(dbs, isolate)
await db.database(isolate)
await db.tenant(isolate)
beforeWrite(dbs, isolate)
}