Skip to content
Snippets Groups Projects
get_job_info.py 1.6 KiB
Newer Older
import logging
import os

import dotenv
import requests

from cbutil.data_points import DataPoint, data_point_factory

logger = logging.getLogger(__file__)


def load_from_env(env_path=".env"):
    if os.path.exists(env_path):
        dotenv.load_dotenv(env_path)
    return os.environ["CI_API_V4_URL"], os.environ["CI_PROJECT_ID"], os.environ["CI_PIPELINE_ID"]


def get_url_from_env() -> str:
    base_url, project_id, pipeline_id = load_from_env()
    logging.info(f'Loaded from env {base_url} {project_id} {pipeline_id}')
    return get_api_url_pipelines(base_url, project_id, pipeline_id)


def get_api_url_pipelines(base_url: str, project_id: int, pipeline_id: int):
    return f"{base_url}/projects/{project_id}/pipelines/{pipeline_id}/jobs"


def get_job_info(url: str):
    next_url = url
    while True:
        logger.info(f"reqeuesting {next_url}")
        jobs = requests.get(next_url)
        if jobs.status_code != 200:
            jobs.raise_for_status()
        for job in jobs.json():
            yield job
        if (next_page := jobs.headers['x-next-page']):
            next_url = f"{url}?page={next_page}"
        else:
            break


def create_job_datapoint(job: dict) -> DataPoint:
    return data_point_factory(job,
                              time_key='finished_at',
                              measurement_name="JOB_INFOS",
                              field_keys=['duration', 'queued_duration'],
                              tag_keys=['id', 'name'])


def get_job_datapoints(url):
    for job in get_job_info(url):
Christoph Alt's avatar
Christoph Alt committed
        if job['status'] == 'success':
            yield create_job_datapoint(job)