Newer
Older
from dataclasses import dataclass, asdict
@dataclass
class DataPoint:
"""Represents a single Datapoint from the InfluxDB perspective."""
measurement: str
time: int
def data_point_factory(run, *,
time_key,
measurement_name,
field_keys: set(),
tag_keys: set() = None,
no_tag_keys: set() = None,
) -> DataPoint:
"""
Returns a data point from a dict.
Parameters:
run(dict): dict from that the data are extracted
time_key(str): key to extract timestamp
tag_keys(set): explicitly define tags
no_tag_key(set): define what are not tags, the tag_keys are inferred automatically
"""
if tag_keys is None and no_tag_keys is None:
raise ValueError("You need either specify tag_keys or no_tag_keys")
fields = {key: run[key] for key in field_keys}
if tag_keys is None:
tag_keys = run.keys() - no_tag_keys - field_keys - {time_key}
tags = {key: run[key]for key in tag_keys}
time = run.get(time_key)
return DataPoint(measurement_name, time, fields=fields, tags=tags)