import os import logging import dotenv import pprint from influxdb import InfluxDBClient from dataclasses import dataclass logger = logging.getLogger(__file__) MISSING_DB_PW = """ Password for the InfluxDB write_user was not set. See https://docs.gitlab.com/ee/ci/variables/#secret-variables """ def load_config_from_env(env_path: str = ".env"): if os.path.exists(env_path): dotenv.load_dotenv(env_path) return DBConfig( host=os.environ["INFLUXDB_HOST"], port=os.environ["INFLUXDB_PORT"], user_name=os.environ["INFLUXDB_USER_NAME"], database=os.environ["INFLUXDB_DATABASE"], write_user_pw=os.environ["INFLUXDB_WRITE_USER_PASSWORD"] ) @dataclass class DBConfig: """ Configclass that stores the information for accessing the Database. """ host: str port: int user_name: str database: str write_user_pw: str class Uploader: def __init__(self, config: DBConfig = load_config_from_env()): self.config = config self.client = InfluxDBClient( host=config.host, port=config.port, username=config.user_name, password=config.write_user_pw, database=config.database, ) def upload(self, points, dry_run=False, *args, **kwargs): logger.info(f"Uploading: {pprint.pformat(points)}") if not dry_run: success = self.client.write_points(points, *args, **kwargs) if success: logger.info(f"Uploaded {len(points)} items") else: raise ValueError("Uploading to influxdb went wrong!")