Newer
Older
import os
import logging
import dotenv
import pprint
from influxdb import InfluxDBClient
from dataclasses import dataclass
logger = logging.getLogger(__file__)
MISSING_DB_PW = """
Password for the InfluxDB write_user was not set.
See https://docs.gitlab.com/ee/ci/variables/#secret-variables
"""
def load_config_from_env(env_path: str = ".env"):
if os.path.exists(env_path):
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
return DBConfig(
host=os.environ["INFLUXDB_HOST"],
port=os.environ["INFLUXDB_PORT"],
user_name=os.environ["INFLUXDB_USER_NAME"],
database=os.environ["INFLUXDB_DATABASE"],
write_user_pw=os.environ["INFLUXDB_WRITE_USER_PASSWORD"]
)
@dataclass
class DBConfig:
"""
Configclass that stores the information for accessing the Database.
"""
host: str
port: int
user_name: str
database: str
write_user_pw: str
class Uploader:
def __init__(self, config: DBConfig = load_config_from_env()):
self.config = config
self.client = InfluxDBClient(
host=config.host,
port=config.port,
username=config.user_name,
password=config.write_user_pw,
database=config.database,
)
def upload(self, points, dry_run=False, *args, **kwargs):
logger.info(f"Uploading: {pprint.pformat(points)}")
if not dry_run:
success = self.client.write_points(points,
*args,
**kwargs)
if success:
logger.info(f"Uploaded {len(points)} items")
else:
raise ValueError("Uploading to influxdb went wrong!")