diff --git a/asyncmongo/connection.py b/asyncmongo/connection.py index 5d47f3b..e4ed942 100644 --- a/asyncmongo/connection.py +++ b/asyncmongo/connection.py @@ -159,8 +159,6 @@ def _parse_header(self, header): def _parse_response(self, response): # logging.info('got data %r' % response) callback = self.__callback - request_id = self.__request_id - self.__request_id = None self.__callback = None if not self.__deferred_message: # skip adding to the cache because there is something else @@ -169,7 +167,8 @@ def _parse_response(self, response): self.__pool.cache(self) try: - response = helpers._unpack_response(response, request_id) # TODO: pass tz_awar + # TODO: pass tz_aware and as_class + response = helpers._unpack_response(response) except Exception, e: logging.error('error %s' % e) callback(None, e) diff --git a/asyncmongo/cursor.py b/asyncmongo/cursor.py index 45cbce6..043c4ba 100644 --- a/asyncmongo/cursor.py +++ b/asyncmongo/cursor.py @@ -26,7 +26,10 @@ "tailable_cursor": 2, "slave_okay": 4, "oplog_replay": 8, - "no_timeout": 16} + "no_timeout": 16, + "await_data": 32, + "exhaust": 64, + "partial": 128} class Cursor(object): """ Cursor is a class used to call oeprations on a given db/collection using a specific connection pool. @@ -36,12 +39,15 @@ def __init__(self, dbname, collection, pool): assert isinstance(dbname, (str, unicode)) assert isinstance(collection, (str, unicode)) assert isinstance(pool, object) - + + self.__id = None self.__dbname = dbname self.__collection = collection self.__pool = pool self.__slave_okay = False - + self.__retrieved = 0 + self.__killed = False + @property def full_collection_name(self): return u'%s.%s' % (self.__dbname, self.__collection) @@ -269,8 +275,9 @@ def find_one(self, spec_or_id, **kwargs): def find(self, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, slave_okay=False, + await_data=False, _must_use_master=False, _is_command=False, - callback=None): + callback=None, batch_size=0): """Query the database. The `spec` argument is a prototype document that all results @@ -287,6 +294,25 @@ def find(self, spec=None, fields=None, skip=0, limit=0, Raises :class:`TypeError` if any of the arguments are of improper type. + + Returns a cursor. find() calls your callback either with an error, + or with the first 100 documents. You must call get_more() repeatedly + on the cursor until it is exhausted: + + class Handler(tornado.web.RequestHandler): + @tornado.web.asynchronous + def get(self): + self.cursor = self.db.collection.find({}, batch_size=300, + callback=self._on_response + ) + + def _on_response(self, response, error): + assert not error + self.write(str(response)) + if self.cursor.alive: + self.cursor.get_more(self._on_response) + else: + self.finish() :Parameters: - `spec` (optional): a SON object specifying elements which @@ -318,6 +344,7 @@ def find(self, spec=None, fields=None, skip=0, limit=0, continue from the last document received. For details, see the `tailable cursor documentation `_. + .. versionadded:: 1.2 - `sort` (optional): a list of (key, direction) pairs specifying the sort order for this query. See :meth:`~pymongo.cursor.Cursor.sort` for details. @@ -325,7 +352,15 @@ def find(self, spec=None, fields=None, skip=0, limit=0, examined when performing the query - `slave_okay` (optional): is it okay to connect directly to and perform queries on a slave instance - + - `await_data` (optional): if True, the server will block for + some extra time before returning, waiting for more data to + return. Ignored if `tailable` is False. + .. versionadded:: 1.2 + - `callback` (optional): a function that takes arguments (result, + error): a list of result documents, or an Exception + - `batch_size`: The size of each batch of results requested. + .. versionadded:: 1.2 + .. mongodoc:: find """ @@ -344,6 +379,8 @@ def find(self, spec=None, fields=None, skip=0, limit=0, raise TypeError("snapshot must be an instance of bool") if not isinstance(tailable, bool): raise TypeError("tailable must be an instance of bool") + if not isinstance(await_data, bool): + raise TypeError("await_data must be an instance of bool") if not callable(callback): raise TypeError("callback must be callable") @@ -357,10 +394,11 @@ def find(self, spec=None, fields=None, skip=0, limit=0, self.__fields = fields self.__skip = skip self.__limit = limit - self.__batch_size = 0 + self.__batch_size = batch_size self.__timeout = timeout self.__tailable = tailable + self.__await_data = tailable and await_data self.__snapshot = snapshot self.__ordering = sort and helpers._index_document(sort) or None self.__max_scan = max_scan @@ -371,14 +409,21 @@ def find(self, spec=None, fields=None, skip=0, limit=0, self.__tz_aware = False #collection.database.connection.tz_aware self.__must_use_master = _must_use_master self.__is_command = _is_command - + + ntoreturn = self.__batch_size + if self.__limit: + if self.__batch_size: + ntoreturn = min(self.__limit, self.__batch_size) + else: + ntoreturn = self.__limit + connection = self.__pool.connection() try: connection.send_message( message.query(self.__query_options(), self.full_collection_name, self.__skip, - self.__limit, + ntoreturn, self.__query_spec(), self.__fields), callback=functools.partial(self._handle_response, orig_callback=callback)) @@ -386,19 +431,55 @@ def find(self, spec=None, fields=None, skip=0, limit=0, logging.error('Error sending query %s' % e) connection.close() raise + + return self + + def get_more(self, callback, batch_size=None): + """ + Calls the given callback when more data is available from a find() + command. + + :Parameters: + - `callback` (optional): a function that takes arguments (result, + error): a list of result documents, or an Exception + - `batch_size`: The size of each batch of results requested. + """ + if batch_size is None: + batch_size = self.__batch_size + if self.__limit: + limit = self.__limit - self.__retrieved + if batch_size: + limit = min(limit, batch_size) + else: + limit = batch_size + + connection = self.__pool.connection() + try: + connection.send_message( + message.get_more( + self.full_collection_name, + limit, + self.__id), + callback=functools.partial(self._handle_response, orig_callback=callback)) + except Exception, e: + logging.error('Error in get_more %s' % e) + connection.close() + raise def _handle_response(self, result, error=None, orig_callback=None): - if result and result.get('cursor_id'): - connection = self.__pool.connection() - try: - connection.send_message( - message.kill_cursors([result['cursor_id']]), - callback=None) - except Exception, e: - logging.error('Error killing cursor %s: %s' % (result['cursor_id'], e)) - connection.close() - raise - + if result: + self.__retrieved += result.get('number_returned', 0) + if self.__id and result.get('cursor_id'): + assert self.__id == result.get('cursor_id') + else: + self.__id = result.get('cursor_id') + + if self.__retrieved >= self.__limit > 0: + self.kill() + + if result and 0 == result.get('cursor_id'): + self.__killed = True + if error: logging.error('%s %s' % (self.full_collection_name , error)) orig_callback(None, error=error) @@ -409,12 +490,13 @@ def _handle_response(self, result, error=None, orig_callback=None): else: orig_callback(result['data'], error=None) - def __query_options(self): """Get the query options string to use for this query.""" options = 0 if self.__tailable: options |= _QUERY_OPTIONS["tailable_cursor"] + if self.__await_data: + options |= _QUERY_OPTIONS["await_data"] if self.__slave_okay or self.__pool._slave_okay: options |= _QUERY_OPTIONS["slave_okay"] if not self.__timeout: @@ -437,5 +519,33 @@ def __query_spec(self): if self.__max_scan: spec["$maxScan"] = self.__max_scan return spec - - + + @property + def alive(self): + """Does this cursor have the potential to return more data? + + This is mostly useful with `tailable cursors + `_ + since they will stop iterating even though they *may* return more + results in the future. + + .. versionadded:: 1.2 + """ + return not self.__killed + + def kill(self): + if self.alive and self.__id: + self.__killed = True + logging.debug('killing cursor %s', self.__id) + connection = self.__pool.connection() + try: + connection.send_message( + message.kill_cursors([self.__id]), + callback=None) + except Exception, e: + logging.error('Error killing cursor %s: %s' % (self.__id, e)) + connection.close() + raise + + def __del__(self): + self.kill() diff --git a/asyncmongo/helpers.py b/asyncmongo/helpers.py index 4fc834c..3b16a65 100644 --- a/asyncmongo/helpers.py +++ b/asyncmongo/helpers.py @@ -22,11 +22,7 @@ def _unpack_response(response, cursor_id=None, as_class=dict, tz_aware=False): """ response_flag = struct.unpack("> sys.stderr, ( + 'test.capped_coll exists and is not a capped collection,\n' + 'please drop the collection and start this example app again.' + ) + sys.exit(1) + + tornado.options.parse_command_line() + application = tornado.web.Application([ + # jQuery.ajax() with cache=false adds a random int to the end of the + # URL like ?_=1234; ignore it + (r'/capped_coll/(?P[^?]+)', TailingHandler), + (r'/document', DocumentHandler), + (r'/clear', ClearCollectionHandler), + (r'/(.*)', tornado.web.StaticFileHandler, {'path': 'index.html'}) + ]) + + async_db = asyncmongo.Client(pool_id='test', + host='127.0.0.1', + port=27017, + mincached=5, + maxcached=15, + maxconnections=30, + dbname='test' + ) + + application.listen(8888) + print 'Listening on port 8888' + tornado.ioloop.IOLoop.instance().start() diff --git a/test/sample_tailing_app/index.html b/test/sample_tailing_app/index.html new file mode 100644 index 0000000..16a55e5 --- /dev/null +++ b/test/sample_tailing_app/index.html @@ -0,0 +1,127 @@ + + + + + + + AsyncMongo Tailable Cursor Example + + +
+ + + + +
+ +
+

