Newer
Older
from collections import abc
from typing import Callable, Iterable
def process_linewise(func: Callable, lines: Iterable):
""" Iterate over lines and apply func on each """
for line in lines:
try:
yield func(line)
except ValueError:
pass
def iterate_csv(path, time_key="timestamp"):
file_time = file_time_to_sec(path)
for row in CSVIterator(path):
row[time_key] = row.get(time_key, file_time)
yield row
class CSVIterator(abc.Iterable):
def __init__(self, file_name: str):
self.file_name = file_name
def __iter__(self):
with open(self.file_name, "r") as in_file:
for row in csv.DictReader(in_file):
yield row
def json2dict(path: str) -> dict:
with open(path, "r") as json_file:
return json.load(json_file)