Iterators and Generators¶
Iterators¶
# From Standard Library I
from collections import namedtuple
_Entry = namedtuple('_Entry', ['priority', 'value'])
# do not inherit from MinHeap -> liskov substitution principle
class MinHeapWithSeparatePriority:
def __init__(self, iterable=None):
self._buffer = []
self._index = {}
if iterable is None:
return
for value, priority in iterable:
self.add(priority, value)
def add(self, priority, value):
try:
existing_index = self._index[value]
except KeyError:
self._buffer.append(_Entry(priority, value))
last_index = len(self._buffer) - 1
self._index[value] = last_index
self._sift_up(last_index)
else:
old_prio = self._buffer[existing_index].priority
self._buffer[existing_index] = _Entry(priority, value)
if old_prio < priority:
self._sift_down(existing_index)
else:
self._sift_up(existing_index)
def extract_min(self):
if not self._buffer:
raise IndexError("Extract from empty heap")
minimum = self._buffer[0].value
self._remove(0)
return minimum
def remove(self, value):
try:
existing_index = self._index[value]
except KeyError:
pass
else:
self._remove(existing_index)
def _remove(self, index):
value_at_index = self._buffer[index].value
self._swap(index, -1)
self._buffer.pop()
del self._index[value_at_index]
self._sift_down(index)
def _sift_up(self, idx):
while idx > 0:
parent = (idx - 1) // 2
if not (self._buffer[idx].priority < self._buffer[parent].priority):
break
self._swap(idx, parent)
idx = parent
def _sift_down(self, idx):
while True:
left_child = idx * 2 + 1
if left_child >= len(self._buffer):
break
smaller_child = left_child
right_child = left_child + 1
if right_child < len(self._buffer):
if self._buffer[right_child].priority < self._buffer[left_child].priority:
smaller_child = right_child
if self._buffer[smaller_child].priority < self._buffer[idx].priority:
self._swap(idx, smaller_child)
else:
break
def _swap(self, i, j):
self._buffer[i], self._buffer[j] = self._buffer[j], self._buffer[i]
self._index[self._buffer[i]] = i
self._index[self._buffer[j]] = j
# see iterators chapter
def __iter__(self):
# return _MinHeapWithSeparatePriorityIterator(self)
for entry in self._buffer:
yield entry.value
# len, bool, str, etc.
some_list = ["some", "strings", "for", "testing"]
some_set = {"some", "strings", "for", "testing"}
# list iteration
for entry in some_list:
print(entry)
some
strings
for
testing
# set iteration
for entry in some_set:
print(entry)
testing
for
strings
some
it = iter(some_set)
it
<set_iterator at 0x7fea304619c0>
next(it)
'testing'
# spot the similarities to "for" iteration
it = iter(some_set)
try:
while True:
entry = next(it)
print(entry)
except StopIteration:
pass
testing
for
strings
some
# this is an example of a heap iterator (see above)
class _MinHeapWithSeparatePriorityIterator:
def __init__(self, heap):
self.heap = heap
# use index instead of iterators this time
self.index = 0
def __next__(self):
if self.index >= len(self.heap._buffer):
raise StopIteration
value = self.heap._buffer[self.index].value
self.index += 1
return value
heap = MinHeapWithSeparatePriority()
heap.add(3, "b")
heap.add(5, "c")
heap.add(1, "a")
heap.add(0, "c")
for entry in heap:
print(entry)
c
a
b
Generators¶
Generators use the iterator protocol to iterate over non-existent collections. Generators create values on the fly.
# difficult to write as list comprehension because of internal state
# start value can be set by the caller to allow for duck typing
def naive_inclusive_prefix_sum(iterable, start_value=0):
accum = start_value
result = []
for entry in iterable:
# do not use += since we need a copy each time
accum = accum + entry
result.append(accum)
return result
# list comprehension version (discouraged)
# assignment expressions are not covered in this course
# see https://www.python.org/dev/peps/pep-0572/
def naive_inclusive_prefix_sum(iterable, start_value=0):
accum = start_value
# this does not work as easily for exclusive prefix sums
return [(accum := accum + entry) for entry in iterable]
The naive solution forces us to store a separate list for the prefix sum, even when all entries are not needed at once!
# the iterator itself
class _InclusivePrefixSumIterator:
def __init__(self, base_iter, start_value):
self.base_iter = base_iter
self.accum = start_value
def __next__(self):
try:
entry = next(self.base_iter)
self.accum = self.accum + entry
return self.accum
except StopIteration:
raise
# object representing the prefix sum
# "range" works the same way
class inclusive_prefix_sum:
def __init__(self, iterable, start_value=0):
self.iterable = iterable
self.start_value = start_value
def __iter__(self):
it = iter(self.iterable)
return _InclusivePrefixSumIterator(it, self.start_value)
for i in inclusive_prefix_sum([1, 2, 3, 4]):
print(i)
1
3
6
10
# we can still obtain a list
list(inclusive_prefix_sum([1, 2, 3, 4]))
[1, 3, 6, 10]
# works with any iterable
list(inclusive_prefix_sum(range(1, 5)))
[1, 3, 6, 10]
# works even on itself
list(inclusive_prefix_sum(inclusive_prefix_sum(range(1, 5))))
[1, 4, 10, 20]
# duck typing is your friend
lists = [
[1, 2],
[3, 4],
[5]
]
list(inclusive_prefix_sum(lists, []))
[[1, 2], [1, 2, 3, 4], [1, 2, 3, 4, 5]]
import math
import functools
# representational constants
# see context managers chapter for details
FRACTIONAL, DECIMAL = range(2)
# this is a module-level global variable
# see context managers chapter for details
_STRING_REPRESENTATION = FRACTIONAL
# see context managers chapter for details
class string_style:
def __init__(self, style):
self._new_style = style
self._old_style = None
def __enter__(self):
global _STRING_REPRESENTATION
self._old_style = _STRING_REPRESENTATION
_STRING_REPRESENTATION = self._new_style
return self
def __exit__(self, tp, value, traceback):
global _STRING_REPRESENTATION
_STRING_REPRESENTATION = self._old_style
# re-raise exceptions
return False
# see decorators chapter for details on total_ordering
@functools.total_ordering
class MyFraction:
# default parameters help with conversion from integers
def __init__(self, num=0, denom=1):
# prevent illegal instances from being created
# use assert denom != 0 only if you trust the user
if denom == 0:
# do not print error messages
# print("ERROR")
# always raise exceptions
raise ValueError("Denominator cannot be zero.")
self._num = num
self._denom = denom
# if you forget this, equality test will not work consistently
self._cancel()
# we will improve this using properties later on
def get_num(self):
return self._num
def get_denom(self):
return self._denom
# __eq__ is required for equality checks
# for simplicity, this requires "other" to be a MyFraction object
# Python's built-in Fraction supports comparisons with ints and floats, too
def __eq__(self, other):
# delegate to tuple equality instead of bothering with
# return self._num == other._num and ...
return (self._num, self._denom) == (other._num, other._denom)
# delegate to __eq__ for consistency
def __ne__(self, other):
return not self.__eq__(other)
# without aggressive cancelling,
# MyFraction(1, 2) is not equal to MyFraction(2, 4)
# cancelling can only be guaranteed through encapsulation
def _cancel(self):
gcd = math.gcd(self._num, self._denom)
self._num //= gcd
self._denom //= gcd
def __lt__(self, other):
return self._num * other._denom < other._num * self._denom
# <= > >= generated by functools
# do not modify self or other, always create a new object!
def __add__(self, other):
new_num = self._num * other._denom + other._num * self._denom
new_denom = self._denom * other._denom
return MyFraction(new_num, new_denom)
# subtract and unary minus -> assignment
def __neg__(self):
return MyFraction(-self._num, self._denom)
def __sub__(self, other):
return self + (-other)
def __mul__(self, other):
return MyFraction(
self._num * other._num,
self._denom * other._denom
)
# divide, invert -> assignment
# also implement __radd__, ... if your addition supports types other than MyFraction
# without this, MyFraction cannot be used in sets and dicts
# this only makes sense for immutable values (see example below)
def __hash__(self):
return hash((self._num, self._denom))
# the built-in print function uses "str" internally
def __str__(self):
# alternative implementations
#return str(self._num) + "/" + str(self._denom)
#return "{}/{}".format(self._num, self._denom)
#return f"{self._num}/{self._denom}"
# allow switching between fractional and decimal representation
return f"{self._num}/{self._denom}" if _STRING_REPRESENTATION == FRACTIONAL else f"{float(self)}"
# jupyter notebook uses "repr" for printing values
def __repr__(self):
# remember the repr contract
return f"MyFraction({self._num!r}, {self._denom!r})"
# this is required for
# if MyFraction(): ...
def __bool__(self):
# you can use implicit conversion
# return bool(self._num)
# but prefer explicit when possible
return self._num != 0
def __float__(self):
# remember: floats can overflow
try:
return self._num / self._denom
except OverflowError as oe:
# provide a nice error message on top of the stack trace
raise ValueError("This fraction cannot be represented as a float") from oe
@staticmethod
def from_float(flt):
num, denom = flt.as_integer_ratio()
return MyFraction(num, denom)
@staticmethod
def from_str(s):
split = s.split("/")
if len(split) == 1:
return MyFraction(int(split[0]))
elif len(split) == 2:
return MyFraction(int(split[0]), int(split[1]))
else:
raise ValueError("Illegal fraction format")
def with_num(self, new_num):
return MyFraction(new_num, self._denom)
def with_denom(self, new_denom):
return MyFraction(self._num, new_denom)
# duck typing is your friend
fractions = [
MyFraction(3, 4), MyFraction(1, 2), MyFraction(5, 8)
]
list(inclusive_prefix_sum(fractions, MyFraction()))
[MyFraction(3, 4), MyFraction(5, 4), MyFraction(15, 8)]
# this works but is not particularly useful
nums = [1, 2, 3, 4]
for n in naive_inclusive_prefix_sum(nums):
print(n)
nums.append(n)
1
3
6
10
# mutation is dangerous
nums = [1, 2, 3, 4]
for n in inclusive_prefix_sum(nums):
print(n)
# uncomment this line to see the problem
# nums.append(n)
1
3
6
10
As a rule of thumb, never read from and write to the same list within a single function to prevent bugs like this.
# iterator + iterable as one
class SinglePassInclusivePrefixSum:
def __init__(self, iterable, start_value=0):
self.base_iter = iter(iterable)
self.accum = start_value
def __iter__(self):
return self
def __next__(self):
try:
entry = next(self.base_iter)
self.accum = self.accum + entry
return self.accum
except StopIteration:
raise
fail_sum = SinglePassInclusivePrefixSum([1, 2, 3, 4])
# this works once
for i in fail_sum:
print(i)
# the second iteration does not do anything
for i in fail_sum:
print(i)
1
3
6
10
# recommended usage
for i in SinglePassInclusivePrefixSum([1, 2, 3, 4]):
print(i)
1
3
6
10
def inclusive_prefix_sum_gen(iterable, start_value=0):
accum = start_value
for entry in iterable:
accum = accum + entry
# this is where .append would happen otherwise
yield accum
for i in inclusive_prefix_sum_gen([1, 2, 3, 4]):
print(i)
1
3
6
10
list(inclusive_prefix_sum_gen([1, 2, 3, 4]))
[1, 3, 6, 10]
We can now use the yield
syntax to write the heap iterator in a concise manner. Nice.
Assignment IG1¶
MyRange-Iterator Implementieren Sie
__iter__
fürMyRange
. Die Verwendung vonyield
wird empfohlen, Sie dürfen aber auch eine separate Iteratorklasse schreiben.
# see BAS2
# no need to store all batches at once
def batches(elements, batch_size):
return [elements[ofst:ofst+batch_size] for ofst in range(0, len(elements), batch_size)]
# use generators to create batches when they are needed
def batches(elements, batch_size):
for ofst in range(0, len(elements), batch_size):
yield elements[ofst:ofst+batch_size]
def batches(elements, batch_size):
# generator comprehension
return (elements[ofst:ofst+batch_size] for ofst in range(0, len(elements), batch_size))
import math
def _det(p1, p2):
return p1.x * p2.y - p2.x * p1.y
def _dist(p1, p2):
return math.hypot(p1.x - p2.x, p1.y - p2.y)
# no sanity checks or immutability guarantees this time for simplicity
class Polygon2D:
# points is not an optional parameter!
# defaulting to an empty polygon makes little sense
def __init__(self, points):
self.points = points
def __repr__(self):
return f"Polygon2D({self.points!r})"
def circumference(self):
return sum(_dist(p, q) for p, q in self.segments())
def area(self):
dets = sum(_det(p, q) for p, q in self.segments())
return 0.5 * abs(dets)
def stringify_sides(self):
return ", ".join(f"{p} - {q}" for p, q in self.segments())
# easy way to iterate over adjacent points
def segments(self):
pts = self.points
n = len(pts)
return ((pts[i], pts[(i + 1) % n]) for i in range(n))
# From Standard Library
from collections import namedtuple
Point2D = namedtuple('Point2D', ['x', 'y'])
square = Polygon2D([
Point2D(0, 0),
Point2D(0, 1),
Point2D(1, 1),
Point2D(1, 0),
])
square.area()
1.0
square.circumference()
4.0
fractions = [
[MyFraction(3, 4)],
[MyFraction(1, 2), MyFraction(5, 8)]
]
def flatten(stacked_lists):
out = []
for l in stacked_lists:
out.extend(l)
return out
# alternative implementation
# return sum(stacked_lists, [])
Concatenating the stacked lists is not necessary if we only need one item at a time - we can use generators for that!
def flatten_iter(stacked_lists):
for l in stacked_lists:
for entry in l:
yield entry
def flatten_iter(stacked_lists):
for l in stacked_lists:
# replaces inner for-loop
yield from l
def flatten_iter(stacked_lists):
return (entry for l in stacked_lists for entry in l)
Assignment IG2¶
Dateien implizit konkatenieren Schreiben Sie einen Generator, der eine Liste von Dateinamen nimmt und der Reihe nach die Dateien zeilenweise ausgibt.
def concat_lines(paths):
for path in paths:
with open(path) as file:
yield from file
Streaming Interfaces¶
https://docs.python.org/3/library/functions.html#built-in-funcs
map
, filter
, itertools.reduce
, zip
, enumerate
, min
, max
, sum
, any
, all
, reversed
and any kind of comprehension use iterators internally, either as input or output or both. You can compose most stateless functions from these alone.
# see BAS2
CONVERSION_FACTORS = {
"m": 1000,
"cm": 10,
"mm": 1
}
# aos version
def remove_units(values):
return [CONVERSION_FACTORS[unit] * value for value, unit in values]
# soa version
def remove_units_soa(values, units):
assert len(values) == len(units)
converted = []
for i in range(len(values)):
# units and values need to be indexable
converted.append(CONVERSION_FACTORS[units[i]] * values[i])
return converted
def remove_units_soa(values, units):
assert len(values) == len(units)
# units and values still need to be indexable
return [CONVERSION_FACTORS[units[i]] * values[i] for i in range(len(values))]
def remove_units_soa(values, units):
# units and values do not need to be indexable
return [CONVERSION_FACTORS[unit] * value for value, unit in zip(values, units)]
remove_units_soa([5, 9], ["mm", "m"])
[5, 9000]
# see SL6
#sum(r.probability for r in rows)
Assignment IG3¶
Besser als in EiP I Schreiben Sie eine Funktion
primes_up_to(n)
, die eine Liste aller Primzahlen kleiner oder gleich n zurückgibt.
def primes_up_to(n):
return [cand for cand in range(2, n + 1)
if all(cand % div != 0 for div in range(2, cand))]
primes_up_to(100)
[2,
3,
5,
7,
11,
13,
17,
19,
23,
29,
31,
37,
41,
43,
47,
53,
59,
61,
67,
71,
73,
79,
83,
89,
97]
Besser als in EiP II Schreiben Sie
primes_up_to(n)
so um, dass ein Generator zurückgegeben wird.
def gen_primes_up_to(n):
return (cand for cand in range(2, n + 1)
if all(cand % div != 0 for div in range(2, cand)))
# iterators are the only thing required for sequence unpacking
first, *_ = heap
first
'c'
Generators are lazy - they only compute values when they really need to. This allows for never-ending iterators.
https://www.youtube.com/watch?v=5jwV3zxXc8E
In this example, a generator is used to generate Sudoku solutions. The caller can choose to ignore the generator if no solution is required, or to generate one solution, all solutions, and everything in between.
itertools
¶
https://docs.python.org/3/library/itertools.html
Implements common iterator functions. The most relevant functions are combinations
, permutations
and product
since these are tedious to implement manually. Some of the functions in itertools
produce endless iterators.