From 54617aa0a88c982e550d4987a3e989e03502b8e1 Mon Sep 17 00:00:00 2001 From: "A. Jesse Jiryu Davis" Date: Fri, 23 Dec 2011 17:29:20 -0600 Subject: [PATCH 1/2] Add support for getmore() and tailable cursors --- asyncmongo/connection.py | 5 +- asyncmongo/cursor.py | 154 +++++++++++++--- asyncmongo/helpers.py | 6 +- test/eventually.py | 44 +++++ test/test_getmore.py | 368 +++++++++++++++++++++++++++++++++++++++ test/test_shunt.py | 2 + 6 files changed, 549 insertions(+), 30 deletions(-) create mode 100644 test/eventually.py create mode 100644 test/test_getmore.py diff --git a/asyncmongo/connection.py b/asyncmongo/connection.py index 5d47f3b..e4ed942 100644 --- a/asyncmongo/connection.py +++ b/asyncmongo/connection.py @@ -159,8 +159,6 @@ def _parse_header(self, header): def _parse_response(self, response): # logging.info('got data %r' % response) callback = self.__callback - request_id = self.__request_id - self.__request_id = None self.__callback = None if not self.__deferred_message: # skip adding to the cache because there is something else @@ -169,7 +167,8 @@ def _parse_response(self, response): self.__pool.cache(self) try: - response = helpers._unpack_response(response, request_id) # TODO: pass tz_awar + # TODO: pass tz_aware and as_class + response = helpers._unpack_response(response) except Exception, e: logging.error('error %s' % e) callback(None, e) diff --git a/asyncmongo/cursor.py b/asyncmongo/cursor.py index 45cbce6..043c4ba 100644 --- a/asyncmongo/cursor.py +++ b/asyncmongo/cursor.py @@ -26,7 +26,10 @@ "tailable_cursor": 2, "slave_okay": 4, "oplog_replay": 8, - "no_timeout": 16} + "no_timeout": 16, + "await_data": 32, + "exhaust": 64, + "partial": 128} class Cursor(object): """ Cursor is a class used to call oeprations on a given db/collection using a specific connection pool. @@ -36,12 +39,15 @@ def __init__(self, dbname, collection, pool): assert isinstance(dbname, (str, unicode)) assert isinstance(collection, (str, unicode)) assert isinstance(pool, object) - + + self.__id = None self.__dbname = dbname self.__collection = collection self.__pool = pool self.__slave_okay = False - + self.__retrieved = 0 + self.__killed = False + @property def full_collection_name(self): return u'%s.%s' % (self.__dbname, self.__collection) @@ -269,8 +275,9 @@ def find_one(self, spec_or_id, **kwargs): def find(self, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, slave_okay=False, + await_data=False, _must_use_master=False, _is_command=False, - callback=None): + callback=None, batch_size=0): """Query the database. The `spec` argument is a prototype document that all results @@ -287,6 +294,25 @@ def find(self, spec=None, fields=None, skip=0, limit=0, Raises :class:`TypeError` if any of the arguments are of improper type. + + Returns a cursor. find() calls your callback either with an error, + or with the first 100 documents. You must call get_more() repeatedly + on the cursor until it is exhausted: + + class Handler(tornado.web.RequestHandler): + @tornado.web.asynchronous + def get(self): + self.cursor = self.db.collection.find({}, batch_size=300, + callback=self._on_response + ) + + def _on_response(self, response, error): + assert not error + self.write(str(response)) + if self.cursor.alive: + self.cursor.get_more(self._on_response) + else: + self.finish() :Parameters: - `spec` (optional): a SON object specifying elements which @@ -318,6 +344,7 @@ def find(self, spec=None, fields=None, skip=0, limit=0, continue from the last document received. For details, see the `tailable cursor documentation `_. + .. versionadded:: 1.2 - `sort` (optional): a list of (key, direction) pairs specifying the sort order for this query. See :meth:`~pymongo.cursor.Cursor.sort` for details. @@ -325,7 +352,15 @@ def find(self, spec=None, fields=None, skip=0, limit=0, examined when performing the query - `slave_okay` (optional): is it okay to connect directly to and perform queries on a slave instance - + - `await_data` (optional): if True, the server will block for + some extra time before returning, waiting for more data to + return. Ignored if `tailable` is False. + .. versionadded:: 1.2 + - `callback` (optional): a function that takes arguments (result, + error): a list of result documents, or an Exception + - `batch_size`: The size of each batch of results requested. + .. versionadded:: 1.2 + .. mongodoc:: find """ @@ -344,6 +379,8 @@ def find(self, spec=None, fields=None, skip=0, limit=0, raise TypeError("snapshot must be an instance of bool") if not isinstance(tailable, bool): raise TypeError("tailable must be an instance of bool") + if not isinstance(await_data, bool): + raise TypeError("await_data must be an instance of bool") if not callable(callback): raise TypeError("callback must be callable") @@ -357,10 +394,11 @@ def find(self, spec=None, fields=None, skip=0, limit=0, self.__fields = fields self.__skip = skip self.__limit = limit - self.__batch_size = 0 + self.__batch_size = batch_size self.__timeout = timeout self.__tailable = tailable + self.__await_data = tailable and await_data self.__snapshot = snapshot self.__ordering = sort and helpers._index_document(sort) or None self.__max_scan = max_scan @@ -371,14 +409,21 @@ def find(self, spec=None, fields=None, skip=0, limit=0, self.__tz_aware = False #collection.database.connection.tz_aware self.__must_use_master = _must_use_master self.__is_command = _is_command - + + ntoreturn = self.__batch_size + if self.__limit: + if self.__batch_size: + ntoreturn = min(self.__limit, self.__batch_size) + else: + ntoreturn = self.__limit + connection = self.__pool.connection() try: connection.send_message( message.query(self.__query_options(), self.full_collection_name, self.__skip, - self.__limit, + ntoreturn, self.__query_spec(), self.__fields), callback=functools.partial(self._handle_response, orig_callback=callback)) @@ -386,19 +431,55 @@ def find(self, spec=None, fields=None, skip=0, limit=0, logging.error('Error sending query %s' % e) connection.close() raise + + return self + + def get_more(self, callback, batch_size=None): + """ + Calls the given callback when more data is available from a find() + command. + + :Parameters: + - `callback` (optional): a function that takes arguments (result, + error): a list of result documents, or an Exception + - `batch_size`: The size of each batch of results requested. + """ + if batch_size is None: + batch_size = self.__batch_size + if self.__limit: + limit = self.__limit - self.__retrieved + if batch_size: + limit = min(limit, batch_size) + else: + limit = batch_size + + connection = self.__pool.connection() + try: + connection.send_message( + message.get_more( + self.full_collection_name, + limit, + self.__id), + callback=functools.partial(self._handle_response, orig_callback=callback)) + except Exception, e: + logging.error('Error in get_more %s' % e) + connection.close() + raise def _handle_response(self, result, error=None, orig_callback=None): - if result and result.get('cursor_id'): - connection = self.__pool.connection() - try: - connection.send_message( - message.kill_cursors([result['cursor_id']]), - callback=None) - except Exception, e: - logging.error('Error killing cursor %s: %s' % (result['cursor_id'], e)) - connection.close() - raise - + if result: + self.__retrieved += result.get('number_returned', 0) + if self.__id and result.get('cursor_id'): + assert self.__id == result.get('cursor_id') + else: + self.__id = result.get('cursor_id') + + if self.__retrieved >= self.__limit > 0: + self.kill() + + if result and 0 == result.get('cursor_id'): + self.__killed = True + if error: logging.error('%s %s' % (self.full_collection_name , error)) orig_callback(None, error=error) @@ -409,12 +490,13 @@ def _handle_response(self, result, error=None, orig_callback=None): else: orig_callback(result['data'], error=None) - def __query_options(self): """Get the query options string to use for this query.""" options = 0 if self.__tailable: options |= _QUERY_OPTIONS["tailable_cursor"] + if self.__await_data: + options |= _QUERY_OPTIONS["await_data"] if self.__slave_okay or self.__pool._slave_okay: options |= _QUERY_OPTIONS["slave_okay"] if not self.__timeout: @@ -437,5 +519,33 @@ def __query_spec(self): if self.__max_scan: spec["$maxScan"] = self.__max_scan return spec - - + + @property + def alive(self): + """Does this cursor have the potential to return more data? + + This is mostly useful with `tailable cursors + `_ + since they will stop iterating even though they *may* return more + results in the future. + + .. versionadded:: 1.2 + """ + return not self.__killed + + def kill(self): + if self.alive and self.__id: + self.__killed = True + logging.debug('killing cursor %s', self.__id) + connection = self.__pool.connection() + try: + connection.send_message( + message.kill_cursors([self.__id]), + callback=None) + except Exception, e: + logging.error('Error killing cursor %s: %s' % (self.__id, e)) + connection.close() + raise + + def __del__(self): + self.kill() diff --git a/asyncmongo/helpers.py b/asyncmongo/helpers.py index 4fc834c..3b16a65 100644 --- a/asyncmongo/helpers.py +++ b/asyncmongo/helpers.py @@ -22,11 +22,7 @@ def _unpack_response(response, cursor_id=None, as_class=dict, tz_aware=False): """ response_flag = struct.unpack("> sys.stderr, 'got RESULTS:', len(result) + self.assertEqual(None, error, str(error)) + self.assertEqual(expected_size, len(result)) + + # Initial QUERY + cursor = self.db.capped_coll.find( + spec={}, + fields={'_id': False}, + tailable=True, + callback=callback + ) + + # Default first batch size: 101 docs + expected_size = 101 + + self.assertNotEqual(None, cursor, + "find() should return a Cursor instance" + ) + + # Get first batch + tornado.ioloop.IOLoop.instance().start() + + # One open cursor + self.assert_(cursor.alive) + self.assertEqual(1, self.get_open_cursors() - before) + + # Get second batch -- everything remaining right now + expected_size = self.n_documents - 101 + cursor.get_more(callback=callback) + tornado.ioloop.IOLoop.instance().start() + + # Still one open cursor + self.assert_(cursor.alive) + self.assertEqual(1, self.get_open_cursors() - before) + + # Add to the capped collection + expected_size = n_new_docs = 50 + self.pymongo_conn.test.capped_coll.insert( + [{} for i in range(n_new_docs)], safe=True + ) + + # Let the cursor tail the next set of docs + cursor.get_more(callback=callback) + tornado.ioloop.IOLoop.instance().start() + + # Once more, add to the capped collection + expected_size = n_new_docs = 250 + self.pymongo_conn.test.capped_coll.insert( + [{} for i in range(n_new_docs)], safe=True + ) + + # Let the cursor tail the next set of docs + cursor.get_more(callback=callback) + tornado.ioloop.IOLoop.instance().start() + + # check cursors -- dereferencing the cursor here should delete it + # server-side + self.assertEqual(1, self.get_open_cursors() - before, + "Should have 1 open cursor before deleting our reference to it" + ) + + del cursor + after = self.get_open_cursors() + self.assertEqual( + before, after, + "%d cursors left open (should be 0)" % (after - before), + ) + + def test_empty_capped_collection(self): + # This might not be the behavior you expect from Mongo, but a tailable + # cursor on an empty capped collection dies immediately. Verify it acts + # this way in asyncmongo. + self.pymongo_conn.test.drop_collection('capped_coll') + self.pymongo_conn.test.create_collection( + 'capped_coll', + size=10*1024**2, # 10 MB + capped=True + ) + + before = self.get_open_cursors() + + def callback(result, error): + tornado.ioloop.IOLoop.instance().stop() + self.assertEqual(None, error, str(error)) + # Empty result + self.assertEqual([], result) + + # Initial QUERY + cursor = self.db.capped_coll.find( + spec={}, + fields={'_id': False}, + tailable=True, + await_data=True, # THIS IS WHAT WE'RE TESTING + callback=callback + ) + + tornado.ioloop.IOLoop.instance().start() + self.assertFalse(cursor.alive, + "Tailable cursor on empty capped collection should die" + ) + + after = self.get_open_cursors() + self.assertEqual( + before, after, + "%d cursors left open (should be 0)" % (after - before), + ) + + def test_await_data(self): + self.pymongo_conn.test.drop_collection('capped_coll') + self.pymongo_conn.test.create_collection( + 'capped_coll', + size=10*1024**2, # 10 MB + capped=True + ) + + # 10 documents + self.pymongo_conn.test.capped_coll.insert( + [{} for i in range(10)], + safe=True + ) + + before = self.get_open_cursors() + + batches = Batches([]) + + def callback(result, error): + tornado.ioloop.IOLoop.instance().stop() +# print >> sys.stderr, 'got RESULTS:', len(result) + self.assertEqual(None, error, str(error)) + batches.append(result) + + # Initial QUERY + cursor = self.db.capped_coll.find( + spec={}, + fields={'_id': False}, + tailable=True, + await_data=True, # THIS IS WHAT WE'RE TESTING + callback=callback + ) + + self.assertEqual( + (1 << 1) | (1 << 5), # tailable & await_data + cursor._Cursor__query_options() + ) + + self.assertNotEqual(None, cursor, + "find() should return a Cursor instance" + ) + + # Get first batch + tornado.ioloop.IOLoop.instance().start() + + self.assertEqual(1, len(batches.batches)) + self.assertEqual(10, len(batches.batches[0])) + + # One open cursor + self.assert_(cursor.alive, "Cursor should still be alive") + self.assertEqual(1, self.get_open_cursors() - before) + + # Get more again -- there's nothing to get, but await_data should make + # us pause a second or two before returning nothing + cursor.get_more(callback=callback) + start_time = time.time() + tornado.ioloop.IOLoop.instance().start() + + self.assertEqual(2, len(batches.batches)) + self.assertEqual(0, len(batches.batches[1])) + self.assertGreater( + time.time() - start_time, + 1, + "With await_data = True, cursor should wait at least a second" + " before returning nothing" + ) + + # Still one open cursor + self.assert_(cursor.alive) + self.assertEqual(1, self.get_open_cursors() - before) + + del cursor + after = self.get_open_cursors() + self.assertEqual( + before, after, + "%d cursors left open (should be 0)" % (after - before), + ) + +if __name__ == '__main__': + import unittest + unittest.main() diff --git a/test/test_shunt.py b/test/test_shunt.py index 4584474..09b10df 100644 --- a/test/test_shunt.py +++ b/test/test_shunt.py @@ -42,6 +42,8 @@ class MongoTest(unittest.TestCase): mongod_options = [('--port', str(27017))] def setUp(self): """setup method that starts up mongod instances using `self.mongo_options`""" + super(MongoTest, self).setUp() + # So any function that calls IOLoop.instance() gets the # PuritanicalIOLoop instead of a default loop. if not tornado.ioloop.IOLoop.initialized(): From 59d2160d1e4451f69dc383f813b76db6fbb2465a Mon Sep 17 00:00:00 2001 From: "A. Jesse Jiryu Davis" Date: Fri, 23 Dec 2011 17:29:58 -0600 Subject: [PATCH 2/2] Sample app demonstrating tailable cursors in asyncmongo --- test/sample_tailing_app/README | 8 ++ test/sample_tailing_app/example.py | 196 +++++++++++++++++++++++++++++ test/sample_tailing_app/index.html | 127 +++++++++++++++++++ 3 files changed, 331 insertions(+) create mode 100644 test/sample_tailing_app/README create mode 100644 test/sample_tailing_app/example.py create mode 100644 test/sample_tailing_app/index.html diff --git a/test/sample_tailing_app/README b/test/sample_tailing_app/README new file mode 100644 index 0000000..01ab717 --- /dev/null +++ b/test/sample_tailing_app/README @@ -0,0 +1,8 @@ +This Tornado test application demonstrates tailable cursors. It creates a +capped collection in the 'test' database and displays a web page at +http://localhost:8888 that streams new documents in the capped collection to the +page. + +Try opening several copies of the page and using the app to add documents to the +collection, or clear the collection. You can also use the mongo shell to insert +a document and see it appear in the page. diff --git a/test/sample_tailing_app/example.py b/test/sample_tailing_app/example.py new file mode 100644 index 0000000..76669de --- /dev/null +++ b/test/sample_tailing_app/example.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python +import functools + +import json +import sys +import time + +import tornado.ioloop +import tornado.web +import tornado.options + +import pymongo +import pymongo.objectid +import asyncmongo + +sync_db = None +async_db = None + + +def create_collection(): + sync_db.create_collection('capped_coll', size=10000, capped=True) + print 'Created capped collection "capped_coll" in database "test"' + + +def json_default(obj): + """ + Convert non-JSON-serializable obj to something serializable + """ + if isinstance(obj, pymongo.objectid.ObjectId): + return str(obj) + else: + raise TypeError("%s is not JSON-serializable" % repr(obj)) + + +# Map client tokens (random floats) to MongoDB cursors +token2cursor = {} + + +class TailingHandler(tornado.web.RequestHandler): + def _remove_dead_cursor(self): + print 'Caught dead cursor' + del token2cursor[self.token] + + # Make the client wait 1 second before trying again + tornado.ioloop.IOLoop.instance().add_timeout( + time.time() + 1, + functools.partial(self._on_response, True, [], None) + ) + + def _find(self): + # Start tailing capped_coll. The cursor dies immediately if the + # collection is empty. Otherwise, it allows us to tail the cursor with + # getMore messages. + cursor = async_db.capped_coll.find( + {}, tailable=True, await_data=True, + callback=functools.partial(self._on_response, True) + ) + + token2cursor[self.token] = cursor + + def _get_more(self, cursor): + # Continue tailing. The await_data=True option we passed in when we + # created the cursor means that if there's no more data, the cursor + # will wait about a second for more data to come in before returning + cursor.get_more( + callback=functools.partial(self._on_response, False) + ) + + @tornado.web.asynchronous + def get(self, token): + """ + Process a GET request for data from capped_coll. Returns a JSONified + list of new documents as soon as they're available, or waits a few + seconds to return if there aren't any new documents. + """ + self.token = token + + cursor = token2cursor.get(token) + if cursor and not cursor.alive: + self._remove_dead_cursor() + elif not cursor: + self._find() + else: + self._get_more(cursor) + + def _on_response(self, new, response, error): + """ + Asynchronous callback when find() or get_more() completes. Sends result + to the client. + @param new: Whether this is the response to a new cursor (find) or an + existing cursor (get_more) + """ + if not self.request.connection.stream.socket: + # Client went away + if self.token in token2cursor: + del token2cursor[self.token] + + elif error: + # Something's wrong with this cursor, remove it + if self.token in token2cursor: + del token2cursor[self.token] + + # Ignore errors from dropped collections + if error.message != 'cursor not valid at server': + self.set_status(500) + self.write(str(error)) + + self.finish() + + elif not new and not response: + # Response is empty list if no data came in during the seconds while + # we waited for more data. get_more() does *not* block indefinitely + # for more data, it only blocks for a few seconds. Let's not call + # self.finish(), and just start waiting for data again. + cursor = token2cursor.get(self.token) + if cursor: + if cursor.alive: + self._get_more(cursor) + else: + self._remove_dead_cursor() + else: + self._find() + else: + # We have new data for the client. + self.set_header('Content-Type', 'application/json; charset=UTF-8') + final_response = json.dumps({ + 'is_new': new, + 'response': response or [] + }, default=json_default) + + self.write(final_response) + self.finish() + + +class DocumentHandler(tornado.web.RequestHandler): + @tornado.web.asynchronous + def post(self): + """ + Insert a new document to the capped collection + """ + try: + doc = json.loads(self.request.body) + async_db.capped_coll.insert(doc, callback=self._on_response) + except Exception, e: + self.set_status(500) + self.write(str(e)) + self.finish() + + def _on_response(self, response, error): + if error: + raise error + self.finish() + +class ClearCollectionHandler(tornado.web.RequestHandler): + def post(self): + """ + Delete everything in the collection + """ + sync_db.capped_coll.drop() + create_collection() + + +if __name__ == '__main__': + sync_db = pymongo.Connection().test + try: + create_collection() + except pymongo.errors.CollectionInvalid: + if 'capped' not in sync_db.capped_coll.options(): + print >> sys.stderr, ( + 'test.capped_coll exists and is not a capped collection,\n' + 'please drop the collection and start this example app again.' + ) + sys.exit(1) + + tornado.options.parse_command_line() + application = tornado.web.Application([ + # jQuery.ajax() with cache=false adds a random int to the end of the + # URL like ?_=1234; ignore it + (r'/capped_coll/(?P[^?]+)', TailingHandler), + (r'/document', DocumentHandler), + (r'/clear', ClearCollectionHandler), + (r'/(.*)', tornado.web.StaticFileHandler, {'path': 'index.html'}) + ]) + + async_db = asyncmongo.Client(pool_id='test', + host='127.0.0.1', + port=27017, + mincached=5, + maxcached=15, + maxconnections=30, + dbname='test' + ) + + application.listen(8888) + print 'Listening on port 8888' + tornado.ioloop.IOLoop.instance().start() diff --git a/test/sample_tailing_app/index.html b/test/sample_tailing_app/index.html new file mode 100644 index 0000000..16a55e5 --- /dev/null +++ b/test/sample_tailing_app/index.html @@ -0,0 +1,127 @@ + + + + + + + AsyncMongo Tailable Cursor Example + + +
+ + + + +
+ +
+

Tailing test.capped_coll:

+
+ + \ No newline at end of file