Skip to content
Snippets Groups Projects
plain_text.py 578 B
Newer Older
Christoph Alt's avatar
Christoph Alt committed
from typing import Callable, Iterable
import csv
from cbutil.util import file_time_to_sec
Christoph Alt's avatar
Christoph Alt committed


def process_linewise(func: Callable, lines: Iterable):
    """ Iterate over lines and apply func on each """
    for line in lines:
        try:
            yield func(line)
        except ValueError:
            pass


def iterate_csv(path, time_key="timestamp"):
    file_time = file_time_to_sec(path)

Christoph Alt's avatar
Christoph Alt committed
    with open(path, "r") as in_file:
        for row in csv.DictReader(in_file):
            if time_key not in row:
                row[time_key] = file_time
Christoph Alt's avatar
Christoph Alt committed
            yield row