utils.py 8.04 KB
Newer Older
1
import os
2
import itertools
Martin Bauer's avatar
Martin Bauer committed
3
from collections import Counter
4
from contextlib import contextmanager
Martin Bauer's avatar
Martin Bauer committed
5
from tempfile import NamedTemporaryFile
Martin Bauer's avatar
Martin Bauer committed
6
7
from typing import Mapping

Martin Bauer's avatar
Martin Bauer committed
8
import numpy as np
Martin Bauer's avatar
Martin Bauer committed
9
import sympy as sp
Martin Bauer's avatar
Martin Bauer committed
10

Martin Bauer's avatar
Martin Bauer committed
11
12
13
14
15
16

class DotDict(dict):
    """Normal dict with additional dot access for all keys"""
    __getattr__ = dict.get
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__
17

18
19
20
21
22
23
24
    # Recursively make DotDict: https://stackoverflow.com/questions/13520421/recursive-dotdict
    def __init__(self, dct={}):
        for key, value in dct.items():
            if isinstance(value, dict):
                value = DotDict(value)
            self[key] = value

25

Martin Bauer's avatar
Martin Bauer committed
26
def all_equal(iterator):
27
28
29
30
31
32
    iterator = iter(iterator)
    try:
        first = next(iterator)
    except StopIteration:
        return True
    return all(first == rest for rest in iterator)
Martin Bauer's avatar
Martin Bauer committed
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51


def recursive_dict_update(d, u):
    """Updates the first dict argument, using second dictionary recursively.

    Examples:
        >>> d = {'sub_dict': {'a': 1, 'b': 2}, 'outer': 42}
        >>> u = {'sub_dict': {'a': 5, 'c': 10}, 'outer': 41, 'outer2': 43}
        >>> recursive_dict_update(d, u)
        {'sub_dict': {'a': 5, 'b': 2, 'c': 10}, 'outer': 41, 'outer2': 43}
    """
    d = d.copy()
    for k, v in u.items():
        if isinstance(v, Mapping):
            r = recursive_dict_update(d.get(k, {}), v)
            d[k] = r
        else:
            d[k] = u[k]
    return d
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80


@contextmanager
def file_handle_for_atomic_write(file_path):
    """Open temporary file object that atomically moves to destination upon exiting.

    Allows reading and writing to and from the same filename.
    The file will not be moved to destination in case of an exception.

    Args:
        file_path: path to file to be opened
    """
    target_folder = os.path.dirname(os.path.abspath(file_path))
    with NamedTemporaryFile(delete=False, dir=target_folder, mode='w') as f:
        try:
            yield f
        finally:
            f.flush()
            os.fsync(f.fileno())
    os.rename(f.name, file_path)


@contextmanager
def atomic_file_write(file_path):
    target_folder = os.path.dirname(os.path.abspath(file_path))
    with NamedTemporaryFile(delete=False, dir=target_folder) as f:
        f.file.close()
        yield f.name
    os.rename(f.name, file_path)
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96


def fully_contains(l1, l2):
    """Tests if elements of sequence 1 are in sequence 2 in same or higher number.

    >>> fully_contains([1, 1, 2], [1, 2])  # 1 is only present once in second list
    False
    >>> fully_contains([1, 1, 2], [1, 1, 4, 2])
    True
    """
    l1_counter = Counter(l1)
    l2_counter = Counter(l2)
    for element, count in l1_counter.items():
        if l2_counter[element] < count:
            return False
    return True
Martin Bauer's avatar
Martin Bauer committed
97
98


Martin Bauer's avatar
Martin Bauer committed
99
def boolean_array_bounding_box(boolean_array):
100
101
102
103
104
105
106
107
108
109
    """Returns bounding box around "true" area of boolean array

    >>> a = np.zeros((4, 4), dtype=bool)
    >>> a[1:-1, 1:-1] = True
    >>> boolean_array_bounding_box(a)
    [(1, 3), (1, 3)]
    """
    dim = boolean_array.ndim
    shape = boolean_array.shape
    assert 0 not in shape, "Shape must not contain zero"
Martin Bauer's avatar
Martin Bauer committed
110
    bounds = []
111
112
113
114
    for ax in itertools.combinations(reversed(range(dim)), dim - 1):
        nonzero = np.any(boolean_array, axis=ax)
        t = np.where(nonzero)[0][[0, -1]]
        bounds.append((t[0], t[1] + 1))
Martin Bauer's avatar
Martin Bauer committed
115
116
117
    return bounds


Martin Bauer's avatar
Martin Bauer committed
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class LinearEquationSystem:
    """Symbolic linear system of equations - consisting of matrix and right hand side.

    Equations can be added incrementally. System is held in reduced row echelon form to quickly determine if
    system has a single, multiple, or no solution.

    Example:
        >>> x, y= sp.symbols("x, y")
        >>> les = LinearEquationSystem([x, y])
        >>> les.add_equation(x - y - 3)
        >>> les.solution_structure()
        'multiple'
        >>> les.add_equation(x + y - 4)
        >>> les.solution_structure()
        'single'
        >>> les.solution()
        {x: 7/2, y: 1/2}

    """
    def __init__(self, unknowns):
        size = len(unknowns)
        self._matrix = sp.zeros(size, size + 1)
        self.unknowns = unknowns
        self.next_zero_row = 0
Martin Bauer's avatar
Martin Bauer committed
142
        self._reduced = True
Martin Bauer's avatar
Martin Bauer committed
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166

    def copy(self):
        """Returns a copy of the equation system."""
        new = LinearEquationSystem(self.unknowns)
        new._matrix = self._matrix.copy()
        new.next_zero_row = self.next_zero_row
        return new

    def add_equation(self, linear_equation):
        """Add a linear equation as sympy expression. Implicit "-0" is assumed. Equation has to be linear and contain
        only unknowns passed to the constructor otherwise a ValueError is raised. """
        self._resize_if_necessary()
        linear_equation = linear_equation.expand()
        zero_row_idx = self.next_zero_row
        self.next_zero_row += 1

        control = 0
        for i, unknown in enumerate(self.unknowns):
            self._matrix[zero_row_idx, i] = linear_equation.coeff(unknown)
            control += unknown * self._matrix[zero_row_idx, i]
        rest = linear_equation - control
        if rest.atoms(sp.Symbol):
            raise ValueError("Not a linear equation in the unknowns")
        self._matrix[zero_row_idx, -1] = -rest
Martin Bauer's avatar
Martin Bauer committed
167
        self._reduced = False
Martin Bauer's avatar
Martin Bauer committed
168
169
170
171
172
173
174
175
176
177
178
179
180

    def add_equations(self, linear_equations):
        """Add a sequence of equations. For details see `add_equation`. """
        self._resize_if_necessary(len(linear_equations))
        for eq in linear_equations:
            self.add_equation(eq)

    def set_unknown_zero(self, unknown_idx):
        """Sets an unknown to zero - pass the index not the variable itself!"""
        assert unknown_idx < len(self.unknowns)
        self._resize_if_necessary()
        self._matrix[self.next_zero_row, unknown_idx] = 1
        self.next_zero_row += 1
Martin Bauer's avatar
Martin Bauer committed
181
        self._reduced = False
Martin Bauer's avatar
Martin Bauer committed
182
183
184

    def reduce(self):
        """Brings the system in reduced row echelon form."""
Martin Bauer's avatar
Martin Bauer committed
185
186
        if self._reduced:
            return
Martin Bauer's avatar
Martin Bauer committed
187
188
        self._matrix = self._matrix.rref()[0]
        self._update_next_zero_row()
Martin Bauer's avatar
Martin Bauer committed
189
        self._reduced = True
Martin Bauer's avatar
Martin Bauer committed
190
191
192
193
194

    @property
    def matrix(self):
        """Return a matrix that represents the equation system.
        Has one column more than unknowns for the affine part."""
Martin Bauer's avatar
Martin Bauer committed
195
        self.reduce()
Martin Bauer's avatar
Martin Bauer committed
196
197
        return self._matrix

Martin Bauer's avatar
Martin Bauer committed
198
199
200
201
202
    @property
    def rank(self):
        self.reduce()
        return self.next_zero_row

Martin Bauer's avatar
Martin Bauer committed
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
    def solution_structure(self):
        """Returns either 'multiple', 'none' or 'single' to indicate how many solutions the system has."""
        self.reduce()
        non_zero_rows = self.next_zero_row
        num_unknowns = len(self.unknowns)
        if non_zero_rows == 0:
            return 'multiple'

        *row_begin, left, right = self._matrix.row(non_zero_rows - 1)
        if non_zero_rows > num_unknowns:
            return 'none'
        elif non_zero_rows == num_unknowns:
            if left == 0 and right != 0:
                return 'none'
            else:
                return 'single'
        elif non_zero_rows < num_unknowns:
            if right != 0 and left == 0 and all(e == 0 for e in row_begin):
                return 'none'
            else:
                return 'multiple'

    def solution(self):
        """Solves the system if it has a single solution. Returns a dictionary mapping symbol to solution value."""
        return sp.solve_linear_system(self._matrix, *self.unknowns)

    def _resize_if_necessary(self, new_rows=1):
        if self.next_zero_row + new_rows > self._matrix.shape[0]:
            self._matrix = self._matrix.row_insert(self._matrix.shape[0] + 1,
                                                   sp.zeros(new_rows, self._matrix.shape[1]))

    def _update_next_zero_row(self):
        result = self._matrix.shape[0]
        while result >= 0:
            row_to_check = result - 1
            if any(e != 0 for e in self._matrix.row(row_to_check)):
                break
            result -= 1
Martin Bauer's avatar
Martin Bauer committed
241
242
243
244
245
246
        self.next_zero_row = result


def find_unique_solutions_with_zeros(system: LinearEquationSystem):
    if not system.solution_structure() != 'multiple':
        raise ValueError("Function works only for underdetermined systems")