Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added csv file sink feature #46

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ type = logger
# name of the logger
name = DataLogger

# [sink2]
# # type of the sink
# type = csv
# # folder where files are saved to
# directory = path_to_csv_files

[logging]
# log level configuration: DEBUG / INFO / WARNING / ERROR
default = WARNING
Expand Down
3 changes: 3 additions & 0 deletions smartmeter_datacollector/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .config import InvalidConfigError
from .sinks.data_sink import DataSink
from .sinks.logger_sink import LoggerSink
from .sinks.csv_sink import CsvSink
from .sinks.mqtt_sink import MqttConfig, MqttDataSink
from .smartmeter.iskraam550 import IskraAM550
from .smartmeter.lge450 import LGE450
Expand Down Expand Up @@ -55,6 +56,8 @@ def build_sinks(config: ConfigParser) -> List[DataSink]:
sinks.append(LoggerSink(
logger_name=sink_config.get('name', "DataLogger")
))
elif sink_type == "csv":
sinks.append(CsvSink(config, section_name))
elif sink_type == "mqtt":
mqtt_config = MqttConfig.from_sink_config(sink_config)
sinks.append(MqttDataSink(mqtt_config))
Expand Down
138 changes: 138 additions & 0 deletions smartmeter_datacollector/sinks/csv_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# SPDX-License-Identifier: GPL-2.0-only
# See LICENSES/README.md for more information.
#
import csv
import datetime
import os
from pathlib import Path
import time
from collections import deque
import asyncio
import logging

from ..smartmeter.meter_data import MeterDataPoint
from ..smartmeter.lge450 import LGE450_COSEM_REGISTERS
from ..smartmeter.iskraam550 import ISKRA_AM550_COSEM_REGISTERS
from ..smartmeter.cosem import RegisterCosem
from .data_sink import DataSink

LOGGER = logging.getLogger("sink")

class CsvSink(DataSink):
"""" Sink implementation for writing a daily csv file """
# TODO: make CsvConfig dataclass and validation
# TODO: implement weekly, monthly, yearly rotation
# TODO: implement daily, weekly, monthly deletion
# TODO: implement appender with rolling FIFO retention
# TODO: config for enable OBIS code header and/or name headers

def __init__( self, config, section_name) -> None:
self.directory = config[section_name].get("directory") # specify directory where the CSV files will be stored
self.fieldnames, self.field_ids = self.get_fields(config)
self.line_queue = deque()
self.loop = None

@staticmethod
def get_fields(config):
try:
field_names = []
field_ids = []
meter_types = set()
for section_name in filter(lambda sec: sec.startswith("reader"), config.sections()):
meter_config = config[section_name]
meter_types.add(meter_config.get('type'))
for meter_type in meter_types:
if meter_type == "lge450":
for point in LGE450_COSEM_REGISTERS:
field_names.append(point.data_point_type.identifier.replace(",", "_"))
field_ids.append(point.obis)
if meter_type == "iskraam550":
for point in ISKRA_AM550_COSEM_REGISTERS:
field_names.append(point.data_point_type.identifier.replace(",", "_"))
field_ids.append(point.obis)
except:
LOGGER.exception("Unable red fields during sink start")
raise

return field_names, field_ids

async def start(self) -> None:
# create directory if it doesn't exist
Path(self.directory).mkdir(parents=True, exist_ok=True)

await self.file_cleanup()
self.loop = asyncio.create_task(self.line_loop())

async def stop(self) -> None:
self.loop.cancel()

async def send(self, data_point: MeterDataPoint) -> None:
self.line_queue.append(data_point)

async def line_loop(self):
""" buffer data_points to aggregate in one line """
# TODO: find smarter time cirteria to aggregate
while True:
await asyncio.sleep(1)
filename = await self.check_file_exists()

aggregated_points = {}

# TODO: aggragation per source
# TODO: aggragation per time frame
while self.line_queue:
try:
data_point = self.line_queue.pop()
aggregated_points[data_point.type.identifier] = str(data_point.value).replace(",", "_")
except Exception as e:
LOGGER.exception("Cannot pop line_queue")

if len(aggregated_points) > 0:
try:
line_data = [ data_point.timestamp.isoformat() , data_point.source ]
for key in self.fieldnames:
if key in aggregated_points:
line_data.append( str(aggregated_points[key]) )
else:
line_data.append("")

with open(os.path.join(self.directory, filename, ), 'a', newline='', encoding='utf-8') as csvfile:
new_line = ",".join(line_data)
csvfile.write(new_line + "\n")
except:
LOGGER.exception("Failed to write line to csv file")
else:
LOGGER.debug("Success writing line to csv file")

async def check_file_exists(self):

# get current date and format it as a string
today = datetime.datetime.now().strftime('%Y-%m-%d')

# create filename using the current date
filename = f'smartmeter_data_{today}.csv'

# check if file already exists
if not os.path.exists(os.path.join(self.directory, filename)):
try:
LOGGER.debug("Try to create csv file")
# create file and write headers
with open(os.path.join(self.directory, filename), 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=self.fieldnames)
writer.writeheader()
except:
LOGGER.exception("Failed to create csv file")
return filename

async def file_cleanup(self):
# delete files older than one year
# one file is about 2.7 MB and in a year almost 1 GB
try:
for file in os.listdir(self.directory):
if file.endswith(".csv"):
filepath = os.path.join(self.directory, file)
if os.path.getmtime(filepath) < (time.time() - 365 * 24 * 60 * 60):
os.remove(filepath)
except:
LOGGER.exception("Failed to cleanup csv file")

53 changes: 53 additions & 0 deletions tests/test_csv_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#
# Copyright (C) 2022 Supercomputing Systems AG
# This file is part of smartmeter-datacollector.
#
# SPDX-License-Identifier: GPL-2.0-only
# See LICENSES/README.md for more information.
#
import configparser
import json
import sys
from datetime import datetime
import time
import asyncio

import pytest
from pytest_mock.plugin import MockerFixture

from smartmeter_datacollector.sinks.csv_sink import CsvSink
from smartmeter_datacollector.smartmeter.meter_data import MeterDataPoint, MeterDataPointType
from smartmeter_datacollector.smartmeter.lge450 import LGE450_COSEM_REGISTERS

TEST_TYPE = MeterDataPointType("TEST_TYPE", "test type", "unit")

@pytest.mark.skipif(sys.version_info < (3, 8), reason="Python3.7 does not support AsyncMock.")
@pytest.mark.asyncio
async def test_csv_sink_send_point():

cfg_parser = configparser.ConfigParser()
cfg_parser.read_dict({
"sink0": {
'type': "csv",
'directory': "path_to_csv_files",
},
"reader0": {
"type": "lge450"
}
})

sink = CsvSink(cfg_parser, "sink0")

await sink.send(MeterDataPoint(LGE450_COSEM_REGISTERS[0].data_point_type, 0.000, "bef test_source", datetime.utcnow()))
await sink.send(MeterDataPoint(LGE450_COSEM_REGISTERS[1].data_point_type, 1.001, "bef test_source", datetime.utcnow()))

await sink.start()

await sink.send(MeterDataPoint(LGE450_COSEM_REGISTERS[2].data_point_type, 12.002, "test_source", datetime.utcnow()))
await sink.send(MeterDataPoint(LGE450_COSEM_REGISTERS[3].data_point_type, 13.003, "test_source", datetime.utcnow()))
await sink.send(MeterDataPoint(LGE450_COSEM_REGISTERS[10].data_point_type, 14.000, "test_source", datetime.utcnow()))

await asyncio.sleep(1.5)
await sink.stop()

assert(True)
32 changes: 32 additions & 0 deletions tests/test_logger_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Copyright (C) 2022 Supercomputing Systems AG
# This file is part of smartmeter-datacollector.
#
# SPDX-License-Identifier: GPL-2.0-only
# See LICENSES/README.md for more information.
#
import configparser
import json
import sys
from datetime import datetime

import pytest
from pytest_mock.plugin import MockerFixture

from smartmeter_datacollector.sinks.logger_sink import LoggerSink
from smartmeter_datacollector.smartmeter.meter_data import MeterDataPoint, MeterDataPointType

TEST_TYPE = MeterDataPointType("TEST_TYPE", "test type", "unit")

@pytest.mark.skipif(sys.version_info < (3, 8), reason="Python3.7 does not support AsyncMock.")
@pytest.mark.asyncio
async def test_logger_sink_send_point(mocker: MockerFixture):
sink = LoggerSink("DataLogger")
client_mock = mocker.patch.object(sink, "_logger", autospec=True)
data_point = MeterDataPoint(TEST_TYPE, 1.0, "test_source", datetime.utcnow())

await sink.start()
await sink.send(data_point)

client_mock.info.assert_called_once_with(str(data_point))