Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable asynchronous Arkouda client requests #2008

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ This yielded a >20TB dataframe in Arkouda.
8. [Versioning](#versioning-ak)
9. [External Systems Integration](#external-integration)
10. [Metrics](#metrics)
11. [Contributing](#contrib-ak)
11. [Asynchronous Client](#async_client)
12. [Contributing](#contrib-ak)

<a id="prereqs"></a>
## Prerequisites <sup><sup><sub><a href="#toc">toc</a></sub></sup></sup>
Expand Down Expand Up @@ -402,6 +403,29 @@ Integrating Arkouda with cloud environments enables users to access Arkouda from

Arkouda provides a separate, dedicated zmq socket to enable generation and export of a variety of system, locale, user, and request metrics. Arkouda generated metrics in a format compatible with Prometheus, Grafana, and TimescaleDB. An Arkouda Prometheus exporter that serves as a Prometheus scrape target will be made available soon in the [arkouda-contrib](https://github.com/Bears-R-Us/arkouda-contrib) repository. A detailed discussion of Arkouda metrics is located in [METRICS.md](METRICS.md)

<a id="async_client"></a>
## Asynchronous Client

### Background

Arkouda has an alpha capability for enabling asynchronous client-server communications that provides feedback to users that a request has been submitted and is being processed within the Arkouda server. The initial asynchronous request capability supports multiuser Arkouda use cases where users may experience delays when the Arkouda server is processing requests by 1..n other users.

### Configuration

To enable asynchronous client communications, set the ARKOUDA_REQUEST_MODE environment variable as follows:

```
export ARKOUDA_REQUEST_MODE=ASYNC
```

### Exiting the Python shell

As of 01022023, exiting the Python shell in ASYNC request mode requires the following command:

```
ak.exit()
```

Comment on lines +406 to +428
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this information something we would want in GitHub pages? I know that the updates to our documentation are still in progress, but I think this may be something beneficial to broadcast to a larger audience as its deployed.

Not 100% sure what the best place to put it would be, but I am thinking the top level for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ethan-DeBandi99 yeah, good point, should be at top-level

<a id="contrib-ak"></a>
## Contributing to Arkouda <sup><sup><sub><a href="#toc">toc</a></sub></sup></sup>

Expand Down
234 changes: 226 additions & 8 deletions arkouda/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
import json
import os
import threading
import warnings
from asyncio.exceptions import CancelledError
from enum import Enum
from typing import Dict, List, Mapping, Optional, Tuple, Union, cast

Expand All @@ -27,6 +30,8 @@
"get_server_commands",
"print_server_commands",
"ruok",
"exit",
"async_connect",
]

# stuff for zmq connection
Expand Down Expand Up @@ -84,6 +89,29 @@ def _mem_get_factor(unit: str) -> int:
clientLogger = getArkoudaLogger(name="Arkouda User Logger", logFormat="%(message)s")


class RequestMode(Enum):
"""
The RequestMode enum provides controlled vocabulary indicating whether the
Arkouda client is submitting a request via asyncio (ASYNC) or via standard,
synchronous (SYNC) flow
"""

ASYNC = "ASYNC"
SYNC = "SYNC"

def __str__(self) -> str:
"""
Overridden method returns value.
"""
return self.value

def __repr__(self) -> str:
"""
Overridden method returns value.
"""
return self.value


class ClientMode(Enum):
"""
The ClientMode enum provides controlled vocabulary indicating whether the
Expand All @@ -107,11 +135,13 @@ def __repr__(self) -> str:
return self.value


requestMode = RequestMode(os.getenv("ARKOUDA_REQUEST_MODE", 'SYNC').upper())

# Get ClientMode, defaulting to UI
mode = ClientMode(os.getenv("ARKOUDA_CLIENT_MODE", "UI").upper())
clientMode = ClientMode(os.getenv("ARKOUDA_CLIENT_MODE", "UI").upper())

# Print splash message if in UI mode
if mode == ClientMode.UI:
# Print splash message if in UI clientMode
if clientMode == ClientMode.UI:
print("{}".format(pyfiglet.figlet_format("Arkouda")))
print(f"Client Version: {__version__}") # type: ignore

Expand All @@ -132,8 +162,146 @@ def set_defaults() -> None:
maxTransferBytes = maxTransferBytesDefVal


# create context, request end of socket, and connect to it
def get_event_loop() -> asyncio.AbstractEventLoop:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop


def async_connect(
server: str = "localhost",
port: int = 5555,
timeout: int = 0,
access_token: str = None,
connect_url: str = None,
loop: Optional[asyncio.AbstractEventLoop] = None
) -> None:
if not loop:
loop = get_event_loop()
loop.run_in_executor(None,
_connect,
server,
port,
timeout,
access_token,
connect_url)


async def _run_async_connect(
loop: asyncio.AbstractEventLoop,
server: str = "localhost",
port: int = 5555,
timeout: int = 0,
access_token: str = None,
connect_url: str = None
) -> asyncio.Future:
try:
if not loop:
loop = get_event_loop()
future = loop.run_in_executor(None,
_connect,
server,
port,
timeout,
access_token,
connect_url)
while not future.done() and not connected:
clientLogger.info("connecting...")
await asyncio.sleep(5)
except CancelledError:
future = asyncio.Future()
future.set_result('cancelled') # type: ignore
future.cancel()
import threading
for t in threading.enumerate():
if 'asyncio' in t.name:
t.join(0)
except KeyboardInterrupt:
future = asyncio.Future()
future.set_result('interrupted') # type: ignore
future.cancel()
finally:
return future


def exit():
os._exit(0)


def connect(
server: str = "localhost",
port: int = 5555,
timeout: int = 0,
access_token: str = None,
connect_url: str = None,
) -> None:
"""
Connect to a running arkouda server.

Parameters
----------
server : str, optional
The hostname of the server (must be visible to the current
machine). Defaults to `localhost`.
port : int, optional
The port of the server. Defaults to 5555.
timeout : int, optional
The timeout in seconds for client send and receive operations.
Defaults to 0 seconds, whicn is interpreted as no timeout.
access_token : str, optional
The token used to connect to an existing socket to enable access to
an Arkouda server where authentication is enabled. Defaults to None.
connect_url : str, optional
The complete url in the format of tcp://server:port?token=<token_value>
where the token is optional

Returns
-------
None

Raises
------
ConnectionError
Raised if there's an error in connecting to the Arkouda server
ValueError
Raised if there's an error in parsing the connect_url parameter
RuntimeError
Raised if there is a server-side error

Notes
-----
On success, prints the connected address, as seen by the server. If called
with an existing connection, the socket will be re-initialized.
"""
if requestMode == RequestMode.ASYNC:
try:
loop = get_event_loop()

task = loop.create_task(_run_async_connect(loop,
server,
port,
timeout,
access_token,
connect_url))
loop.run_until_complete(task)
except KeyboardInterrupt:
task.cancel()
loop.run_until_complete(loop.create_task(cancel_task()))
else:
_connect(server,
port,
timeout,
access_token,
connect_url)


async def cancel_task() -> None:
await asyncio.sleep(0)
logger.debug('task cancelled')


# create context, request end of socket, and connect to it
def _connect(
server: str = "localhost",
port: int = 5555,
timeout: int = 0,
Expand Down Expand Up @@ -394,9 +562,50 @@ def _start_tunnel(addr: str, tunnel_server: str) -> Tuple[str, object]:
raise ConnectionError(e)


def _send_string_message(
cmd: str, recv_binary: bool = False, args: str = None, size: int = -1
) -> Union[str, memoryview]:
async def _async_send_string_message(message: RequestMessage, loop=None) -> Union[str, memoryview]:
try:
if not loop:
loop = get_event_loop()
future = loop.run_in_executor(None, socket.send_string, json.dumps(message.asdict()))
while not future.done():
clientLogger.info(f"{message.cmd} request sent...")
await asyncio.sleep(5)
logger.debug(f'future result {future.result()}')
except CancelledError:
future.set_result('cancelled')
future.cancel()
for t in threading.enumerate():
if 'asyncio' in t.name:
t.join(0)
except KeyboardInterrupt:
future.set_result('interrupted')
future.cancel()
finally:
return future


def _execute_async_send(message: RequestMessage, loop=None):
try:
if not loop:
loop = get_event_loop()

task = loop.create_task(_async_send_string_message(message, loop))
if not task:
logger.error('no task')

loop.run_until_complete(task)
logger.debug(f'task done? {task.done()}')
logger.debug(f'task result {task.result()}')
except KeyboardInterrupt:
logger.debug('interrupted')
task.cancel()
loop.run_until_complete(loop.create_task(cancel_task()))
logger.debug(f'is task done {task.done()}')


def _send_string_message(cmd: str,
recv_binary: bool = False,
args: str = None, size: int = -1) -> Union[str, memoryview]:
"""
Generates a RequestMessage encapsulating command and requesting
user information, sends it to the Arkouda server, and returns
Expand Down Expand Up @@ -438,7 +647,16 @@ def _send_string_message(

logger.debug(f"sending message {message}")

socket.send_string(json.dumps(message.asdict()))
loop = get_event_loop()

def asyncEligible(cmd: str) -> bool:
return cmd not in {'delete', 'connect', 'getconfig'}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving forward, I am assuming we will have more commands if not all supported. Would we need to list every command here or would this be removed if we are supporting all commands?

Copy link
Member

@stress-tess stress-tess Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong but I think this is saying every command except delete, connect, and getconfig is async eligible (i.e. will return True)

This is supported by the fact that one of the mp4s has asynchronous ak.ones which is not in this set of commands

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ethan-DeBandi99 ah, sorry, as @pierce314159 this checks to see if a method can be run in async mode. The reasons for the three are as follows:

  1. delete--since delete invokes del on pdarray, this cannot be run async b/c del is also invoked on Arkouda shell exit() and fails due to the resulting asyncio failing due to modules being deleted
  2. connect--this is because Python client connect method is itself async-enabled
  3. getconfig--this is invoked within the async Python client connect method, which, again, is itself async-enabled


if requestMode == RequestMode.ASYNC and asyncEligible(cmd):
_execute_async_send(message, loop)

else:
socket.send_string(json.dumps(message.asdict()))

if recv_binary:
frame = socket.recv(copy=False)
Expand Down
4 changes: 3 additions & 1 deletion arkouda/pdarrayclass.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import builtins
import contextlib
from typing import List, Sequence, Union, cast

import numpy as np # type: ignore
Expand Down Expand Up @@ -184,7 +185,8 @@ def __init__(
self.shape = shape
self.itemsize = itemsize

def __del__(self):
@contextlib.contextmanager
def __del__(self, generic_msg=generic_msg):
try:
logger.debug(f"deleting pdarray with name {self.name}")
generic_msg(cmd="delete", args={"name": self.name})
Expand Down