Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify Message (optional parse_headers) and UnicastPrefixes (optional validation and required_fields) #10

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/log_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def process_message(msg):

elif t == "openbmp.parsed.unicast_prefix":
unicast_prefix = UnicastPrefix(m)

# Optional fixture writer
# with open('unicast_prefixes_message', "w+") as file:
# file.write(msg.value + '\n')

print '\n' + 'Received Message (' + t_stamp + ') : ' + m_tag + '(V: ' + str(m.version) + ')'
print unicast_prefix.to_json_pretty()

Expand Down
62 changes: 47 additions & 15 deletions src/openbmp/api/parsed/message/Base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from abc import ABCMeta, abstractmethod
import json
import re


class Base(object):
Expand All @@ -24,6 +25,24 @@ class Base(object):

__metaclass__ = ABCMeta

@staticmethod
def isplit(string, delimiter=None):
"""
Like string.split but returns an iterator (lazy and works a little bit faster)
"""
if delimiter is None:
# Handle whitespace by default
delim = r"\s"

elif len(delimiter) != 1:
raise ValueError("Can only handle single character delimiters", delimiter)

else:
# Escape, incase it's "\", "*" etc.
delim = re.escape(delimiter)

return (x.group(0) for x in re.finditer(r"[^{}]+".format(delim), string))

def __init__(self):
"""Initializes the class variables."""
# Default message bus specification version (max) supported
Expand Down Expand Up @@ -55,13 +74,18 @@ def get_row_map(self):
"""
return self.row_map

def parse(self, version, data):
def parse(self, version, data, validate=True, required_fields=None):
"""
Parse TSV rows of data from message

:param version: Float representation of maximum message bus specification version supported.
See http://openbmp.org/#!docs/MESSAGE_BUS_API.md for more details.
:param data: TSV data (MUST not include the headers)
:param validate: If required to validate every field with its corresponding processor
:param required_fields: If needed to parse only feq fields ans speed up parsing.
Example: {10: 'prefix', 11: "prefix_len"} where:
"10" and "11" - positions of fields in MESSAGE_BUS_API,
"prefix" and "prefix_len" - name of parsed fields in resulting dictionary.

:return: True if error, False if no errors
"""
Expand All @@ -76,20 +100,28 @@ def parse(self, version, data):
if len(self.header_names) == 0:
raise Exception("header_names should be overriden.")

records = data.splitlines()

# # Splits each record into fields.
for r in records:
fields = r.split('\t') # Fields of a record as array.

# # # Process and validate each field with its corresponding processor.
if len(fields) >= len(self.processors):
for i,processor in enumerate(self.processors):
fields[i] = processor.process_value(fields[i])

fields_dict = dict(zip(self.header_names, fields))

self.row_map.append(fields_dict)
# Splits each record into fields.
for record in Base.isplit(data, "\n"):
fields = record.split('\t') # Fields of a record as array.

fields_map = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like your conflict merge reverted changes that should be here. Please check your merge conflict changes and correct here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TimEvens isn't it what I moved to lines 117-122? I don't see anything missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it's tested in "tests/test_unicast_prefix.py"


if required_fields:
for key in required_fields:
if validate:
processor_class = self.get_processors()[key]
fields_map[required_fields[key]] = processor_class.process_value(fields[key])
else:
fields_map[required_fields[key]] = fields[key]
else:
if len(fields) >= len(self.processors):
fields_map = dict(zip(self.header_names, fields))
if validate:
# Process and validate each field with its corresponding processor.
for (f, p, h) in zip(fields, self.get_processors(), self.header_names):
fields_map[h] = p.process_value(f)

self.row_map.append(fields_map)

def to_json(self):
"""
Expand Down
59 changes: 33 additions & 26 deletions src/openbmp/api/parsed/message/Message.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ class Message(object):
TYPE_PEER = "PEER"
TYPE_ROUTER = "ROUTER"

