from dataclasses import dataclass, asdict from .util import time_conversion import logging logger = logging.getLogger(__file__) @dataclass class DataPoint: """Represents a single Datapoint from the InfluxDB perspective.""" measurement: str time: int fields: dict tags: dict def asdict(self): return asdict(self) def data_point_factory(run, *, time_key, measurement_name, field_keys: set(), tag_keys: set() = None, no_tag_keys: set() = None, ) -> DataPoint: """ Returns a data point from a dict. Parameters: run(dict): dict from that the data are extracted time_key(str): key to extract timestamp tag_keys(set): explicitly define tags no_tag_key(set): define what are not tags, the tag_keys are inferred automatically """ if tag_keys is None and no_tag_keys is None: raise ValueError("You need either specify tag_keys or no_tag_keys") fields = {key: run[key] for key in field_keys} if tag_keys is None: tag_keys = run.keys() - no_tag_keys - field_keys - {time_key} tags = {key: run[key]for key in tag_keys} time = run[time_key] try: time = time_conversion(time) except ValueError: logger.warn(f"{time} could not be transformed with the current pattern") pass return DataPoint(measurement_name, time, fields=fields, tags=tags)