Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Msg tracker - parsing #24

Merged
merged 3 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions src/mesh_analysis/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Structure (WIP)

This module is composed by two parts, `Reacer` and `Tracer`.
With this format, we intend to easily abstract the parsing of the logs, without having to do too many
changes in the code. As long as we have the implementation ready for the specific node we will run
(either is Waku, Nomos, Codex...), handling the data to the visualization part should be plug and play.

## Reader
`Reader` is an abstraction of how the node logs will be parsed.
Ideally, we only care about **reading** them, but this can come from various sources:

1. Log files
2. Kubernetes API
3. Grafana-Loki
4. Other sources

Having an abstract `Reader` class will benefit to hide all unnecessary complexity on the upper layer,
where each reader will handle its on behaviour in the `read` method.

Apart from this, the `Reader` will use a specific `Tracer`.


## Tracer

This part will be used by the `Reader` for specifying which data is interesting from the logs, and
retrieve it in the desired format.

As we might be interested in different patterns or different information depending on the node we are
running, we need to implement a specific `Tracer` for each one.

### Waku Tracer

#### Message Tracking

In order to reconstruct the trace of a message, we need to check for every node X when this specific
node X received a message. Also, to represent X, we need to know this node id.

Currently, Waku logs when receives a message, together with the `timestamp`, `msg_hash` and `sender_peer_id`.
We also need to get the node X `peer_id` by checking at the beginning of the log this information.

With this information, we are able to create a dataframe with the following information:

| **timestamp** | **msg_hash** | sender_peer_id | receiver_peer_id |
|:-----------------------------:|:------------:|:--------------:|:----------------:|
| 2024-04-22 14:06:58.001+00:00 | 0x1 | A | B |
| 2024-04-22 14:06:58.002+00:00 | 0x1 | B | D |
| 2024-04-22 14:06:58.003+00:00 | 0x1 | A | C |
| ... | ... | ... | ... |

This information in form of a Dataframe has `timestamp` and `msg_hash` as indexes. This allow us to
fast and easily query information within time ranges, or just by a certain `msg_hash`. Then, with
this in place, we can easily represent this information in the `visualizer` module.

- [ ] TODO: Add link to `visualizer` README.md when it is done.
Empty file added src/mesh_analysis/__init__.py
Empty file.
Empty file.
56 changes: 56 additions & 0 deletions src/mesh_analysis/readers/file_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Python Imports
import logging
import multiprocessing
import pandas as pd
from typing import List
from pathlib import Path

# Project Imports
from src.utils import file_utils
from src.mesh_analysis.readers.reader import Reader
from src.mesh_analysis.tracers.message_tracer import MessageTracer


logger = logging.getLogger(__name__)


class FileReader(Reader):

def __init__(self, folder: str, tracer: MessageTracer):
self._folder = folder
self._tracer = tracer

def read(self) -> pd.DataFrame:
logger.info(f'Reading {self._folder}')
files_result = file_utils.get_files_from_folder_path(Path(self._folder))

if files_result.is_ok():
parsed_logs = self._read_files(files_result.ok_value)
logger.info(f'Tracing {self._folder}')
df = self._tracer.trace(parsed_logs)
else:
logger.error(f'Could not read {self._folder}')
exit()

return df
AlbertoSoutullo marked this conversation as resolved.
Show resolved Hide resolved

def _read_files(self, files: List) -> List:
# TODO: set this as a parameter?
num_processes = multiprocessing.cpu_count()
AlbertoSoutullo marked this conversation as resolved.
Show resolved Hide resolved
with multiprocessing.Pool(processes=4) as pool:
parsed_logs = pool.map(self._read_file_patterns, files)

return parsed_logs

def _read_file_patterns(self, file: str) -> List:
results = [[] for p in self._tracer.patterns]
AlbertoSoutullo marked this conversation as resolved.
Show resolved Hide resolved

with open(Path(self._folder + file)) as log_file:
for line in log_file:
for i, pattern in enumerate(self._tracer.patterns):
match = pattern.match(line)
if match:
results[i].append(list(match.groups()))
break

return results
10 changes: 10 additions & 0 deletions src/mesh_analysis/readers/reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Python Imports
import pandas as pd
from abc import ABC, abstractmethod


class Reader(ABC):

@abstractmethod
def read(self) -> pd.DataFrame:
pass
Empty file.
23 changes: 23 additions & 0 deletions src/mesh_analysis/tracers/message_tracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Python Imports
import pandas as pd
from abc import ABC, abstractmethod
from typing import List


# Project Imports


class MessageTracer(ABC):

@abstractmethod
AlbertoSoutullo marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self):
self._patterns = None
pass

@property
def patterns(self) -> List:
return self._patterns

@abstractmethod
def trace(self, parsed_logs: List) -> pd.DataFrame:
pass
37 changes: 37 additions & 0 deletions src/mesh_analysis/tracers/waku_tracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Python Imports
import re
import pandas as pd
from typing import List


# Project Imports
from src.mesh_analysis.tracers.message_tracer import MessageTracer


class WakuTracer(MessageTracer):

def __init__(self):
# TODO: Improve patterns as:
# - Different patterns (received, sent, dropped)
# - Once one pattern search is completed, stop search for it in the logs (ie: Announce Address)
self._patterns = [re.compile(
r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+\+\d{2}:\d{2}) .* msg_hash=([a-fA-F0-9x]+) .* sender_peer_id=([A-Za-z0-9]+)$'),
re.compile(r'.* Announcing addresses .*\[([^]]+)\]$')]

def trace(self, parsed_logs: List) -> pd.DataFrame:
df = self._trace_message_in_logs(parsed_logs)

return df
AlbertoSoutullo marked this conversation as resolved.
Show resolved Hide resolved

def _trace_message_in_logs(self, parsed_logs: List) -> pd.DataFrame:
parsed_logs = [log for log in parsed_logs if len(log[0]) > 0]
AlbertoSoutullo marked this conversation as resolved.
Show resolved Hide resolved

# Merge received message info + own ID
res = [message + node[1][0] for node in parsed_logs for message in node[0]]

df = pd.DataFrame(res, columns=['timestamp', 'msg_hash', 'sender_peer_id', 'receiver_peer_id'])
df['receiver_peer_id'] = df['receiver_peer_id'].apply(lambda x: x.split('/')[-1])
df.set_index(['timestamp', 'msg_hash'], inplace=True)
df.sort_index(inplace=True)

return df