Iterators and Generators

Start in JupyterHub

Iterators

# From Standard Library I

from collections import namedtuple

_Entry = namedtuple('_Entry', ['priority', 'value'])

# do not inherit from MinHeap -> liskov substitution principle
class MinHeapWithSeparatePriority:
    def __init__(self, iterable=None):
        self._buffer = []
        self._index = {}
        if iterable is None:
            return
        for value, priority in iterable:
            self.add(priority, value)
            
    def add(self, priority, value):
        try:
            existing_index = self._index[value]
        except KeyError:
            self._buffer.append(_Entry(priority, value))
            last_index = len(self._buffer) - 1
            self._index[value] = last_index
            self._sift_up(last_index)
        else:
            old_prio = self._buffer[existing_index].priority
            self._buffer[existing_index] = _Entry(priority, value)
            if old_prio < priority:
                self._sift_down(existing_index)
            else:
                self._sift_up(existing_index)
            
    def extract_min(self):
        if not self._buffer:
            raise IndexError("Extract from empty heap")
        minimum = self._buffer[0].value
        self._remove(0)
        return minimum
    
    def remove(self, value):
        try:
            existing_index = self._index[value]
        except KeyError:
            pass
        else:
            self._remove(existing_index)
                
    def _remove(self, index):
        value_at_index = self._buffer[index].value
        self._swap(index, -1)
        self._buffer.pop()
        del self._index[value_at_index]
        self._sift_down(index)

    def _sift_up(self, idx):
        while idx > 0:
            parent = (idx - 1) // 2
            if not (self._buffer[idx].priority < self._buffer[parent].priority):
                break
            self._swap(idx, parent)
            idx = parent
    
    def _sift_down(self, idx):
        while True:
            left_child = idx * 2 + 1
            if left_child >= len(self._buffer):
                break
            smaller_child = left_child
            right_child = left_child + 1
            if right_child < len(self._buffer):
                if self._buffer[right_child].priority < self._buffer[left_child].priority:
                    smaller_child = right_child
            
            if self._buffer[smaller_child].priority < self._buffer[idx].priority:
                self._swap(idx, smaller_child)
            else:
                break
                
    def _swap(self, i, j):
        self._buffer[i], self._buffer[j] = self._buffer[j], self._buffer[i]
        self._index[self._buffer[i]] = i
        self._index[self._buffer[j]] = j
        
    # see iterators chapter
    def __iter__(self):
        # return _MinHeapWithSeparatePriorityIterator(self)
        for entry in self._buffer:
            yield entry.value

    
    # len, bool, str, etc.
some_list = ["some", "strings", "for", "testing"]
some_set = {"some", "strings", "for", "testing"}
# list iteration
for entry in some_list:
    print(entry)
some
strings
for
testing
# set iteration
for entry in some_set:
    print(entry)
testing
for
strings
some
it = iter(some_set)
it
<set_iterator at 0x7fea304619c0>
next(it)
'testing'
# spot the similarities to "for" iteration
it = iter(some_set)
try:
    while True:
        entry = next(it)
        print(entry)
except StopIteration:
    pass
testing
for
strings
some
# this is an example of a heap iterator (see above)
class _MinHeapWithSeparatePriorityIterator:
    def __init__(self, heap):
        self.heap = heap
        # use index instead of iterators this time
        self.index = 0
        
    def __next__(self):
        if self.index >= len(self.heap._buffer):
            raise StopIteration
        value = self.heap._buffer[self.index].value
        self.index += 1
        return value
heap = MinHeapWithSeparatePriority()
heap.add(3, "b")
heap.add(5, "c")
heap.add(1, "a")
heap.add(0, "c")

for entry in heap:
    print(entry)
c
a
b

Generators

Generators use the iterator protocol to iterate over non-existent collections. Generators create values on the fly.

# difficult to write as list comprehension because of internal state
# start value can be set by the caller to allow for duck typing
def naive_inclusive_prefix_sum(iterable, start_value=0):
    accum = start_value
    result = []
    for entry in iterable:
        # do not use += since we need a copy each time
        accum = accum + entry
        result.append(accum)
    return result
# list comprehension version (discouraged)
# assignment expressions are not covered in this course
# see https://www.python.org/dev/peps/pep-0572/
def naive_inclusive_prefix_sum(iterable, start_value=0):
    accum = start_value
    # this does not work as easily for exclusive prefix sums
    return [(accum := accum + entry) for entry in iterable]

The naive solution forces us to store a separate list for the prefix sum, even when all entries are not needed at once!

