Skip to content

Commit

Permalink
feat: flush rows in DAQJobStoreCSV in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Oct 12, 2024
1 parent 0908554 commit e740070
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 11 deletions.
4 changes: 4 additions & 0 deletions src/daq/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ class DAQJobStore(DAQJob):
def start(self):
while True:
self.consume()
self.store_loop()
time.sleep(0.5)

def store_loop(self):
raise NotImplementedError

def handle_message(self, message: DAQJobMessage) -> bool:
if not self.can_store(message):
raise Exception(
Expand Down
52 changes: 41 additions & 11 deletions src/daq/store/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
from io import TextIOWrapper
from pathlib import Path
from queue import Empty, Queue
from typing import Any, cast

from daq.models import DAQJobConfig
Expand All @@ -12,6 +13,7 @@
from utils.file import add_date_to_file_name

DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS = 5 * 60
DAQ_JOB_STORE_CSV_WRITE_BATCH_SIZE = 1000


@dataclass
Expand All @@ -29,6 +31,7 @@ class DAQJobStoreCSVConfig(DAQJobConfig):
class CSVFile:
file: TextIOWrapper
last_flush_date: datetime
write_queue: Queue[list[Any]]


class DAQJobStoreCSV(DAQJobStore):
Expand All @@ -53,28 +56,55 @@ def handle_message(self, message: DAQJobMessageStore) -> bool:
Path(file_path).touch()

# Open file and write csv headers
file = CSVFile(open(file_path, "a"), datetime.now())
file = CSVFile(open(file_path, "a"), datetime.now(), Queue())
self._open_files[file_path] = file
writer = csv.writer(file.file)

# Write headers if file haven't existed before
if not file_exists:
writer.writerow(message.keys)
file.write_queue.put(message.keys)
else:
file = self._open_files[file_path]
writer = csv.writer(file.file)

# Write rows and flush
writer.writerows(message.data)
if (
datetime.now() - file.last_flush_date
).total_seconds() > DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS:
file.file.flush()
file.last_flush_date = datetime.now()
# Append rows to write_queue
for row in message.data:
file.write_queue.put(row)

return True

def store_loop(self):
for file in self._open_files.values():
if file.file.closed:
continue
writer = csv.writer(file.file)
total_rows_to_write = 0
rows_to_write = []

# Write rows in batches
while True:
rows_to_write.clear()
for _ in range(DAQ_JOB_STORE_CSV_WRITE_BATCH_SIZE):
try:
rows_to_write.append(file.write_queue.get_nowait())
except Empty:
break
if len(rows_to_write) == 0:
break
total_rows_to_write += len(rows_to_write)
writer.writerows(rows_to_write)

# Flush if the time is up
if (
datetime.now() - file.last_flush_date
).total_seconds() > DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS:
file.file.flush()
file.last_flush_date = datetime.now()
self._logger.debug(
f"Flushed {total_rows_to_write} rows to '{file.file.name}'"
)

def __del__(self):
self.store_loop()

# Close all open files
for file in self._open_files.values():
if file.file.closed:
Expand Down

0 comments on commit e740070

Please sign in to comment.