Source code for petl.util.lookups

from __future__ import absolute_import, print_function, division


import operator
from petl.compat import text_type


from petl.errors import DuplicateKeyError
from petl.util.base import Table, asindices, asdict, Record, rowgetter


def _setup_lookup(table, key, value):

    # obtain iterator and header row
    it = iter(table)
    hdr = next(it)

    # prepare key getter
    keyindices = asindices(hdr, key)
    assert len(keyindices) > 0, 'no key selected'
    getkey = operator.itemgetter(*keyindices)

    # prepare value getter
    if value is None:
        # default value is complete row
        getvalue = rowgetter(*range(len(hdr)))
    else:
        valueindices = asindices(hdr, value)
        assert len(valueindices) > 0, 'no value selected'
        getvalue = operator.itemgetter(*valueindices)

    return it, getkey, getvalue


[docs]def lookup(table, key, value=None, dictionary=None): """ Load a dictionary with data from the given table. E.g.:: >>> import petl as etl >>> table1 = [['foo', 'bar'], ... ['a', 1], ... ['b', 2], ... ['b', 3]] >>> lkp = etl.lookup(table1, 'foo', 'bar') >>> lkp['a'] [1] >>> lkp['b'] [2, 3] >>> # if no value argument is given, defaults to the whole ... # row (as a tuple) ... lkp = etl.lookup(table1, 'foo') >>> lkp['a'] [('a', 1)] >>> lkp['b'] [('b', 2), ('b', 3)] >>> # compound keys are supported ... table2 = [['foo', 'bar', 'baz'], ... ['a', 1, True], ... ['b', 2, False], ... ['b', 3, True], ... ['b', 3, False]] >>> lkp = etl.lookup(table2, ('foo', 'bar'), 'baz') >>> lkp[('a', 1)] [True] >>> lkp[('b', 2)] [False] >>> lkp[('b', 3)] [True, False] >>> # data can be loaded into an existing dictionary-like ... # object, including persistent dictionaries created via the ... # shelve module ... import shelve >>> lkp = shelve.open('example.dat', flag='n') >>> lkp = etl.lookup(table1, 'foo', 'bar', lkp) >>> lkp.close() >>> lkp = shelve.open('example.dat', flag='r') >>> lkp['a'] [1] >>> lkp['b'] [2, 3] """ if dictionary is None: dictionary = dict() # setup it, getkey, getvalue = _setup_lookup(table, key, value) # build lookup dictionary for row in it: k = getkey(row) v = getvalue(row) if k in dictionary: # work properly with shelve l = dictionary[k] l.append(v) dictionary[k] = l else: dictionary[k] = [v] return dictionary
Table.lookup = lookup
[docs]def lookupone(table, key, value=None, dictionary=None, strict=False): """ Load a dictionary with data from the given table, assuming there is at most one value for each key. E.g.:: >>> import petl as etl >>> table1 = [['foo', 'bar'], ... ['a', 1], ... ['b', 2], ... ['b', 3]] >>> # if the specified key is not unique and strict=False (default), ... # the first value wins ... lkp = etl.lookupone(table1, 'foo', 'bar') >>> lkp['a'] 1 >>> lkp['b'] 2 >>> # if the specified key is not unique and strict=True, will raise ... # DuplicateKeyError ... try: ... lkp = etl.lookupone(table1, 'foo', strict=True) ... except etl.errors.DuplicateKeyError as e: ... print(e) ... duplicate key: 'b' >>> # compound keys are supported ... table2 = [['foo', 'bar', 'baz'], ... ['a', 1, True], ... ['b', 2, False], ... ['b', 3, True], ... ['b', 3, False]] >>> lkp = etl.lookupone(table2, ('foo', 'bar'), 'baz') >>> lkp[('a', 1)] True >>> lkp[('b', 2)] False >>> lkp[('b', 3)] True >>> # data can be loaded into an existing dictionary-like ... # object, including persistent dictionaries created via the ... # shelve module ... import shelve >>> lkp = shelve.open('example.dat', flag='n') >>> lkp = etl.lookupone(table1, 'foo', 'bar', lkp) >>> lkp.close() >>> lkp = shelve.open('example.dat', flag='r') >>> lkp['a'] 1 >>> lkp['b'] 2 """ if dictionary is None: dictionary = dict() # setup it, getkey, getvalue = _setup_lookup(table, key, value) # build lookup dictionary for row in it: k = getkey(row) if strict and k in dictionary: raise DuplicateKeyError(k) elif k not in dictionary: v = getvalue(row) dictionary[k] = v return dictionary
Table.lookupone = lookupone
[docs]def dictlookup(table, key, dictionary=None): """ Load a dictionary with data from the given table, mapping to dicts. E.g.:: >>> import petl as etl >>> table1 = [['foo', 'bar'], ... ['a', 1], ... ['b', 2], ... ['b', 3]] >>> lkp = etl.dictlookup(table1, 'foo') >>> lkp['a'] [{'foo': 'a', 'bar': 1}] >>> lkp['b'] [{'foo': 'b', 'bar': 2}, {'foo': 'b', 'bar': 3}] >>> # compound keys are supported ... table2 = [['foo', 'bar', 'baz'], ... ['a', 1, True], ... ['b', 2, False], ... ['b', 3, True], ... ['b', 3, False]] >>> lkp = etl.dictlookup(table2, ('foo', 'bar')) >>> lkp[('a', 1)] [{'foo': 'a', 'bar': 1, 'baz': True}] >>> lkp[('b', 2)] [{'foo': 'b', 'bar': 2, 'baz': False}] >>> lkp[('b', 3)] [{'foo': 'b', 'bar': 3, 'baz': True}, {'foo': 'b', 'bar': 3, 'baz': False}] >>> # data can be loaded into an existing dictionary-like ... # object, including persistent dictionaries created via the ... # shelve module ... import shelve >>> lkp = shelve.open('example.dat', flag='n') >>> lkp = etl.dictlookup(table1, 'foo', lkp) >>> lkp.close() >>> lkp = shelve.open('example.dat', flag='r') >>> lkp['a'] [{'foo': 'a', 'bar': 1}] >>> lkp['b'] [{'foo': 'b', 'bar': 2}, {'foo': 'b', 'bar': 3}] """ if dictionary is None: dictionary = dict() it = iter(table) hdr = next(it) flds = list(map(text_type, hdr)) keyindices = asindices(hdr, key) assert len(keyindices) > 0, 'no key selected' getkey = operator.itemgetter(*keyindices) for row in it: k = getkey(row) rec = asdict(flds, row) if k in dictionary: # work properly with shelve l = dictionary[k] l.append(rec) dictionary[k] = l else: dictionary[k] = [rec] return dictionary
Table.dictlookup = dictlookup
[docs]def dictlookupone(table, key, dictionary=None, strict=False): """ Load a dictionary with data from the given table, mapping to dicts, assuming there is at most one row for each key. E.g.:: >>> import petl as etl >>> table1 = [['foo', 'bar'], ... ['a', 1], ... ['b', 2], ... ['b', 3]] >>> # if the specified key is not unique and strict=False (default), ... # the first value wins ... lkp = etl.dictlookupone(table1, 'foo') >>> lkp['a'] {'foo': 'a', 'bar': 1} >>> lkp['b'] {'foo': 'b', 'bar': 2} >>> # if the specified key is not unique and strict=True, will raise ... # DuplicateKeyError ... try: ... lkp = etl.dictlookupone(table1, 'foo', strict=True) ... except etl.errors.DuplicateKeyError as e: ... print(e) ... duplicate key: 'b' >>> # compound keys are supported ... table2 = [['foo', 'bar', 'baz'], ... ['a', 1, True], ... ['b', 2, False], ... ['b', 3, True], ... ['b', 3, False]] >>> lkp = etl.dictlookupone(table2, ('foo', 'bar')) >>> lkp[('a', 1)] {'foo': 'a', 'bar': 1, 'baz': True} >>> lkp[('b', 2)] {'foo': 'b', 'bar': 2, 'baz': False} >>> lkp[('b', 3)] {'foo': 'b', 'bar': 3, 'baz': True} >>> # data can be loaded into an existing dictionary-like ... # object, including persistent dictionaries created via the ... # shelve module ... import shelve >>> lkp = shelve.open('example.dat', flag='n') >>> lkp = etl.dictlookupone(table1, 'foo', lkp) >>> lkp.close() >>> lkp = shelve.open('example.dat', flag='r') >>> lkp['a'] {'foo': 'a', 'bar': 1} >>> lkp['b'] {'foo': 'b', 'bar': 2} """ if dictionary is None: dictionary = dict() it = iter(table) hdr = next(it) flds = list(map(text_type, hdr)) keyindices = asindices(hdr, key) assert len(keyindices) > 0, 'no key selected' getkey = operator.itemgetter(*keyindices) for row in it: k = getkey(row) if strict and k in dictionary: raise DuplicateKeyError(k) elif k not in dictionary: d = asdict(flds, row) dictionary[k] = d return dictionary
Table.dictlookupone = dictlookupone
[docs]def recordlookup(table, key, dictionary=None): """ Load a dictionary with data from the given table, mapping to record objects. """ if dictionary is None: dictionary = dict() it = iter(table) hdr = next(it) flds = list(map(text_type, hdr)) keyindices = asindices(hdr, key) assert len(keyindices) > 0, 'no key selected' getkey = operator.itemgetter(*keyindices) for row in it: k = getkey(row) rec = Record(row, flds) if k in dictionary: # work properly with shelve l = dictionary[k] l.append(rec) dictionary[k] = l else: dictionary[k] = [rec] return dictionary
Table.recordlookup = recordlookup
[docs]def recordlookupone(table, key, dictionary=None, strict=False): """ Load a dictionary with data from the given table, mapping to record objects, assuming there is at most one row for each key. """ if dictionary is None: dictionary = dict() it = iter(table) hdr = next(it) flds = list(map(text_type, hdr)) keyindices = asindices(hdr, key) assert len(keyindices) > 0, 'no key selected' getkey = operator.itemgetter(*keyindices) for row in it: k = getkey(row) if strict and k in dictionary: raise DuplicateKeyError(k) elif k not in dictionary: d = Record(row, flds) dictionary[k] = d return dictionary
Table.recordlookupone = recordlookupone