From adb9165534687bd8535462402c32758d9bb756cf Mon Sep 17 00:00:00 2001 From: Hamish Rickerby Date: Fri, 1 Jun 2018 12:24:11 +1000 Subject: [PATCH 1/2] Support for firestore --- .travis.yml | 2 + README.md | 52 +++++++ lib/databases/firestore.js | 286 ++++++++++++++++++++++++++++++++++++ package.json | 1 + test/repositoryReadTest.js | 6 +- test/repositoryWriteTest.js | 12 +- 6 files changed, 350 insertions(+), 9 deletions(-) create mode 100644 lib/databases/firestore.js diff --git a/.travis.yml b/.travis.yml index 08895c6..30d4cbc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,6 +13,7 @@ before_install: - tar -xzf /tmp/dynamodb_local_latest.tar.gz -C /tmp - java -Djava.library.path=/tmp/DynamoDBLocal_lib -jar /tmp/DynamoDBLocal.jar -inMemory & - sleep 2 + - echo $BASE64_GOOGLE_KEY | base64 --decode > /tmp/firestore-node-viewmodel-key.json services: - mongodb @@ -40,3 +41,4 @@ env: - AWS_SECRET_ACCESS_KEY=SECRET - AWS_REGION=us-east-1 - AWS_DYNAMODB_ENDPOINT=http://localhost:8000 + - GOOGLE_APPLICATION_CREDENTIALS=/tmp/firestore-node-viewmodel-key.json diff --git a/README.md b/README.md index bd86806..1e9cda8 100644 --- a/README.md +++ b/README.md @@ -294,6 +294,57 @@ Additionaly for elasticsearch6 the number of shards, number of replicas, the ref }); ``` +## Firestore + +### Setup + +#### Library installation + +`yarn add @google-cloud/firestore` or `npm install --save @google-cloud/firestore` + +#### Options + +Use the `firestore` type to support Google's Firestore database, and store your KeyFile in a location accessible by your application. + +```javascript + const options = { + repository: { + type: 'firestore', + projectId: 'YOUR_PROJECT_ID', + keyFilename: '/path/to/keyfile.json' + }, + }; +``` + +### Find Queries + +Simple (equality comparison) find queries are supported by passing javascript objects as the query parameter, or more complex queries can be executed via nested arrays. In the case of multiple key/value pairs or nested arrays, the composite predicates form logical ANDs. + +``` javascript + // Simple Object format + aRepo.find({'aProp': 'aValue', 'secondProp': 'secondValue'}, function (err, vm) { + if (err) { + console.log('Repo find error', err); + return; + } + console.log('Found', vm); + }); + + // Nested array syntax, allows for more complex predicates + aRepo.find([['aProp', '==', 'aValue'], ['secondProp', '<', 10000]], function (err, vm) { + if (err) { + console.log('Repo find error', err); + return; + } + console.log('Found', vm); + }); +``` + +The queryOptions parameter supports limit, skip, and sort, in a mongoDb-like syntax. + +### Testing Setup + +To provide the authentication file to tests, the `GOOGLE_APPLICATION_CREDENTIALS` environment setting should point to the file so it can be loaded by firestore. To inject the file in TravisCI, create a new environment variable called `BASE64_GOOGLE_KEY` in the Travis GUI, and set the value of this to be the Base64 encoded content of the file. The .travis.yml file contains configuration to decode this setting and write it out to a known location for the CI settings to pickup. # [Release notes](https://github.com/adrai/node-viewmodel/blob/master/releasenotes.md) @@ -310,6 +361,7 @@ Currently these databases are supported: 8. elasticsearch ([elasticsearch] (https://github.com/elastic/elasticsearch-js)) 9. elasticsearch6 ([elasticsearch] (https://github.com/elastic/elasticsearch-js)) - for Elasticsearch 5.x and 6.x 10. dynamodb ([aws-sdk] (https://github.com/aws/aws-sdk-js)) +11. firestore ([@google-cloud/firestore] (https://github.com/googleapis/nodejs-firestore)) ## own db implementation You can use your own db implementation by extending this... diff --git a/lib/databases/firestore.js b/lib/databases/firestore.js new file mode 100644 index 0000000..e6a3323 --- /dev/null +++ b/lib/databases/firestore.js @@ -0,0 +1,286 @@ +var util = require('util'), + _ = require('lodash'), + async = require('async'), + ConcurrencyError = require('../concurrencyError'), + gcFirestore = require('@google-cloud/firestore'), + Repository = require('../base'), + uuid = require('uuid').v4, + ViewModel = Repository.ViewModel; + +var collections = []; + +function Firestore(options) { + Repository.call(this); + this.options = _.merge({ timestampsInSnapshots: true }, options); +} + +util.inherits(Firestore, Repository); + +function implementError (callback) { + var err = new Error('Storage method add is not implemented'); + if (callback) callback(err); + throw err; +} + +function parseFirestoreQuery(query) { + if (_.isArray(query)) { + return query; + } else if (_.isPlainObject(query)) { + return _.map(query, function(value, key) { + return [key, '==', value]; + }); + } + throw new Error('Unknown query type'); +}; + +function firestoreQueryParser(collectionRef, queryParams) { + var params = parseFirestoreQuery(queryParams); + return _.reduce(params, function(acc, q) { + return acc.where.apply(acc, q); + }, collectionRef); +}; + +function emptyCollection(db, collection, callback) { + var collectionRef = db.collection(collection); + var query = collectionRef.get().then(function (querySnapshot) { + var writeBatch = db.batch(); + querySnapshot.forEach(function (documentSnapshot) { + var documentPath = collection + '/' + documentSnapshot.id; + var documentRef = db.doc(documentPath); + writeBatch.delete(documentRef); + }); + writeBatch.commit().then(function () { + if (callback) callback(null); + }); + }); +}; + +function getPrecondition(vm) { + var precondition = {}; + if (!_.isUndefined(vm.get('_updateTime'))) { + const time = vm.get('_updateTime'); + if (_.isDate(time)) { + precondition['lastUpdateTime'] = time.toISOString(); + } else if (_.isString(time)) { + precondition['lastUpdateTime'] = time; + } + } + return precondition; +} + +function enrichVMWithTimestamps(vm, documentSnapshot) { + _.isUndefined(documentSnapshot.readTime) ? false : vm.set('_readTime', documentSnapshot.readTime); + _.isUndefined(documentSnapshot.createTime) ? false : vm.set('_createTime', documentSnapshot.createTime); + _.isUndefined(documentSnapshot.updateTime) ? false : vm.set('_updateTime', documentSnapshot.updateTime); + return vm; +}; + +function applyQueryOptions(query, options) { + if (!_.isUndefined(options)) { + // Apply supported queryOptions + if (_.has(options, 'limit')) { + query = query.limit(options.limit); + } + if (_.has(options, 'skip')) { + query = query.offset(options.skip); + } + if (_.has(options, 'sort')) { + var sortKey = options.sort.keys[0]; + var direction = options.sort.keys[sortKey] == 1 ? 'asc' : 'desc'; + query = query.orderBy(sortKey, direction); + } + } + return query; +} + +_.extend(Firestore.prototype, { + + connect: function (callback) { + var self = this; + var options = this.options; + self.db = new gcFirestore(options); + self.emit('connect'); + if (callback) callback(null, self); + }, + + disconnect: function (callback) { + var self = this; + delete self.db; + self.emit('disconnect'); + if (callback) callback(null, self); + }, + + getNewId: function (callback) { + this.checkConnection(); + + var id = uuid().toString(); + if (callback) callback(null, id); + }, + + get: function (id, callback) { + this.checkConnection(); + + if(_.isFunction(id)) { + callback = id; + id = null; + } + + if (!id) { + id = uuid().toString(); + } + + var self = this; + + var documentPath = this.collection + '/' + id; + var documentRef = this.db.doc(documentPath); + + documentRef.get().then(function (documentSnapshot) { + var vm = new ViewModel(documentSnapshot.data() || { id }, self); + vm = enrichVMWithTimestamps(vm, documentSnapshot); + if (documentSnapshot.exists) { + vm.actionOnCommit = 'update'; + } else { + vm.actionOnCommit = 'create'; + } + callback(null, vm); + }); + }, + + find: function (queryParams, queryOptions, callback) { + this.checkConnection(); + + var self = this; + var collectionRef = this.db.collection(this.collection); + + var query = firestoreQueryParser(collectionRef, queryParams); + query = applyQueryOptions(query, queryOptions); + + query.get().then(function (querySnapshot) { + var vms = _.map(querySnapshot.docs, function(documentSnapshot) { + var vm = new ViewModel(documentSnapshot.data(), self); + vm = enrichVMWithTimestamps(vm, documentSnapshot); + vm.actionOnCommit = 'update'; + return vm; + }); + callback(null, vms); + }); + }, + + findOne: function (queryParams, queryOptions, callback) { + // NOTE: queryOptions is ignored + this.checkConnection(); + + var self = this; + var collectionRef = this.db.collection(this.collection); + + var query = firestoreQueryParser(collectionRef, queryParams); + _.unset(queryOptions, 'limit'); + query = applyQueryOptions(query, queryOptions); + query.limit(1).get().then(function (querySnapshot) { + if (querySnapshot.size == 0) { + callback(null, null); + } + querySnapshot.forEach(function (documentSnapshot) { + var vm = new ViewModel(documentSnapshot.data(), self); + vm = enrichVMWithTimestamps(vm, documentSnapshot); + vm.actionOnCommit = 'update'; + callback(null, vm); + }); + }); + }, + + commit: function (vm, callback) { + this.checkConnection(); + + if (!vm.actionOnCommit) return callback(new Error('actionOnCommit is not defined!')); + + var self = this; + + switch(vm.actionOnCommit) { + case 'delete': + var documentPath = this.collection + '/' + vm.id; + var documentRef = this.db.doc(documentPath); + var precondition = getPrecondition(vm); + documentRef.delete(precondition).then(function () { + callback(null); + }).catch(function (err) { + return callback(new ConcurrencyError()); + }); + break; + case 'create': + var documentPath = this.collection + '/' + vm.id; + var documentRef = this.db.doc(documentPath); + documentRef.get().then(function (documentSnapshot) { + if (documentSnapshot.exists) { + return callback(new ConcurrencyError()); + } + documentRef.set(vm.attributes).then(function () { + vm.actionOnCommit = 'update'; + callback(null, vm); + }); + }); + break; + case 'update': + var documentPath = this.collection + '/' + vm.id; + var documentRef = this.db.doc(documentPath); + documentRef.get().then(function (documentSnapshot) { + if (!documentSnapshot.exists) { + documentRef.set(vm.attributes).then(function () { + vm.actionOnCommit = 'update'; + callback(null, vm); + }); + } else { + if (!_.isUndefined(documentSnapshot.updateTime) && + _.isUndefined(vm.get('_updateTime'))) { + return callback(new ConcurrencyError()); + } + + var precondition = getPrecondition(vm); + documentRef.update(vm.attributes, precondition).then(function () { + self.get(vm.id, callback); + }, function (err) { + return callback(new ConcurrencyError()); + }); + } + }); + break; + default: + return callback(new Error('Unknown actionOnCommit: ' + vm.actionOnCommit)); + }; + }, + + checkConnection: function (callback) { + if (this.collection) { + return; + } + + if (collections.indexOf(this.collectionName) < 0) { + collections.push(this.collectionName) + } + + this.collection = this.collectionName; + if (callback) callback(null); + }, + + clear: function (callback) { + this.checkConnection(); + + var self = this; + if (!this.collection) { + if (callback) callback(null); + return; + } + + emptyCollection(this.db, this.collection, callback); + }, + + clearAll: function (callback) { + var self = this; + async.each(collections, function (col, callback) { + emptyCollection(self.db, col, callback); + }, callback); + }, + +}); + +module.exports = Firestore; diff --git a/package.json b/package.json index a0e60b6..148a917 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "uuid": "3.1.0" }, "devDependencies": { + "@google-cloud/firestore": "^0.16.1", "aws-sdk": ">=2.123.0", "azure-storage": ">=0.3.0", "cradle": ">=0.6.7", diff --git a/test/repositoryReadTest.js b/test/repositoryReadTest.js index 2ec6a64..e10128a 100644 --- a/test/repositoryReadTest.js +++ b/test/repositoryReadTest.js @@ -77,7 +77,7 @@ describe('Repository read', function() { describe('with options containing a type property with the value of', function() { - var types = ['inmemory', 'mongodb', 'tingodb', 'couchdb', 'redis', 'elasticsearch6', 'dynamodb'/*, 'elasticsearch', 'documentdb', 'azuretable'*/]; + var types = ['inmemory', 'mongodb', 'tingodb', 'couchdb', 'redis', 'elasticsearch6', 'dynamodb', 'firestore' /*, 'elasticsearch', 'documentdb', 'azuretable' */]; types.forEach(function(type) { @@ -490,7 +490,7 @@ describe('Repository read', function() { }); - var noQueryArray = ['azuretable', 'documentdb', 'dynamodb']; + var noQueryArray = ['azuretable', 'documentdb', 'dynamodb', 'firestore']; if (!_.includes(noQueryArray, type)) { @@ -781,7 +781,7 @@ describe('Repository read', function() { }); - var noQueryArray = ['azuretable', 'documentdb', 'dynamodb']; + var noQueryArray = ['azuretable', 'documentdb', 'dynamodb', 'firestore']; if (!_.includes(noQueryArray, type)) { diff --git a/test/repositoryWriteTest.js b/test/repositoryWriteTest.js index e35b572..192d1b4 100644 --- a/test/repositoryWriteTest.js +++ b/test/repositoryWriteTest.js @@ -76,7 +76,7 @@ describe.only('Repository write', function() { describe('with options containing a type property with the value of', function() { - var types = ['inmemory', 'mongodb', 'tingodb', 'couchdb', 'redis', 'elasticsearch6', 'dynamodb'/*, 'elasticsearch', 'documentdb', 'azuretable'*/]; + var types = ['inmemory', 'mongodb', 'tingodb', 'couchdb', 'redis', 'elasticsearch6', 'dynamodb', 'firestore' /*, 'elasticsearch', 'documentdb', 'azuretable' */]; types.forEach(function(type) { @@ -482,7 +482,7 @@ describe.only('Repository write', function() { }); - var noQueryArray = ['azuretable', 'documentdb', 'dynamodb']; + var noQueryArray = ['azuretable', 'documentdb', 'dynamodb', 'firestore']; if (!_.includes(noQueryArray, type)) { @@ -1247,7 +1247,7 @@ describe.only('Repository write', function() { }); - var noQueryArray = ['azuretable', 'documentdb', 'dynamodb']; + var noQueryArray = ['azuretable', 'documentdb', 'dynamodb', 'firestore']; if (!_.includes(noQueryArray, type)) { @@ -1510,7 +1510,7 @@ describe.only('Repository write', function() { }); - describe('but beeing updated by someone else in the meantime', function() { + describe('but being updated by someone else in the meantime', function() { it('it should callback with a concurrency error', function(done) { @@ -1536,7 +1536,7 @@ describe.only('Repository write', function() { }); - describe('but beeing updated by someone else in the meantime and creating with the same id', function() { + describe('but being updated by someone else in the meantime and creating with the same id', function() { it('it should callback with a concurrency error', function(done) { @@ -1605,7 +1605,7 @@ describe.only('Repository write', function() { }); - describe('but beeing updated by someone else in the meantime', function() { + describe('but being updated by someone else in the meantime', function() { it('it should callback with a concurrency error', function(done) { From 40ccc0f6850f3d1a98818452e5ddbf50def426c8 Mon Sep 17 00:00:00 2001 From: Hamish Rickerby Date: Tue, 22 Jan 2019 10:50:46 +1100 Subject: [PATCH 2/2] Change clear collections to be a batch function - Firebase only supports 500 changes as part of a batch, so this function needs to loop through larger collections. --- lib/databases/firestore.js | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/lib/databases/firestore.js b/lib/databases/firestore.js index e6a3323..e56e10e 100644 --- a/lib/databases/firestore.js +++ b/lib/databases/firestore.js @@ -41,18 +41,34 @@ function firestoreQueryParser(collectionRef, queryParams) { }; function emptyCollection(db, collection, callback) { + var batchSize = 300; var collectionRef = db.collection(collection); - var query = collectionRef.get().then(function (querySnapshot) { - var writeBatch = db.batch(); - querySnapshot.forEach(function (documentSnapshot) { - var documentPath = collection + '/' + documentSnapshot.id; - var documentRef = db.doc(documentPath); - writeBatch.delete(documentRef); - }); - writeBatch.commit().then(function () { - if (callback) callback(null); - }); - }); + var query = collectionRef.limit(batchSize); + + return deleteQueryBatch(db, query, batchSize, callback); +} + +function deleteQueryBatch(db, query, batchSize, callback) { + query.get() + .then((snapshot) => { + if (snapshot.size == 0) { + return 0; + } + + var batch = db.batch(); + snapshot.docs.forEach((doc) => batch.delete(doc.ref)); + return batch.commit().then(() => { + return snapshot.size; + }); + }).then((numDeleted) => { + if (numDeleted == 0) { + return callback(); + } + + process.nextTick(() => { + deleteQueryBatch(db, query, batchSize, callback); + }); + }).catch(callback); }; function getPrecondition(vm) {