Skip to content

Commit

Permalink
Make plugin transport ZMQ multiplex
Browse files Browse the repository at this point in the history
This feature will enable the bidirectional plugin transport and multiplexing.

Calling `send_action` within the plugin will route back to the same worker
from now on. It is implicit, developer does not need to specific worker reuse
or not.
This merge also add dynamic threading at ZMQTransport.
It solved the problem of workers exhaust problem.

refs SkygearIO/skygear-server#295
  • Loading branch information
rickmak committed May 5, 2017
2 parents c7b6405 + 4f6531b commit d5f3e4e
Show file tree
Hide file tree
Showing 8 changed files with 412 additions and 100 deletions.
4 changes: 3 additions & 1 deletion skygear/bin.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ def run_plugin(options):
log.info(
"Connecting to address %s" % options.skygear_address)
transport = ZmqTransport(options.skygear_address,
threading=options.zmq_thread_pool)
threading=options.zmq_thread_pool,
limit=options.zmq_thread_limit)
SkygearContainer.set_default_transport(transport)
transport.run()


Expand Down
28 changes: 19 additions & 9 deletions skygear/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import json
import logging

import requests
import strict_rfc3339

from . import error

log = logging.getLogger(__name__)


class PayloadEncoder(json.JSONEncoder):
Expand All @@ -28,9 +31,10 @@ def default(self, obj):


def send_action(url, payload, timeout=60):
log.error("skygear.container.send_action is deprecated.\n"
"Please use SkygearContainer().send_action instead.")
headers = {'Content-type': 'application/json',
'Accept': 'application/json'}

_data = json.dumps(payload, cls=PayloadEncoder)
return requests.post(url, data=_data, headers=headers, timeout=timeout) \
.json()
Expand All @@ -42,16 +46,21 @@ class SkygearContainer(object):
access_token = None
user_id = None
app_name = ''
transport = None

def __init__(self, endpoint=None, api_key=None, access_token=None,
user_id=None):
user_id=None, transport=None):
if endpoint:
self.endpoint = endpoint
if api_key:
self.api_key = api_key
if user_id:
self.user_id = user_id
self.access_token = access_token
if transport is None:
self.transport = SkygearContainer.transport
else:
self.transport = transport

def _request_url(self, action_name):
endpoint = self.endpoint
Expand Down Expand Up @@ -87,12 +96,13 @@ def set_default_endpoint(cls, endpoint):
def set_default_apikey(cls, api_key):
cls.api_key = api_key

@classmethod
def set_default_transport(cls, transport):
cls.transport = transport

def send_action(self, action_name, params, plugin_request=False,
timeout=60):
resp = send_action(self._request_url(action_name),
self._payload(action_name, params, plugin_request),
timeout=timeout)
if 'error' in resp:
raise error.SkygearException.from_dict(resp['error'])

url = self._request_url(action_name)
payload = self._payload(action_name, params, plugin_request)
resp = self.transport.send_action(action_name, payload, url, timeout)
return resp
5 changes: 5 additions & 0 deletions skygear/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ def add_plugin_arguments(ap: argparse.ArgumentParser):
default=4, type=int,
help='Number of thread in ZMQTransport thread pool',
env_var='ZMQ_THREAD_POOL')
ap.add_argument('--zmq-thread-limit', metavar='ZMQ_THREAD_LIMIT',
action='store',
default=10, type=int,
help='Max number of thread in ZMQTransport thread pool',
env_var='ZMQ_THREAD_LIMIT')
ap.add_argument('modules', nargs='*', default=[]) # env_var: LOAD_MODULES


Expand Down
7 changes: 4 additions & 3 deletions skygear/tests/test_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
import unittest
from unittest.mock import patch

from ..container import SkygearContainer, send_action
from ..container import SkygearContainer
from ..transmitter.http import send_action


