Skip to content

Commit

Permalink
tarantool transport added
Browse files Browse the repository at this point in the history
  • Loading branch information
ArtDu committed May 27, 2021
1 parent 87b451b commit b3d31e1
Show file tree
Hide file tree
Showing 6 changed files with 409 additions and 52 deletions.
109 changes: 57 additions & 52 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,35 @@ name: CI
on: [pull_request, push]
jobs:
#################### Unittests ####################
unittest:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.6,3.7,3.8]
steps:
- name: Install system packages
run: sudo apt update && sudo apt-get install libcurl4-openssl-dev libssl-dev
- name: Check out code from GitHub
uses: actions/[email protected]
- name: Set up Python ${{ matrix.python-version }}
id: python
uses: actions/[email protected]
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: pip install --upgrade pip setuptools wheel tox tox-docker
- name: Run unittest
run: tox -v -e ${{ matrix.python-version }}-linux-unit -- -v
#################### Integration tests ####################
# unittest:
# runs-on: ubuntu-latest
# strategy:
# matrix:
# python-version: [3.6,3.7,3.8]
# steps:
# - name: Install system packages
# run: sudo apt update && sudo apt-get install libcurl4-openssl-dev libssl-dev
# - name: Check out code from GitHub
# uses: actions/[email protected]
# - name: Set up Python ${{ matrix.python-version }}
# id: python
# uses: actions/[email protected]
# with:
# python-version: ${{ matrix.python-version }}
# - name: Install dependencies
# run: pip install --upgrade pip setuptools wheel tox tox-docker
# - name: Run unittest
# run: tox -v -e ${{ matrix.python-version }}-linux-unit -- -v
# #################### Integration tests ####################
integration:
needs: [unittest]
# needs: [unittest]
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.6,3.7,3.8,pypy3]
steps:
- name: Install system packages
run: sudo apt update && sudo apt-get install libcurl4-openssl-dev libssl-dev
run: sudo apt update && sudo apt-get install tarantool libcurl4-openssl-dev libssl-dev
- name: Check out code from GitHub
uses: actions/[email protected]
- name: Set up Python ${{ matrix.python-version }}
Expand All @@ -39,34 +39,39 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: pip install --upgrade pip setuptools wheel tox tox-docker
- name: Run AMQP integration tests
run: tox -v -e ${{ matrix.python-version }}-linux-integration-py-amqp -- -v
- name: Run redis integration tests
run: tox -v -e ${{ matrix.python-version }}-linux-integration-py-redis -- -v

run: pip install --upgrade pip setuptools tarantool pytest amqp wheel tox tox-docker
# - name: Run AMQP integration tests
# run: tox -v -e ${{ matrix.python-version }}-linux-integration-py-amqp -- -v
# - name: Run redis integration tests
# run: tox -v -e ${{ matrix.python-version }}-linux-integration-py-redis -- -v
- name: Run tarantool integration tests
run: |
tarantoolctl rocks install queue
tarantool init.lua&
python -m pytest -E tarantool t/integration
#################### Linters and checkers ####################
lint:
needs: [unittest, integration]
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8]
steps:
- name: Install system packages
run: sudo apt update && sudo apt-get install libcurl4-openssl-dev libssl-dev
- name: Check out code from GitHub
uses: actions/[email protected]
- name: Set up Python ${{ matrix.python-version }}
id: python
uses: actions/[email protected]
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: pip install --upgrade pip setuptools wheel tox tox-docker
- name: Run flake8
run: tox -v -e flake8 -- -v
- name: Run pydocstyle
run: tox -v -e pydocstyle -- -v
- name: Run apicheck
run: tox -v -e apicheck -- -v
# lint:
# needs: [unittest, integration]
# runs-on: ubuntu-latest
# strategy:
# matrix:
# python-version: [3.8]
# steps:
# - name: Install system packages
# run: sudo apt update && sudo apt-get install libcurl4-openssl-dev libssl-dev
# - name: Check out code from GitHub
# uses: actions/[email protected]
# - name: Set up Python ${{ matrix.python-version }}
# id: python
# uses: actions/[email protected]
# with:
# python-version: ${{ matrix.python-version }}
# - name: Install dependencies
# run: pip install --upgrade pip setuptools wheel tox tox-docker
# - name: Run flake8
# run: tox -v -e flake8 -- -v
# - name: Run pydocstyle
# run: tox -v -e pydocstyle -- -v
# - name: Run apicheck
# run: tox -v -e apicheck -- -v
3 changes: 3 additions & 0 deletions init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
box.cfg{ listen = 3301 }
queue = require('queue')
box.schema.user.grant('guest', 'super')
1 change: 1 addition & 0 deletions kombu/transport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def supports_librabbitmq():
'librabbitmq': 'kombu.transport.librabbitmq:Transport',
'memory': 'kombu.transport.memory:Transport',
'redis': 'kombu.transport.redis:Transport',
'tarantool': 'kombu.transport.tarantool:Transport',
'rediss': 'kombu.transport.redis:Transport',
'SQS': 'kombu.transport.SQS:Transport',
'sqs': 'kombu.transport.SQS:Transport',
Expand Down
156 changes: 156 additions & 0 deletions kombu/transport/tarantool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""Tarantool transport module for Kombu.
Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes
Connection String
=================
Connection string is in the following format:
.. code-block::
tarantool://TARANTOOL_ADDRESS[:PORT]
"""
import os
from queue import Empty

from . import base
from . import virtual
from ..utils import cached_property


try:
import tarantool
except ImportError: # pragma: no cover
tarantool = None # noqa

DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 3301

__author__ = 'Artyom Dubinin <[email protected]>'


class Channel(virtual.Channel):
"""Tarantool Channel."""

supports_fanout = True
# restore(release) doing on tarantool side
do_restore = False
auto_delete = set()

def __init__(self, connection, **kwargs):
super().__init__(connection, **kwargs)
vhost = self.connection.client.virtual_host
self._vhost = '/{}'.format(vhost.strip('/'))
if not self.client.connected:
raise

def _new_queue(self, queue, **kwargs):
if not self._has_queue(queue):
self._create_queue(queue)
self.client.eval(f"queue.tube.{queue}.auto_delete = ...", (kwargs['auto_delete']))

def _has_queue(self, queue, **kwarg):
return self.client.call("queue.tube", queue).data[0]

def _is_auto_delete(self, queue):
return self.client.eval(f"return queue.tube.{queue}.auto_delete").data[0]

def _create_queue(self, queue_name):
self.client.eval("queue.create_tube(...)", (queue_name, 'fifottl'))

# AttributeError: 'Channel' object has no attribute '_queue_bind'
def _queue_bind(self, *args):
pass

# it's not a virtual base function, but it was needed to support fanout messaging
def _put_fanout(self, exchange, message, routing_key=None, **kwargs):
for queue in self._lookup(exchange, routing_key):
self._put(queue, message)

def _put(self, queue, message, **kwargs):
ttl = 'nil'
if 'expiration' in message['properties']:
# kombu expiration in ms, but tarantool get seconds
ttl = int(message['properties']['expiration']) / 1000
return self.client.eval(
f'return queue.tube.{queue}:put(...,'
f'{{ pri = {self._get_message_priority(message)}, ttl = {ttl} }})', message)

def _get(self, queue, timeout=None):
res = self.client.call(f"queue.tube.{queue}:take", (1))
if not res.data:
raise Empty
task_id = res.data[0][0]
message = res.data[0][2]
if self._is_auto_delete(queue):
self.auto_delete.add(message['properties']['delivery_tag'])
self._ack(queue, task_id)
else:
message['properties']['delivery_info'].update({'queue': queue})
message['properties']['delivery_info'].update({'task_id': task_id})
return message

def _ack(self, queue, id):
self.client.call(f"queue.tube.{queue}:ack", id)

def basic_ack(self, delivery_tag, multiple=False):
if delivery_tag not in self.auto_delete:
queue = self.qos.get(delivery_tag).delivery_info['queue']
task_id = self.qos.get(delivery_tag).delivery_info['task_id']
self._ack(queue, task_id)
del self.qos._delivered[delivery_tag]

def _size(self, queue):
# TODO: only ready size returned, need count of all messages
if self._has_queue(queue):
stat = self.client.call('queue.statistics', queue)
return stat.data[0]['tasks']['ready']
return None

def _drop(self, queue):
self.client.call(f"queue.tube.{queue}:drop")

def _purge(self, queue):
if self._has_queue(queue):
self.client.call(f"queue.tube.{queue}:truncate")

def _open(self):
conninfo = self.connection.client
host = conninfo.hostname or DEFAULT_HOST
host_port = conninfo.port or DEFAULT_PORT
conn = tarantool.connect(host, host_port)
return conn

def close(self):
self.client.close()

@cached_property
def client(self):
return self._open()


class Transport(virtual.Transport):
"""In-memory Transport."""

Channel = Channel
polling_interval = 1
default_port = DEFAULT_PORT
driver_type = 'tarantool'
driver_name = 'tarantool'

def __init__(self, *args, **kwargs):
if tarantool is None:
raise ImportError('The tarantool library is not installed')

super().__init__(*args, **kwargs)

def driver_version(self):
return tarantool.__version__
86 changes: 86 additions & 0 deletions mylocust.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#
# run command: locust -f mylocust.py --host=localhost:8000
#
import datetime
import inspect
import time
import uuid

from kombu import Connection

from locust import User, TaskSet, task, events



def stopwatch(func):
def wrapper(*args, **kwargs):
# get task's function name
previous_frame = inspect.currentframe().f_back
_, _, task_name, _, _ = inspect.getframeinfo(previous_frame)

start = time.time()
result = None
try:
result = func(*args, **kwargs)
except Exception as e:
total = int((time.time() - start) * 1000)
events.request_failure.fire(request_type="TYPE",
name=task_name,
response_time=total,
response_length=0,
exception=e)
else:
total = int((time.time() - start) * 1000)
events.request_success.fire(request_type="TYPE",
name=task_name,
response_time=total,
response_length=0)
return result

return wrapper


class KombuClient:
@stopwatch
def test_simple_queue(self, simple_queue):
text = f'helloworld, sent at ' \
f'{datetime.datetime.today()}'
simple_queue.put(text)
message = simple_queue.get()
assert message.decode() == text
message.ack()


class ProtocolKombu(User):
abstract = True

def __init__(self, environment):
super().__init__(environment)
self.client = KombuClient()


class KombuTasks(TaskSet):
c = None
simple_queue = None

def on_start(self):
print('connect to broker')
self.c = Connection(transport='tarantool')
self.simple_queue = \
self.c.SimpleQueue('simple_queue_' +
str(uuid.uuid4()).replace('-', ''))

def on_stop(self):
print('close broker connection')
self.simple_queue.close()
self.c.close()

@task
def check_simple_queue(self):
self.client.test_simple_queue(self.simple_queue)


class Entrypoint(ProtocolKombu):
tasks = [KombuTasks]


Loading

0 comments on commit b3d31e1

Please sign in to comment.