def __init__(self, data=None):
def __init__(self, data=None, parse_headers=True):
"""
Handle the message by parsing header of it.

:param data: Raw Kafka message as string.
:param parse_headers: If headers parsing is required. May be disabled to speed up.
"""

if data and not data.strip(): # If "data" is not string, throws error.
Expand All @@ -35,49 +36,55 @@ def __init__(self, data=None):
self.content_pos = int()
self.router_ip = str()

self.__parse(data, parse_headers)

if data:
self.__parse(data)
self.__parse(data, parse_headers)

def __parse(self, data):
def __parse(self, data, parse_headers=True):
"""
Parses header of raw Kafka messages and set the version, length, number of records and router hash id.

:param data: Raw Kafka message as string.
:param parse_headers: If headers parsing is required. May be disabled to speed up.
"""

data_end_pos = data.find("\n\n")
header_data = data[:data_end_pos]
if parse_headers:
data_end_pos = data.find("\n\n")
header_data = data[:data_end_pos]

self.content_pos = data_end_pos + 2
self.content = data[self.content_pos:]
self.content_pos = data_end_pos + 2
self.content = data[self.content_pos:]

headers = header_data.split("\n")
headers = header_data.split("\n")

for header in headers:
value = header.split(":")[1].strip()
attr = header.split(":")[0].strip()
for header in headers:
value = header.split(":")[1].strip()
attr = header.split(":")[0].strip()

# Attribute names are from http://openbmp.org/#!docs/MESSAGE_BUS_API.md headers
if attr == "V":
self.version = float(value)
# Attribute names are from http://openbmp.org/#!docs/MESSAGE_BUS_API.md headers
if attr == "V":
self.version = float(value)

elif attr == "C_HASH_ID":
self.collector_hash_id = value
elif attr == "C_HASH_ID":
self.collector_hash_id = value

elif attr == "T":
self.type = value
elif attr == "T":
self.type = value

elif attr == "L":
self.length = long(value)
elif attr == "L":
self.length = long(value)

elif attr == "R":
self.records = long(value)
elif attr == "R":
self.records = long(value)

elif attr == "R_HASH_ID":
self.router_hash_id = value
elif attr == "R_HASH_ID":
self.router_hash_id = value

elif attr == "R_IP":
self.router_ip = value
elif attr == "R_IP":
self.router_ip = value
else:
self.content = data

def get_version(self):
return self.version
Expand Down
10 changes: 8 additions & 2 deletions src/openbmp/api/parsed/message/UnicastPrefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,17 @@ class UnicastPrefix(Base):
MsgBusFields.ORIGINATOR_ID.get_name()
]

def __init__(self, message):
def __init__(self, message, validate=True, required_fields=None):
"""
Handle the message by parsing it and storing the data in memory.

:param message: 'Message' object.
:param validate: If required to validate every field with its corresponding processor
:param validate: If required to validate every field with its corresponding processor
:param required_fields: If needed to parse only feq fields ans speed up parsing.
Example: {10: 'prefix', 11: "prefix_len"} where:
"10" and "11" - positions of fields in MESSAGE_BUS_API,
"prefix" and "prefix_len" - name of parsed fields in resulting dictionary.
"""
if not isinstance(message, Message):
raise TypeError("Expected Message object instead of type " + type(message))
Expand Down Expand Up @@ -84,7 +90,7 @@ def __init__(self, message):
self.processors = self.get_processors()

if data:
self.parse(version, data)
self.parse(version, data, validate=validate, required_fields=required_fields)