class TestSendAction(unittest.TestCase):
@patch('skygear.container.requests', autospec=True)
@patch('skygear.transmitter.http.requests', autospec=True)
def test_send_str(self, mock_requests):
send_action('http://skygear.dev/', {
'key': 'string'
Expand All @@ -30,7 +31,7 @@ def test_send_str(self, mock_requests):
self.assertEqual(call[1][0], 'http://skygear.dev/')
self.assertEqual(call[2]['data'], '{"key": "string"}')

@patch('skygear.container.requests', autospec=True)
@patch('skygear.transmitter.http.requests', autospec=True)
def test_send_date(self, mock_requests):
dt = datetime.datetime(2014, 9, 27, 17, 40, 0,
tzinfo=datetime.timezone.utc)
Expand Down
3 changes: 2 additions & 1 deletion skygear/transmitter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class ZmqTransport(CommonTransport):
"""A dummy ZmqTransport class to raise a proper exception
"""

def __init__(self, addr, context=None, registry=None, threading=0):
def __init__(self, addr,
context=None, registry=None, threading=0, limit=10):
raise ImportError(
'zmq transport is not installed. '
'Please install via `pip install --upgrade skygear[zmq]`')
Expand Down
28 changes: 28 additions & 0 deletions skygear/transmitter/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,39 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import logging

import requests
import strict_rfc3339
from werkzeug.routing import Map, Rule
from werkzeug.serving import run_simple
from werkzeug.wrappers import Request, Response

from .. import error
from .common import CommonTransport
from .encoding import _serialize_exc

log = logging.getLogger(__name__)


class PayloadEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
ts = obj.timestamp()
return strict_rfc3339.timestamp_to_rfc3339_utcoffset(ts)


def send_action(url, payload, timeout=60):
headers = {'Content-type': 'application/json',
'Accept': 'application/json'}

_data = json.dumps(payload, cls=PayloadEncoder)
return requests.post(url, data=_data, headers=headers, timeout=timeout) \
.json()


class HttpTransport(CommonTransport):
"""
HttpTransport implements a transport protocol between skygear and
Expand Down Expand Up @@ -109,3 +129,11 @@ def run(self):
run_simple(self.hostname, self.port, self.dispatch,
threaded=True,
use_reloader=self.debug)

def send_action(self, action_name, payload, url, timeout):
resp = send_action(url,
payload,
timeout=timeout)
if 'error' in resp:
raise error.SkygearException.from_dict(resp['error'])
return resp
106 changes: 106 additions & 0 deletions skygear/transmitter/tests/test_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import threading
import time
import unittest

import zmq

from ...transmitter.zmq import (HEARTBEAT_INTERVAL, HEARTBEAT_LIVENESS,
PPP_HEARTBEAT, PPP_REQUEST, PPP_RESPONSE,
ZmqTransport)


Expand Down Expand Up @@ -77,6 +79,60 @@ def test_maintain_worker_count(self):
t.join()
context.destroy()

@unittest.mock.patch('skygear.transmitter.zmq.Worker.handle_message')
def test_spawn_new_worker(self, handle_message_mock):
def slow_response(*args, **kwargs):
time.sleep(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS * 2)
return b'{}'
handle_message_mock.side_effect = slow_response

context = zmq.Context()
t = threading.Thread(target=request_router,
args=(context,
'tcp://0.0.0.0:23456',
{'name': 'foo'}))
t.start()
transport = ZmqTransport('tcp://0.0.0.0:23456',
context=context,
threading=1)
transport.start()
transport_t = threading.Thread(
target=transport.maintain_workers_count)
transport_t.start()
time.sleep(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS * 2)
self.assertEqual(len(transport.threads), 2)
self.assertEqual(transport.threads_opened, 2)
handle_message_mock.assert_called_with(b'{"name": "foo"}', {
'bounce_count': 0,
'request_id': 'REQ-ID',
})
t.join()
transport.stop()
transport_t.join()
context.destroy()

def test_one_off_worker(self):
context = zmq.Context()
expected_body = {'method': 'POST', 'payload': {'key': 'value'}}
return_value = '{"result": {"response_key": "response_value"}}'
t = threading.Thread(target=response_router,
args=(context,
'tcp://0.0.0.0:23456',
expected_body,
return_value))
t.start()
transport = ZmqTransport('tcp://0.0.0.0:23456',
context=context,
threading=1)
transport.start()
result = transport.send_action('action_name', {'key': 'value'})
self.assertEqual(
result,
{'result': {'response_key': 'response_value'}}
)
t.join()
context.destroy()


def dead_router(context, addr, count):
"""
Expand All @@ -94,3 +150,53 @@ def dead_router(context, addr, count):
router.send_multipart(frames)
i = i + 1
router.close()


def response_router(context, addr, expected_body, response):
"""
This router will send predefined response body to the worker
"""
router = context.socket(zmq.ROUTER)
router.bind(addr)
while True:
router.poll()
frames = router.recv_multipart()
if len(frames) == 2:
router.send(PPP_HEARTBEAT)
continue
body = frames[7].decode('utf8')
assert expected_body == json.loads(body)

frames[3] = PPP_RESPONSE
frames[7] = response.encode('utf8')
router.send_multipart(frames)
router.close()
break


def request_router(context, addr, body):
"""
This router will send predefined request body to the worker
"""
router = context.socket(zmq.ROUTER)
router.bind(addr)
router.poll()
frames = router.recv_multipart()
router.send(PPP_HEARTBEAT)

address = frames[0]
frames = [
address,
address,
b'',
PPP_REQUEST,
b'0',
b'REQ-ID',
b'',
json.dumps(body).encode('utf8')
]
router.send_multipart(frames)

router.poll()
router.recv_multipart()
router.close()
Loading

0 comments on commit d5f3e4e

Please sign in to comment.