Newer
Older
from cbutil.data_points import data_point_factory
import time
test_dict = {
"timestamp": int(time.time()),
"tag_1key": "tag_1value",
"tag_2key": "tag_2value",
"neither_1key": "neither_1value",
}
def test_data_point_factory():
no_tag_keys = {"neither_1key"}
field_keys = {"field_1key", "field_2key"}
dp = data_point_factory(test_dict,
time_key="timestamp",
measurement_name="name",
field_keys=field_keys,
no_tag_keys=no_tag_keys)
assert dp.time == test_dict["timestamp"]
assert dp.measurement == "name"
assert dp.tags == {k: v for k, v in test_dict.items() if k.startswith("tag_")}
assert dp.fields == {k: v for k, v in test_dict.items() if k.startswith("field_")}
def test_data_point_factory_explicit():
tag_keys = {"tag_1key", "tag_2key"}
field_keys = {"field_1key", "field_2key"}
dp = data_point_factory(test_dict,
time_key="timestamp",
measurement_name="name",
field_keys=field_keys,
tag_keys=tag_keys)
assert dp.time == test_dict["timestamp"]
assert dp.measurement == "name"
assert dp.tags == {k: v for k, v in test_dict.items() if k.startswith("tag_")}
assert dp.fields == {k: v for k, v in test_dict.items() if k.startswith("field_")}