Tailing test.capped_coll:

+
+ + \ No newline at end of file diff --git a/test/test_getmore.py b/test/test_getmore.py new file mode 100644 index 0000000..3d02fa2 --- /dev/null +++ b/test/test_getmore.py @@ -0,0 +1,368 @@ +import time + +import tornado.ioloop + +import test_shunt +import eventually +import asyncmongo + +class Batches: + """ + Bookkeeping - record the batches of data we receive from the server so we + can assert things about them. + """ + def __init__(self, sizes): + # Batch sizes to request from server + self.sizes = sizes + + # Current batch + self.index = 0 + + # Batches of data received from server + self.batches = [] + self.batch_timestamps = [] + + def next_batch_size(self): + if self.index < len(self.sizes): + return self.sizes[self.index] + else: + return 0 + + def append(self, batch): + self.batches.append(batch) + self.batch_timestamps.append(time.time()) + + def sizes_received(self): + return [len(batch) for batch in self.batches] + + def total_received(self): + return sum(self.sizes_received()) + +class GetmoreTest( + test_shunt.MongoTest, + test_shunt.SynchronousMongoTest, + eventually.AssertEventuallyTest +): + """ + Test the GETMORE message: + http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPGETMORE + """ + n_documents = 300 + + def setUp(self): + super(GetmoreTest, self).setUp() + self.pymongo_conn.test.drop_collection('foo') + + # Fun fact: if there's no index, and we call find() with a sort option, + # then Mongo will have to do a full sort in memory, and since it's + # loaded all the results into memory anyway, it will ignore the default + # batch size of 101 results and just send us everything in one batch. + self.pymongo_conn.test.foo.ensure_index([('i', asyncmongo.ASCENDING)]) + self.pymongo_conn.test.foo.insert([{'i': i} for i in xrange(self.n_documents)]) + + self.db = asyncmongo.Client( + pool_id='test_query', host='127.0.0.1', + port=int(self.mongod_options[0][1]), dbname='test', mincached=3 + ) + + def _test_cursor(self, sizes, expected_sizes, limit): + """ + Test that we can do a cursor.get_more() with various batch sizes. + @param sizes: List of ints, the batch sizes to request in order + @param limit: Int, limit for initial query + """ + batches = Batches(sizes) + + def callback(result, error): + self.assertEqual(None, error, str(error)) + batches.append(result) + + if cursor.alive: + # GETMORE + cursor.get_more(callback, batch_size=batches.next_batch_size()) + batches.index += 1 + + # Initial QUERY + kwargs = dict( + spec={}, + sort=[('i', asyncmongo.ASCENDING)], + fields={'_id': False}, + callback=callback, + batch_size=batches.next_batch_size(), + ) + + if limit is not None: + kwargs['limit'] = limit + + cursor = self.db.foo.find(**kwargs) + + batches.index += 1 + + self.assertNotEqual(None, cursor, + "find() should return a Cursor instance" + ) + + self.assertEventuallyEqual( + expected_sizes, + batches.sizes_received, # A method that's called periodically + ) + + before = self.get_open_cursors() + + # This will complete once all the assertEventually calls complete. + tornado.ioloop.IOLoop.instance().start() + + # check cursors + after = self.get_open_cursors() + self.assertEqual( + before, after, + "%d cursors left open (should be 0)" % (after - before), + ) + + def test_no_batch_size(self): + # Arrange the query into a series of batches of peculiar sizes. The + # final batch will have batch_size 0, meaning "send me the rest". + self._test_cursor( + sizes=[], + expected_sizes=[101, self.n_documents - 101], + limit=None, + ) + + def test_batch_sizes(self): + # Arrange the query into a series of batches of peculiar sizes. The + # final batch will have batch_size 0, meaning "send me the rest". + sizes = [20, 17, 102, 1, 2, 4] + self._test_cursor( + sizes=sizes, + expected_sizes=sizes + [self.n_documents - sum(sizes)], + limit=None, + ) + + def test_big_limit(self): + # Don't set a batch size, only a limit + self._test_cursor( + sizes=[], + limit=1000, + expected_sizes=[self.n_documents], + ) + + def test_medium_limit(self): + self._test_cursor( + sizes=[], + limit=150, + expected_sizes=[150], + ) + + def test_limit_and_batch_sizes(self): + # Send a series of batch sizes and *also* a limit of 75 total records. + self._test_cursor( + sizes=[50, 1, 80, 7, 150], + limit=75, + expected_sizes=[50, 1, 24], + ) + + def test_tailable(self): + self.pymongo_conn.test.drop_collection('capped_coll') + self.pymongo_conn.test.create_collection( + 'capped_coll', + size=10*1024**2, # 10 MB + capped=True + ) + + self.pymongo_conn.test.capped_coll.insert( + [{} for i in range(self.n_documents)], + safe=True + ) + + before = self.get_open_cursors() + + def callback(result, error): + tornado.ioloop.IOLoop.instance().stop() +# print >> sys.stderr, 'got RESULTS:', len(result) + self.assertEqual(None, error, str(error)) + self.assertEqual(expected_size, len(result)) + + # Initial QUERY + cursor = self.db.capped_coll.find( + spec={}, + fields={'_id': False}, + tailable=True, + callback=callback + ) + + # Default first batch size: 101 docs + expected_size = 101 + + self.assertNotEqual(None, cursor, + "find() should return a Cursor instance" + ) + + # Get first batch + tornado.ioloop.IOLoop.instance().start() + + # One open cursor + self.assert_(cursor.alive) + self.assertEqual(1, self.get_open_cursors() - before) + + # Get second batch -- everything remaining right now + expected_size = self.n_documents - 101 + cursor.get_more(callback=callback) + tornado.ioloop.IOLoop.instance().start() + + # Still one open cursor + self.assert_(cursor.alive) + self.assertEqual(1, self.get_open_cursors() - before) + + # Add to the capped collection + expected_size = n_new_docs = 50 + self.pymongo_conn.test.capped_coll.insert( + [{} for i in range(n_new_docs)], safe=True + ) + + # Let the cursor tail the next set of docs + cursor.get_more(callback=callback) + tornado.ioloop.IOLoop.instance().start() + + # Once more, add to the capped collection + expected_size = n_new_docs = 250 + self.pymongo_conn.test.capped_coll.insert( + [{} for i in range(n_new_docs)], safe=True + ) + + # Let the cursor tail the next set of docs + cursor.get_more(callback=callback) + tornado.ioloop.IOLoop.instance().start() + + # check cursors -- dereferencing the cursor here should delete it + # server-side + self.assertEqual(1, self.get_open_cursors() - before, + "Should have 1 open cursor before deleting our reference to it" + ) + + del cursor + after = self.get_open_cursors() + self.assertEqual( + before, after, + "%d cursors left open (should be 0)" % (after - before), + ) + + def test_empty_capped_collection(self): + # This might not be the behavior you expect from Mongo, but a tailable + # cursor on an empty capped collection dies immediately. Verify it acts + # this way in asyncmongo. + self.pymongo_conn.test.drop_collection('capped_coll') + self.pymongo_conn.test.create_collection( + 'capped_coll', + size=10*1024**2, # 10 MB + capped=True + ) + + before = self.get_open_cursors() + + def callback(result, error): + tornado.ioloop.IOLoop.instance().stop() + self.assertEqual(None, error, str(error)) + # Empty result + self.assertEqual([], result) + + # Initial QUERY + cursor = self.db.capped_coll.find( + spec={}, + fields={'_id': False}, + tailable=True, + await_data=True, # THIS IS WHAT WE'RE TESTING + callback=callback + ) + + tornado.ioloop.IOLoop.instance().start() + self.assertFalse(cursor.alive, + "Tailable cursor on empty capped collection should die" + ) + + after = self.get_open_cursors() + self.assertEqual( + before, after, + "%d cursors left open (should be 0)" % (after - before), + ) + + def test_await_data(self): + self.pymongo_conn.test.drop_collection('capped_coll') + self.pymongo_conn.test.create_collection( + 'capped_coll', + size=10*1024**2, # 10 MB + capped=True + ) + + # 10 documents + self.pymongo_conn.test.capped_coll.insert( + [{} for i in range(10)], + safe=True + ) + + before = self.get_open_cursors() + + batches = Batches([]) + + def callback(result, error): + tornado.ioloop.IOLoop.instance().stop() +# print >> sys.stderr, 'got RESULTS:', len(result) + self.assertEqual(None, error, str(error)) + batches.append(result) + + # Initial QUERY + cursor = self.db.capped_coll.find( + spec={}, + fields={'_id': False}, + tailable=True, + await_data=True, # THIS IS WHAT WE'RE TESTING + callback=callback + ) + + self.assertEqual( + (1 << 1) | (1 << 5), # tailable & await_data + cursor._Cursor__query_options() + ) + + self.assertNotEqual(None, cursor, + "find() should return a Cursor instance" + ) + + # Get first batch + tornado.ioloop.IOLoop.instance().start() + + self.assertEqual(1, len(batches.batches)) + self.assertEqual(10, len(batches.batches[0])) + + # One open cursor + self.assert_(cursor.alive, "Cursor should still be alive") + self.assertEqual(1, self.get_open_cursors() - before) + + # Get more again -- there's nothing to get, but await_data should make + # us pause a second or two before returning nothing + cursor.get_more(callback=callback) + start_time = time.time() + tornado.ioloop.IOLoop.instance().start() + + self.assertEqual(2, len(batches.batches)) + self.assertEqual(0, len(batches.batches[1])) + self.assertGreater( + time.time() - start_time, + 1, + "With await_data = True, cursor should wait at least a second" + " before returning nothing" + ) + + # Still one open cursor + self.assert_(cursor.alive) + self.assertEqual(1, self.get_open_cursors() - before) + + del cursor + after = self.get_open_cursors() + self.assertEqual( + before, after, + "%d cursors left open (should be 0)" % (after - before), + ) + +if __name__ == '__main__': + import unittest + unittest.main() diff --git a/test/test_shunt.py b/test/test_shunt.py index 4584474..09b10df 100644 --- a/test/test_shunt.py +++ b/test/test_shunt.py @@ -42,6 +42,8 @@ class MongoTest(unittest.TestCase): mongod_options = [('--port', str(27017))] def setUp(self): """setup method that starts up mongod instances using `self.mongo_options`""" + super(MongoTest, self).setUp() + # So any function that calls IOLoop.instance() gets the # PuritanicalIOLoop instead of a default loop. if not tornado.ioloop.IOLoop.initialized():