import csv import json from collections import abc from typing import Callable, Iterable from cbutil.util import file_time_to_sec def process_linewise(func: Callable, lines: Iterable): """ Iterate over lines and apply func on each """ for line in lines: try: yield func(line) except ValueError: pass def iterate_csv(path, time_key="timestamp"): file_time = file_time_to_sec(path) for row in CSVIterator(path): row[time_key] = row.get(time_key, file_time) yield row class CSVIterator(abc.Iterable): def __init__(self, file_name: str): self.file_name = file_name def __iter__(self): with open(self.file_name, "r") as in_file: for row in csv.DictReader(in_file): yield row def json2dict(path: str) -> dict: with open(path, "r") as json_file: return json.load(json_file)