Skip to content
Snippets Groups Projects
Commit d404c71c authored by Christoph Alt's avatar Christoph Alt
Browse files

added util to read the job infos

parent 69a74514
No related merge requests found
Pipeline #42911 passed with stages
in 25 seconds
......@@ -3,3 +3,4 @@ from .processing_functions import mesa_pd_text
from .postprocessing import *
from .util import read_file_line_wise, time_conversion
from .data_points import DataPoint, data_point_factory
from .get_job_info import get_url_from_env, get_job_datapoints
from dataclasses import dataclass, asdict
from .util import time_conversion
import logging
from dataclasses import asdict, dataclass
from .util import time_conversion
logger = logging.getLogger(__file__)
......
import logging
import os
import dotenv
import requests
from cbutil.data_points import DataPoint, data_point_factory
logger = logging.getLogger(__file__)
def load_from_env(env_path=".env"):
if os.path.exists(env_path):
dotenv.load_dotenv(env_path)
return os.environ["CI_API_V4_URL"], os.environ["CI_PROJECT_ID"], os.environ["CI_PIPELINE_ID"]
def get_url_from_env() -> str:
base_url, project_id, pipeline_id = load_from_env()
logging.info(f'Loaded from env {base_url} {project_id}{pipeline_id}')
return get_api_url_pipelines(base_url, project_id, pipeline_id)
def get_api_url_pipelines(base_url: str, project_id: int, pipeline_id: int):
return f"{base_url}/projects/{project_id}/pipelines/{pipeline_id}/jobs"
def get_job_info(url: str):
jobs = requests.get(url)
for job in jobs.json():
yield job
def create_job_datapoint(job: dict) -> DataPoint:
return data_point_factory(job,
time_key='finished_at',
measurement_name="JOB_INFOS",
field_keys=['duration', 'queued_duration'],
tag_keys=['id', 'name'])
def get_job_datapoints(url):
for job in get_job_info(url):
yield create_job_datapoint(job)
......@@ -56,10 +56,15 @@ def file_time_to_sec(file_path) -> int:
return int(os.path.getmtime(file_path))
def time_conversion(time_stamp, *, pattern="%Y-%m-%d %H:%M:%S"):
def time_conversion(time_stamp, *, pattern="%Y-%m-%d %H:%M:%S%z"):
try:
return int(time_stamp)
except ValueError as e:
logger.exception(e)
try:
dt = datetime.strptime(time_stamp, pattern)
except ValueError as e:
logger.exception(e)
dt = datetime.fromisoformat(time_stamp)
return int(datetime.strptime(time_stamp, pattern).timestamp())
return int(dt.timestamp())
......@@ -14,6 +14,7 @@ setup(name="cb-util",
"influxdb",
"gitpython",
"grafanalib",
"requests",
],
setup_requires=['pytest-runner'],
tests_require=['pytest']
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment