diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index 24f386e..1e4c217 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -5,12 +5,11 @@ on: branches: [ "main", "**" ] jobs: - build: - + name: Build Docker image runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - name: Build the Docker image - run: docker build . --file Dockerfile --tag sceptre-bennu:$(date +%s) + - name: Check out the repo + uses: actions/checkout@v4 + - name: Build the Docker image + run: docker build --pull --file Dockerfile --tag sceptre-bennu:$(date +%s) . diff --git a/.github/workflows/publish-image.yml b/.github/workflows/publish-image.yml index 2e10537..9403b85 100644 --- a/.github/workflows/publish-image.yml +++ b/.github/workflows/publish-image.yml @@ -13,18 +13,18 @@ env: # There is a single job in this workflow. It's configured to run on the latest available version of Ubuntu. jobs: build-and-push-image: + name: Build and push Docker image runs-on: ubuntu-latest # Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job. permissions: contents: read packages: write - # steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. - name: Log in to the Container registry - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: registry: ${{ env.REGISTRY }} username: ${{ github.actor }} @@ -32,17 +32,16 @@ jobs: # This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels. - name: Extract metadata (tags, labels) for Docker id: meta - uses: docker/metadata-action@v4 + uses: docker/metadata-action@v5 with: images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} # This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages. # It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository. # It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step. - name: Build and push Docker image - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: . push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} - diff --git a/Dockerfile b/Dockerfile index 5a16001..489a2d2 100755 --- a/Dockerfile +++ b/Dockerfile @@ -1,12 +1,18 @@ +ARG REGISTRY_IMAGE="ubuntu:20.04" +ARG PIP_INDEX="https://pypi.org/" +ARG PIP_INDEX_URL="https://pypi.org/simple" + # create python build image -FROM ubuntu:20.04 AS pybuilder +FROM ${REGISTRY_IMAGE} AS pybuilder -ENV DEBIAN_FRONTEND noninteractive +ENV DEBIAN_FRONTEND="noninteractive" \ + PIP_DISABLE_PIP_VERSION_CHECK=1 -RUN apt update \ - && apt install -y \ +RUN apt-get update \ + && apt-get install -y \ build-essential cmake git wget python3-dev python3-pip \ libfreetype6-dev liblapack-dev libboost-dev \ + && apt-get clean \ && rm -rf /var/lib/apt/lists/* # setup ZMQ @@ -26,10 +32,10 @@ RUN wget -O helics.tgz https://github.com/GMLC-TDC/HELICS/releases/download/v${H && tar -C /tmp/helics -xzf helics.tgz \ && rm helics.tgz \ && mkdir -p /tmp/helics/build && cd /tmp/helics/build \ - && cmake -D HELICS_USE_SYSTEM_ZEROMQ_ONLY=ON .. \ + && cmake -j$(nproc) -D HELICS_USE_SYSTEM_ZEROMQ_ONLY=ON .. \ && make -j$(nproc) install -RUN python3 -m pip install --trusted-host pypi.org --trusted-host files.pythonhosted.org pyzmq~=20.0.0 --install-option=--enable-drafts +RUN python3 -m pip install --no-cache-dir pyzmq~=20.0.0 --install-option=--enable-drafts RUN wget -O pyhelics.tgz https://github.com/GMLC-TDC/pyhelics/releases/download/v${HELICS_VERSION}/helics-${HELICS_VERSION}.tar.gz \ && mkdir -p /tmp/pyhelics \ @@ -37,25 +43,26 @@ RUN wget -O pyhelics.tgz https://github.com/GMLC-TDC/pyhelics/releases/download/ && rm pyhelics.tgz \ && cd /tmp/pyhelics/helics-${HELICS_VERSION} \ && sed -i 's/helics-apps/helics-apps~=2.7.1/' setup.py \ - && python3 -m pip install --trusted-host pypi.org --trusted-host files.pythonhosted.org . + && python3 -m pip install --no-cache-dir . #DEBUG build #ADD docker/vendor /tmp/bennu/vendor #WORKDIR /tmp/bennu/vendor/helics-helper -#RUN python3 -m pip install --trusted-host pypi.org --trusted-host files.pythonhosted.org . +#RUN python3 -m pip install . # install Python bennu package ADD src/pybennu /tmp/bennu/src/pybennu WORKDIR /tmp/bennu/src/pybennu -RUN python3 -m pip install --trusted-host pypi.org --trusted-host files.pythonhosted.org . +RUN python3 -m pip install --no-cache-dir . # create final image -FROM ubuntu:20.04 +FROM ${REGISTRY_IMAGE} -ENV DEBIAN_FRONTEND noninteractive +ENV DEBIAN_FRONTEND="noninteractive" \ + PIP_DISABLE_PIP_VERSION_CHECK=1 -RUN apt update \ - && apt install -y \ +RUN apt-get update \ + && apt-get install -y \ # bennu build-essential ca-certificates cmake cmake-curses-gui \ git wget pkg-config libasio-dev libsodium-dev \ @@ -68,6 +75,7 @@ RUN apt update \ libffi-dev ruby-dev ruby-ffi \ # python python3-dev python3-pip python3-setuptools python3-wheel \ + && apt-get clean \ && rm -rf /var/lib/apt/lists/* # setup Go @@ -97,11 +105,15 @@ ADD test /tmp/bennu/test # install C++ and Golang bennu package WORKDIR /tmp/bennu/build -RUN cmake -D BUILD_GOBENNU=OFF ../ && make -j$(nproc) install \ +RUN cmake -j$(nproc) -D BUILD_GOBENNU=OFF ../ \ + && make -j$(nproc) install \ && rm -rf /tmp/* -RUN gem install fpm -RUN pip3 install --trusted-host pypy.org --trusted-host files.pythonhosted.org -U aptly-ctl pip setuptools twine wheel +# Gem install failures: https://github.com/jordansissel/fpm/issues/2048 +# For now, manually install dotenv 2.8.1 +RUN gem install dotenv -v 2.8.1 && gem install fpm -v 1.15.1 + +RUN python3 -m pip install --no-cache-dir --upgrade aptly-ctl pip setuptools twine wheel WORKDIR /root CMD /bin/bash diff --git a/src/pybennu/config.ini b/src/pybennu/config.ini index 1ca39e2..854f86b 100644 --- a/src/pybennu/config.ini +++ b/src/pybennu/config.ini @@ -39,8 +39,12 @@ input-mappings = [/system/branch-1_8163-8164/active] ; ; Configurations for PMUs configured in RTDS, comma-separated ; ; NOTE: the length of all "rtds-pmu-*" options must match! +; ; TODO: UDP isn't implemented yet in pyPMU, therefore "udp" is not currently working +; ; Allowed values for "rtds-pmu-protocols": tcp, udp +; ; This selects which transport protocol will be used for C37.118 ; rtds-pmu-ips = 172.24.9.51, 172.24.9.51, 172.24.9.51, 172.24.9.51, 172.24.9.51, 172.24.9.51, 172.24.9.51, 172.24.9.51 ; rtds-pmu-ports = 4714, 4716, 4719, 4718, 4717, 4715, 4772, 4782 +; rtds-pmu-protocols = tcp, tcp, tcp, tcp, tcp, tcp, tcp, tcp ; rtds-pmu-names = PMU1, PMU2, PMU3, PMU4, PMU5, PMU6, PMU7, PMU8 ; rtds-pmu-labels = BUS7, BUS8, BUS9, BUS5, BUS5-1, BUS4, BUS6-1, BUS6 ; rtds-pdc-ids = 41, 6, 91, 92, 71, 72, 5, 6 diff --git a/src/pybennu/pybennu/providers/power/solvers/rtds.py b/src/pybennu/pybennu/providers/power/solvers/rtds.py index fe58290..57e3c0b 100644 --- a/src/pybennu/pybennu/providers/power/solvers/rtds.py +++ b/src/pybennu/pybennu/providers/power/solvers/rtds.py @@ -85,17 +85,18 @@ | field | type | example | description | | ------------------------ | ------------- | ------------------------- | ----------- | -| @timestamp | date | 2022-04-20:11:22:33.000 | Timestamp from RTDS. | +| @timestamp | date | 2022-04-20:11:22:33.000 | Timestamp from SCEPTRE. This should match the value of sceptre_time. | | rtds_time | date | 2022-04-20:11:22:33.000 | Timestamp from RTDS. | | sceptre_time | date | 2022-04-20:11:22:33.000 | Timestamp from SCEPTRE provider (the power-provider VM in the emulation). | +| time_drift | double | 433.3 | The difference in milliseconds in times between the RTDS and SCEPTRE (the "drift" between the two systems). This value will always be positive, even if the RTDS is ahead of SCEPTRE. This is calculated as abs(sceptre_time - rtds_time) * 1000. | | event.ingested | date | 2022-04-20:11:22:33.000 | Timestamp of when the data was ingested into Elasticsearch. | -| ecs.version | keyword | 8.8.0 | [Elastic Common Schema (ECS)](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) version this schema adheres to. | -| agent.type | keyword | rtds-sceptre-provider | Type of system providing the data | -| agent.version | keyword | 4.0.0 | Version of the provider | +| ecs.version | keyword | 8.11.0 | [Elastic Common Schema (ECS)](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) version this schema adheres to. | +| agent.type | keyword | rtds-sceptre-provider | Type of system providing the data. | +| agent.version | keyword | unknown | Version of the provider. | | observer.hostname | keyword | power-provider | Hostname of the system providing the data. | | observer.geo.timezone | keyword | America/Denver | Timezone of the system providing the data. | | network.protocol | keyword | dnp3 | Network protocol used to retrieve the data. Currently, this will be either dnp3 or c37.118. | -| network.transport | keyword | tcp | Transport layer (Layer 4 of OSI model) protocol used to retrieve the data. Currently, this is usually tcp, but it could be udp if UDP is used for C37.118 or GTNET-SKT. | +| network.transport | keyword | tcp | Transport layer (Layer 4 of OSI model) protocol used to retrieve the data. Currently, this is usually `tcp`, but it could be udp if UDP is used for C37.118 or GTNET-SKT. | | pmu.name | keyword | PMU1 | Name of the PMU. | | pmu.label | keyword | BUS4-1 | Label for the PMU. | | pmu.ip | ip | 172.24.9.51 | IP address of the PMU. | @@ -103,7 +104,7 @@ | pmu.id | long | 41 | PDC ID of the PMU. | | measurement.stream | byte | 1 | Stream ID of this measurement from the PMU. | | measurement.status | keyword | ok | Status of this measurement from the PMU. | -| measurement.time | double | 1686089097.13333 | Absolute time of when the measurement occurred. This timestamp can be used as a sequence number, and will match across all PMUs from the same RTDS case. | +| measurement.time | double | 1686089097.13333 | Absolute time of when the measurement occurred. This timestamp can be used as a sequence number, and will match across all PMUs from the same RTDS case. It should match `rtds_time`, after being converted to a date. | | measurement.frequency | double | 60.06 | Nominal system frequency. | | measurement.dfreq | double | 8.835189510136843e-05 | Rate of change of frequency (ROCOF). | | measurement.channel | keyword | PHASOR CH 1:VA | Channel name of this measurement from the PMU. | @@ -137,16 +138,15 @@ from configparser import ConfigParser from datetime import datetime, timezone from io import TextIOWrapper +from logging.handlers import QueueHandler, QueueListener from pathlib import Path from pprint import pformat +from subprocess import check_output from time import sleep -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Literal from elasticsearch import Elasticsearch, helpers -# TODO: figure out how to get bennu version after it's fixed -# Use importlib.metadata? -# from pybennu._version import __version__ from pybennu.distributed.provider import Provider from pybennu.pypmu.synchrophasor.frame import CommonFrame, DataFrame, HeaderFrame from pybennu.pypmu.synchrophasor.pdc import Pdc @@ -155,7 +155,9 @@ # TODO: support PMU "analog" fields, current handling is a hack for HARMONIE LDRD # TODO: push state of GTNET-SKT values to Elasticsearch # TODO: log most messages to a file with Rotating handler to avoid filling up log (since bennu isn't very smart and won't rotate the file) +# Save CSV log messages to their own file? Save PMU logger messages to their own file? # TODO: add field to Elastic data differentiating which SCEPTRE experiment data is associated with +# TODO: split out utility functions into a "utils" file: str_to_bool, utc_now, utc_now_formatted, RotatingCSVWriter def str_to_bool(val: str) -> bool: """ @@ -183,6 +185,18 @@ def str_to_bool(val: str) -> bool: raise ValueError(f"invalid bool string {val}") +# datetime.utcnow() returns a naive datetime objects (no timezone info). +# when .timestamp() is called on these objects, they get interpreted as +# local time, not UTC time, which leads to incorrect timestamps. +# Further reading: https://blog.miguelgrinberg.com/post/it-s-time-for-a-change-datetime-utcnow-is-now-deprecated +def utc_now() -> datetime: + return datetime.now(timezone.utc) + + +def utc_now_formatted() -> str: + return utc_now().strftime("%d-%m-%Y_%H-%M-%S") + + class RotatingCSVWriter: """ Writes data to CSV files, creating a new file when a limit is reached. @@ -196,17 +210,17 @@ def __init__( filename_base: str = "rtds_pmu_data", rows_per_file: int = 1000000, max_files: int = 0 - ): - self.name = name - self.csv_dir = csv_dir # type: Path - self.header = header # type: List[str] - self.filename_base = filename_base - self.max_rows = rows_per_file - self.max_files = max_files - self.files_written = [] # type: List[Path] - self.rows_written = 0 - self.current_path = None # type: Optional[Path] - self.fp = None # type: Optional[TextIOWrapper] + ) -> None: + self.name: str = name + self.csv_dir: Path = csv_dir + self.header: List[str] = header + self.filename_base: str = filename_base + self.max_rows: int = rows_per_file + self.max_files: int = max_files + self.files_written: List[Path] = [] + self.rows_written: int = 0 + self.current_path: Optional[Path] = None + self.fp: Optional[TextIOWrapper] = None self.log = logging.getLogger(f"{self.__class__.__name__} [{self.name}]") self.log.setLevel(logging.DEBUG) @@ -223,19 +237,19 @@ def __init__( # initial file rotation self.rotate() - def _close_file(self): + def _close_file(self) -> None: if self.fp and not self.fp.closed: self.fp.flush() os.fsync(self.fp.fileno()) # ensure data is written to disk self.fp.close() - def rotate(self): + def rotate(self) -> None: self._close_file() # close current CSV before starting new one if self.current_path: self.log.debug(f"Wrote {self.rows_written} rows and {self.current_path.stat().st_size} bytes to {self.current_path}") self.files_written.append(self.current_path) - timestamp = datetime.utcnow().strftime("%d-%m-%Y_%H-%M-%S") + timestamp = utc_now_formatted() filename = f"{self.filename_base}_{timestamp}.csv" self.current_path = Path(self.csv_dir, filename) self.log.info(f"Rotating CSV file to {self.current_path}") @@ -253,7 +267,7 @@ def rotate(self): self.log.info(f"Removing CSV file {oldest}") oldest.unlink() # delete the file - def _emit(self, data: list): + def _emit(self, data: list) -> None: """Write comma-separated list of values.""" for i, column in enumerate(data): self.fp.write(str(column)) @@ -261,7 +275,7 @@ def _emit(self, data: list): self.fp.write(",") self.fp.write("\n") - def write(self, data: list): + def write(self, data: list) -> None: """Write data to CSV file.""" if self.rows_written == self.max_rows: self.rotate() @@ -280,23 +294,42 @@ class PMU: Polls for data using C37.118 protocol, utilizing the pypmu library under-the-hood. """ - def __init__(self, ip: str, port: int, pdc_id: int, name: str = "", label: str = ""): - self.ip = ip - self.port = port - self.pdc_id = pdc_id - self.name = name - self.label = label + def __init__( + self, + ip: str, + port: int, + pdc_id: int, + name: str = "", + label: str = "", + protocol: Literal["tcp", "udp"] = "tcp" + ) -> None: + self.ip: str = ip + self.port: int = port + self.pdc_id: int = pdc_id + self.name: str = name + self.label: str = label + self.protocol: Literal["tcp", "udp"] = protocol + + # TODO: UDP isn't implemented yet in pyPMU + if self.protocol not in ["tcp", "udp"]: + raise ValueError(f"PMU protocol must be 'tcp' or 'udp', got {self.protocol}") # Configure PDC instance (pypmu.synchrophasor.pdc.Pdc) - self.pdc = Pdc(self.pdc_id, self.ip, self.port) - self.pdc_header = None # type: Optional[HeaderFrame] - self.pdc_config = None # type: Optional[CommonFrame] - self.channel_names = [] # type: List[str] - self.csv_writer = None # type: Optional[RotatingCSVWriter] - self.station_name = "" # type: str + self.pdc = Pdc( + pdc_id=self.pdc_id, + pmu_ip=self.ip, + pmu_port=self.port, + method=self.protocol + ) + + self.pdc_header: Optional[HeaderFrame] = None + self.pdc_config: Optional[CommonFrame] = None + self.channel_names: List[str] = [] + self.csv_writer: Optional[RotatingCSVWriter] = None + self.station_name: str = "" # Configure logging - self.log = logging.getLogger(f"PMU [{str(self)}]") + self.log: logging.Logger = logging.getLogger(f"PMU [{str(self)}]") self.log.setLevel(logging.DEBUG) self.pdc.logger = logging.getLogger(f"Pdc [{str(self)}]") # NOTE: pypmu logs a LOT of stuff at DEBUG, leave at INFO @@ -305,7 +338,7 @@ def __init__(self, ip: str, port: int, pdc_id: int, name: str = "", label: str = self.log.info(f"Initialized {repr(self)}") def __repr__(self) -> str: - return f"PMU(ip={self.ip}, port={self.port}, pdc_id={self.pdc_id}, name={self.name}, label={self.label})" + return f"PMU(ip={self.ip}, port={self.port}, pdc_id={self.pdc_id}, name={self.name}, label={self.label}, protocol={self.protocol})" def __str__(self) -> str: if self.name and self.label: @@ -315,7 +348,7 @@ def __str__(self) -> str: else: return f"{self.ip}:{self.port}_{self.pdc_id}" - def run(self): + def run(self) -> None: """Connect to PMU.""" self.pdc.run() @@ -350,7 +383,7 @@ def _process_name(cn: str) -> str: self.channel_names.append(_process_name(channel)) self.log.debug(f"Channel names: {self.channel_names}") - def start(self): + def start(self) -> None: self.pdc.start() def get_data_frame(self) -> Optional[Dict[str, Any]]: @@ -391,7 +424,7 @@ def __init__( publish_endpoint, config: ConfigParser, debug: bool = False - ): + ) -> None: Provider.__init__(self, server_endpoint, publish_endpoint) # Thread locks @@ -399,7 +432,7 @@ def __init__( self.__gtnet_lock = threading.Lock() # Load configuration values - self.config = config # type: ConfigParser + self.config: ConfigParser = config self.debug = debug # type: bool self.publish_rate = float(self._conf("publish-rate")) # type: float @@ -437,7 +470,7 @@ def __init__( else: self.gtnet_skt_udp_write_rate = 30 - # Elastic-config + # Elastic config self.elastic_enabled = str_to_bool(self._conf("elastic-enabled")) # type: bool self.elastic_host = self._conf("elastic-host") # type: str if self._has_conf("elastic-index-basename"): @@ -445,6 +478,14 @@ def __init__( else: self.elastic_index_basename = "rtds-default" + # rtds-pmu-protocols + if self._has_conf("rtds-pmu-protocols"): + self.pmu_protocols = self._conf("rtds-pmu-protocols", is_list=True) # type: List[str] + else: + self.pmu_protocols = ["tcp" for _ in range(len(self.pmu_ips))] + if not all(p in ["tcp", "udp"] for p in self.pmu_protocols): + raise ValueError("rtds-pmu-protocols must be either 'tcp' or 'udp'") + # Validate configuration values if self.retry_delay <= 0.0: raise ValueError(f"'rtds-retry-delay' value must be a positive float, not {self.retry_delay}") @@ -454,7 +495,7 @@ def __init__( raise ValueError(f"'csv-rows-per-file' must be a positive integer, not {self.csv_rows_per_file}") if not self.pmu_ips or any(x.count(".") != 3 for x in self.pmu_ips): raise ValueError(f"invalid value(s) for 'rtds-pmu-ips': {self.pmu_ips}") - if not (len(self.pmu_ips) == len(self.pmu_names) == len(self.pmu_ports) == len(self.pmu_labels) == len(self.pdc_ids)): + if not (len(self.pmu_ips) == len(self.pmu_names) == len(self.pmu_ports) == len(self.pmu_labels) == len(self.pdc_ids) == len(self.pmu_protocols)): raise ValueError("lengths of pmu configuration options don't match, are you missing a pmu in one of the options?") # Validate GTNET-SKT configuration values only if there are values configured @@ -476,12 +517,30 @@ def __init__( if len(self.gtnet_skt_tag_names) > 30: # max of 30 data points per channel, 10 channels total for GTNET card raise ValueError(f"maximum of 30 points allowed per GTNET-SKT channel, {len(self.gtnet_skt_tag_names)} are defined in config") + # asynchronous logging. Log messages are placed into a queue, + # which is serviced by a thread that runs each handler with + # messages from the queue. This isn't needed as much for stdout, + # but if we enable logging to file or elastic then it will be. + self.log_queue = queue.Queue() + queue_handler = QueueHandler(self.log_queue) + + formatter = logging.Formatter( + fmt="%(asctime)s.%(msecs)03d [%(levelname)s] (%(name)s) %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(formatter) + console_handler.setLevel(logging.DEBUG if self.debug else logging.INFO) + queue_listener = QueueListener(self.log_queue, console_handler, respect_handler_level=True) + queue_listener.start() + # Configure logging to stdout (includes debug messages from pypmu) logging.basicConfig( level=logging.DEBUG if self.debug else logging.INFO, - format="%(asctime)s.%(msecs)03d [%(levelname)s] (%(name)s) %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - stream=sys.stdout + handlers=[queue_handler], + # format="%(asctime)s.%(msecs)03d [%(levelname)s] (%(name)s) %(message)s", + # datefmt="%Y-%m-%d %H:%M:%S", + # stream=sys.stdout ) self.log = logging.getLogger(self.__class__.__name__) self.log.setLevel(logging.DEBUG if self.debug else logging.INFO) @@ -492,6 +551,14 @@ def __init__( elif not self.csv_max_files: self.log.warning("No limit set in 'csv-max-files', CSV files won't be cleaned up and could fill all available disk space") + # Get bennu version + self.bennu_version = "unknown" + try: + apt_result = check_output("apt-cache show bennu", shell=True).decode() + self.bennu_version = re.search(r"Version: ([\w\.]+)\s", apt_result).groups()[0] + except Exception as ex: + self.log.warning(f"Failed to get bennu version: {ex}") + # Elasticsearch setup self._es_index_cache = set() self.es_queue = queue.Queue() # queue for data to push to elastic @@ -504,9 +571,9 @@ def __init__( # Create PMU instances: IP, Port, Name, Label, PDC ID self.pmus = [] - pmu_info = zip(self.pmu_ips, self.pmu_names, self.pmu_ports, self.pmu_labels, self.pdc_ids) - for ip, name, port, label, pdc_id in pmu_info: - pmu = PMU(ip=ip, port=port, pdc_id=pdc_id, name=name, label=label) + pmu_info = zip(self.pmu_ips, self.pmu_names, self.pmu_ports, self.pmu_labels, self.pdc_ids, self.pmu_protocols) + for ip, name, port, label, pdc_id, protocol in pmu_info: + pmu = PMU(ip=ip, port=port, pdc_id=pdc_id, name=name, label=label, protocol=protocol) self._rebuild_pmu_connection(pmu) # build PMU connection, with automatic retry self.pmus.append(pmu) self.log.info(f"Instantiated and started {len(self.pmus)} PMUs") @@ -540,6 +607,10 @@ def __init__( # Allow gtnet-skt fields to be read self.current_values.update(self.gtnet_skt_state) + # Hack + # TODO: rotate out values to keep this from eating up memory if running for a long time + self.time_map = {} + # Start GTNET-SKT writer thread if using UDP protocol if self.gtnet_skt_protocol == "udp": self.__gtnet_thread = threading.Thread(target=self._gtnet_continuous_write) @@ -586,7 +657,7 @@ def _conf(self, key: str, is_list: bool = False, convert=None) -> Any: return val - def _start_poll_pmus(self): + def _start_poll_pmus(self) -> None: """ Query for data from the PMUs via C37.118 as fast as possible. """ @@ -608,7 +679,7 @@ def _start_poll_pmus(self): if pmu.pdc_header: metadata[str(pmu)]["header"] = pmu.pdc_header.__dict__ - timestamp = datetime.utcnow().strftime("%d-%m-%Y_%H-%M-%S") + timestamp = utc_now_formatted() # Save PMU metadata to JSON file meta_path = Path(self.csv_path, f"pmu_metadata_{timestamp}.json") @@ -631,7 +702,7 @@ def _start_poll_pmus(self): for thread in threads: thread.join() - def _rebuild_pmu_connection(self, pmu: PMU): + def _rebuild_pmu_connection(self, pmu: PMU) -> None: """ Rebuild TCP connection to PMU if connection fails. @@ -664,7 +735,7 @@ def _rebuild_pmu_connection(self, pmu: PMU): self.log.debug(f"(re)initialized PMU {str(pmu)}") - def _poll_pmu(self, pmu: PMU): + def _poll_pmu(self, pmu: PMU) -> None: """ Continually polls for data from a PMU and updates self.current_values. @@ -697,18 +768,27 @@ def _poll_pmu(self, pmu: PMU): self.frame_queue.put((pmu, data_frame)) - def _data_writer(self): + def _data_writer(self) -> None: """ Handles processing of PMU data to avoid blocking the PMU polling thread. """ self.log.info("Starting data writer thread") while True: - # print(f"frame queue: {self.frame_queue.qsize()}") - # This will block until there is an item to process pmu, frame = self.frame_queue.get() - sceptre_ts = datetime.utcnow() + + # This is an epic hack to workaround a nasty time syncronization issue. + # Extensive documentation about this is on the wiki. + # tl;dr: due the 8 PMU connections being handled in independant threads, + # the sceptre_time will differ between the 8 PMUs for the same rtds_time. + with self.__lock: + if frame["time"] not in self.time_map: + self.time_map[frame["time"]] = utc_now() + sceptre_ts = self.time_map[frame["time"]] + + rtds_datetime = datetime.fromtimestamp(frame["time"], timezone.utc) + drift = abs((sceptre_ts - rtds_datetime).total_seconds()) * 1000.0 # float for mm in frame["measurements"]: if mm["stat"] != "ok": @@ -731,18 +811,18 @@ def _data_writer(self): # Save data to Elasticsearch if self.elastic_enabled: - rtds_datetime = datetime.utcfromtimestamp(frame["time"]) - rtds_ts = rtds_datetime.timestamp() - if rtds_ts != frame["time"]: - self.log.error(f"Timestamp {rtds_ts} (from {rtds_datetime}) != frame['time'] of {frame['time']}") - sys.exit(1) - es_bodies = [] + for ph_id, phasor in line["phasors"].items(): es_body = { - "@timestamp": rtds_datetime, + "@timestamp": sceptre_ts, "rtds_time": rtds_datetime, "sceptre_time": sceptre_ts, + "time_drift": drift, + "network": { + "protocol": "c37.118", + "transport": pmu.protocol, + }, "pmu": { "name": pmu.name, "label": pmu.label, @@ -767,7 +847,8 @@ def _data_writer(self): es_bodies.append(es_body) self.es_queue.put(es_bodies) - # Update global data structure with measurements + # Update global data structure with measurements. + # These are the values that get published to SCEPTRE. # # NOTE: since there are usually multiple threads querying from multiple PMUs # simultaneously, a lock mutex is used to ensure self.current_values doesn't @@ -812,7 +893,7 @@ def _data_writer(self): csv_row.append(v) pmu.csv_writer.write(csv_row) - def _elastic_pusher(self): + def _elastic_pusher(self) -> None: """ Thread that sends PMU data to Elasticsearch using the Elasticsearch Bulk API. """ @@ -821,23 +902,17 @@ def _elastic_pusher(self): # Only need to create these fields once, cache and copy into messages es_additions = { "ecs": { - "version": "8.8.0" + "version": "8.11.0", }, "agent": { "type": "rtds-sceptre-provider", - # "version": __version__ - "version": "0.0.0", + "version": self.bennu_version, }, "observer": { "hostname": platform.node(), "geo": { "timezone": str(datetime.now(timezone.utc).astimezone().tzinfo) - } - }, - "network": { - # TODO: if pushing gtnet-skt points to elastic, change these two fields - "protocol": "c37.118", - "transport": "tcp", + }, }, } @@ -849,6 +924,7 @@ def _elastic_pusher(self): "@timestamp": {"type": "date"}, "rtds_time": {"type": "date"}, "sceptre_time": {"type": "date"}, + "time_drift": {"type": "double"}, "event": {"properties": {"ingested": {"type": "date"}}}, "ecs": {"properties": {"version": {"type": "keyword"}}}, "agent": { @@ -903,34 +979,34 @@ def _elastic_pusher(self): raise RuntimeError("failed to connect to Elasticsearch") while True: - # print(f"elastic queue: {self.es_queue.qsize()}") # Batch up messages before sending them all in a single bulk request messages = [] while len(messages) < 48: # 8 PMUs * 6 channels # This blocks until there are messages messages.extend(self.es_queue.get()) - # TODO: there is potential for docs to be pushed to the wrong - # date index at the start or end of a day. - ts_now = datetime.utcnow() - index = f"{self.elastic_index_basename}-{ts_now.strftime('%Y.%m.%d')}" - - if index not in self._es_index_cache: - # check with elastic, could exist and not be in the cache - # if the provider was restarted part way through a day. - if not es.indices.exists(index=index): - self.log.info(f"Creating Elasticsearch index '{index}'...") - es.indices.create(index=index, body={ - "mappings": { - "properties": es_type_mapping - } - }) - self.log.info(f"Created Elasticsearch index '{index}'") - self._es_index_cache.add(index) - # Create list of docs to send using Elasticsearch's bulk API - actions = [ - { + ts_now = utc_now() + actions = [] + for message in messages: + # Use the message's timestamp to generate the index name. + # This ensures docs at start/end of a day get into the correct index. + index = f"{self.elastic_index_basename}-{message['@timestamp'].strftime('%Y.%m.%d')}" + + if index not in self._es_index_cache: + # check with elastic, could exist and not be in the cache + # if the provider was restarted part way through a day. + if not es.indices.exists(index=index): + self.log.info(f"Creating Elasticsearch index '{index}'...") + es.indices.create(index=index, body={ + "mappings": { + "properties": es_type_mapping + } + }) + self.log.info(f"Created Elasticsearch index '{index}'") + self._es_index_cache.add(index) + + action = { "_index": index, "_source": { **es_additions, @@ -938,8 +1014,8 @@ def _elastic_pusher(self): "event": {"ingested": ts_now}, } } - for message in messages - ] + + actions.append(action) try: result = helpers.bulk(es, actions, request_timeout=10) @@ -982,7 +1058,8 @@ def query(self) -> str: BUS6_VA.real BUS6_VA.angle """ - self.log.debug("Processing query request") + if self.debug: + self.log.debug("Processing query request") if not self.current_values: msg = "ERR=No data points have been read yet from the RTDS" @@ -997,7 +1074,8 @@ def query(self) -> str: return msg def read(self, tag: str) -> str: - self.log.debug(f"Processing read request for tag '{tag}'") + if self.debug: + self.log.debug(f"Processing read request for tag '{tag}'") if not self.current_values: msg = "ERR=Data points have not been initialized yet from the RTDS" @@ -1015,7 +1093,8 @@ def read(self, tag: str) -> str: return msg def write(self, tags: dict) -> str: - self.log.debug(f"Processing write request for tags: {tags}") + if self.debug: + self.log.debug(f"Processing write request for tags: {tags}") if not tags: msg = "ERR=No tags provided for write to RTDS" @@ -1041,7 +1120,8 @@ def write(self, tags: dict) -> str: self.log.error(f"{msg} (tags being written: {tags})") return msg - # Update the state tracker with the values from the tags being written + # Typecast values (e.g. bool to 0/1) + typecasted = {} for tag, val in tags.items(): # Validate types match what's in config, if they don't, then warn and typecast # NOTE: these will be strings if coming from pybennu-probe @@ -1058,15 +1138,18 @@ def write(self, tags: dict) -> str: val = "1" if self.gtnet_skt_tags[tag] == "int": - typecasted_value = int(val) + typecasted[tag] = int(val) else: - typecasted_value = float(val) + typecasted[tag] = float(val) + + if self.debug: + self.log.debug(f"Updating GTNET-SKT tags to: {typecasted}") - # NOTE: don't need to sort incoming values, since they're updating the - # dict (gtnet_skt_state), which is already in the proper order. - self.log.info(f"Updating GTNET-SKT tag '{tag}' to {typecasted_value} (previous value: {self.gtnet_skt_state[tag]})") - with self.__gtnet_lock: - self.gtnet_skt_state[tag] = typecasted_value + # Update the state tracker with the values from the tags being written. + # NOTE: don't need to sort incoming values, since they're updating the + # gtnet_skt_state dict, which is already in the proper order. + with self.__gtnet_lock: + self.gtnet_skt_state.update(typecasted) # If TCP, build and send, and retry connection if it fails # For UDP, the writer thread will handle sending the updated values @@ -1094,7 +1177,9 @@ def write(self, tags: dict) -> str: self.current_values.update(self.gtnet_skt_state) msg = f"ACK=Wrote {len(tags)} tags to RTDS via GTNET-SKT" - self.log.debug(msg) + + if self.debug: + self.log.debug(msg) return msg @@ -1109,7 +1194,7 @@ def _gtnet_build_payload(self) -> bytes: return struct.pack(self.struct_format_string, *values) - def _gtnet_continuous_write(self): + def _gtnet_continuous_write(self) -> None: """ Continuously write values to RTDS via GTNET-SKT using UDP protocol. @@ -1126,7 +1211,7 @@ def _gtnet_continuous_write(self): self.__gtnet_socket.sendto(payload, target) sleep(sleep_for) - def _gtnet_init_socket(self): + def _gtnet_init_socket(self) -> None: """ Initialize TCP or UDP socket, and if TCP connection fails, retry until it's successful. """ @@ -1153,7 +1238,7 @@ def _gtnet_init_socket(self): else: self.__gtnet_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - def _gtnet_reset_socket(self): + def _gtnet_reset_socket(self) -> None: if self.__gtnet_socket: try: self.__gtnet_socket.close() @@ -1161,7 +1246,7 @@ def _gtnet_reset_socket(self): pass self.__gtnet_socket = None - def periodic_publish(self): + def periodic_publish(self) -> None: """ Publish all tags periodically. diff --git a/src/pybennu/setup.py b/src/pybennu/setup.py index 7d74667..b03137f 100644 --- a/src/pybennu/setup.py +++ b/src/pybennu/setup.py @@ -102,7 +102,7 @@ def run(self): 'numpy>=1.11.2', # >=1.11.2 ~=1.21.2 'opendssdirect.py~=0.6.1', 'py-expression-eval~=0.3.14', - 'PYPOWER>=5.0.1', # ==5.1.15 + 'PYPOWER>=5.1.16', # ==5.1.15 'pyserial>=3.4', # >=3.4 'PyYAML>=3.12', # pyyaml>=3.12 ==5.4.1 'requests>=2.20', # ~=2.26.0