Skip to content

Commit

Permalink
Msg tracker - parsing (#24)
Browse files Browse the repository at this point in the history
* WIP: Parsing and creating df from log files done

* First tracking version structure

* Add PR suggestions
  • Loading branch information
AlbertoSoutullo authored May 13, 2024
1 parent 8fe6320 commit c5ce9db
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 0 deletions.
54 changes: 54 additions & 0 deletions src/mesh_analysis/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Structure (WIP)

This module is composed by two parts, `Reacer` and `Tracer`.
With this format, we intend to easily abstract the parsing of the logs, without having to do too many
changes in the code. As long as we have the implementation ready for the specific node we will run
(either is Waku, Nomos, Codex...), handling the data to the visualization part should be plug and play.

## Reader
`Reader` is an abstraction of how the node logs will be parsed.
Ideally, we only care about **reading** them, but this can come from various sources:

1. Log files
2. Kubernetes API
3. Grafana-Loki
4. Other sources

Having an abstract `Reader` class will benefit to hide all unnecessary complexity on the upper layer,
where each reader will handle its on behaviour in the `read` method.

Apart from this, the `Reader` will use a specific `Tracer`.


## Tracer

This part will be used by the `Reader` for specifying which data is interesting from the logs, and
retrieve it in the desired format.

As we might be interested in different patterns or different information depending on the node we are
running, we need to implement a specific `Tracer` for each one.

### Waku Tracer

#### Message Tracking

In order to reconstruct the trace of a message, we need to check for every node X when this specific
node X received a message. Also, to represent X, we need to know this node id.

Currently, Waku logs when receives a message, together with the `timestamp`, `msg_hash` and `sender_peer_id`.
We also need to get the node X `peer_id` by checking at the beginning of the log this information.

With this information, we are able to create a dataframe with the following information:

| **timestamp** | **msg_hash** | sender_peer_id | receiver_peer_id |
|:-----------------------------:|:------------:|:--------------:|:----------------:|
| 2024-04-22 14:06:58.001+00:00 | 0x1 | A | B |
| 2024-04-22 14:06:58.002+00:00 | 0x1 | B | D |
| 2024-04-22 14:06:58.003+00:00 | 0x1 | A | C |
| ... | ... | ... | ... |

This information in form of a Dataframe has `timestamp` and `msg_hash` as indexes. This allow us to
fast and easily query information within time ranges, or just by a certain `msg_hash`. Then, with
this in place, we can easily represent this information in the `visualizer` module.

- [ ] TODO: Add link to `visualizer` README.md when it is done.
Empty file added src/mesh_analysis/__init__.py
Empty file.
Empty file.
56 changes: 56 additions & 0 deletions src/mesh_analysis/readers/file_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Python Imports
import logging
import multiprocessing
import pandas as pd
from typing import List
from pathlib import Path

# Project Imports
from src.utils import file_utils
from src.mesh_analysis.readers.reader import Reader
from src.mesh_analysis.tracers.message_tracer import MessageTracer


logger = logging.getLogger(__name__)


class FileReader(Reader):

def __init__(self, folder: str, tracer: MessageTracer):
self._folder = folder
self._tracer = tracer

def read(self) -> pd.DataFrame:
logger.info(f'Reading {self._folder}')
files_result = file_utils.get_files_from_folder_path(Path(self._folder))

if files_result.is_err():
logger.error(f'Could not read {self._folder}')
exit()

parsed_logs = self._read_files(files_result.ok_value)
logger.info(f'Tracing {self._folder}')
df = self._tracer.trace(parsed_logs)

return df

def _read_files(self, files: List) -> List:
# TODO: set this as a parameter?
num_processes = multiprocessing.cpu_count()
with multiprocessing.Pool(processes=4) as pool:
parsed_logs = pool.map(self._read_file_patterns, files)

return parsed_logs

def _read_file_patterns(self, file: str) -> List:
results = [[] for p in self._tracer.patterns]

with open(Path(self._folder + file)) as log_file:
for line in log_file:
for i, pattern in enumerate(self._tracer.patterns):
match = pattern.match(line)
if match:
results[i].append(list(match.groups()))
break

return results
10 changes: 10 additions & 0 deletions src/mesh_analysis/readers/reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Python Imports
import pandas as pd
from abc import ABC, abstractmethod


class Reader(ABC):

@abstractmethod
def read(self) -> pd.DataFrame:
pass
Empty file.
20 changes: 20 additions & 0 deletions src/mesh_analysis/tracers/message_tracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Python Imports
import pandas as pd
from abc import ABC, abstractmethod
from typing import List, Optional

# Project Imports


class MessageTracer(ABC):

def __init__(self):
self._patterns: Optional[List] = None

@property
def patterns(self) -> List:
return self._patterns

@abstractmethod
def trace(self, parsed_logs: List) -> pd.DataFrame:
pass
36 changes: 36 additions & 0 deletions src/mesh_analysis/tracers/waku_tracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Python Imports
import re
import pandas as pd
from typing import List

# Project Imports
from src.mesh_analysis.tracers.message_tracer import MessageTracer


class WakuTracer(MessageTracer):

def __init__(self):
# TODO: Improve patterns as:
# - Different patterns (received, sent, dropped)
# - Once one pattern search is completed, stop search for it in the logs (ie: Announce Address)
self._patterns = [re.compile(
r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+\+\d{2}:\d{2}) .* msg_hash=([a-fA-F0-9x]+) .* sender_peer_id=([A-Za-z0-9]+)$'),
re.compile(r'.* Announcing addresses .*\[([^]]+)\]$')]

def trace(self, parsed_logs: List) -> pd.DataFrame:
df = self._trace_message_in_logs(parsed_logs)

return df

def _trace_message_in_logs(self, parsed_logs: List) -> pd.DataFrame:
parsed_logs = (log for log in parsed_logs if len(log[0]) > 0)

# Merge received message info + own ID
res = (message + node[1][0] for node in parsed_logs for message in node[0])

df = pd.DataFrame(res, columns=['timestamp', 'msg_hash', 'sender_peer_id', 'receiver_peer_id'])
df['receiver_peer_id'] = df['receiver_peer_id'].apply(lambda x: x.split('/')[-1])
df.set_index(['timestamp', 'msg_hash'], inplace=True)
df.sort_index(inplace=True)

return df

0 comments on commit c5ce9db

Please sign in to comment.