diff --git a/README.md b/README.md index cf8de75..28e7661 100644 --- a/README.md +++ b/README.md @@ -45,10 +45,10 @@ actual queue name would be `prod-subscriptions-random-queue-stag` - [x] Configurable encoding (json/base64) - [ ] SQS Spooling (Don't know if spool-consumer supports aws local) #### Beanstalk -- [ ] Send Message -- [ ] Receive Message -- [ ] Delete Message -- [ ] Create queue if not exists +- [x] Send Message +- [x] Receive Message +- [x] Delete Message +- [x] Create queue if not exists ## Queue URI Examples: ```python diff --git a/py_queue_factory/__init__.py b/py_queue_factory/__init__.py index 091acfe..ccf16a6 100644 --- a/py_queue_factory/__init__.py +++ b/py_queue_factory/__init__.py @@ -2,4 +2,5 @@ from .abstract_queue import * from .sqs_queue import * from .sqs_local import * +from .beanstalk_queue import * from .queue_factory import * diff --git a/py_queue_factory/abstract_queue.py b/py_queue_factory/abstract_queue.py index 449337f..7cda1bd 100644 --- a/py_queue_factory/abstract_queue.py +++ b/py_queue_factory/abstract_queue.py @@ -1,4 +1,6 @@ import copy +import json +import base64 from abc import ABC, abstractmethod import urllib.parse as url_parse @@ -18,11 +20,12 @@ class AbstractQueue(ABC): DEFAULT_ENCODING = 'base64' VALID_ENCODING = ['json', 'base64'] - def send_message(self, message, delay=0): + def send_message(self, message, delay=0, debug=False): if not isinstance(message, QueueMessage): - # message = self.handle_cid(message) message = QueueMessage(message) self.do_send_message(message, delay) + if debug: + return message @abstractmethod def do_send_message(message, delay): @@ -87,3 +90,23 @@ def get_queue_name(self): self.subdomain, self.queue_name) return self.queue_prefix + queue_name_with_suffix + + @staticmethod + def encode_mesage(message_body, encoding): + if encoding == 'json': + message_body = json.dumps(message_body) + elif encoding == 'base64': + json_message = json.dumps(message_body).encode('utf-8') + message_body = base64.b64encode(json_message).decode('utf-8') + + return message_body + + @staticmethod + def decode_message(message_body, encoding): + if encoding == 'json': + message_body = json.loads(message_body) + elif encoding == 'base64': + message_body = base64.b64decode(message_body.encode('utf-8')) + message_body = json.loads(message_body.decode('utf-8')) + + return message_body diff --git a/py_queue_factory/beanstalk_queue.py b/py_queue_factory/beanstalk_queue.py new file mode 100644 index 0000000..b8725ae --- /dev/null +++ b/py_queue_factory/beanstalk_queue.py @@ -0,0 +1,61 @@ +import os +import urllib.parse as url_parse + +import beanstalkc3 +from beanstalkc3 import Job + +from . import AbstractQueue, QueueMessage + + +class Beanstalk(AbstractQueue): + + BEANSTALK_MAX_VISIBILITY_TIMEOUT = 60 * 60 * 12 # 12 hours + BEANSTALK_RECEIVE_MESSAGE_WAIT_TIME = 30 # 30 seconds + + def __init__(self, uri, host_url, subdomain, default_port=11300): + parts = url_parse.urlparse(uri) + host = parts.hostname + self.scheme = parts.scheme + port = parts.port if parts.port else default_port + self.set_host_url(host_url).set_subdomain(subdomain) + path_parts = list(filter(None, parts.path.split('/'))) + self.queue_prefix = "/".join(path_parts) if path_parts else '' + self.beanstalk_client = beanstalkc3.Connection(host=host, port=port) + + def do_send_message(self, message, delay, attempt=1): + if delay > 900: + delay = 900 + try: + message_body = AbstractQueue.encode_mesage(message.get_body(), self.encoding) + self.beanstalk_client.use(self.get_queue_name()) + respone = self.beanstalk_client.put(message_body, delay=delay, ttr=self.visibility_timeout) + message.set_id(respone) + except: + if attempt < 3: + attempt += 1 + self.do_send_message(message, delay, attempt) + else: + raise Exception('Could not send message') + + def delete_message(self, message): + job = Job(self.beanstalk_client, message.get_id(), message.get_body()) + job.delete() + + def receive_message(self): + self.beanstalk_client.watch(self.get_queue_name()) + result = self.beanstalk_client.reserve(self.BEANSTALK_RECEIVE_MESSAGE_WAIT_TIME) + message = QueueMessage(result.jid, AbstractQueue.decode_message(result.body, self.encoding)) + + return message + + def get_queue_url(self): + return os.path.join(self.scheme + '://', 'beanstalkd', self.get_queue_name()) + + def change_message_visibility(self, message, visibility_timeout): + pass + + def validate_visibility_timeout(self): + if self.visibility_timeout > self.BEANSTALK_MAX_VISIBILITY_TIMEOUT: + raise Exception(f'visibility_timeout range 0 to ' + f'{self.BEANSTALK_MAX_VISIBILITY_TIMEOUT}, but received' + f' {self.visibility_timeout}') diff --git a/py_queue_factory/queue_factory.py b/py_queue_factory/queue_factory.py index fd0737e..d23dc7a 100644 --- a/py_queue_factory/queue_factory.py +++ b/py_queue_factory/queue_factory.py @@ -1,19 +1,26 @@ import urllib.parse as url_parse -from . import Sqs, SqsLocal +from . import Sqs, SqsLocal, Beanstalk + class QueueFactory: @staticmethod - def get_queue(queue_uri, host_url, subdomain): + def get_queue(queue_uri, host_url, subdomain, port=None): parts = url_parse.urlparse(queue_uri) + host = parts.hostname + host_parts = host.split('.', 2) if parts.scheme == 'https': - host = parts.hostname - host_parts = host.split('.', 2) if host_parts[0] == 'sqs' and host_parts[2] == 'amazonaws.com': return Sqs(queue_uri, host_url, subdomain) elif host_parts[0] == 'sqs' and host_parts[2] == 'awslocal': return SqsLocal(queue_uri, host_url, subdomain) else: raise Exception('Invalid Sqs URI') + elif parts.scheme == 'beanstalk': + if host_parts[0] == 'beanstalkd': + return Beanstalk(queue_uri, host_url, subdomain, port) \ + if port else Beanstalk(queue_uri, host_url, subdomain) + else: + raise Exception('Invalid Beanstalk URI') else: raise Exception('Unsupported URI scheme') diff --git a/py_queue_factory/sqs_queue.py b/py_queue_factory/sqs_queue.py index 87581e4..576854f 100644 --- a/py_queue_factory/sqs_queue.py +++ b/py_queue_factory/sqs_queue.py @@ -1,6 +1,4 @@ import boto3 -import json -import base64 import urllib.parse as url_parse from . import AbstractQueue, QueueMessage @@ -43,7 +41,7 @@ def do_send_message(self, message, delay, attempt=1): if delay > 900: delay = 900 try: - message_body = self.encode_mesage(message.get_body()) + message_body = AbstractQueue.encode_mesage(message.get_body(), self.encoding) respone = self.sqs_client.send_message( QueueUrl=self.get_queue_url(), MessageBody=message_body, @@ -91,7 +89,7 @@ def receive_message(self, attribute_names=[]): ) if 'Messages' in result: data = result['Messages'][0] - message_body = self.decode_message(data['Body']) + message_body = AbstractQueue.decode_message(data['Body'], self.encoding) message = QueueMessage(message_body, data['MessageId']) message.set_receipt_handle(data['ReceiptHandle']) message.set_attributes(data.get('Attributes', {})) @@ -100,24 +98,6 @@ def receive_message(self, attribute_names=[]): self.create_queue(self.get_queue_name()) return message - def encode_mesage(self, message_body): - if self.encoding == 'json': - message_body = json.dumps(message_body) - elif self.encoding == 'base64': - json_message = json.dumps(message_body).encode('utf-8') - message_body = base64.b64encode(json_message).decode('utf-8') - - return message_body - - def decode_message(self, message_body): - if self.encoding == 'json': - message_body = json.loads(message_body) - elif self.encoding == 'base64': - message_body = base64.b64decode(message_body.encode('utf-8')) - message_body = json.loads(message_body.decode('utf-8')) - - return message_body - def change_message_visibility(self, message, visibility_timeout): if visibility_timeout > self.SQS_MAX_VISIBILITY_TIMEOUT: visibility_timeout = self.SQS_MAX_VISIBILITY_TIMEOUT diff --git a/setup.py b/setup.py index d559dc9..ada9772 100644 --- a/setup.py +++ b/setup.py @@ -16,8 +16,9 @@ url='https://github.com/practo/py-queue-factory', packages=setuptools.find_packages(), install_requires=[ - 'boto3>=1.7.*', - ], + 'boto3>=1.7.*', + 'beanstalkc3 @git+https://github.com/practo/beanstalkc.git@v2.0.0', + ], classifiers=[ 'Development Status :: 3 - Alpha', 'Intended Audience :: Developers',