diff --git a/.gitignore b/.gitignore index 3afb01e..a557db8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ +*/.#* +*egg-info *.pyc *~ -example/db.sqlite \ No newline at end of file +build/* +dist/* +example/db.sqlite diff --git a/README.rst b/README.rst index e2ac7dc..7562871 100644 --- a/README.rst +++ b/README.rst @@ -8,16 +8,16 @@ to run Django management commands from the admin. Dependencies ============ - - django-async - - django-sneak + - django-async (https://pypi.python.org/pypi/django-async) + - django-sneak (https://github.com/liberation/django-sneak) Settings ======== -You need to activate the Django admin in the settings and ``urls.py`` +You need to activate the Django admin in the settings and ``urls.py`` depending on your needs the configuration may vary, refer -to the Django documentation related to the +to the Django documentation related to the `admin application `_. Don't forget to add the application where you defined management @@ -46,8 +46,8 @@ Then you will have to create a configuration class for the command:: # ./music/admincommands.py - from admincommands.models import AdminCommand - + from django import forms + from admincommand.models import AdminCommand class Lyrics(AdminCommand): @@ -57,17 +57,17 @@ Then you will have to create a configuration class for the command:: def get_command_arguments(self, forms_data): return [forms_data['title']], {} -And all is well, the new admin command will be available under the +If all is well, the new admin command will be available under the «Admin Command» area of the administration of the default admin site. -If you use custom admin site, don't forget to register +If you use a customized admin site (e.g. no autodiscover), then don't forget to register ``admincommand.models.AdminCommand`` to the admin site object. Asynchronous tasks ================== -If you want to execute commands asynchronously you have to -specify it in the AdminCommand configuration class with the +If you want to execute commands asynchronously you have to +specify it in the AdminCommand configuration class with the ``asynchronous`` property set to ``True``:: # ./music/admincommands.py @@ -91,7 +91,7 @@ You also need to run periodically ``flush_queue`` from ``django-async`` applicat Permissions =========== -You MUST add to every user or groups that should have access to the list of commands -«Can change admincommand» permission. Every admin command gets it's own permission +You MUST add to every user or groups that should have access to the list of commands +«Can change admincommand» permission. Every admin command gets it's own permission «Can Run AnAdminCommand», so you can add it to proper users or group. Users will only see and be able to execute admin commands for which they have the permission. diff --git a/admincommand/admin.py b/admincommand/admin.py index 5d124a7..30095b2 100644 --- a/admincommand/admin.py +++ b/admincommand/admin.py @@ -4,12 +4,17 @@ from django.contrib.admin.options import csrf_protect_m from django.http import HttpResponse, HttpResponseRedirect from django.shortcuts import redirect -from django.conf.urls.defaults import url +try: + from django.conf.urls.defaults import url, patterns +except ImportError: + from django.conf.urls import url, patterns from django.core.urlresolvers import reverse from django.http import HttpResponseBadRequest -from django.conf.urls.defaults import patterns from django.utils.encoding import force_unicode -from django.utils.functional import update_wrapper +try: + from django.utils.functional import update_wrapper +except: + from functools import update_wrapper from django.http import HttpResponseForbidden from django.utils.safestring import mark_safe from django.contrib import messages @@ -27,7 +32,7 @@ class AdminCommandAdmin(SneakAdmin): def queryset(self, request): # user current user to construct the queryset - # so that only commands the user can execute + # so that only commands the user can execute # will be visible return CommandQuerySet(request.user) diff --git a/admincommand/async/__init__.py b/admincommand/async/__init__.py new file mode 100644 index 0000000..7c10a92 --- /dev/null +++ b/admincommand/async/__init__.py @@ -0,0 +1,13 @@ +""" + Django Async implementation. +""" + + +def schedule(*a, **kw): # pragma: no cover + """Wrapper for async.schedule.schedule that allow coverage. + """ + # Redefining name 'schedule' from outer scope + # pylint: disable=W0621 + from async.api import schedule + return schedule(*a, **kw) + diff --git a/admincommand/async/admin.py b/admincommand/async/admin.py new file mode 100644 index 0000000..05e2159 --- /dev/null +++ b/admincommand/async/admin.py @@ -0,0 +1,48 @@ +""" + Django admin. +""" +from django.contrib import admin +from async.models import Error, Job, Group + + +class ErrorInline(admin.TabularInline): + """Display the errors as part of the Job screen. + """ + model = Error + fields = ['traceback'] + readonly_fields = ['traceback'] + max_num = 0 + + +def display_group(obj): + return obj.group.reference if obj.group else None + +display_group.short_description = 'Group' + + +class JobAdmin(admin.ModelAdmin): + """Allow us to manipulate jobs. + """ + + list_display = ['__unicode__', 'scheduled', 'executed', display_group] + inlines = [ErrorInline] + + +class GroupAdmin(admin.ModelAdmin): + """Allow us to see groups. + """ + + def executed_jobs(obj): + return obj.jobs.filter(executed__isnull=False).count() + executed_jobs.short_description = 'Executed' + + def unexecuted_jobs(obj): + return obj.jobs.filter(executed__isnull=True).count() + unexecuted_jobs.short_description = 'Unexecuted' + + list_display = ['__unicode__', 'description', executed_jobs, unexecuted_jobs] + +admin.site.register(Error) +admin.site.register(Job, JobAdmin) +admin.site.register(Group, GroupAdmin) + diff --git a/admincommand/async/api.py b/admincommand/async/api.py new file mode 100644 index 0000000..2ac71ee --- /dev/null +++ b/admincommand/async/api.py @@ -0,0 +1,107 @@ +""" + Schedule the execution of an async task. +""" +from datetime import timedelta +# No name 'sha1' in module 'hashlib' +# pylint: disable=E0611 +from hashlib import sha1 +from simplejson import dumps + +from django.db.models import Q +try: + # No name 'timezone' in module 'django.utils' + # pylint: disable=E0611 + from django.utils import timezone +except ImportError: # pragma: no cover + from datetime import datetime as timezone + +from async.models import Error, Job, Group +from async.utils import full_name + + +def _get_now(): + """Get today datetime, testing purpose. + """ + return timezone.now() + + +def schedule(function, args=None, kwargs=None, + priority=5, run_after=None, group=None, meta=None): + """Schedule a tast for execution. + """ + # Too many arguments + # pylint: disable=R0913 + if group: + if type(group) == Group: + expected_group = group + else: + expected_group = Group.latest_group_by_reference(group) + else: + expected_group = None + job = Job( + name=full_name(function), + args=dumps(args or []), kwargs=dumps(kwargs or {}), + meta=dumps(meta or {}), scheduled=run_after, + priority=priority, + group=expected_group) + job.save() + return job + + +def deschedule(function, args=None, kwargs=None): + """Remove any instances of the job from the queue. + """ + job = Job( + name=full_name(function), + args=dumps(args or []), kwargs=dumps(kwargs or {})) + mark_cancelled = Job.objects.filter(executed=None, + identity=sha1(unicode(job)).hexdigest()) + mark_cancelled.update(cancelled=_get_now()) + + +def health(): + """Return information about the health of the queue in a format that + can be turned into JSON. + """ + output = {'queue': {}, 'errors': {}} + output['queue']['all-jobs'] = Job.objects.all().count() + output['queue']['not-executed'] = Job.objects.filter(executed=None).count() + output['queue']['executed'] = Job.objects.exclude(executed=None).count() + output['errors']['number'] = Error.objects.all().count() + return output + + +def remove_old_jobs(remove_jobs_before_days=30, resched_hours=8): + """Remove old jobs start from these conditions + + Removal date for jobs is `remove_jobs_before_days` days earlier + than when this is executed. + + It will delete jobs and groups that meet the following: + - Jobs execute before the removal date and which are not in any group. + - Groups (and their jobs) where all jobs have executed before the removal + date. + """ + start_remove_jobs_before_dt = _get_now() - timedelta( + days=remove_jobs_before_days) + + # Jobs not in a group that are old enough to delete + rm_job = (Q(executed__isnull=False) & + Q(executed__lt=start_remove_jobs_before_dt)) | \ + (Q(cancelled__isnull=False) & + Q(cancelled__lt=start_remove_jobs_before_dt)) + Job.objects.filter(Q(group__isnull=True), rm_job).delete() + + # Groups with all executed jobs -- look for groups that qualify + groups = Group.objects.filter(Q(jobs__executed__isnull=False) | + Q(jobs__cancelled__isnull=False)) + for group in groups.iterator(): + if group.jobs.filter(rm_job).count() == group.jobs.all().count(): + group.jobs.filter(rm_job).delete() + group.delete() + + next_exec = _get_now() + timedelta(hours=resched_hours) + + schedule(remove_old_jobs, + args=[remove_jobs_before_days, resched_hours], + run_after=next_exec) diff --git a/admincommand/async/logger.py b/admincommand/async/logger.py new file mode 100644 index 0000000..8cc0e75 --- /dev/null +++ b/admincommand/async/logger.py @@ -0,0 +1,6 @@ +""" + Django Async logger. +""" +#pylint: disable=unused-import +import logging as _logger + diff --git a/admincommand/async/management/__init__.py b/admincommand/async/management/__init__.py new file mode 100644 index 0000000..c51c289 --- /dev/null +++ b/admincommand/async/management/__init__.py @@ -0,0 +1,3 @@ +""" + Django Async. +""" diff --git a/admincommand/async/management/commands/__init__.py b/admincommand/async/management/commands/__init__.py new file mode 100644 index 0000000..2d1c799 --- /dev/null +++ b/admincommand/async/management/commands/__init__.py @@ -0,0 +1,3 @@ +""" + Django Async management commands. +""" diff --git a/admincommand/async/management/commands/flush_queue.py b/admincommand/async/management/commands/flush_queue.py new file mode 100644 index 0000000..be834a4 --- /dev/null +++ b/admincommand/async/management/commands/flush_queue.py @@ -0,0 +1,112 @@ +""" + Django Async management commands. +""" +from django.core.management.base import BaseCommand +try: + # No name 'timezone' in module 'django.utils' + # pylint: disable=E0611 + from django.utils import timezone +except ImportError: # pragma: no cover + from datetime import datetime as timezone +from optparse import make_option +from lockfile import FileLock, AlreadyLocked + +from async.models import Job + + +def acquire_lock(lockname): + """Return a decorator for the given lock name + """ + def decorator(handler): + """Decorator for lock acquisition + """ + def handle(*args): + """Acquire the lock before running the method. + """ + lock = FileLock(lockname) + try: + lock.acquire(timeout=-1) + except AlreadyLocked: # pragma: no cover + print 'Lock is already set, aborting.' + return + try: + handler(*args) + finally: + lock.release() + return handle + return decorator + + +def run_queue(which, outof, limit): + """ + Code that actually executes the jobs in the queue. + + This implementation is pretty ugly, but does behave in the + right way. + """ + for _ in xrange(limit): + now = timezone.now() + def run(jobs): + """Run the jobs handed to it + """ + for job in jobs.iterator(): + if job.id % outof == which % outof: + if (job.group and job.group.final and + job.group.final.pk == job.pk): + if not job.group.has_completed(job): + continue + print "%s: %s" % (job.id, unicode(job)) + job.execute() + return False + return True + by_priority = by_priority_filter = (Job.objects + .filter(executed=None, cancelled=None) + .exclude(scheduled__gt=now) + .order_by('-priority')) + while True: + try: + priority = by_priority[0].priority + except IndexError: + print "No jobs to execute" + return + if run(Job.objects + .filter(executed=None, cancelled=None, + scheduled__lte=now, priority=priority) + .order_by('scheduled', 'id')): + if run(Job.objects + .filter(executed=None, cancelled=None, + scheduled=None, priority=priority) + .order_by('id')): + by_priority = by_priority_filter.filter( + priority__lt=priority) + else: + break + else: + break + + +class Command(BaseCommand): + """ + Invoke using: + python manage.py flush_queue + """ + option_list = BaseCommand.option_list + ( + make_option('--jobs', '-j', dest='jobs', + help='The maximum number of jobs to run'), + make_option('--which', '-w', dest='which', + help='The worker ID number'), + make_option('--outof', '-o', dest='outof', + help='How many workers there are'), + ) + help = 'Does a single pass over the asynchronous queue' + + def handle(self, **options): + """ + Command implementation. + """ + jobs_limit = int(options.get('jobs') or 300) + which = int(options.get('which') or 0) + outof = int(options.get('outof') or 1) + + acquire_lock('async_flush_queue%s' % which)( + run_queue)(which, outof, jobs_limit) diff --git a/admincommand/async/management/commands/queue_health.py b/admincommand/async/management/commands/queue_health.py new file mode 100644 index 0000000..e1d0a26 --- /dev/null +++ b/admincommand/async/management/commands/queue_health.py @@ -0,0 +1,21 @@ +""" + Django manage.py command to show the queue health +""" +from django.core.management.base import BaseCommand +from simplejson import dumps + +from async.api import health + + +class Command(BaseCommand): + """ + Invoke using: + python manage.py queue_health + """ + help = 'Prints information about the queue in JSON format.' + + def handle(self, **options): + """Command implementation. + """ + print dumps(health()) + diff --git a/admincommand/async/migrations/0001_initial.py b/admincommand/async/migrations/0001_initial.py new file mode 100644 index 0000000..7ad60e8 --- /dev/null +++ b/admincommand/async/migrations/0001_initial.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +from south.utils import datetime_utils as datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding model 'Job' + db.create_table('async_job', ( + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('name', self.gf('django.db.models.fields.CharField')(max_length=100)), + ('args', self.gf('django.db.models.fields.TextField')()), + ('kwargs', self.gf('django.db.models.fields.TextField')()), + ('meta', self.gf('django.db.models.fields.TextField')()), + ('result', self.gf('django.db.models.fields.TextField')(blank=True)), + ('priority', self.gf('django.db.models.fields.IntegerField')()), + ('identity', self.gf('django.db.models.fields.CharField')(max_length=100, db_index=True)), + ('added', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, blank=True)), + ('scheduled', self.gf('django.db.models.fields.DateTimeField')(null=True, blank=True)), + ('started', self.gf('django.db.models.fields.DateTimeField')(null=True, blank=True)), + ('executed', self.gf('django.db.models.fields.DateTimeField')(null=True, blank=True)), + )) + db.send_create_signal('async', ['Job']) + + # Adding model 'Error' + db.create_table('async_error', ( + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('job', self.gf('django.db.models.fields.related.ForeignKey')(related_name='errors', to=orm['async.Job'])), + ('executed', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, blank=True)), + ('exception', self.gf('django.db.models.fields.TextField')()), + ('traceback', self.gf('django.db.models.fields.TextField')()), + )) + db.send_create_signal('async', ['Error']) + + + def backwards(self, orm): + # Deleting model 'Job' + db.delete_table('async_job') + + # Deleting model 'Error' + db.delete_table('async_error') + + + models = { + 'async.error': { + 'Meta': {'object_name': 'Error'}, + 'exception': ('django.db.models.fields.TextField', [], {}), + 'executed': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'errors'", 'to': "orm['async.Job']"}), + 'traceback': ('django.db.models.fields.TextField', [], {}) + }, + 'async.job': { + 'Meta': {'object_name': 'Job'}, + 'added': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'args': ('django.db.models.fields.TextField', [], {}), + 'executed': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'identity': ('django.db.models.fields.CharField', [], {'max_length': '100', 'db_index': 'True'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'meta': ('django.db.models.fields.TextField', [], {}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '100'}), + 'priority': ('django.db.models.fields.IntegerField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'scheduled': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}), + 'started': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}) + } + } + + complete_apps = ['async'] + diff --git a/admincommand/async/migrations/0002_auto__add_group__add_field_job_group.py b/admincommand/async/migrations/0002_auto__add_group__add_field_job_group.py new file mode 100644 index 0000000..e26dcdd --- /dev/null +++ b/admincommand/async/migrations/0002_auto__add_group__add_field_job_group.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +from south.utils import datetime_utils as datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding model 'Group' + db.create_table('async_group', ( + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('reference', self.gf('django.db.models.fields.CharField')(max_length=100)), + ('description', self.gf('django.db.models.fields.TextField')(null=True, blank=True)), + ('created', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, blank=True)), + )) + db.send_create_signal('async', ['Group']) + + # Adding field 'Job.group' + db.add_column('async_job', 'group', + self.gf('django.db.models.fields.related.ForeignKey')(blank=True, related_name='jobs', null=True, to=orm['async.Group']), + keep_default=False) + + + def backwards(self, orm): + # Deleting model 'Group' + db.delete_table('async_group') + + # Deleting field 'Job.group' + db.delete_column('async_job', 'group_id') + + + models = { + 'async.error': { + 'Meta': {'object_name': 'Error'}, + 'exception': ('django.db.models.fields.TextField', [], {}), + 'executed': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'errors'", 'to': "orm['async.Job']"}), + 'traceback': ('django.db.models.fields.TextField', [], {}) + }, + 'async.group': { + 'Meta': {'object_name': 'Group'}, + 'created': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'description': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'reference': ('django.db.models.fields.CharField', [], {'max_length': '100'}) + }, + 'async.job': { + 'Meta': {'object_name': 'Job'}, + 'added': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'args': ('django.db.models.fields.TextField', [], {}), + 'executed': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}), + 'group': ('django.db.models.fields.related.ForeignKey', [], {'blank': 'True', 'related_name': "'jobs'", 'null': 'True', 'to': "orm['async.Group']"}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'identity': ('django.db.models.fields.CharField', [], {'max_length': '100', 'db_index': 'True'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'meta': ('django.db.models.fields.TextField', [], {}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '100'}), + 'priority': ('django.db.models.fields.IntegerField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'scheduled': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}), + 'started': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}) + } + } + + complete_apps = ['async'] diff --git a/admincommand/async/migrations/0003_auto__add_field_job_cancelled.py b/admincommand/async/migrations/0003_auto__add_field_job_cancelled.py new file mode 100644 index 0000000..2f7825b --- /dev/null +++ b/admincommand/async/migrations/0003_auto__add_field_job_cancelled.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +from south.utils import datetime_utils as datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding field 'Job.cancelled' + db.add_column('async_job', 'cancelled', + self.gf('django.db.models.fields.DateTimeField')(null=True, blank=True), + keep_default=False) + + + def backwards(self, orm): + # Deleting field 'Job.cancelled' + db.delete_column('async_job', 'cancelled') + + + models = { + 'async.error': { + 'Meta': {'object_name': 'Error'}, + 'exception': ('django.db.models.fields.TextField', [], {}), + 'executed': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'errors'", 'to': "orm['async.Job']"}), + 'traceback': ('django.db.models.fields.TextField', [], {}) + }, + 'async.group': { + 'Meta': {'object_name': 'Group'}, + 'created': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'description': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'reference': ('django.db.models.fields.CharField', [], {'max_length': '100'}) + }, + 'async.job': { + 'Meta': {'object_name': 'Job'}, + 'added': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'args': ('django.db.models.fields.TextField', [], {}), + 'cancelled': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}), + 'executed': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}), + 'group': ('django.db.models.fields.related.ForeignKey', [], {'blank': 'True', 'related_name': "'jobs'", 'null': 'True', 'to': "orm['async.Group']"}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'identity': ('django.db.models.fields.CharField', [], {'max_length': '100', 'db_index': 'True'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'meta': ('django.db.models.fields.TextField', [], {}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '100'}), + 'priority': ('django.db.models.fields.IntegerField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'scheduled': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}), + 'started': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}) + } + } + + complete_apps = ['async'] \ No newline at end of file diff --git a/admincommand/async/migrations/0004_auto__add_field_group_final.py b/admincommand/async/migrations/0004_auto__add_field_group_final.py new file mode 100644 index 0000000..ad64cb9 --- /dev/null +++ b/admincommand/async/migrations/0004_auto__add_field_group_final.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +from south.utils import datetime_utils as datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding field 'Group.final' + db.add_column('async_group', 'final', + self.gf('django.db.models.fields.related.ForeignKey')(blank=True, related_name='ends', null=True, to=orm['async.Job']), + keep_default=False) + + + def backwards(self, orm): + # Deleting field 'Group.final' + db.delete_column('async_group', 'final_id') + + + models = { + 'async.error': { + 'Meta': {'object_name': 'Error'}, + 'exception': ('django.db.models.fields.TextField', [], {}), + 'executed': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'errors'", 'to': "orm['async.Job']"}), + 'traceback': ('django.db.models.fields.TextField', [], {}) + }, + 'async.group': { + 'Meta': {'object_name': 'Group'}, + 'created': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'description': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'final': ('django.db.models.fields.related.ForeignKey', [], {'blank': 'True', 'related_name': "'ends'", 'null': 'True', 'to': "orm['async.Job']"}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'reference': ('django.db.models.fields.CharField', [], {'max_length': '100'}) + }, + 'async.job': { + 'Meta': {'object_name': 'Job'}, + 'added': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'args': ('django.db.models.fields.TextField', [], {}), + 'cancelled': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}), + 'executed': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}), + 'group': ('django.db.models.fields.related.ForeignKey', [], {'blank': 'True', 'related_name': "'jobs'", 'null': 'True', 'to': "orm['async.Group']"}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'identity': ('django.db.models.fields.CharField', [], {'max_length': '100', 'db_index': 'True'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'meta': ('django.db.models.fields.TextField', [], {}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '100'}), + 'priority': ('django.db.models.fields.IntegerField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'scheduled': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}), + 'started': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}) + } + } + + complete_apps = ['async'] \ No newline at end of file diff --git a/admincommand/async/migrations/0005_indexes.py b/admincommand/async/migrations/0005_indexes.py new file mode 100644 index 0000000..5b0ff06 --- /dev/null +++ b/admincommand/async/migrations/0005_indexes.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +from south.utils import datetime_utils as datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + db.create_index('async_job', ['priority', 'id']) + db.create_index('async_job', ['scheduled', 'executed', 'cancelled']) + db.create_index('async_job', ['group_id', 'executed', 'cancelled']) + + def backwards(self, orm): + pass + + models = { + 'async.error': { + 'Meta': {'object_name': 'Error'}, + 'exception': ('django.db.models.fields.TextField', [], {}), + 'executed': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'errors'", 'to': "orm['async.Job']"}), + 'traceback': ('django.db.models.fields.TextField', [], {}) + }, + 'async.group': { + 'Meta': {'object_name': 'Group'}, + 'created': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'description': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'final': ('django.db.models.fields.related.ForeignKey', [], {'blank': 'True', 'related_name': "'ends'", 'null': 'True', 'to': "orm['async.Job']"}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'reference': ('django.db.models.fields.CharField', [], {'max_length': '100'}) + }, + 'async.job': { + 'Meta': {'object_name': 'Job'}, + 'added': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'args': ('django.db.models.fields.TextField', [], {}), + 'cancelled': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}), + 'executed': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}), + 'group': ('django.db.models.fields.related.ForeignKey', [], {'blank': 'True', 'related_name': "'jobs'", 'null': 'True', 'to': "orm['async.Group']"}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'identity': ('django.db.models.fields.CharField', [], {'max_length': '100', 'db_index': 'True'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'meta': ('django.db.models.fields.TextField', [], {}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '100'}), + 'priority': ('django.db.models.fields.IntegerField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'scheduled': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}), + 'started': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}) + } + } + + complete_apps = ['async'] \ No newline at end of file diff --git a/admincommand/async/migrations/__init__.py b/admincommand/async/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/admincommand/async/models.py b/admincommand/async/models.py new file mode 100644 index 0000000..7886b23 --- /dev/null +++ b/admincommand/async/models.py @@ -0,0 +1,226 @@ +""" + Django Async models. +""" +from datetime import timedelta +from django.db import models, transaction +from django.db.models import Count, Q +try: + # No name 'timezone' in module 'django.utils' + # pylint: disable=E0611 + from django.utils import timezone +except ImportError: # pragma: no cover + from datetime import datetime as timezone +# No name 'sha1' in module 'hashlib' +# pylint: disable=E0611 +from hashlib import sha1 +from simplejson import dumps, loads +from traceback import format_exc + +from async.logger import _logger +from async.utils import object_at_end_of_path, non_unicode_kwarg_keys + +from django.core.exceptions import ValidationError + + +class Group(models.Model): + """ + A group for jobs that need to be executed. + """ + reference = models.CharField(max_length=100) + description = models.TextField(blank=True, null=True) + created = models.DateTimeField(auto_now_add=True) + final = models.ForeignKey('Job', blank=True, null=True, + related_name='ends') + + def __unicode__(self): + return u'%s' % self.reference + + def save(self, *args, **kwargs): + # We can't create a new group with that reference + # if the old group still has jobs that haven't executed. + if Job.objects.filter(group__reference=self.reference).filter( + Q(executed__isnull=True) & Q(cancelled__isnull=True) + ).exclude(group__id=self.id).count() > 0: + raise ValidationError( + "Group reference [%s] still has unexecuted jobs." % + self.reference) + result = super(Group, self).save(*args, **kwargs) + if self.final and self.final.group != self: + self.final.group = self + self.final.save() + return result + + def on_completion(self, job): + """Set a job to be the one that executes when the other jobs + in the group have completed. + """ + self.final = job + self.save() + + def estimate_execution_duration(self): + """Estimate of the total amount of time (in seconds) that the group + will take to execute. + """ + result = self.jobs.aggregate( + job_count=Count('id'), executed_job_count=Count('executed'), + cancelled_job_count=Count('cancelled')) + total_jobs = result['job_count'] + total_executed_jobs = result['executed_job_count'] + total_cancelled_jobs = result['cancelled_job_count'] + total_done = total_executed_jobs + total_cancelled_jobs + if total_jobs > 0: + # Don't allow to calculate if executed jobs are not valid. + if total_done == 0: + return None, None, None + elif not self.has_completed(): + # Some jobs are unexecuted. + time_consumed = timezone.now() - self.created + estimated_time = timedelta(seconds=( + time_consumed.seconds/float(total_done)) + * total_jobs) + remaining = estimated_time - time_consumed + else: + # All jobs in group are executed. + estimated_time = ( + self.latest_executed_job().executed - self.created) + time_consumed = estimated_time + remaining = timedelta(seconds=0) + return estimated_time, remaining, time_consumed + else: + return None, None, None + + def latest_executed_job(self): + """When the last executed job in the group was completed. + """ + if self.jobs.filter(executed__isnull=False).count(): + return self.jobs.filter(executed__isnull=False).latest('executed') + + def has_completed(self, exclude=None): + """Return True if all jobs are either executed or cancelled. + """ + job_query = self.jobs.all() + if exclude: + job_query = job_query.exclude(pk=exclude.pk) + return (self.jobs.all().count() > 0 and + job_query.filter( + Q(executed__isnull=True) & Q(cancelled__isnull=True) + ).count() == 0) + + @staticmethod + def latest_group_by_reference(reference): + """ + Fetch the latest group with the requested reference. It will + create a new group if necessary. + """ + try: + group = Group.objects.filter( + reference=reference).latest('created') + if group.has_completed(): + # The found group is either fully executed or cancelled + # so make a new one + group = Group.objects.create( + reference=reference, description=group.description) + except Group.DoesNotExist: + group = Group.objects.create(reference=reference) + return group + + +class Job(models.Model): + """ + An asynchronous task that is to be executed. + """ + name = models.CharField(max_length=100, blank=False) + args = models.TextField() + kwargs = models.TextField() + meta = models.TextField() + result = models.TextField(blank=True) + + priority = models.IntegerField() + identity = models.CharField(max_length=100, blank=False, db_index=True) + + added = models.DateTimeField(auto_now_add=True) + scheduled = models.DateTimeField(null=True, blank=True, + help_text="If not set, will be executed ASAP") + started = models.DateTimeField(null=True, blank=True) + executed = models.DateTimeField(null=True, blank=True) + cancelled = models.DateTimeField(null=True, blank=True) + + group = models.ForeignKey(Group, related_name='jobs', + null=True, blank=True) + + def __unicode__(self): + # __unicode__: Instance of 'bool' has no 'items' member + # pylint: disable=E1103 + args = ', '.join([repr(s) for s in loads(self.args)] + + ['%s=%s' % (k, repr(v)) for k, v in loads(self.kwargs).items()]) + return u'%s(%s)' % (self.name, args) + + def save(self, *a, **kw): + # Stop us from cheating by adding the new jobs to the old group. + # Checking if group obj got passed and current job is not in that group + if self.group and self not in self.group.jobs.all(): + # Cannot add current job to latest group that have an executed job. + if self.group.jobs.filter( + Q(executed__isnull=False) | Q(cancelled__isnull=False) + ).count() > 0: + raise ValidationError( + "Cannot add job [%s] to group [%s] because this group " + "has executed jobs." % + (self.name, self.group.reference)) + self.identity = sha1(unicode(self)).hexdigest() + return super(Job, self).save(*a, **kw) + + def execute(self, **_meta): + """ + Run the job using the specified meta values to control the + execution. + """ + try: + _logger.info("%s %s", self.id, unicode(self)) + args = loads(self.args) + kwargs = non_unicode_kwarg_keys(loads(self.kwargs)) + function = object_at_end_of_path(self.name) + _logger.debug(u"%s resolved to %s" % (self.name, function)) + def execute(): + """Execute the database updates in one transaction. + """ + self.started = timezone.now() + result = function(*args, **kwargs) + self.executed = timezone.now() + self.result = dumps(result) + self.save() + return result + return transaction.commit_on_success(execute)() + except Exception, exception: + self.started = None + errors = 1 + self.errors.count() + self.scheduled = (timezone.now() + + timedelta(seconds=60 * pow(errors, 1.6))) + self.priority = self.priority - 1 + _logger.error( + "Job failed. Rescheduled for %s after %s error(s). " + "New priority is %s", + self.scheduled, errors, self.priority) + def record(): + """Local function allows us to wrap these updates into a + transaction. + """ + Error.objects.create(job=self, exception=repr(exception), + traceback=format_exc()) + self.save() + transaction.commit_on_success(record)() + raise + + +class Error(models.Model): + """ + Recorded when an error happens during execution of a job. + """ + job = models.ForeignKey(Job, related_name='errors') + executed = models.DateTimeField(auto_now_add=True) + exception = models.TextField() + traceback = models.TextField() + + def __unicode__(self): + return u'%s : %s' % (self.executed, self.exception) + diff --git a/admincommand/async/slumber_operations.py b/admincommand/async/slumber_operations.py new file mode 100644 index 0000000..ccb5a9c --- /dev/null +++ b/admincommand/async/slumber_operations.py @@ -0,0 +1,97 @@ +""" + Implementation of the Slumber operations. +""" +from django.shortcuts import Http404 +from django.db.models import Count +from slumber.operations import ModelOperation, InstanceOperation +from slumber.server.http import require_user, require_permission +from urllib import quote + +from async import schedule +from async.models import Group + + +# Method could be a function +# pylint: disable=R0201 + + +class Schedule(ModelOperation): + """Schedule a job via Slumber + """ + def get(self, request, response, app, model): + """Eventually this will return the parameters that are needed. + """ + pass + + @require_permission('async.add_job') + def post(self, request, response, _app, _model): + """Wrap the API. + """ + if hasattr(request.POST, 'getlist'): + args = request.POST.getlist('args') + else: + args = request.POST.get('args', []) + job = schedule(request.POST['name'], + args=args, + kwargs=request.POST.get('kwargs', {}), + meta=request.POST.get('meta', {}), + run_after=request.POST.get('run_after', None), + priority=request.POST.get('priority', 5), + group=request.POST.get('group', None)) + response['job'] = dict(id=job.id) + + +class Progress(InstanceOperation): + """Return information about the progress of a job. + """ + def __call__(self, group): + """Generate the operation URL using the reference instead + of the primary key. + """ + # pylint: disable=arguments-differ + part = quote(group.reference.encode('utf-8'), '') + return super(Progress, self).__call__() + part + '/' + + @require_user + def get(self, _request, response, _app, _models, group_reference_name): + """The current progress and estimated completion time of the job. + """ + groups = Group.objects.filter(reference=group_reference_name) + if groups: + latest_group = groups.latest('created') + result = latest_group.jobs.aggregate( + job_count=Count('id'), executed_job_count=Count('executed')) + total_jobs = result['job_count'] + total_executed_jobs = result['executed_job_count'] + if total_jobs > 0: + total_unexecuted_jobs = total_jobs - total_executed_jobs + + _, remaining, consumed = \ + latest_group.estimate_execution_duration() + latest = latest_group.latest_executed_job() + response['progress'] = { + 'id': latest_group.id, + 'reference': latest_group.reference, + 'created': latest_group.created.isoformat(), + 'latest_job_completed': + latest.executed.isoformat() if latest else None, + 'total_jobs': total_jobs, + 'total_executed_jobs': total_executed_jobs, + 'total_unexecuted_jobs': total_unexecuted_jobs, + 'total_error_jobs': + latest_group.jobs.filter( + errors__isnull=False).distinct().count(), + 'current_errors': + latest_group.jobs.filter( + errors__isnull=False, executed__isnull=True, + cancelled__isnull=True + ).distinct().count(), + 'consumed_seconds': + consumed.seconds if consumed else None, + 'remaining_seconds': + remaining.seconds if remaining else 0 + } + else: + raise Http404("Cannot find group with reference [%s]." % + group_reference_name) + diff --git a/admincommand/async/slumber_server.py b/admincommand/async/slumber_server.py new file mode 100644 index 0000000..c4269f7 --- /dev/null +++ b/admincommand/async/slumber_server.py @@ -0,0 +1,17 @@ +""" + Async Slumber server configuration +""" +from slumber import configure + +from async.models import Job, Group +from async.slumber_operations import Schedule, Progress + + +configure(Job, operations_extra=[ + (Schedule, 'schedule') +]) + +configure(Group, operations_extra=[ + (Progress, 'progress'), +]) + diff --git a/admincommand/async/tests/__init__.py b/admincommand/async/tests/__init__.py new file mode 100644 index 0000000..7d2e1fd --- /dev/null +++ b/admincommand/async/tests/__init__.py @@ -0,0 +1,3 @@ +""" + All tests for Django Async. +""" diff --git a/admincommand/async/tests/test_admin.py b/admincommand/async/tests/test_admin.py new file mode 100644 index 0000000..d85101a --- /dev/null +++ b/admincommand/async/tests/test_admin.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- + +from django.test import TestCase +import async +from async.admin import display_group +from async.models import Group + + +class TestJobAdmin(TestCase): + def do_nothing(self): + return True + + def test_display_group__have_group(self): + group = Group.latest_group_by_reference('nothing') + job = async.schedule('async.tests.test_admin.TestJobAdmin.do_nothing', group=group) + + group_to_be_displayed = display_group(job) + self.assertEqual(group_to_be_displayed, group.reference) + + def test_display_group__do_not_have_group(self): + job = async.schedule('async.tests.test_admin.TestJobAdmin.do_nothing') + + group = display_group(job) + self.assertEqual(group, None) diff --git a/admincommand/async/tests/test_api.py b/admincommand/async/tests/test_api.py new file mode 100644 index 0000000..990a99b --- /dev/null +++ b/admincommand/async/tests/test_api.py @@ -0,0 +1,350 @@ +""" + Test for apis.. +""" +from django.test import TestCase +from django.core import management +try: + # No name 'timezone' in module 'django.utils' + # pylint: disable=E0611 + from django.utils import timezone +except ImportError: + from datetime import datetime as timezone + +import datetime + +from async import api +from async.models import Job, Group, Error +from mock import patch, Mock + + +def get_now(): + return timezone.now() + + +def get_d_before_dt_by_days(base_dt, d): + return base_dt - datetime.timedelta(days=d) + + +class TestRemoveOldJobs(TestCase): + """Tests removing old jobs. + """ + @staticmethod + def create_job(jid, group=None): + job = api.schedule('job-%s' % jid, group=group) + Error.objects.create(job=job, exception='Error', traceback='code stack') + return job + + @patch('async.api._get_now') + def test_job_reschedule_duration(self, mock_get_today_dt): + """Test if next schedule with run in 8 hrs + """ + + today_dt = timezone.now() + mock_get_today_dt.return_value = today_dt + job_name = 'async.api.remove_old_jobs' + + # Run commands in func remove_old_jobs + # when it's done will create new job with same name + # and new schedule equal current + 8 + api.remove_old_jobs(2) + + self.assertEqual(Job.objects.filter(name=job_name).count(), 1) + expected_job = Job.objects.get(name=job_name) + self.assertIsNotNone(expected_job.scheduled) + + # Check scheduled time of new job instance (which same name) + # is more than current time = 8.0. + diff = (expected_job.scheduled - today_dt).total_seconds()/3600. + self.assertTrue(diff == 8.0) + + @patch('async.api._get_now') + def test_job_reschedule(self, mock_get_today_dt): + """Test deal with only remove_old_jobs job + - it should remove old remove_old_jobs job + - it should get new reschedule remove_old_jobs job + """ + + today_dt = timezone.now() + mock_get_today_dt.return_value = today_dt + job_name = 'async.api.remove_old_jobs' + + # Run commands in func remove_old_jobs + # when it's done will create new job with same name + # and new schedule equal current + 8 + api.remove_old_jobs(2) + + self.assertEqual(Job.objects.filter(name=job_name).count(), 1) + + # Force first job to execute (no scheduled set it's run immediately). + expected_job = Job.objects.get(name=job_name) + expected_job.scheduled = None + expected_job.save() + management.call_command('flush_queue') + + self.assertEqual(Job.objects.all().count(), 2) + self.assertEqual(Job.objects.filter(name=job_name).count(), 2) + + # Force first scheduled job to executed + # (no scheduled set it's run immediately). + latest_job = Job.objects.filter(name=job_name).latest('id') + latest_job.scheduled = None + latest_job.save() + management.call_command('flush_queue') + + # Force latest job to execute (no scheduled set it's run immediately). + latest_job = Job.objects.filter(name=job_name).latest('id') + latest_job.scheduled = None + latest_job.save() + + # Get ids from job that gonna be remove + # these ids will be check after latest job was executed. + jobs_must_gone_ids = [] + for j in Job.objects.filter(name=job_name): + if j.executed is not None: + j.executed = j.executed - datetime.timedelta(days=5) + j.save() + jobs_must_gone_ids.append(j.id) + management.call_command('flush_queue') + + # Current jobs should be valid only + # - latest remove_old_jobs that does not execute yet -> 1 + # - latest remove_old_jobs that was executed -> 1 + not_expected_result = filter( + lambda x: x in jobs_must_gone_ids, + Job.objects.filter(name=job_name).values_list('id', flat=True) + ) + self.assertEqual(len(not_expected_result), 0) + self.assertEqual(Job.objects.filter(name=job_name).count(), 2) + self.assertEqual( + Job.objects.filter(name=job_name, executed__isnull=False).count(), + 1) + self.assertEqual( + Job.objects.filter(name=job_name, executed__isnull=True).count(), 1) + + @patch('async.api._get_now') + def test_remove_jobs(self, mock_get_today_dt): + """ job name job-0 must be removed after flush_queue run. + """ + mock_get_today_dt.return_value = ( + get_now() - datetime.timedelta(days=60)) + + j1, j2, j3 = map(self.create_job, range(3)) + + j1.executed = get_d_before_dt_by_days( + mock_get_today_dt.return_value, 30) + j1.save() + j2.executed = get_d_before_dt_by_days( + mock_get_today_dt.return_value, 15) + j2.save() + j3.executed = get_d_before_dt_by_days( + mock_get_today_dt.return_value, 15) + j3.save() + + api.remove_old_jobs(20) + + self.assertEqual( + Job.objects.filter(name__in=['job-1', 'job-2']).count(), 2) + + mock_get_today_dt.return_value = get_now() + management.call_command('flush_queue') + + self.assertEqual( + Job.objects.filter(name__in=['job-1', 'job-2']).count(), 0) + + @patch('async.api._get_now') + def test_remove_old_job_with_no_param_sent(self, mock_get_today_dt): + """Test in case of no parameter sent to remove_old_jobs + """ + test_base_dt = get_now() - datetime.timedelta(days=60) + mock_get_today_dt.return_value = test_base_dt + + j1, j2 = map(self.create_job, range(2)) + j1.executed = test_base_dt - datetime.timedelta(days=31) + j1.save() + + j2.executed = test_base_dt - datetime.timedelta(days=20) + j2.save() + + self.assertEqual(Error.objects.all().count(), 2) + + api.remove_old_jobs() + + # Should get remove_old_jobs for next round and j2 + # j1 should gone now + + self.assertEqual(Job.objects.all().count(), 2) + self.assertIsNotNone(Job.objects.filter(name=j2.name)) + self.assertEqual(Error.objects.all().count(), 1, Error.objects.all()) + + def test_get_now(self): + result = api._get_now() + self.assertIsNotNone(result) + self.assertTrue(isinstance(result, datetime.datetime)) + + def test_groups__with_unexecuted_are_not_removed(self): + test_base_dt = get_now() + group = Group.objects.create(reference='no-rm-group') + job = self.create_job('no-rm-group', group) + + api.remove_old_jobs() + + self.assertEqual(Job.objects.all().count(), 2, Job.objects.all()) + self.assertEqual(Group.objects.all().count(), 1, Group.objects.all()) + self.assertEqual(Error.objects.all().count(), 1, Error.objects.all()) + + def test_groups_with_executed_job_are_removed(self): + test_base_dt = get_now() + group = Group.objects.create(reference='rm-group') + job = self.create_job('rm-group', group) + job.executed = test_base_dt - datetime.timedelta(days=31) + job.save() + + api.remove_old_jobs() + + self.assertEqual(Job.objects.all().count(), 1, Job.objects.all()) + self.assertEqual(Job.objects.all()[0].name, 'async.api.remove_old_jobs') + self.assertEqual(Group.objects.all().count(), 0, Group.objects.all()) + self.assertEqual(Error.objects.all().count(), 0, Error.objects.all()) + + def test_groups__with_young_jobs_are_not_removed(self): + test_base_dt = get_now() + group = Group.objects.create(reference='not_rm-young-group') + job = self.create_job('not_rm-young-group', group) + job.executed = test_base_dt - datetime.timedelta(days=16) + job.save() + + api.remove_old_jobs() + + self.assertEqual(Job.objects.all().count(), 2, Job.objects.all()) + self.assertEqual(Group.objects.all().count(), 1, Group.objects.all()) + self.assertEqual(Error.objects.all().count(), 1, Error.objects.all()) + + def test_groups__with_young_and_old_jobs_are_not_removed(self): + test_base_dt = get_now() + group = Group.objects.create(reference='not_rm-mixed-group') + job1 = self.create_job('not_rm-mixed-group', group) + job2 = self.create_job('not_rm-mixed-group', group) + + job1.executed = test_base_dt - datetime.timedelta(days=45) + job1.save() + job2.executed = test_base_dt - datetime.timedelta(days=16) + job2.save() + + api.remove_old_jobs() + + self.assertEqual(Job.objects.all().count(), 3, Job.objects.all()) + self.assertEqual(Group.objects.all().count(), 1, Group.objects.all()) + self.assertEqual(Error.objects.all().count(), 2, Error.objects.all()) + + @patch('async.api._get_now') + def test_remove__cancelled_jobs(self, mock_get_today_dt): + """ job name job-0 must be removed after flush_queue run. + """ + mock_get_today_dt.return_value = ( + get_now() - datetime.timedelta(days=60)) + + j1, j2, j3 = map(self.create_job, range(3)) + + j1.cancelled = get_d_before_dt_by_days( + mock_get_today_dt.return_value, 30) + j1.save() + j2.cancelled = get_d_before_dt_by_days( + mock_get_today_dt.return_value, 15) + j2.save() + j3.cancelled = get_d_before_dt_by_days( + mock_get_today_dt.return_value, 15) + j3.save() + + api.remove_old_jobs(20) + + self.assertEqual( + Job.objects.filter(name__in=['job-1', 'job-2']).count(), 2) + + mock_get_today_dt.return_value = get_now() + management.call_command('flush_queue') + + self.assertEqual( + Job.objects.filter(name__in=['job-1', 'job-2']).count(), 0) + + @patch('async.api._get_now') + def test_remove__mixing_jobs(self, mock_get_today_dt): + """ job name job-0 must be removed after flush_queue run. + """ + mock_get_today_dt.return_value = ( + get_now() - datetime.timedelta(days=60)) + + j1, j2, j3, j4, j5, j6 = map(self.create_job, range(6)) + + j1.executed = get_d_before_dt_by_days( + mock_get_today_dt.return_value, 30) + j1.save() + j2.executed = get_d_before_dt_by_days( + mock_get_today_dt.return_value, 15) + j2.save() + j3.executed = get_d_before_dt_by_days( + mock_get_today_dt.return_value, 15) + j3.save() + j4.cancelled = get_d_before_dt_by_days( + mock_get_today_dt.return_value, 30) + j4.save() + j5.cancelled = get_d_before_dt_by_days( + mock_get_today_dt.return_value, 15) + j5.save() + j6.cancelled = get_d_before_dt_by_days( + mock_get_today_dt.return_value, 15) + j6.save() + + api.remove_old_jobs() + + self.assertEqual( + Job.objects.filter(name__in=['job-1', 'job-2', 'job-3', 'job-4', 'job-5', 'job-6']).count(), 5) + + mock_get_today_dt.return_value = get_now() + management.call_command('flush_queue') + + self.assertEqual( + Job.objects.filter(name__in=['job-1', 'job-2', 'job-3', 'job-4', 'job-5', 'job-6']).count(), 0) + + def test_groups__with_young_cancelled_jobs_are_not_removed(self): + test_base_dt = get_now() + group = Group.objects.create(reference='not_rm-young-group') + job = self.create_job('not_rm-young-group', group) + job.cancelled = test_base_dt - datetime.timedelta(days=16) + job.save() + + api.remove_old_jobs() + + self.assertEqual(Job.objects.all().count(), 2, Job.objects.all()) + self.assertEqual(Group.objects.all().count(), 1, Group.objects.all()) + self.assertEqual(Error.objects.all().count(), 1, Error.objects.all()) + + def test_groups_with_cancelled_job_are_removed(self): + test_base_dt = get_now() + group = Group.objects.create(reference='rm-group') + job = self.create_job('rm-group', group) + job.cancelled = test_base_dt - datetime.timedelta(days=31) + job.save() + + api.remove_old_jobs() + + self.assertEqual(Job.objects.all().count(), 1, Job.objects.all()) + self.assertEqual(Job.objects.all()[0].name, 'async.api.remove_old_jobs') + self.assertEqual(Group.objects.all().count(), 0, Group.objects.all()) + self.assertEqual(Error.objects.all().count(), 0, Error.objects.all()) + + def test_groups__with_young_and_old_cancelled_jobs_are_not_removed(self): + test_base_dt = get_now() + group = Group.objects.create(reference='not_rm-mixed-group') + job1 = self.create_job('not_rm-mixed-group', group) + job2 = self.create_job('not_rm-mixed-group', group) + + job1.cancelled = test_base_dt - datetime.timedelta(days=45) + job1.save() + job2.cancelled = test_base_dt - datetime.timedelta(days=16) + job2.save() + + api.remove_old_jobs() + + self.assertEqual(Job.objects.all().count(), 3, Job.objects.all()) + self.assertEqual(Group.objects.all().count(), 1, Group.objects.all()) + self.assertEqual(Error.objects.all().count(), 2, Error.objects.all()) diff --git a/admincommand/async/tests/test_commands.py b/admincommand/async/tests/test_commands.py new file mode 100644 index 0000000..58f2c92 --- /dev/null +++ b/admincommand/async/tests/test_commands.py @@ -0,0 +1,132 @@ +""" + Tests for the Django management commands. +""" +from django.core import management +from django.test import TestCase +try: + # No name 'timezone' in module 'django.utils' + # pylint: disable=E0611 + from django.utils import timezone as datetime +except ImportError: + from datetime import datetime +from mock import patch + +from async import schedule +from async.api import health, deschedule +from async.models import Job + + +# Using the global statement +# pylint: disable = W0603 + + +ORDER = None +def _dummy(order=None, error=None): + """Basic dummy function we can use to test the queue execution. + """ + if order: + ORDER.append(order) + if error: + raise Exception(error) + + +class TestFlushQueue(TestCase): + """Test the flush_queue management command. + """ + def test_empty_queue(self): + """Make sure we don't get any errors if the queue is empty. + """ + management.call_command('flush_queue') + self.assertEqual(Job.objects.all().count(), 0) + + def test_asap_tasks(self): + """Make sure that tasks scheduled for immediate execution + are run. + """ + schedule(_dummy) + self.assertEqual(Job.objects.filter(executed=None).count(), 1) + management.call_command('flush_queue') + self.assertEqual(Job.objects.filter(executed=None).count(), 0) + + def test_queue_fails_on_error(self): + """Make sure that the queue flushing stops on the first error. + """ + schedule(_dummy, kwargs={'error': "Error"}) + schedule(_dummy) + self.assertEqual(Job.objects.filter(executed=None).count(), 2) + with self.assertRaises(Exception): + management.call_command('flush_queue') + self.assertEqual(Job.objects.filter(executed=None).count(), 2) + management.call_command('flush_queue') + self.assertEqual(Job.objects.filter(executed=None).count(), 1) + + def test_scheduled_runs_first_when_added_first(self): + """Make sure that the scheduled job is always run first. + """ + global ORDER + ORDER = [] + schedule(_dummy, args=[1], run_after=datetime.now()) + schedule(_dummy, args=[2]) + management.call_command('flush_queue') + self.assertEqual(ORDER, [1, 2]) + + def test_scheduled_runs_first_when_added_last(self): + """Make sure that the scheduled job is always run first. + """ + global ORDER + ORDER = [] + schedule(_dummy, args=[2]) + schedule(_dummy, args=[1], run_after=datetime.now()) + management.call_command('flush_queue') + self.assertEqual(ORDER, [1, 2]) + + def test_scheduled_runs_last_when_has_higher_priority(self): + """The lowest priority scheduled job runs before the highest + priority non-scheduled job. + """ + global ORDER + ORDER = [] + schedule(_dummy, args=[1], priority=5) + schedule(_dummy, args=[2], priority=1, run_after=datetime.now()) + management.call_command('flush_queue') + self.assertEqual(ORDER, [1, 2]) + + def test_flush_queue_with_jobs_limit(self): + """Make sure that the number of job run is the same + as the input jobs limit. + """ + for _ in xrange(5): + schedule(_dummy) + management.call_command('flush_queue', jobs=2) + self.assertEqual(Job.objects.filter(executed=None).count(), 3) + + def test_flush_queue_without_jobs_limit_limit_at_300_by_default(self): + """Make sure that the number of job run by default is 300. + """ + for _ in xrange(305): + schedule(_dummy) + management.call_command('flush_queue') + self.assertEqual(Job.objects.filter(executed=None).count(), 5) + + def test_flush_queue_with_cancelled_jobs__should_not_be_executed(self): + """Make sure that the number of job run by default is 300. + """ + for _ in xrange(5): + job = schedule(_dummy) + deschedule(job.name) + management.call_command('flush_queue') + self.assertEqual(Job.objects.filter(executed=None).count(), 5) + self.assertEqual(Job.objects.filter(cancelled=None).count(), 0) + + +class TestHealth(TestCase): + """Make sure the health command runs without any errors. + """ + def test_health(self): + """Excecute command. + """ + with patch( + 'async.management.commands.queue_health.dumps', + lambda x: self.assertEqual(x, health())): + management.call_command('queue_health') + diff --git a/admincommand/async/tests/test_deschedule.py b/admincommand/async/tests/test_deschedule.py new file mode 100644 index 0000000..9b1fe2f --- /dev/null +++ b/admincommand/async/tests/test_deschedule.py @@ -0,0 +1,50 @@ +""" + Test the deschedule API. +""" +from django.test import TestCase +# No name 'sha1' in module 'hashlib' +# pylint: disable=E0611 +from hashlib import sha1 + +from async import schedule +from async.api import deschedule +from async.models import Job + + +def _example(*_a, **_kw): + """An example function that can be used for testing purposes. + """ + pass + + +class TestDeschedule(TestCase): + """Make sure that descheduling works correctly. + """ + def test_unicode_generated(self): + """Make sure that the unicode generated by a real scheduled job + and one used to create an identity match. + """ + job1 = schedule('async.tests.test_deschedule._example') + job2 = Job(name='async.tests.test_deschedule._example', + args="[]", kwargs="{}") + self.assertEqual(job1.identity, sha1(unicode(job2)).hexdigest()) + + def test_deschedule_by_name(self): + """We must be able to deschedule a job by giving its name. + """ + job = schedule('async.tests.test_deschedule._example') + self.assertEqual(job.name, 'async.tests.test_deschedule._example') + deschedule('async.tests.test_deschedule._example') + job = Job.objects.get(pk=job.pk) + self.assertIsNotNone(job.cancelled) + + def test_deschedule_by_function(self): + """We must be able to schedule a job by giving a function. + """ + job = schedule(_example) + # Different versions of Django will import this file differently + self.assertTrue(job.name.endswith( + 'async.tests.test_deschedule._example')) + deschedule(_example) + job = Job.objects.get(pk=job.pk) + self.assertIsNotNone(job.cancelled) diff --git a/admincommand/async/tests/test_execute.py b/admincommand/async/tests/test_execute.py new file mode 100644 index 0000000..0906bb1 --- /dev/null +++ b/admincommand/async/tests/test_execute.py @@ -0,0 +1,89 @@ +""" + Tests for task execution. +""" +from django.contrib.auth.models import User +from django.test import TransactionTestCase + +from async import schedule +from async.models import Job + + +# Redefining name '_EXECUTED' from outer scope +# pylint: disable = W0621 +_EXECUTED = None + +def _function(*a, **kw): + """A simple test function. + """ + # Using the global statement + # pylint: disable = W0603 + global _EXECUTED + _EXECUTED = (a, kw) + for name in a: + User(username=name).save() + assert kw.get('assert', True) + return kw.get('result', None) + + +class _class(object): + """Test class holder. + """ + @classmethod + def class_method(cls, *a, **kw): + """Class method so we can be sure these work. + """ + # Using the global statement + # pylint: disable = W0603 + global _EXECUTED + _EXECUTED = (a, kw) + + +class TestExecution(TransactionTestCase): + """Test that execution of a job works correctly in all circumstances. + """ + def setUp(self): + # Using the global statement + # pylint: disable = W0603 + global _EXECUTED + _EXECUTED = None + + def test_simple(self): + """Execute a basic function. + """ + job = schedule(_function, + args=['async-test-user'], kwargs={'result': 'something'}) + self.assertEqual(job.execute(), "something") + self.assertEqual(_EXECUTED, + (('async-test-user',), {'result': 'something'})) + self.assertEqual('"something"', job.result) + self.assertIsNotNone(job.executed) + self.assertLess(job.started, job.executed) + self.assertEqual( + User.objects.filter(username='async-test-user').count(), 1) + + def test_error_recording(self): + """Make sure that if there is an error in the function it is dealt + with properly. + """ + job = Job.objects.get( + pk=schedule(_function, + args=['async-test-user'], kwargs={'assert': False}).pk) + self.assertEqual(job.errors.count(), 0) + with self.assertRaises(AssertionError): + job.execute() + self.assertEqual(_EXECUTED, (('async-test-user',), {'assert': False})) + job = Job.objects.get(pk=job.pk) + self.assertEqual(job.errors.count(), 1) + error = job.errors.all()[0] + self.assertIn('AssertionError', error.exception) + self.assertIn('async/tests/test_execute.py', error.traceback) + self.assertIsNotNone(job.scheduled) + self.assertEqual( + User.objects.filter(username='async-test-user').count(), 0) + + def test_class_method_works(self): + """Make sure that we can execute a class method. + """ + job = schedule(_class.class_method) + job.execute() + self.assertIsNotNone(job.executed) diff --git a/admincommand/async/tests/test_flush_queue.py b/admincommand/async/tests/test_flush_queue.py new file mode 100644 index 0000000..68e6d73 --- /dev/null +++ b/admincommand/async/tests/test_flush_queue.py @@ -0,0 +1,86 @@ +""" + Tests for the flush queue management command. +""" +from django.test import TestCase +from django.core import management + +from async.api import schedule +from async.models import Job, Group + + +def do_job(): + pass + + +class TestFlushQueue(TestCase): + def setUp(self): + self.group = Group.objects.create(reference='1of2') + self.j1 = schedule(do_job, group=self.group) + self.j2 = schedule(do_job, group=self.group) + + def test_0of2(self): + management.call_command('flush_queue', which=0, outof=2) + j1 = Job.objects.get(pk=self.j1.pk) + j2 = Job.objects.get(pk=self.j2.pk) + if ( j1.pk %2 ): + self.assertIsNone(j1.executed) + self.assertIsNotNone(j2.executed) + else: + self.assertIsNotNone(j1.executed) + self.assertIsNone(j2.executed) + + def test_1of2(self): + management.call_command('flush_queue', which=1, outof=2) + j1 = Job.objects.get(pk=self.j1.pk) + j2 = Job.objects.get(pk=self.j2.pk) + if ( j1.pk %2 ): + self.assertIsNotNone(j1.executed) + self.assertIsNone(j2.executed) + else: + self.assertIsNone(j1.executed) + self.assertIsNotNone(j2.executed) + + def test_2of2(self): + management.call_command('flush_queue', which=2, outof=2) + j1 = Job.objects.get(pk=self.j1.pk) + j2 = Job.objects.get(pk=self.j2.pk) + if ( j1.pk %2 ): + self.assertIsNone(j1.executed) + self.assertIsNotNone(j2.executed) + else: + self.assertIsNotNone(j1.executed) + self.assertIsNone(j2.executed) + +class TestFinalJob(TestCase): + def test_final_when_added_last(self): + self.group = Group.objects.create(reference='final-job') + self.j1 = schedule(do_job, group=self.group) + self.j2 = schedule(do_job, group=self.group) + self.j3 = schedule(do_job) + self.group.on_completion(self.j3) + management.call_command('flush_queue') + j1 = Job.objects.get(pk=self.j1.pk) + j2 = Job.objects.get(pk=self.j2.pk) + j3 = Job.objects.get(pk=self.j3.pk) + self.assertLess(j1.executed, j2.executed) + self.assertLess(j2.executed, j3.executed) + + def test_final_when_added_first(self): + self.j1 = schedule(do_job) + self.group = Group.objects.create(reference='final-job', final=self.j1) + self.j2 = schedule(do_job, group=self.group) + self.j3 = schedule(do_job, group=self.group) + management.call_command('flush_queue') + j1 = Job.objects.get(pk=self.j1.pk) + j2 = Job.objects.get(pk=self.j2.pk) + j3 = Job.objects.get(pk=self.j3.pk) + self.assertLess(j2.executed, j3.executed) + self.assertLess(j3.executed, j1.executed) + + def test_only_has_final_job(self): + self.j1 = schedule(do_job) + self.group = Group.objects.create(reference='final-job', final=self.j1) + management.call_command('flush_queue') + j1 = Job.objects.get(pk=self.j1.pk) + self.assertIsNotNone(j1.executed) + diff --git a/admincommand/async/tests/test_models.py b/admincommand/async/tests/test_models.py new file mode 100644 index 0000000..3126701 --- /dev/null +++ b/admincommand/async/tests/test_models.py @@ -0,0 +1,266 @@ +""" + Testing that models work properly. +""" +import datetime + +from django.test import TestCase, TransactionTestCase +from django.core.exceptions import ValidationError +try: + # No name 'timezone' in module 'django.utils' + # pylint: disable=E0611 + from django.utils import timezone +except ImportError: + from datetime import datetime as timezone + +from async import schedule +from async.models import Error, Job, Group + +def _fn(*_a, **_kw): + """Test function. + """ + pass + + +class TestJob(TransactionTestCase): + """Make sure the basic model features work properly. + """ + def test_model_creation(self): + """Make sure schedule API works. + """ + job = schedule('async.tests.test_models._fn') + self.assertEqual(Job.objects.all().count(), 1) + self.assertEqual(unicode(job), "async.tests.test_models._fn()") + self.assertEqual(job.identity, + '289dbff9c1bd746fc444a20d396986857a6e8f04') + + def test_model_creation_with_no_group(self): + """Make sure schedule API works with no group. + """ + job = schedule('async.tests.test_models._fn') + self.assertEqual(Job.objects.all().count(), 1) + self.assertEqual(job.group, None) + + def test_model_creattion_with_group(self): + """make sure schedule API works with group. + """ + group = Group.objects.create( + reference='test-group', + description='for testing') + job = schedule('async.tests.test_models._fn') + job.group = group + job.save() + + self.assertEqual(Job.objects.all().count(), 1) + self.assertEqual(job.group, group) + + def test_identity(self): + """Make sure that the identity we get is the same as in another + test when given the same arguments. + """ + job = schedule('async.tests.test_models._fn') + self.assertEqual(job.identity, + '289dbff9c1bd746fc444a20d396986857a6e8f04') + + def test_unicode_with_args(self): + """Make sure unicode handling deals with args properly. + """ + self.assertEqual(unicode(schedule( + 'async.tests.test_models._fn', args=['argument'])), + "async.tests.test_models._fn('argument')") + self.assertEqual(unicode(schedule( + 'async.tests.test_models._fn', args=['a1', 'a2'])), + "async.tests.test_models._fn('a1', 'a2')") + self.assertEqual(unicode(schedule( + 'async.tests.test_models._fn', args=[1, 2])), + 'async.tests.test_models._fn(1, 2)') + self.assertEqual(unicode(schedule( + 'async.tests.test_models._fn', args=[dict(k='v', x=None)])), + "async.tests.test_models._fn({'x': None, 'k': 'v'})") + + def test_unicode_with_kwargs(self): + """Make sure unicode handling deals with kwargs properly. + """ + job = schedule('async.tests.test_models._fn', + kwargs=dict(k='v', x=None)) + self.assertEqual(unicode(job), + "async.tests.test_models._fn(x=None, k='v')") + self.assertEqual(job.identity, + '60941ebcc096c0223ba1db02b3d256f19ba553a3') + + def test_unicode_with_args_and_kwargs(self): + """Make sure unicode handling deals with kwargs properly. + """ + job = schedule('async.tests.test_models._fn', + args=['argument'], kwargs=dict(k='v', x=None)) + self.assertEqual(unicode(job), + "async.tests.test_models._fn('argument', x=None, k='v')") + self.assertEqual(job.identity, + '2ce2bb7935439a6ab3f111882f359a06b36bf995') + + +class TestError(TestCase): + """Test the Error model. + """ + + def test_unicode(self): + """Make sure the that the Unicode form of the Error works. + """ + job = schedule('async.tests.test_models._fn') + error = Error.objects.create(job=job, exception="Exception text") + self.assertTrue( + unicode(error).endswith(u' : Exception text'), unicode(error)) + + +class TestGroup(TestCase): + """Test the Group model. + """ + def setUp(self): + self.g1 = Group.objects.create( + reference='group1', + description='test group1' + ) + self.j1 = Job.objects.create( + name='job1', + args='[]', + kwargs='{}', + meta='{}', + priority=3, + ) + + self.j2 = Job.objects.create( + name='job2', + args='[]', + kwargs='{}', + meta='{}', + priority=3, + ) + + self.j3 = Job.objects.create( + name='job3', + args='[]', + kwargs='{}', + meta='{}', + priority=3, + ) + + def test_model_creation(self): + """ Test if can create model. Get new instance. + """ + group = Group.objects.create( + reference='test-group', + description='for testing' + ) + + self.assertTrue(Group.objects.all().count(), 1) + self.assertEqual(unicode(group), u'test-group') + self.assertEqual(group.description, 'for testing') + + def test_creating_group_with_duplicate_reference_and_executed_job(self): + """ Create new group with reference same as old group which has + one job and already executed. Creating should success. + """ + self.j1.group = self.g1 + self.j1.save() + self.j2.group = self.g1 + self.j2.save() + + self.j1.executed = timezone.now() + self.j1.save() + self.j2.cancelled = timezone.now() + self.j2.save() + + g2 = Group.objects.create(reference=self.g1.reference) + self.assertEqual( + Group.objects.filter(reference=self.g1.reference).count(), 2) + + def test_cancelled_jobs_allow_new_group(self): + """ Make sure that a cancelled job allows us to make a new + group with the same reference. + """ + self.j1.group = self.g1 + self.j1.cancelled = timezone.now() + self.j1.save() + + g2 = Group.objects.create(reference=self.g1.reference) + self.assertEqual( + Group.objects.filter(reference=self.g1.reference).count(), 2) + + def test_creating_group_with_duplicate_reference_and_has_one_unexecuted_job(self): + """ Create new group with reference same as old group which has + unexecuted job. Creating should not success. + """ + + # Assiging j1, j2, j3 to group1 + self.j1.group = self.g1 + self.j1.save() + self.j2.group = self.g1 + self.j2.save() + self.j3.group = self.g1 + self.j3.save() + + # Mark executed for j1, j2 + self.j1.executed = timezone.now() + self.j1.save() + self.j2.executed = timezone.now() + self.j2.save() + + with self.assertRaises(ValidationError) as e: + Group.objects.create(reference=self.g1.reference) + self.assertTrue(isinstance(e.exception, ValidationError)) + self.assertEqual(Group.objects.filter(reference=self.g1.reference).count(), 1) + + def test_adding_job_to_group_that_has_executed_job(self): + """ Add job to group which have one executed job. + """ + self.j1.group = self.g1 + self.j1.executed = timezone.now() + self.j1.save() + + with self.assertRaises(ValidationError) as e: + self.j2.group = self.g1 + self.j2.save() + self.assertTrue(isinstance(e.exception, ValidationError)) + self.assertEqual(Job.objects.filter(group=self.g1).count(), 1) + + def test_adding_job_to_group_that_has_cancelled_job(self): + """ Add job to group which have one cancelled job. + """ + self.j1.group = self.g1 + self.j1.cancelled = timezone.now() + self.j1.save() + + with self.assertRaises(ValidationError) as e: + self.j2.group = self.g1 + self.j2.save() + self.assertTrue(isinstance(e.exception, ValidationError)) + self.assertEqual(Job.objects.filter(group=self.g1).count(), 1) + + def test_adding_job_to_group_that_has_unexecuted_job(self): + """ Add jobs to group which has unexecuted job. + """ + self.j1.group = self.g1 + self.j1.save() + self.j2.group = self.g1 + self.j2.save() + + self.assertEqual(Group.objects.get(reference=self.g1.reference).jobs.count(), 2) + + def test_adding_job_to_executed_group(self): + """ Adding a new job to a fully executed group creates a new group. + """ + self.j1.group = self.g1 + self.j1.executed = timezone.now() + self.j1.save() + + job = schedule('async.tests.test_models._fn', + group=self.g1.reference) + self.assertEqual(job.group.reference, self.g1.reference) + self.assertNotEqual(job.group.pk, self.g1.pk) + + def test_schedule_passing_group_instance(self): + """ Scheduling a job passing in a group object works. + """ + job = schedule('async.tests.test_models._fn', + group=self.g1) + self.assertEqual(job.group.pk, self.g1.pk) + diff --git a/admincommand/async/tests/test_schedule.py b/admincommand/async/tests/test_schedule.py new file mode 100644 index 0000000..2baa45f --- /dev/null +++ b/admincommand/async/tests/test_schedule.py @@ -0,0 +1,37 @@ +""" + Test the schedule API. +""" +from django.test import TestCase + +from async import schedule + + +def _example(): + """An example function that can be used for testing purposes. + """ + pass + + +class TestSchedule(TestCase): + """Make sure that scheduling works correctly. + """ + def test_schedule_by_name(self): + """We must be able to schedule a job by giving its name. + """ + job = schedule('async.tests.test_schedule._example') + self.assertEqual(job.name, 'async.tests.test_schedule._example') + + def test_schedule_by_function(self): + """We must be able to schedule a job by giving a function. + """ + job = schedule(_example) + # Different versions of Django will import this file differently + self.assertTrue(job.name.endswith( + 'async.tests.test_schedule._example')) + + def test_schedule_in_group(self): + """Make sure that a group is created if it doesn't already exist. + """ + job = schedule(_example, group='test_schedule_in_group') + self.assertEqual(job.group.reference, 'test_schedule_in_group') + diff --git a/admincommand/async/tests/test_slumber_server.py b/admincommand/async/tests/test_slumber_server.py new file mode 100644 index 0000000..9f923cd --- /dev/null +++ b/admincommand/async/tests/test_slumber_server.py @@ -0,0 +1,415 @@ +""" + Tests for the Slumber operations. +""" +from datetime import timedelta +from simplejson import dumps, loads +from mock import patch + +from django.contrib.auth.models import User, Permission +from django.test import TestCase +from django.core.exceptions import ValidationError +try: + # No name 'timezone' in module 'django.utils' + # pylint: disable=E0611 + from django.utils import timezone +except ImportError: + from datetime import datetime as timezone + +from async.models import Job, Group, Error +from async.slumber_operations import Progress + + +# Instance of 'WSGIRequest' has no 'status_code' member +# (but some types could not be inferred) +# pylint: disable=E1103 + + +class TestSlumber(TestCase): + """Make sure that Slumber is wired in properly. + """ + maxDiff = None + + def test_slumber_root(self): + """Make sure Slumber is properly wired in. + """ + response = self.client.get('/slumber/') + self.assertEqual(response.status_code, 200) + json = loads(response.content) + self.assertEqual(json, dict( + services=None, + configuration={}, + apps={ + 'async': '/slumber/async/', + 'django.contrib.sites': '/slumber/django/contrib/sites/', + 'django.contrib.admin': '/slumber/django/contrib/admin/', + 'django.contrib.auth': '/slumber/django/contrib/auth/', + 'django.contrib.contenttypes': + '/slumber/django/contrib/contenttypes/', + 'django.contrib.messages': '/slumber/django/contrib/messages/', + 'django.contrib.sessions': '/slumber/django/contrib/sessions/', + 'django.contrib.staticfiles': + '/slumber/django/contrib/staticfiles/', + 'django_nose': '/slumber/django_nose/', + 'south': '/slumber/south/'}, + _meta={'message': 'OK', 'status': 200}), json) + + +class WithUser(object): + def setUp(self): + super(WithUser, self).setUp() + self.user = User.objects.create(username='test') + self.user.set_password('password') + self.user.save() + self.client.login(username='test', password='password') + self.permission = Permission.objects.get(codename='add_job') + self.user.user_permissions.add(self.permission) + + +class TestSchedule(WithUser, TestCase): + """Make sure the schedule API wrapper works. + """ + URL = '/slumber/async/Job/schedule/' + + def test_get_works(self): + """Make sure the operation is wired in. Don't expect any output yet. + """ + response = self.client.get(self.URL) + self.assertEqual(response.status_code, 200) + json = loads(response.content) + self.assertEqual(json, dict( + _meta={'message': 'OK', 'status': 200, 'username': 'test'})) + + def test_simple_post_works(self): + """Make sure the basic post functionality works. + """ + response = self.client.post(self.URL, dict( + name='test-job-1')) + self.assertEqual(response.status_code, 200) + job = Job.objects.get(name='test-job-1') + self.assertEqual(job.args, '[]') + self.assertIsNone(job.scheduled) + json = loads(response.content) + self.assertEqual(json, dict( + job=dict(id=job.id), + _meta={'message': 'OK', 'status': 200, 'username': 'test'})) + + def test_args_multipart_content(self): + """Make sure that arguments are properly processed when using a + normal POST. + """ + response = self.client.post(self.URL, dict( + name='test-job-1', args=[1, True, "Hello"])) + self.assertEqual(response.status_code, 200) + job = Job.objects.get(name='test-job-1') + self.assertEqual(job.args, dumps(["1", "True", "Hello"])) + self.assertIsNone(job.scheduled) + json = loads(response.content) + self.assertEqual(json, dict( + job=dict(id=job.id), + _meta={'message': 'OK', 'status': 200, 'username': 'test'})) + + def test_args_json(self): + """Make sure that arguments are properly processed when using a + normal POST. + """ + response = self.client.post(self.URL, dumps(dict( + name='test-job-1', args=[1, True, "Hello"])), + content_type='application/json') + self.assertEqual(response.status_code, 200) + job = Job.objects.get(name='test-job-1') + self.assertEqual(job.args, dumps([1, True, "Hello"])) + self.assertIsNone(job.scheduled) + json = loads(response.content) + self.assertEqual(json, dict( + job=dict(id=job.id), + _meta={'message': 'OK', 'status': 200, 'username': 'test'})) + + def test_run_at(self): + """Make sure that the run at time is properly handled. + """ + scheduled = timezone.now() + timedelta(days=1) + response = self.client.post(self.URL, dict( + name='test-job-1', run_after=scheduled)) + self.assertEqual(response.status_code, 200) + job = Job.objects.get(name='test-job-1') + self.assertEqual(job.args, '[]') + self.assertEqual(job.scheduled, scheduled) + json = loads(response.content) + self.assertEqual(json, dict( + job=dict(id=job.id), + _meta={'message': 'OK', 'status': 200, 'username': 'test'})) + + def test_create_job_with_group(self): + """Create job with group reference. + """ + scheduled = timezone.now() + timedelta(days=1) + group = Group.objects.create( + reference='test-group-1', + description='info') + response = self.client.post(self.URL, dict( + name='test-job-1', + run_after=scheduled, + group=group.reference)) + self.assertEqual(response.status_code, 200) + job = Job.objects.get(name='test-job-1') + self.assertEqual(job.group.reference, group.reference) + + def test_create_job_with_non_exist_group(self): + """Create job with non exist group creates one + """ + scheduled = timezone.now() + timedelta(days=1) + response = self.client.post(self.URL, dict( + name='test-job-1', + run_after=scheduled, + group='non-exist-group')) + self.assertEqual(response.status_code, 200) + json = loads(response.content) + self.assertEqual(json['_meta'], { + 'status': 200, + 'username': 'test', + 'message': 'OK'}) + job = Job.objects.get(pk=json['job']['id']) + self.assertEqual(job.group.reference, 'non-exist-group') + self.assertEqual( + Group.objects.filter(reference='non-exist-group').count(), 1) + + def test_create_job_with_multiple_group_same_reference(self): + """Create job by assiging multiple group + current job should has been assign by latest group + """ + scheduled = timezone.now() + timedelta(days=1) + g1 = Group.objects.create(reference='multiple-group') + g2 = Group.objects.create(reference='multiple-group') + g3 = Group.objects.create(reference='multiple-group') + response = self.client.post(self.URL, dict( + name='test-job-1', + run_after=scheduled, + group='multiple-group')) + self.assertEqual(response.status_code, 200) + j1 = Job.objects.get(name='test-job-1') + self.assertEqual(Group.objects.filter( + reference='multiple-group').count(), 3 + ) + self.assertEqual(j1.group.reference, g3.reference) + self.assertNotEqual(j1.group.created, g1.created) + self.assertNotEqual(j1.group.created, g2.created) + self.assertEqual(j1.group.created, g3.created) + + def test_create_job_with_group_which_has_executed_job(self): + """Create job by assigning group which already has executed job. + So it should get ValidationError. + """ + scheduled = timezone.now() + timedelta(days=1) + g1 = Group.objects.create(reference='test-group') + + j1 = Job.objects.create( + name='test-job-1', group=g1, + args='[]', kwargs='{}', meta='{}', priority=5) + j2 = Job.objects.create( + name='test-job-2', group=g1, + args='[]', kwargs='{}', meta='{}', priority=5) + j1.executed = timezone.now() - timedelta(days=30) + j1.save() + with self.assertRaises(ValidationError) as e: + response = self.client.post(self.URL, dict( + name='test-job-3', + run_after=scheduled, + group='test-group')) + + def test_group_progres_url(self): + """Make sure that references with odd characters still generate + correct progress URLs. + """ + g1 = Group.objects.create(reference="space test") + response = self.client.get('/slumber/async/Group/data/%s/' % g1.pk) + self.assertEqual(response.status_code, 200) + json = loads(response.content) + self.assertEqual(json['operations']['progress'], + '/slumber/async/Group/progress/space%20test/') + + +class TestProgress(WithUser, TestCase): + URL = '/slumber/async/Group/progress/' + + def test_get_work(self): + """Test normal get request work. + """ + group_name = 'test-ddrun' + group1 = Group.objects.create(reference=group_name) + Job.objects.create( + name='test-job1', + args='[]', + kwargs='{}', + meta='{}', + priority=3, + group=group1 + ) + Job.objects.create( + name='test-job2', + args='[]', + kwargs='{}', + meta='{}', + priority=3, + group=group1 + ) + test_url = self.URL + group_name + '/' + + response = self.client.get(test_url) + self.assertEqual(response.status_code, 200) + + json = loads(response.content) + self.assertEqual(json['_meta'], + {'message': 'OK', 'status': 200, 'username': 'test'} + ) + + json_progress = json.get('progress') + self.assertTrue(json_progress) + self.assertEqual(json_progress.get('id'), group1.id) + self.assertEqual(json_progress.get('created'), group1.created.isoformat()) + self.assertIsNone(json_progress.get('last_job_completed')) + self.assertEqual(json_progress.get('total_jobs'), group1.jobs.count()) + self.assertEqual(json_progress.get('total_executed_jobs'), 0) + self.assertEqual(json_progress.get('total_unexecuted_jobs'), 2) + self.assertEqual(json_progress.get('total_error_jobs'), 0) + self.assertIsNone(json_progress.get('estimated_total_time')) + self.assertEqual(json_progress.get('remaining_seconds'), 0) + + def test_no_any_job_in_group(self): + """Create group but no job create for that group. + """ + group_name = 'test-ddrun' + Group.objects.create(reference=group_name) + test_url = self.URL + group_name + '/' + + response = self.client.get(test_url) + self.assertEqual(response.status_code, 200) + + json = loads(response.content) + self.assertEqual(json['_meta'], + {'message': 'OK', 'status': 200, 'username': 'test'} + ) + self.assertTrue(json.get('progress') is None) + + def test_no_group_valid_for_get_request(self): + """ Do get request to non exist group. + """ + #TODO need to catch more specific exception + # now it just thrown TemplateDoesNotExist + with self.assertRaises(Exception) as e: + response = self.client.get(self.URL + 'fake-group/') + + def test_all_jobs_executed(self): + """Test get detail from group with all executed jobs. + """ + group1 = Group.objects.create(reference='drun1') + for i in range(5): + Job.objects.create(name='j-%s' % i, args='[]', kwargs='{}', + meta='{}', priority=3, group=group1) + for job in group1.jobs.all(): + job.executed = timezone.now() + job.save() + + test_url = self.URL + 'drun1/' + response = self.client.get(test_url) + self.assertEqual(response.status_code, 200) + + json = loads(response.content) + self.assertEqual(json['_meta'], + {'message': 'OK', 'status': 200, 'username': 'test'}) + + json_progress = json.get('progress') + self.assertTrue(json_progress) + self.assertEqual(json_progress.get('id'), group1.id) + self.assertEqual(json_progress.get('created'), group1.created.isoformat()) + self.assertEqual(json_progress.get('latest_job_completed'), + group1.latest_executed_job().executed.isoformat()) + self.assertEqual(json_progress.get('total_jobs'), group1.jobs.count()) + self.assertEqual(json_progress.get('total_executed_jobs'), 5) + self.assertEqual(json_progress.get('total_unexecuted_jobs'), 0) + self.assertEqual(json_progress.get('total_error_jobs'), 0) + + def test_all_jobs_executed_with_error(self): + """Test get detail from group with job errors. + """ + group1 = Group.objects.create(reference='drun1') + for i in range(5): + Job.objects.create(name='j-%s' % i, args='[]', kwargs='{}', + meta='{}', priority=3, group=group1) + for job in group1.jobs.all(): + job.executed = timezone.now() - timedelta(days=60) + job.save() + + j1 = Job.objects.all()[0] + j2 = Job.objects.all()[1] + e1 = Error.objects.create(job=j1) + e2 = Error.objects.create(job=j1) + e3 = Error.objects.create(job=j2) + + test_url = self.URL + 'drun1/' + response = self.client.get(test_url) + self.assertEqual(response.status_code, 200) + + json = loads(response.content) + self.assertEqual(json['_meta'], + {'message': 'OK', 'status': 200, 'username': 'test'}) + + json_progress = json.get('progress') + self.assertTrue(json_progress) + self.assertEqual(json_progress.get('id'), group1.id) + self.assertEqual(json_progress.get('created'), group1.created.isoformat()) + self.assertEqual(json_progress.get('latest_job_completed'), + group1.latest_executed_job().executed.isoformat()) + self.assertEqual(json_progress.get('total_jobs'), group1.jobs.count()) + self.assertEqual(json_progress.get('total_executed_jobs'), 5) + self.assertEqual(json_progress.get('total_unexecuted_jobs'), 0) + self.assertEqual(json_progress.get('total_error_jobs'), 2) + + def test_estimate_execution_duration_can_produce_result(self): + """Just to test if estimate function produce result, + not checking the result. + """ + + g1 = Group.objects.create(reference='test-group') + g1.created = timezone.now() - timedelta(days=5000) + g1.save() + + def create_job_series(id): + j = Job.objects.create( + name='job-%s' % id, + args='[]', + kwargs='{}', + meta='{}', + priority=5, + group=g1 + ) + j.save() + j.added = timezone.now() - timedelta(days=5000) + j.save() + return j + + self.assertIsNone(g1.latest_executed_job()) + + j1, j2, j3, j4, j5, j6 = map(create_job_series, range(0, 6)) + j1.executed = j1.added + timedelta(days=10) + j1.save() + j2.executed = j2.added + timedelta(days=10) + j2.save() + + total, remaining, consumed = g1.estimate_execution_duration() + self.assertTrue(isinstance(total, timedelta)) + self.assertTrue(isinstance(remaining, timedelta)) + self.assertTrue(isinstance(consumed, timedelta)) + + + def test_estimate_execution_duration_with_no_job_valid(self): + """Calculate function return None if no data for process. + """ + g1 = Group.objects.create(reference='test-group') + g1.created = timezone.now() - timedelta(days=5000) + g1.save() + + total, remaining, consumed = g1.estimate_execution_duration() + self.assertIsNone(total) + self.assertIsNone(remaining) + self.assertIsNone(consumed) + diff --git a/admincommand/async/tests/test_utils.py b/admincommand/async/tests/test_utils.py new file mode 100644 index 0000000..e243fbd --- /dev/null +++ b/admincommand/async/tests/test_utils.py @@ -0,0 +1,17 @@ +""" + Extra tests that might be needed for utilities. +""" +from django.test import TestCase + +from async.utils import object_at_end_of_path + + +class TestFetchingMethod(TestCase): + """Tests for object_at_end_of_path + """ + def test_with_global(self): + """Make sure we can access a builtin + """ + found = object_at_end_of_path('list') + self.assertEqual(found, list) + diff --git a/admincommand/async/utils.py b/admincommand/async/utils.py new file mode 100644 index 0000000..5d1f8be --- /dev/null +++ b/admincommand/async/utils.py @@ -0,0 +1,50 @@ +""" + Various Python utilities. +""" +from inspect import getmembers, getmodule, isfunction, ismethod + + +def full_name(item): + """Return the full name of a something passed in so it can be retrieved + later on. + """ + if isinstance(item, basestring): + return item + if ismethod(item): + module_name = full_name(dict(getmembers(item))['im_self']) + else: + module_name = getmodule(item).__name__ + if isfunction(item): + name = item.func_name + else: + name = item.__name__ + return '.'.join([module_name, name]) + + +def object_at_end_of_path(path): + """Attempt to return the Python object at the end of the dotted + path by repeated imports and attribute access. + """ + access_path = path.split('.') + module = None + for index in xrange(1, len(access_path)): + try: + # import top level module + module_name = '.'.join(access_path[:-index]) + module = __import__(module_name) + except ImportError: + continue + else: + for step in access_path[1:-1]: # walk down it + module = getattr(module, step) + break + if module: + return getattr(module, access_path[-1]) + else: + return globals()['__builtins__'][path] + + +def non_unicode_kwarg_keys(kwargs): + """Convert all the keys to strings as Python won't accept unicode. + """ + return dict([(str(k), v) for k, v in kwargs.items()]) if kwargs else {} diff --git a/admincommand/core.py b/admincommand/core.py index 568d993..2a26185 100644 --- a/admincommand/core.py +++ b/admincommand/core.py @@ -1,3 +1,4 @@ +import threading from StringIO import StringIO from django.conf import settings @@ -7,12 +8,12 @@ from django.utils.importlib import import_module from django.core.management.base import BaseCommand from django.contrib.auth.models import User -from async import schedule +from admincommand.async import schedule from admincommand.models import AdminCommand -# Cache variable to store runnable commands configuration +# Cache variable to store runnable commands configuration _command_configs = {} @@ -70,5 +71,16 @@ def run_command(command_config, cleaned_data, user): # display it to the user output = StringIO() kwargs['stdout'] = output - management.call_command(command_config.command_name(), *args, **kwargs) - return output.getvalue() + call_command = lambda: management.call_command(command_config.command_name(), *args, **kwargs) + if getattr(command_config, 'thread', False): + # AdminCommand.thread is (undocumented) support for calling management commands + # from a threaded context. This is usually a bad idea, but due to post-save signals + # etc it may be the only option in certain circumstances (for example, see this + # old ticket here https://code.djangoproject.com/ticket/8399) + thread = threading.Thread(target=call_command) + thread.start() + thread.join() + return "(executed in a thread)\n" + output.getvalue() + else: + call_command() + return output.getvalue() diff --git a/setup.py b/setup.py old mode 100644 new mode 100755 index 7b8c72b..da3e34c --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ #!/usr/bin/env python import os -from distutils.core import setup +from setuptools import setup def read(fname): @@ -15,7 +15,7 @@ def read(fname): author='Djaz Team', author_email='devweb@liberation.fr', url='https://github.com/liberation/django-admincommand', - packages=['admincommand'], + packages=['admincommand', 'admincommand.async'], data_files=[('admincommand/templates/admincommand', [ 'admincommand/templates/admincommand/output.html', 'admincommand/templates/admincommand/run.html'])]