# the iterator itself
class _InclusivePrefixSumIterator:
    def __init__(self, base_iter, start_value):
        self.base_iter = base_iter
        self.accum = start_value
    
    def __next__(self):
        try:
            entry = next(self.base_iter)
            self.accum = self.accum + entry
            return self.accum
        except StopIteration:
            raise

# object representing the prefix sum
# "range" works the same way
class inclusive_prefix_sum:
    def __init__(self, iterable, start_value=0):
        self.iterable = iterable
        self.start_value = start_value
    
    def __iter__(self):
        it = iter(self.iterable)
        return _InclusivePrefixSumIterator(it, self.start_value)
for i in inclusive_prefix_sum([1, 2, 3, 4]):
    print(i)
1
3
6
10
# we can still obtain a list
list(inclusive_prefix_sum([1, 2, 3, 4]))
[1, 3, 6, 10]
# works with any iterable
list(inclusive_prefix_sum(range(1, 5)))
[1, 3, 6, 10]
# works even on itself
list(inclusive_prefix_sum(inclusive_prefix_sum(range(1, 5))))
[1, 4, 10, 20]
# duck typing is your friend
lists = [
    [1, 2],
    [3, 4],
    [5]
]
list(inclusive_prefix_sum(lists, []))
[[1, 2], [1, 2, 3, 4], [1, 2, 3, 4, 5]]
import math
import functools


# representational constants
# see context managers chapter for details
FRACTIONAL, DECIMAL = range(2)


# this is a module-level global variable
# see context managers chapter for details
_STRING_REPRESENTATION = FRACTIONAL


# see context managers chapter for details
class string_style:
    def __init__(self, style):
        self._new_style = style
        self._old_style = None
        
    def __enter__(self):
        global _STRING_REPRESENTATION
        self._old_style = _STRING_REPRESENTATION
        _STRING_REPRESENTATION = self._new_style
        return self
    
    def __exit__(self, tp, value, traceback):
        global _STRING_REPRESENTATION
        _STRING_REPRESENTATION = self._old_style
        # re-raise exceptions
        return False


# see decorators chapter for details on total_ordering
@functools.total_ordering
class MyFraction:
    # default parameters help with conversion from integers
    def __init__(self, num=0, denom=1):
        # prevent illegal instances from being created
        # use assert denom != 0 only if you trust the user
        if denom == 0:
            # do not print error messages
            # print("ERROR")
            # always raise exceptions
            raise ValueError("Denominator cannot be zero.")
        self._num = num
        self._denom = denom
        # if you forget this, equality test will not work consistently
        self._cancel()
            
    # we will improve this using properties later on
    def get_num(self):
        return self._num

    def get_denom(self):
        return self._denom
    
    # __eq__ is required for equality checks
    # for simplicity, this requires "other" to be a MyFraction object
    # Python's built-in Fraction supports comparisons with ints and floats, too
    def __eq__(self, other):
        # delegate to tuple equality instead of bothering with
        # return self._num == other._num and ...
        return (self._num, self._denom) == (other._num, other._denom)
    
    # delegate to __eq__ for consistency
    def __ne__(self, other):
        return not self.__eq__(other)
    
    # without aggressive cancelling,
    # MyFraction(1, 2) is not equal to MyFraction(2, 4)
    # cancelling can only be guaranteed through encapsulation
    def _cancel(self):
        gcd = math.gcd(self._num, self._denom)
        self._num //= gcd
        self._denom //= gcd
    
    def __lt__(self, other):
        return self._num * other._denom < other._num * self._denom
    
    # <= > >= generated by functools
    
    # do not modify self or other, always create a new object!
    def __add__(self, other):
        new_num = self._num * other._denom + other._num * self._denom
        new_denom = self._denom * other._denom
        return MyFraction(new_num, new_denom)
    
    # subtract and unary minus -> assignment
    
    def __neg__(self):
        return MyFraction(-self._num, self._denom)
    
    def __sub__(self, other):
        return self + (-other)
    
    def __mul__(self, other):
        return MyFraction(
            self._num * other._num,
            self._denom * other._denom
        )

    # divide, invert -> assignment
    
    # also implement __radd__, ... if your addition supports types other than MyFraction
    
    # without this, MyFraction cannot be used in sets and dicts
    # this only makes sense for immutable values (see example below)
    def __hash__(self):
        return hash((self._num, self._denom))
    
    # the built-in print function uses "str" internally
    def __str__(self):
        # alternative implementations
        #return str(self._num) + "/" + str(self._denom)
        #return "{}/{}".format(self._num, self._denom)
        #return f"{self._num}/{self._denom}"
        
        # allow switching between fractional and decimal representation
        return f"{self._num}/{self._denom}" if _STRING_REPRESENTATION == FRACTIONAL else f"{float(self)}"
     
    # jupyter notebook uses "repr" for printing values
    def __repr__(self):
        # remember the repr contract
        return f"MyFraction({self._num!r}, {self._denom!r})"
    
    # this is required for
    # if MyFraction(): ...
    def __bool__(self):
        # you can use implicit conversion
        # return bool(self._num)
        # but prefer explicit when possible
        return self._num != 0
    
    def __float__(self):
        # remember: floats can overflow
        try:
            return self._num / self._denom
        except OverflowError as oe:
            # provide a nice error message on top of the stack trace
            raise ValueError("This fraction cannot be represented as a float") from oe
    
    @staticmethod
    def from_float(flt):
        num, denom = flt.as_integer_ratio()
        return MyFraction(num, denom)
    
    @staticmethod
    def from_str(s):
        split = s.split("/")
        if len(split) == 1:
            return MyFraction(int(split[0]))
        elif len(split) == 2:
            return MyFraction(int(split[0]), int(split[1]))
        else:
            raise ValueError("Illegal fraction format")
    
    def with_num(self, new_num):
        return MyFraction(new_num, self._denom)
    
    def with_denom(self, new_denom):
        return MyFraction(self._num, new_denom)
