Skip to content
Snippets Groups Projects
Commit f484cb31 authored by Christoph Alt's avatar Christoph Alt
Browse files

added a function to build data points

parent 9f0b36ea
No related merge requests found
Pipeline #41898 passed with stage
in 36 seconds
......@@ -2,4 +2,4 @@ from .upload import DBConfig, Uploader
from .processing_functions import mesa_pd_text
from .postprocessing import *
from .util import read_file_line_wise
from .data_points import DataPoint
from .data_points import DataPoint, data_point_factory
......@@ -11,3 +11,31 @@ class DataPoint:
def asdict(self):
return asdict(self)
def data_point_factory(run, *,
time_key,
measurement_name,
field_keys: set(),
tag_keys: set() = None,
no_tag_keys: set() = None,
) -> DataPoint:
"""
Returns a data point from a dict.
Parameters:
run(dict): dict from that the data are extracted
time_key(str): key to extract timestamp
tag_keys(set): explicitly define tags
no_tag_key(set): define what are not tags, the tag_keys are inferred automatically
"""
if tag_keys is None and no_tag_keys is None:
raise ValueError("You need either specify tag_keys or no_tag_keys")
fields = {key: run[key] for key in field_keys}
if tag_keys is None:
tag_keys = run.keys() - no_tag_keys - field_keys - {time_key}
tags = {key: run[key]for key in tag_keys}
time = run.get(time_key)
return DataPoint(measurement_name, time, fields=fields, tags=tags)
from .plain_text import process_linewise
from .plain_text import process_linewise, iterate_csv
from .sqlite import sqlite_context, query_complete_table, build_iterate_query, iterate_all_tables
from .sqlite_helper import query_builder
from cbutil.data_points import data_point_factory
import time
test_dict = {
"timestamp": int(time.time()),
"tag_1key": "tag_1value",
"tag_2key": "tag_2value",
"field_1key": "field_1value",
"field_2key": "field_2value",
"neither_1key": "neither_1value",
}
def test_data_point_factory():
no_tag_keys = {"neither_1key"}
field_keys = {"field_1key", "field_2key"}
dp = data_point_factory(test_dict,
time_key="timestamp",
measurement_name="name",
field_keys=field_keys,
no_tag_keys=no_tag_keys)
assert dp.time == test_dict["timestamp"]
assert dp.measurement == "name"
assert dp.tags == {k: v for k, v in test_dict.items() if k.startswith("tag_")}
assert dp.fields == {k: v for k, v in test_dict.items() if k.startswith("field_")}
def test_data_point_factory_explicit():
tag_keys = {"tag_1key", "tag_2key"}
field_keys = {"field_1key", "field_2key"}
dp = data_point_factory(test_dict,
time_key="timestamp",
measurement_name="name",
field_keys=field_keys,
tag_keys=tag_keys)
assert dp.time == test_dict["timestamp"]
assert dp.measurement == "name"
assert dp.tags == {k: v for k, v in test_dict.items() if k.startswith("tag_")}
assert dp.fields == {k: v for k, v in test_dict.items() if k.startswith("field_")}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment