Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/3689 kafka fallback #2306

Closed
wants to merge 10 commits into from
24 changes: 24 additions & 0 deletions .github/workflows/sync_branch.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Sync Feature Branch with Develop

on:
pull_request:
types:
- opened
- synchronize

jobs:
sync-with-develop:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2

- name: Fetch and rebase develop
run: |
git fetch origin
git rebase origin/develop

- name: Push changes back to feature branch
run: |
git push origin HEAD
98 changes: 98 additions & 0 deletions doajtest/unit/test_kafka_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from redis import StrictRedis
from unittest.mock import patch, Mock, MagicMock
from doajtest.helpers import DoajTestCase
from portality.events.system_status_check import KafkaStatusCheck
import portality.events.kafka_producer as kafka_events
class TestKafkaStatusCheck(DoajTestCase):

def setUp(self):
super(TestKafkaStatusCheck, self).setUp()
self.kafka_status = KafkaStatusCheck()

@patch.object(StrictRedis, "get", return_value=b'True')
@patch.object(KafkaStatusCheck, "can_check_redis", return_value=True)
def test_active_status(self, mock_kafka_status_check, mock_strict_redis):
status = self.kafka_status.is_active()
self.assertTrue(status)
mock_kafka_status_check.assert_called_once()
mock_strict_redis.assert_called_once()

@patch.object(StrictRedis, "get", return_value=b'False')
@patch.object(KafkaStatusCheck, "can_check_redis", return_value=True)
def test_inactive_status(self, mock_kafka_status_check, mock_strict_redis):
status = self.kafka_status.is_active()
self.assertFalse(status)
mock_kafka_status_check.assert_called_once()
mock_strict_redis.assert_called_once()

class TestKafkaProducer(DoajTestCase):

# Test for handle_exception
@patch('portality.core.app.logger.exception')
@patch('portality.app_email.send_mail')
def test_handle_exception(self, mock_send_mail, mock_logger_exception):
error_msg = "Sample error"
exception = Exception("Sample exception")
kafka_events.handle_exception(error_msg, exception)

mock_logger_exception.assert_called_once_with(error_msg + str(exception))
mock_send_mail.assert_called_once()

# Test for kafka_producer when producer is None and no exception raised
@patch('kafka.KafkaProducer')
def test_kafka_producer_new(self, mock_kafka_producer):
kafka_events.producer = None
result = kafka_events.kafka_producer()

self.assertIsNotNone(result)
mock_kafka_producer.assert_called_once()

# Test for kafka_producer when producer is already set
def test_kafka_producer_existing(self):
kafka_events.producer = Mock()
result = kafka_events.kafka_producer()

self.assertEqual(result, kafka_events.producer)

# Test for kafka_producer when exception raised
@patch('kafka.KafkaProducer', side_effect=Exception("Kafka error"))
@patch('portality.events.kafka_producer.handle_exception')
def test_kafka_producer_exception(self, mock_handle_exception, _):
kafka_events.producer = None
result = kafka_events.kafka_producer()

self.assertIsNone(result)
mock_handle_exception.assert_called_once()

# Test for send_event when kafka_status is None
@patch('portality.events.kafka_producer.shortcircuit_send_event')
def test_send_event_status_none(self, mock_shortcircuit_send_event):
kafka_events.kafka_status = None

kafka_events.send_event(Mock())
mock_shortcircuit_send_event.assert_called_once()

# Test for send_event when everything is operational
@patch.object(KafkaStatusCheck, 'is_active', return_value=True)
@patch('portality.events.kafka_producer.kafka_producer', return_value=Mock())
@patch('portality.events.shortcircuit')
def test_send_event_operational(self, mock_shortcircuit, _, __):
kafka_events.kafka_status = KafkaStatusCheck()

kafka_events.send_event(Mock())
mock_shortcircuit.assert_not_called()

# Test for send_event when exception occurs
@patch('kafka.KafkaProducer', return_value=Mock(send=MagicMock(side_effect=Exception("Send error"))))
@patch.object(KafkaStatusCheck, 'set_kafka_inactive_redis')
@patch.object(KafkaStatusCheck, 'is_active', return_value=True)
@patch('portality.events.kafka_producer.shortcircuit_send_event')
@patch('portality.events.kafka_producer.handle_exception')
@patch('portality.events.kafka_producer.producer', new=None)
def test_send_event_exception(self, __, mock_handle_exception, mock_shortcircuit, _, mock_kafka_producer):
kafka_events.kafka_status = KafkaStatusCheck()

kafka_events.send_event(Mock())
mock_handle_exception.assert_called()
mock_shortcircuit.assert_called_once()
mock_kafka_producer.assert_called_once()
3 changes: 3 additions & 0 deletions portality/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@
BG_STATUS_STABLE = 'stable'
BG_STATUS_UNSTABLE = 'unstable'

# Redis Keys Constants
KAFKA_ACTIVE_STATUS = 'KAFKA_ACTIVE_STATUS'


class ConstantList:
@classmethod
Expand Down
3 changes: 3 additions & 0 deletions portality/events/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from portality.app import app as doajapp
from portality.bll import DOAJ
from portality import util
from portality.models import Event

broker = doajapp.config.get("KAFKA_BROKER")
Expand All @@ -12,6 +13,7 @@
topic = app.topic(topic_name)

event_counter = 0
event_logger = util.custom_timed_rotating_logger('consumer_log.log')


