Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flush method that waits for in-flight requests #377

Merged
merged 1 commit into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions bugsnag/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,36 @@ def _leave_breadcrumb_for_event(self, event: Event) -> None:
BreadcrumbType.ERROR
)

def flush(self, timeout_ms: int) -> None:
# trigger session delivery as there may be outstanding sessions that
# haven't been sent yet
self.session_tracker.send_sessions()

stop_event = threading.Event()

def block_until_no_requests():
while (
self._request_tracker.has_in_flight_requests() or
self.session_tracker._request_tracker.has_in_flight_requests()
):
# wait 10ms before checking for in-flight requests again
was_stopped = stop_event.wait(0.01)

# stop checking and exit if the timeout has been exceeded
if was_stopped:
break

thread = threading.Thread(target=block_until_no_requests)
thread.start()
thread.join(timeout_ms / 1000)

if thread.is_alive():
# tell the thread to stop checking for in-flight requests as the
# timeout has been exceeded
stop_event.set()

raise Exception("flush timed out after %dms" % timeout_ms)


class ClientContext:
def __init__(self, client,
Expand Down
73 changes: 73 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
import re
import sys
import time
import pytest
import inspect
import logging
import threading
from datetime import datetime, timedelta, timezone
from unittest.mock import Mock, ANY
from tests import fixtures
Expand Down Expand Up @@ -1749,6 +1751,77 @@ def test_in_flight_event_request_tracking_in_notify_exc_info_failure(self):

assert not client._request_tracker.has_in_flight_requests()

def test_flush_returns_immediately_when_no_requests_are_outstanding(self):
start_s = time.time()
self.client.flush(10)
end_s = time.time()

assert end_s - start_s < 0.01

def test_flush_raises_if_timeout_is_exceeded(self):
delivery = QueueingDelivery()
configuration = Configuration()
configuration.configure(delivery=delivery, api_key='abc')

client = Client(configuration)
client.notify(Exception('oh dear'))

with pytest.raises(Exception) as exception:
client.flush(10)

assert str(exception) == 'Exception: flush timed out after 10ms'

def test_flush_waits_for_outstanding_events_before_returning(self):
delivery = QueueingDelivery()
configuration = Configuration()
configuration.configure(delivery=delivery, api_key='abc')

client = Client(configuration)
client.notify(Exception('oh dear'))
client.notify(Exception('oh no'))
client.notify(Exception('oh my'))

def flush_request_queue():
time.sleep(0.05)
assert client._request_tracker.has_in_flight_requests()

delivery.flush_request_queue()
assert not client._request_tracker.has_in_flight_requests()

thread = threading.Thread(target=flush_request_queue)
thread.start()

client.flush(100)

# the thread should have stopped before flush could exit
assert not thread.is_alive()

def test_flush_waits_for_outstanding_sessions_before_returning(self):
delivery = QueueingDelivery()
configuration = Configuration()
configuration.configure(delivery=delivery, api_key='abc')

client = Client(configuration)
client.session_tracker.start_session()
client.session_tracker.start_session()

def flush_request_queue():
request_tracker = client.session_tracker._request_tracker

time.sleep(0.05)
assert request_tracker.has_in_flight_requests()

delivery.flush_request_queue()
assert not request_tracker.has_in_flight_requests()

thread = threading.Thread(target=flush_request_queue)
thread.start()

client.flush(100)

# the thread should have stopped before flush could exit
assert not thread.is_alive()


@pytest.mark.parametrize("metadata,type", [
(1234, 'int'),
Expand Down
Loading