Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for a deadline on clients #69

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@
namespace lisp {
namespace lisp_grpc {

// Creates a gpr_timespec equal to the 'time_s' in seconds.
// This gpr_timespec can then be used as a deadline for a grpc call.
//
// Initial implmentation pulled from grpc tests:
// https://github.com/grpc/grpc/blob/9a606ec6180812fbb2582646b1aeab1dfeb13475/test/core/test_util/test_config.cc#L78
gpr_timespec grpc_timeout_seconds_to_deadline(size_t time_s) {
return gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis(static_cast<size_t>(1e3) * time_s, GPR_TIMESPAN));
}

extern "C" {

// Creates a grpc_call* given a 'channel', which manages the
Expand All @@ -30,11 +41,19 @@ extern "C" {
// grpc_completion_queue_pluck or grpc_completion_queue_next is called.
grpc_call* lisp_grpc_channel_create_call(grpc_channel* channel,
const char* call_name,
grpc_completion_queue* cq) {
grpc_completion_queue* cq,
size_t* deadline_seconds) {
gpr_timespec send_deadline;
if (deadline_seconds != nullptr) {
send_deadline = grpc_timeout_seconds_to_deadline(*deadline_seconds);
} else {
send_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
}

return grpc_channel_create_call(
channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
grpc_slice_from_copied_string(call_name), nullptr,
gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
send_deadline, nullptr);
}

// Prepares ops for completion queue pluck/next
Expand Down
17 changes: 12 additions & 5 deletions client.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,19 @@ additional args ARGS for client information. If CREDS is passed then a secure
channel will be created using CREDS else an insecure channel will be used."
(c-grpc-client-new-channel creds target args))

(defun service-method-call (channel call-name cq)
(defun service-method-call (channel call-name cq &optional (deadline-seconds *call-deadline*))
"A wrapper to create a grpc_call pointer that will be used to call CALL-NAME
on the CHANNEL provided and store the result in the completion queue CQ."
(cffi:foreign-funcall "lisp_grpc_channel_create_call"
:pointer channel :string call-name :pointer cq
:pointer))
on the CHANNEL provided and store the result in the completion queue CQ. A
deadline can be set with DEADLINE-SECONDS."
(let ((deadline-pointer (if deadline-seconds
(cffi:foreign-alloc :size :initial-element deadline-seconds)
(cffi:null-pointer))))
(prog1
(cffi:foreign-funcall "lisp_grpc_channel_create_call"
:pointer channel :string call-name :pointer cq :pointer deadline-pointer
:pointer)
(unless (cffi:null-pointer-p deadline-pointer)
(cffi:foreign-free deadline-pointer)))))


;; Auxiliary Functions
Expand Down
3 changes: 2 additions & 1 deletion grpc.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
#:with-insecure-channel
#:with-ssl-channel
#:grpc-call
#:check-server-status))
#:check-server-status
#:*call-deadline*))
3 changes: 3 additions & 0 deletions shared.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
(defvar *completion-queue* nil "The global completion queue used to
manage grpc calls.")

(defvar *call-deadline* nil
"The deadline for a grpc call. The default, nil, specifies an infinite deadline")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is something that should be a defvar, more something you specify, and the user can make a defvar and set in their code...


;; gRPC Enums
(cffi:defcenum grpc-security-level
"Security levels of grpc transport security. It represents an inherent
Expand Down
58 changes: 56 additions & 2 deletions tests/integration-test.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ Parameters
:utf-8))
:action
(lambda (message call)
(declare (ignore call))
(format t "~% response: ~A ~%" message)
(format t "~% call: ~A ~%" call)
(format t " response: ~A ~%" message)
(concatenate 'string
message
" Back"))))
Expand Down Expand Up @@ -75,3 +75,57 @@ Parameters
(assert-true (string= actual-client-response expected-client-response))
(bordeaux-threads:join-thread thread)))))
(grpc:shutdown-grpc))

(deftest test-client-server-deadline-can-set (server-suite)
"TODO(michaeldelago): The server should receive the deadline and be able to confirm that it's set"
(grpc:init-grpc)
(unwind-protect
(let* ((expected-client-response "Hello World Back")
(hostname "localhost")
(method-name "xyz")
(port-number 8000)
(sem (bordeaux-threads:make-semaphore))
(thread (bordeaux-threads:make-thread
(lambda () (run-server sem hostname method-name
port-number)))))
(bordeaux-threads:wait-on-semaphore sem)
(grpc:with-insecure-channel
(channel
(concatenate 'string hostname ":" (write-to-string port-number)))
(let* ((grpc::*call-deadline* 3)
(message "Hello World")
(response (grpc:grpc-call channel method-name
(flexi-streams:string-to-octets message)
nil nil))
(actual-client-response (flexi-streams:octets-to-string
(car response))))
(assert-true (string= actual-client-response expected-client-response))
(bordeaux-threads:join-thread thread)))))
(grpc:shutdown-grpc))

(deftest test-client-server-deadline-can-fail (server-suite)
"TODO(michaeldelago): The server should return an error if the deadline is too high"
(grpc:init-grpc)
(unwind-protect
(let* ((expected-client-response "Hello World Back")
(hostname "localhost")
(method-name "xyz")
(port-number 8000)
(sem (bordeaux-threads:make-semaphore))
(thread (bordeaux-threads:make-thread
(lambda () (run-server sem hostname method-name
port-number)))))
(bordeaux-threads:wait-on-semaphore sem)
(grpc:with-insecure-channel
(channel
(concatenate 'string hostname ":" (write-to-string port-number)))
(let* ((grpc::*call-deadline* 3)
(message "Hello World")
(response (grpc:grpc-call channel method-name
(flexi-streams:string-to-octets message)
nil nil))
(actual-client-response (flexi-streams:octets-to-string
(car response))))
(assert-true (string= actual-client-response expected-client-response))
(bordeaux-threads:join-thread thread)))))
(grpc:shutdown-grpc))
Loading