Newer
Older
import requests
from cbutil.data_points import DataPoint, data_point_factory
logger = logging.getLogger(__file__)
def load_from_env(env_path=".env"):
if os.path.exists(env_path):
dotenv.load_dotenv(env_path)
return os.environ["CI_API_V4_URL"], os.environ["CI_PROJECT_ID"], os.environ["CI_PIPELINE_ID"]
def get_url_from_env() -> str:
base_url, project_id, pipeline_id = load_from_env()
logging.info(f'Loaded from env {base_url} {project_id} {pipeline_id}')
return get_api_url_pipelines(base_url, project_id, pipeline_id)
def get_api_url_pipelines(base_url: str, project_id: int, pipeline_id: int):
return f"{base_url}/projects/{project_id}/pipelines/{pipeline_id}/jobs"
def get_job_info(url: str):
next_url = url
while True:
logger.info(f"reqeuesting {next_url}")
jobs = requests.get(next_url)
if jobs.status_code != 200:
jobs.raise_for_status()
for job in jobs.json():
yield job
if (next_page := jobs.headers['x-next-page']):
next_url = f"{url}?page={next_page}"
else:
break
def create_job_datapoint(job: dict) -> DataPoint:
return data_point_factory(job,
time_key='finished_at',
measurement_name="JOB_INFOS",
field_keys=['duration', 'queued_duration'],
tag_keys=['id', 'name'])
def get_job_datapoints(url):
for job in get_job_info(url):