Source code for petl.transform.sorts

from __future__ import absolute_import, print_function, division


import os
import heapq
from tempfile import NamedTemporaryFile
import itertools
import logging
from collections import namedtuple
import operator
from petl.compat import pickle, next, text_type


import petl.config as config
from petl.comparison import comparable_itemgetter
from petl.util.base import Table, asindices


logger = logging.getLogger(__name__)
warning = logger.warning
info = logger.info
debug = logger.debug


[docs]def sort(table, key=None, reverse=False, buffersize=None, tempdir=None, cache=True): """ Sort the table. Field names or indices (from zero) can be used to specify the key. E.g.:: >>> import petl as etl >>> table1 = [['foo', 'bar'], ... ['C', 2], ... ['A', 9], ... ['A', 6], ... ['F', 1], ... ['D', 10]] >>> table2 = etl.sort(table1, 'foo') >>> table2 +-----+-----+ | foo | bar | +=====+=====+ | 'A' | 9 | +-----+-----+ | 'A' | 6 | +-----+-----+ | 'C' | 2 | +-----+-----+ | 'D' | 10 | +-----+-----+ | 'F' | 1 | +-----+-----+ >>> # sorting by compound key is supported ... table3 = etl.sort(table1, key=['foo', 'bar']) >>> table3 +-----+-----+ | foo | bar | +=====+=====+ | 'A' | 6 | +-----+-----+ | 'A' | 9 | +-----+-----+ | 'C' | 2 | +-----+-----+ | 'D' | 10 | +-----+-----+ | 'F' | 1 | +-----+-----+ >>> # if no key is specified, the default is a lexical sort ... table4 = etl.sort(table1) >>> table4 +-----+-----+ | foo | bar | +=====+=====+ | 'A' | 6 | +-----+-----+ | 'A' | 9 | +-----+-----+ | 'C' | 2 | +-----+-----+ | 'D' | 10 | +-----+-----+ | 'F' | 1 | +-----+-----+ The `buffersize` argument should be an `int` or `None`. If the number of rows in the table is less than `buffersize`, the table will be sorted in memory. Otherwise, the table is sorted in chunks of no more than `buffersize` rows, each chunk is written to a temporary file, and then a merge sort is performed on the temporary files. If `buffersize` is `None`, the value of `petl.config.sort_buffersize` will be used. By default this is set to 100000 rows, but can be changed, e.g.:: >>> import petl.config >>> petl.config.sort_buffersize = 500000 If `petl.config.sort_buffersize` is set to `None`, this forces all sorting to be done entirely in memory. By default the results of the sort will be cached, and so a second pass over the sorted table will yield rows from the cache and will not repeat the sort operation. To turn off caching, set the `cache` argument to `False`. """ return SortView(table, key=key, reverse=reverse, buffersize=buffersize, tempdir=tempdir, cache=cache)
Table.sort = sort def _iterchunk(fn): # reopen so iterators from file cache are independent debug('iterchunk, opening %s' % fn) with open(fn, 'rb') as f: try: while True: yield pickle.load(f) except EOFError: pass debug('end of iterchunk, closed %s' % fn) class _Keyed(namedtuple('Keyed', ['key', 'obj'])): # Override default behavior of namedtuple comparisons, only keys need to be compared for heapmerge def __eq__(self, other): return self.key == other.key def __lt__(self, other): return self.key < other.key def __le__(self, other): return self.key <= other.key def __ne__(self, other): return self.key != other.key def __gt__(self, other): return self.key > other.key def __ge__(self, other): return self.key >= other.key def _heapqmergesorted(key=None, *iterables): """Return a single iterator over the given iterables, sorted by the given `key` function, assuming the input iterables are already sorted by the same function. (I.e., the merge part of a general merge sort.) Uses :func:`heapq.merge` for the underlying implementation.""" if key is None: keyed_iterables = iterables for element in heapq.merge(*keyed_iterables): yield element else: keyed_iterables = [(_Keyed(key(obj), obj) for obj in iterable) for iterable in iterables] for element in heapq.merge(*keyed_iterables): yield element.obj def _shortlistmergesorted(key=None, reverse=False, *iterables): """Return a single iterator over the given iterables, sorted by the given `key` function, assuming the input iterables are already sorted by the same function. (I.e., the merge part of a general merge sort.) Uses :func:`min` (or :func:`max` if ``reverse=True``) for the underlying implementation.""" if reverse: op = max else: op = min if key is not None: opkwargs = {'key': key} else: opkwargs = dict() # populate initial shortlist # (remember some iterables might be empty) iterators = list() shortlist = list() for iterable in iterables: it = iter(iterable) try: first = next(it) iterators.append(it) shortlist.append(first) except StopIteration: pass # do the mergesort while iterators: nxt = op(shortlist, **opkwargs) yield nxt nextidx = shortlist.index(nxt) try: shortlist[nextidx] = next(iterators[nextidx]) except StopIteration: del shortlist[nextidx] del iterators[nextidx] def _mergesorted(key=None, reverse=False, *iterables): # N.B., I've used heapq for normal merge sort and shortlist merge sort for # reverse merge sort because I've assumed that heapq.merge is faster and # so is preferable but it doesn't support reverse sorting so the shortlist # merge sort has to be used for reverse sorting. Some casual profiling # suggests there isn't much between the two in terms of speed, but might be # worth profiling more carefully if reverse: return _shortlistmergesorted(key, True, *iterables) else: return _heapqmergesorted(key, *iterables) class SortView(Table): def __init__(self, source, key=None, reverse=False, buffersize=None, tempdir=None, cache=True): self.source = source self.key = key self.reverse = reverse if buffersize is None: self.buffersize = config.sort_buffersize else: self.buffersize = buffersize self.tempdir = tempdir self.cache = cache self._hdrcache = None self._memcache = None self._filecache = None self._getkey = None def clearcache(self): debug('clear cache') self._hdrcache = None self._memcache = None self._filecache = None self._getkey = None def __iter__(self): source = self.source key = self.key reverse = self.reverse if self.cache and self._memcache is not None: return self._iterfrommemcache() elif self.cache and self._filecache is not None: return self._iterfromfilecache() else: return self._iternocache(source, key, reverse) def _iterfrommemcache(self): debug('iterate from memory cache') yield tuple(self._hdrcache) for row in self._memcache: yield tuple(row) def _iterfromfilecache(self): # create a reference to the filecache here, so cleanup happens in the # correct order filecache = self._filecache filenames = list(map(operator.attrgetter('name'), filecache)) debug('iterate from file cache: %r', filenames) yield tuple(self._hdrcache) chunkiters = [_iterchunk(fn) for fn in filenames] rows = _mergesorted(self._getkey, self.reverse, *chunkiters) try: for row in rows: yield tuple(row) finally: debug('attempt cleanup from generator') # N.B., need to ensure that any open files are closed **before** # temporary files are deleted, as deletion will fail on Windows # if file is in use (i.e., still open) del chunkiters del rows del filecache debug('exiting generator') def _iternocache(self, source, key, reverse): debug('iterate without cache') self.clearcache() it = iter(source) try: hdr = next(it) except StopIteration: if key is None: return # nothing to do on a table without headers hdr = [] yield tuple(hdr) if key is not None: # convert field selection into field indices indices = asindices(hdr, key) else: indices = range(len(hdr)) # now use field indices to construct a _getkey function getkey = comparable_itemgetter(*indices) # TODO support native comparison # initialise the first chunk rows = list(itertools.islice(it, 0, self.buffersize)) rows.sort(key=getkey, reverse=reverse) # have we exhausted the source iterator? if self.buffersize is None or len(rows) < self.buffersize: # yes, table fits within sort buffer if self.cache: debug('caching mem') self._hdrcache = hdr self._memcache = rows # actually not needed to iterate from memcache self._getkey = getkey for row in rows: yield tuple(row) else: # no, table is too big, need to sort in chunks chunkfiles = [] while rows: # dump the chunk with NamedTemporaryFile(dir=self.tempdir, delete=False, mode='wb') as f: # N.B., we **don't** want the file to be deleted on close, # but we **do** want the file to be deleted when self # is garbage collected, or when the program exits. When # all references to the wrapper are gone, the file should # get deleted. wrapper = _NamedTempFileDeleteOnGC(f.name) debug('created temporary chunk file %s' % f.name) for row in rows: pickle.dump(row, f, protocol=-1) f.flush() chunkfiles.append(wrapper) # grab the next chunk rows = list(itertools.islice(it, 0, self.buffersize)) rows.sort(key=getkey, reverse=reverse) if self.cache: debug('caching files') self._hdrcache = hdr self._filecache = chunkfiles self._getkey = getkey chunkiters = [_iterchunk(f.name) for f in chunkfiles] for row in _mergesorted(getkey, reverse, *chunkiters): yield tuple(row) class _NamedTempFileDeleteOnGC(object): def __init__(self, name): self.name = name def delete(self, unlink=os.unlink, log=logger.debug): name = self.name try: log('deleting %s' % name) unlink(name) except Exception as e: log('exception deleting %s: %s' % (name, e)) raise else: log('deleted %s' % name) def __del__(self): self.delete() def __str__(self): return self.name def __repr__(self): return self.name
[docs]def mergesort(*tables, **kwargs): """ Combine multiple input tables into one sorted output table. E.g.:: >>> import petl as etl >>> table1 = [['foo', 'bar'], ... ['A', 9], ... ['C', 2], ... ['D', 10], ... ['A', 6], ... ['F', 1]] >>> table2 = [['foo', 'bar'], ... ['B', 3], ... ['D', 10], ... ['A', 10], ... ['F', 4]] >>> table3 = etl.mergesort(table1, table2, key='foo') >>> table3.lookall() +-----+-----+ | foo | bar | +=====+=====+ | 'A' | 9 | +-----+-----+ | 'A' | 6 | +-----+-----+ | 'A' | 10 | +-----+-----+ | 'B' | 3 | +-----+-----+ | 'C' | 2 | +-----+-----+ | 'D' | 10 | +-----+-----+ | 'D' | 10 | +-----+-----+ | 'F' | 1 | +-----+-----+ | 'F' | 4 | +-----+-----+ If the input tables are already sorted by the given key, give ``presorted=True`` as a keyword argument. This function is equivalent to concatenating the input tables using :func:`cat` then sorting, however this function will typically be more efficient, especially if the input tables are presorted. Keyword arguments: key : string or tuple of strings, optional Field name or tuple of fields to sort by (defaults to `None` lexical sort) reverse : bool, optional `True` if sort in reverse (descending) order (defaults to `False`) presorted : bool, optional `True` if inputs are already sorted by the given key (defaults to `False`) missing : object Value to fill with when input tables have different fields (defaults to `None`) header : sequence of strings, optional Specify a fixed header for the output table buffersize : int, optional Limit the number of rows in memory per input table when inputs are not presorted """ return MergeSortView(tables, **kwargs)
Table.mergesort = mergesort class MergeSortView(Table): def __init__(self, tables, key=None, reverse=False, presorted=False, missing=None, header=None, buffersize=None, tempdir=None, cache=True): self.key = key if presorted: self.tables = tables else: self.tables = [sort(t, key=key, reverse=reverse, buffersize=buffersize, tempdir=tempdir, cache=cache) for t in tables] self.missing = missing self.header = header self.reverse = reverse def __iter__(self): return itermergesort(self.tables, self.key, self.header, self.missing, self.reverse) def itermergesort(sources, key, header, missing, reverse): # first need to standardise headers of all input tables # borrow this from itercat - TODO remove code smells its = [iter(t) for t in sources] src_hdrs = [] for it in its: try: src_hdrs.append(next(it)) except StopIteration: src_hdrs.append([]) if header is None: # determine output fields by gathering all fields found in the sources outhdr = list() for hdr in src_hdrs: for f in list(map(text_type, hdr)): if f not in outhdr: # add any new fields as we find them outhdr.append(f) else: # predetermined output fields outhdr = header yield tuple(outhdr) def _standardisedata(it, hdr, ofs): flds = list(map(text_type, hdr)) # now construct and yield the data rows for _row in it: try: # should be quickest to do this way yield tuple(_row[flds.index(fo)] if fo in flds else missing for fo in ofs) except IndexError: # handle short rows outrow = [missing] * len(ofs) for i, fi in enumerate(flds): try: outrow[ofs.index(fi)] = _row[i] except IndexError: pass # be relaxed about short rows yield tuple(outrow) # wrap all iterators to standardise fields sits = [_standardisedata(it, hdr, outhdr) for hdr, it in zip(src_hdrs, its)] # now determine key function getkey = None if key is not None: # convert field selection into field indices indices = asindices(outhdr, key) # now use field indices to construct a _getkey function # N.B., this will probably raise an exception on short rows getkey = comparable_itemgetter(*indices) # OK, do the merge sort for row in _shortlistmergesorted(getkey, reverse, *sits): yield row
[docs]def issorted(table, key=None, reverse=False, strict=False): """ Return True if the table is ordered (i.e., sorted) by the given key. E.g.:: >>> import petl as etl >>> table1 = [['foo', 'bar', 'baz'], ... ['a', 1, True], ... ['b', 3, True], ... ['b', 2]] >>> etl.issorted(table1, key='foo') True >>> etl.issorted(table1, key='bar') False >>> etl.issorted(table1, key='foo', strict=True) False >>> etl.issorted(table1, key='foo', reverse=True) False """ # determine the operator to use when comparing rows if reverse and strict: op = operator.lt elif reverse and not strict: op = operator.le elif strict: op = operator.gt else: op = operator.ge it = iter(table) try: flds = [text_type(f) for f in next(it)] except StopIteration: flds = [] if key is None: prev = next(it) for curr in it: if not op(curr, prev): return False prev = curr else: getkey = comparable_itemgetter(*asindices(flds, key)) prev = next(it) prevkey = getkey(prev) for curr in it: currkey = getkey(curr) if not op(currkey, prevkey): return False prevkey = currkey return True
Table.issorted = issorted