# duck typing is your friend
fractions = [
    MyFraction(3, 4), MyFraction(1, 2), MyFraction(5, 8)
]
list(inclusive_prefix_sum(fractions, MyFraction()))
[MyFraction(3, 4), MyFraction(5, 4), MyFraction(15, 8)]
# this works but is not particularly useful
nums = [1, 2, 3, 4]
for n in naive_inclusive_prefix_sum(nums):
    print(n)
    nums.append(n)
1
3
6
10
# mutation is dangerous
nums = [1, 2, 3, 4]
for n in inclusive_prefix_sum(nums):
    print(n)
   # uncomment this line to see the problem
   # nums.append(n)
1
3
6
10

As a rule of thumb, never read from and write to the same list within a single function to prevent bugs like this.

# iterator + iterable as one
class SinglePassInclusivePrefixSum:
    def __init__(self, iterable, start_value=0):
        self.base_iter = iter(iterable)
        self.accum = start_value
    
    def __iter__(self):
        return self
    
    def __next__(self):
        try:
            entry = next(self.base_iter)
            self.accum = self.accum + entry
            return self.accum
        except StopIteration:
            raise
fail_sum = SinglePassInclusivePrefixSum([1, 2, 3, 4])
# this works once
for i in fail_sum:
    print(i)
# the second iteration does not do anything
for i in fail_sum:
    print(i)
1
3
6
10
# recommended usage
for i in SinglePassInclusivePrefixSum([1, 2, 3, 4]):
    print(i)
1
3
6
10
def inclusive_prefix_sum_gen(iterable, start_value=0):
    accum = start_value
    for entry in iterable:
        accum = accum + entry
        # this is where .append would happen otherwise
        yield accum
for i in inclusive_prefix_sum_gen([1, 2, 3, 4]):
    print(i)
1
3
6
10
list(inclusive_prefix_sum_gen([1, 2, 3, 4]))
[1, 3, 6, 10]

We can now use the yield syntax to write the heap iterator in a concise manner. Nice.

Assignment IG1

  1. MyRange-Iterator Implementieren Sie __iter__ für MyRange. Die Verwendung von yield wird empfohlen, Sie dürfen aber auch eine separate Iteratorklasse schreiben.

# see BAS2
# no need to store all batches at once
def batches(elements, batch_size):
    return [elements[ofst:ofst+batch_size] for ofst in range(0, len(elements), batch_size)]
# use generators to create batches when they are needed
def batches(elements, batch_size):
    for ofst in range(0, len(elements), batch_size):
        yield elements[ofst:ofst+batch_size]
def batches(elements, batch_size):
    # generator comprehension
    return (elements[ofst:ofst+batch_size] for ofst in range(0, len(elements), batch_size))
import math

def _det(p1, p2):
    return p1.x * p2.y - p2.x * p1.y

def _dist(p1, p2):
    return math.hypot(p1.x - p2.x, p1.y - p2.y)