@app.agent(topic)
Expand All @@ -20,6 +22,7 @@ async def handle_event(stream):
with doajapp.test_request_context("/"):
svc = DOAJ.eventsService()
async for event in stream:
event_logger.info(event)
event_counter += 1
doajapp.logger.info(f"Kafka event count {event_counter}")
# TODO uncomment the following line once the Event model is fixed to Kafka
Expand Down
50 changes: 46 additions & 4 deletions portality/events/kafka_producer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,54 @@
import json
from kafka import KafkaProducer
import kafka
from portality.core import app
from portality import app_email, util
from portality.events.shortcircuit import send_event as shortcircuit_send_event
from portality.events.system_status_check import KafkaStatusCheck

from portality.core import app as doajapp
bootstrap_server = doajapp.config.get("KAFKA_BOOTSTRAP_SERVER")

producer = KafkaProducer(bootstrap_servers=bootstrap_server, value_serializer=lambda v: json.dumps(v).encode('utf-8'))
def handle_exception(error_msg, exception):
app.logger.exception(error_msg + str(exception))
app_email.send_mail(
to=[app.config.get('ADMIN_EMAIL', '[email protected]')],
fro=app.config.get('SYSTEM_EMAIL_FROM', '[email protected]'),
subject='Alert: DOAJ Kafka send event error',
msg_body=error_msg + ": \n" + str(exception)
)

producer = None
event_logger = util.custom_timed_rotating_logger('producer_log.log')

def kafka_producer():
global producer
try:
if producer is None:
producer = kafka.KafkaProducer(bootstrap_servers=bootstrap_server, value_serializer=lambda v: json.dumps(v).encode('utf-8'))
except Exception as exp:
producer = None
handle_exception("Error in setting up kafka connection", exp)
return producer

try:
kafka_status = KafkaStatusCheck()
except Exception as exp:
kafka_status = None
handle_exception("Error in setting up Redis for kafka events", exp)


def send_event(event):
future = producer.send('events', value=event.serialise())
future.get(timeout=60)
try:
event_logger.info(event.data)
if kafka_status and kafka_status.is_active() and kafka_producer():
future = producer.send('events', value=event.serialise())
future.get(timeout=60)
else:
shortcircuit_send_event(event)
except Exception as e:
try:
kafka_status.set_kafka_inactive_redis()
except Exception as exp:
handle_exception("Failed to set kafka inactive status in Redis", exp)
shortcircuit_send_event(event)
handle_exception("Failed to send event to Kafka.", e)
3 changes: 3 additions & 0 deletions portality/events/shortcircuit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from portality.bll import DOAJ
from portality import util

event_logger = util.custom_timed_rotating_logger('shortcircuit_log.log')

def send_event(event):
event_logger.info(event.data)
svc = DOAJ.eventsService()
svc.consume(event)
45 changes: 45 additions & 0 deletions portality/events/system_status_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import redis
from portality import constants
from portality.core import app
from portality.lib import dates

class KafkaStatusCheck:
"""
useful to set Kafka status to false when there is any issue connecting to Kafka.
If kafka status set to false in Redis, it has to be set to true manually in Radis after fixing the issue.
"""

def __init__(self):
self.is_kafka_active = True
self.time_gap = app.config['TIME_GAP_REDIS_KAFKA']
self.last_time = dates.now()
redis_host = app.config['HUEY_REDIS_HOST']
redis_port = app.config['HUEY_REDIS_PORT']
self.redis_conn = redis.StrictRedis(host=redis_host, port=redis_port, db=0)

def is_active(self):
if self.can_check_redis():
self.is_kafka_active = self.is_kafka_active_redis()
return self.is_kafka_active

def can_check_redis(self):
time_diff = dates.now() - self.last_time
if time_diff.seconds > self.time_gap:
self.last_time = dates.now()
return True

return False

def is_kafka_active_redis(self):
value = self.redis_conn.get(constants.KAFKA_ACTIVE_STATUS)
# set the key value if not set
if value is None:
self.set_default_kafka_status()
return True
return value.decode('utf-8').lower() == "true"

def set_default_kafka_status(self):
self.redis_conn.set(constants.KAFKA_ACTIVE_STATUS, "true")

def set_kafka_inactive_redis(self):
self.redis_conn.set(constants.KAFKA_ACTIVE_STATUS, "false")
2 changes: 2 additions & 0 deletions portality/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@
KAFKA_BROKER = "kafka://localhost:9092"
KAFKA_EVENTS_TOPIC = "events"
KAFKA_BOOTSTRAP_SERVER = "localhost:9092"
# Time gap interval in seconds to query redis server
TIME_GAP_REDIS_KAFKA = 60 * 60

###########################################
# Read Only Mode
Expand Down
31 changes: 30 additions & 1 deletion portality/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,35 @@ def get_full_url_safe(endpoint):
app.logger.warning(f'endpoint not found -- [{endpoint}]')
return None


def no_op(*args, **kwargs):
""" noop (no operation) function """
pass
pass


def custom_timed_rotating_logger(file_name):
"""Custom Logger to log to specified file name"""
import os
import logging
from logging.handlers import TimedRotatingFileHandler
# Create a logger
logger = logging.getLogger(os.path.splitext(file_name)[0])
logger.setLevel(logging.DEBUG) # Set the logging level

# Get the user's home directory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably best suited to the appdata folder - currently in ~/appdata/doaj/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated log dir to ~/appdata/ doaj at log_dir = os.path.join(user_home, 'appdata', 'doaj')

user_home = os.path.expanduser("~")
log_dir = os.path.join(user_home, 'appdata', 'doaj')
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_filename = os.path.join(log_dir, file_name)
# Rotate every day. Keep 30 days worth of backup.
handler = TimedRotatingFileHandler(log_filename, when="D", interval=1, backupCount=30)

# Create a formatter and add it to the handler
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

# Add the handler to the logger
logger.addHandler(handler)

return logger
Loading