Skip to content
Snippets Groups Projects
upload.py 1.69 KiB
Newer Older
Christoph Alt's avatar
Christoph Alt committed
import os
import logging
import dotenv
import pprint
from influxdb import InfluxDBClient
from dataclasses import dataclass

logger = logging.getLogger(__file__)

MISSING_DB_PW = """
Password for the InfluxDB write_user was not set.
See https://docs.gitlab.com/ee/ci/variables/#secret-variables
"""


def load_config_from_env(env_path: str = ".env"):
    if os.path.exists(env_path):
        dotenv.load_dotenv(env_path)
Christoph Alt's avatar
Christoph Alt committed
    return DBConfig(
        host=os.environ["INFLUXDB_HOST"],
        port=os.environ["INFLUXDB_PORT"],
        user_name=os.environ["INFLUXDB_USER_NAME"],
        database=os.environ["INFLUXDB_DATABASE"],
        write_user_pw=os.environ["INFLUXDB_WRITE_USER_PASSWORD"]
    )


@dataclass
class DBConfig:
    """
    Configclass that stores the information for accessing the Database.
    """
    host: str
    port: int
    user_name: str
    database: str
    write_user_pw: str


class Uploader:
    def __init__(self, config: DBConfig = load_config_from_env()):
        self.config = config

        self.client = InfluxDBClient(
            host=config.host,
            port=config.port,
            username=config.user_name,
            password=config.write_user_pw,
            database=config.database,
        )

    def upload(self, points, dry_run=False, *args, **kwargs):
        logger.info(f"Uploading: {pprint.pformat(points)}")
        if not dry_run:
            success = self.client.write_points(points,
                                               *args,
                                               **kwargs)
            if success:
                logger.info(f"Uploaded {len(points)} items")
            else:
                raise ValueError("Uploading to influxdb went wrong!")