# no sanity checks or immutability guarantees this time for simplicity
class Polygon2D:
    # points is not an optional parameter!
    # defaulting to an empty polygon makes little sense
    def __init__(self, points):
        self.points = points
        
    def __repr__(self):
        return f"Polygon2D({self.points!r})"
    
    def circumference(self):
        return sum(_dist(p, q) for p, q in self.segments())
    
    def area(self):
        dets = sum(_det(p, q) for p, q in self.segments())
        return 0.5 * abs(dets)
    
    def stringify_sides(self):
        return ", ".join(f"{p} - {q}" for p, q in self.segments())
    
    # easy way to iterate over adjacent points
    def segments(self):
        pts = self.points
        n = len(pts)
        return ((pts[i], pts[(i + 1) % n]) for i in range(n))
# From Standard Library

from collections import namedtuple

Point2D = namedtuple('Point2D', ['x', 'y'])
square = Polygon2D([
    Point2D(0, 0),
    Point2D(0, 1),
    Point2D(1, 1),
    Point2D(1, 0),
])
square.area()
1.0
square.circumference()
4.0
fractions = [
    [MyFraction(3, 4)],
    [MyFraction(1, 2), MyFraction(5, 8)]
]
def flatten(stacked_lists):
    out = []
    for l in stacked_lists:
        out.extend(l)
    return out
    # alternative implementation
    # return sum(stacked_lists, [])

Concatenating the stacked lists is not necessary if we only need one item at a time - we can use generators for that!

def flatten_iter(stacked_lists):
    for l in stacked_lists:
        for entry in l:
            yield entry
def flatten_iter(stacked_lists):
    for l in stacked_lists:
        # replaces inner for-loop
        yield from l
def flatten_iter(stacked_lists):
    return (entry for l in stacked_lists for entry in l)

Assignment IG2

  1. Dateien implizit konkatenieren Schreiben Sie einen Generator, der eine Liste von Dateinamen nimmt und der Reihe nach die Dateien zeilenweise ausgibt.

def concat_lines(paths):
    for path in paths:
        with open(path) as file:
            yield from file

Streaming Interfaces

https://docs.python.org/3/library/functions.html#built-in-funcs

map, filter, itertools.reduce, zip, enumerate, min, max, sum, any, all, reversed and any kind of comprehension use iterators internally, either as input or output or both. You can compose most stateless functions from these alone.

# see BAS2
CONVERSION_FACTORS = {
    "m":  1000,
    "cm":   10,
    "mm":    1
}

# aos version
def remove_units(values):
    return [CONVERSION_FACTORS[unit] * value for value, unit in values]
# soa version
def remove_units_soa(values, units):
    assert len(values) == len(units)
    converted = []
    for i in range(len(values)):
        # units and values need to be indexable
        converted.append(CONVERSION_FACTORS[units[i]] * values[i])
    return converted
def remove_units_soa(values, units):
    assert len(values) == len(units)
    # units and values still need to be indexable
    return [CONVERSION_FACTORS[units[i]] * values[i] for i in range(len(values))]
def remove_units_soa(values, units):
    # units and values do not need to be indexable
    return [CONVERSION_FACTORS[unit] * value for value, unit in zip(values, units)]
remove_units_soa([5, 9], ["mm", "m"])
[5, 9000]
# see SL6
#sum(r.probability for r in rows)

Assignment IG3

  1. Besser als in EiP I Schreiben Sie eine Funktion primes_up_to(n), die eine Liste aller Primzahlen kleiner oder gleich n zurückgibt.

def primes_up_to(n):
    return [cand for cand in range(2, n + 1)
            if all(cand % div != 0 for div in range(2, cand))]
primes_up_to(100)
[2,
 3,
 5,
 7,
 11,
 13,
 17,
 19,
 23,
 29,
 31,
 37,
 41,
 43,
 47,
 53,
 59,
 61,
 67,
 71,
 73,
 79,
 83,
 89,
 97]
  1. Besser als in EiP II Schreiben Sie primes_up_to(n) so um, dass ein Generator zurückgegeben wird.

def gen_primes_up_to(n):
    return (cand for cand in range(2, n + 1)
            if all(cand % div != 0 for div in range(2, cand)))
# iterators are the only thing required for sequence unpacking
first, *_ = heap
first
'c'

Generators are lazy - they only compute values when they really need to. This allows for never-ending iterators.

https://www.youtube.com/watch?v=5jwV3zxXc8E

In this example, a generator is used to generate Sudoku solutions. The caller can choose to ignore the generator if no solution is required, or to generate one solution, all solutions, and everything in between.

itertools

https://docs.python.org/3/library/itertools.html

Implements common iterator functions. The most relevant functions are combinations, permutations and product since these are tedious to implement manually. Some of the functions in itertools produce endless iterators.