def get_processors(self):
"""
Expand Down
8 changes: 8 additions & 0 deletions tests/FixturesBasedTestCase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import unittest


class FixturesBasedTestCase(unittest.TestCase):

def setUp(self):
with open('fixtures/unicast_prefixes_message', 'r') as unicast_prefix_message_file:
self.unicast_prefix_message = unicast_prefix_message_file.read()
Empty file added tests/__init__.py
Empty file.
7 changes: 7 additions & 0 deletions tests/fixtures/unicast_prefixes_message
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
V: 1.5
C_HASH_ID: 9ae8148974c9ca01ec9271753426d214
L: 290976
R: 2

add 465863 2783130a95f3ee3c238f38eb1b898b81 eee2f394c09f96c2453bf83989eaa1a2 172.20.0.1 d651b1d601e34826247374cb1430c91c 7aefefa6df92a1fccf2fd8a50d8900de 12.12.12.12 1403 2017-06-22 23:33:04.101567 12.12.12.0 20 1 igp 1403 999 777 3 9829 14.14.14.14 0 0 14032:1299 0 1 0 1 1
add 465864 b1c8a35dcec465d155e6d31f190153fa eee2f394c09f96c2453bf83989eaa1a2 172.20.0.1 d651b1d601e34826247374cb1430c91c 7aefefa6df92a1fccf2fd8a50d8900de 12.12.123.123 1403 2017-06-22 23:33:04.101567 12.12.123.0 22 1 igp 1403 999 777 3 9829 14.14.14.144 0 0 14032:1299 0 1 0 1 1
28 changes: 28 additions & 0 deletions tests/test_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from FixturesBasedTestCase import FixturesBasedTestCase

from openbmp.api.parsed.message import Message


class MessageTest(FixturesBasedTestCase):

def test_headers_parsing(self):
"""
Test default Message headers parsing
"""
message = Message(self.unicast_prefix_message)

self.assertEqual(1.5, message.version)
self.assertEqual("9ae8148974c9ca01ec9271753426d214", message.collector_hash_id)
self.assertEqual(290976, message.length)
self.assertEqual(2, message.records)

def test_disabled_headers_parsing(self):
"""
Test disabled Message headers parsing
"""
message = Message(self.unicast_prefix_message, parse_headers=False)

self.assertEqual(0.0, message.version)
self.assertEqual("", message.collector_hash_id)
self.assertEqual(0, message.length)
self.assertEqual(0, message.records)
51 changes: 51 additions & 0 deletions tests/test_unicast_prefix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from FixturesBasedTestCase import FixturesBasedTestCase

from openbmp.api.parsed.message import Message, UnicastPrefix


class UnicastPrefixTest(FixturesBasedTestCase):

def test_default_parsing(self):
"""
Test default UnicastPrefix parsing
"""
message = Message(self.unicast_prefix_message)
unicast_prefixes = UnicastPrefix(message)
row_map = unicast_prefixes.get_row_map()

self.assertEqual(2, len(row_map))

self.assertEqual("12.12.12.0", row_map[0]['prefix'])
self.assertEqual(20, row_map[0]['prefix_len'])

self.assertEqual("12.12.123.0", row_map[1]['prefix'])
self.assertEqual(22, row_map[1]['prefix_len'])

def test_parsing_without_validation(self):
"""
If disable validation, it should still produce the same output but
numerical fields should be serializes as String
"""
message = Message(self.unicast_prefix_message)
unicast_prefixes = UnicastPrefix(message, validate=False)
row_map = unicast_prefixes.get_row_map()

self.assertEqual("20", row_map[0]['prefix_len'])

def test_parsing_with_required_fields(self):
"""
Test how UnicastPrefix works with custom required_fields parameter
"""
message = Message(self.unicast_prefix_message)

# Without validation
unicast_prefixes = UnicastPrefix(message, validate=False, required_fields={11: "my_custom_prefix_name"})
row_map = unicast_prefixes.get_row_map()

self.assertEqual('20', row_map[0]['my_custom_prefix_name'])

# With validation
unicast_prefixes = UnicastPrefix(message, validate=True, required_fields={11: "my_custom_prefix_name"})
row_map = unicast_prefixes.get_row_map()

self.assertEqual(20, row_map[0]['my_custom_prefix_name'])