Newer
Older
import dotenv
from influxdb import InfluxDBClient
logger = logging.getLogger(__file__)
MISSING_DB_PW = """
Password for the InfluxDB write_user was not set.
See https://docs.gitlab.com/ee/ci/variables/#secret-variables
"""
def load_config_from_env(env_path: str = ".env"):
if os.path.exists(env_path):
return DBConfig(
host=os.environ["INFLUXDB_HOST"],
port=os.environ["INFLUXDB_PORT"],
user_name=os.environ["INFLUXDB_USER_NAME"],
database=os.environ["INFLUXDB_DATABASE"],
write_user_pw=os.environ["INFLUXDB_WRITE_USER_PASSWORD"]
)
@dataclass
class DBConfig:
"""
Configclass that stores the information for accessing the Database.
"""
host: str
port: int
user_name: str
database: str
write_user_pw: str
class Uploader:
def __init__(self, config: DBConfig = None):
if config is None:
config = load_config_from_env()
self.config = config
self.client = InfluxDBClient(
host=config.host,
port=config.port,
username=config.user_name,
password=config.write_user_pw,
database=config.database,
)
def upload(self, points, dry_run=False, *, time_precision='s', **kwargs):
if (common_tags := kwargs.get("tags")):
logger.info(f"with common tags: {pprint.pformat(common_tags)}")
if not dry_run:
success = self.client.write_points(points,
time_precision=time_precision,
**kwargs)
if success:
logger.info(f"Uploaded {len(points)} items")
else:
raise ValueError("Uploading to influxdb went wrong!")