"README.md" did not exist on "cd8a9329b2fb99432fe1b3e69c2af16f4dfc51fb"
Newer
Older
from .util import time_conversion
import logging
logger = logging.getLogger(__file__)
@dataclass
class DataPoint:
"""Represents a single Datapoint from the InfluxDB perspective."""
measurement: str
time: int
def data_point_factory(run, *,
time_key,
measurement_name,
field_keys: set(),
tag_keys: set() = None,
no_tag_keys: set() = None,
) -> DataPoint:
"""
Returns a data point from a dict.
Parameters:
run(dict): dict from that the data are extracted
time_key(str): key to extract timestamp
tag_keys(set): explicitly define tags
no_tag_key(set): define what are not tags, the tag_keys are inferred automatically
"""
if tag_keys is None and no_tag_keys is None:
raise ValueError("You need either specify tag_keys or no_tag_keys")
fields = {key: float(run[key]) for key in field_keys}
if tag_keys is None:
tag_keys = run.keys() - no_tag_keys - field_keys - {time_key}
tags = {key: run[key]for key in tag_keys}
time = time_conversion(run[time_key])
return DataPoint(measurement_name, time, fields=fields, tags=tags)