Skip to content
Snippets Groups Projects
upload.py 1.91 KiB
Newer Older
Christoph Alt's avatar
Christoph Alt committed
import logging
Christoph Alt's avatar
Christoph Alt committed
import pprint
from dataclasses import dataclass

import dotenv
from influxdb import InfluxDBClient

Christoph Alt's avatar
Christoph Alt committed
logger = logging.getLogger(__file__)

MISSING_DB_PW = """
Password for the InfluxDB write_user was not set.
See https://docs.gitlab.com/ee/ci/variables/#secret-variables
"""


def load_config_from_env(env_path: str = ".env"):
    if os.path.exists(env_path):
        dotenv.load_dotenv(env_path)
Christoph Alt's avatar
Christoph Alt committed
    return DBConfig(
        host=os.environ["INFLUXDB_HOST"],
        port=os.environ["INFLUXDB_PORT"],
        user_name=os.environ["INFLUXDB_USER_NAME"],
        database=os.environ["INFLUXDB_DATABASE"],
        write_user_pw=os.environ["INFLUXDB_WRITE_USER_PASSWORD"]
    )


@dataclass
class DBConfig:
    """
    Configclass that stores the information for accessing the Database.
    """
    host: str
    port: int
    user_name: str
    database: str
    write_user_pw: str


class Uploader:
    def __init__(self, config: DBConfig = None):
        if config is None:
            config = load_config_from_env()

        self.config = config
Christoph Alt's avatar
Christoph Alt committed

        self.client = InfluxDBClient(
            host=config.host,
            port=config.port,
            username=config.user_name,
            password=config.write_user_pw,
            database=config.database,
        )

    def upload(self, points, dry_run=False, *, time_precision='s', **kwargs):
Christoph Alt's avatar
Christoph Alt committed
        logger.info(f"Uploading: {pprint.pformat(points)}")
        if (common_tags := kwargs.get("tags")):
            logger.info(f"with common tags: {pprint.pformat(common_tags)}")

Christoph Alt's avatar
Christoph Alt committed
        if not dry_run:
            success = self.client.write_points(points,
                                               time_precision=time_precision,
Christoph Alt's avatar
Christoph Alt committed
                                               **kwargs)
            if success:
                logger.info(f"Uploaded {len(points)} items")
            else:
                raise ValueError("Uploading to influxdb went wrong!")