Source code for petl.util.timing

from __future__ import absolute_import, print_function, division


import abc
import logging
import sys
import time


from petl.compat import PY3
from petl.util.base import Table
from petl.util.statistics import onlinestats


[docs]def progress(table, batchsize=1000, prefix="", out=None): """ Report progress on rows passing through to a file or file-like object (defaults to sys.stderr). E.g.:: >>> import petl as etl >>> table = etl.dummytable(100000) >>> table.progress(10000).tocsv('example.csv') # doctest: +SKIP 10000 rows in 0.13s (78363 row/s); batch in 0.13s (78363 row/s) 20000 rows in 0.22s (91679 row/s); batch in 0.09s (110448 row/s) 30000 rows in 0.31s (96573 row/s); batch in 0.09s (108114 row/s) 40000 rows in 0.40s (99535 row/s); batch in 0.09s (109625 row/s) 50000 rows in 0.49s (101396 row/s); batch in 0.09s (109591 row/s) 60000 rows in 0.59s (102245 row/s); batch in 0.09s (106709 row/s) 70000 rows in 0.68s (103221 row/s); batch in 0.09s (109498 row/s) 80000 rows in 0.77s (103810 row/s); batch in 0.09s (108126 row/s) 90000 rows in 0.90s (99465 row/s); batch in 0.13s (74516 row/s) 100000 rows in 1.02s (98409 row/s); batch in 0.11s (89821 row/s) 100000 rows in 1.02s (98402 row/s); batches in 0.10 +/- 0.02s [0.09-0.13] (100481 +/- 13340 rows/s [74516-110448]) See also :func:`petl.util.timing.clock`. """ return ProgressView(table, batchsize, prefix, out)
[docs]def log_progress(table, batchsize=1000, prefix="", logger=None, level=logging.INFO): """ Report progress on rows passing through to a python logger. If logger is none, a new logger will be created that, by default, streams to stdout. E.g.:: >>> import petl as etl >>> table = etl.dummytable(100000) >>> table.log_progress(10000).tocsv('example.csv') # doctest: +SKIP 10000 rows in 0.13s (78363 row/s); batch in 0.13s (78363 row/s) 20000 rows in 0.22s (91679 row/s); batch in 0.09s (110448 row/s) 30000 rows in 0.31s (96573 row/s); batch in 0.09s (108114 row/s) 40000 rows in 0.40s (99535 row/s); batch in 0.09s (109625 row/s) 50000 rows in 0.49s (101396 row/s); batch in 0.09s (109591 row/s) 60000 rows in 0.59s (102245 row/s); batch in 0.09s (106709 row/s) 70000 rows in 0.68s (103221 row/s); batch in 0.09s (109498 row/s) 80000 rows in 0.77s (103810 row/s); batch in 0.09s (108126 row/s) 90000 rows in 0.90s (99465 row/s); batch in 0.13s (74516 row/s) 100000 rows in 1.02s (98409 row/s); batch in 0.11s (89821 row/s) 100000 rows in 1.02s (98402 row/s); batches in 0.10 +/- 0.02s [0.09-0.13] (100481 +/- 13340 rows/s [74516-110448]) See also :func:`petl.util.timing.clock`. """ return LoggingProgressView(table, batchsize, prefix, logger, level=level)
Table.progress = progress Table.log_progress = log_progress class ProgressViewBase(Table): """ Abstract base class for reporting on proecessing status """ def __init__(self, inner, batchsize, prefix): self.inner = inner self.batchsize = batchsize self.prefix = prefix @abc.abstractmethod def print_message(self, message): pass def __iter__(self): start = time.time() batchstart = start batchn = 0 batchtimemin, batchtimemax = None, None batchtimemean, batchtimevar = 0, 0 batchratemean, batchratevar = 0, 0 for n, r in enumerate(self.inner): if n % self.batchsize == 0 and n > 0: batchn += 1 batchend = time.time() batchtime = batchend - batchstart if batchtimemin is None or batchtime < batchtimemin: batchtimemin = batchtime if batchtimemax is None or batchtime > batchtimemax: batchtimemax = batchtime elapsedtime = batchend - start try: rate = int(n / elapsedtime) except ZeroDivisionError: rate = 0 try: batchrate = int(self.batchsize / batchtime) except ZeroDivisionError: batchrate = 0 v = (n, elapsedtime, rate, batchtime, batchrate) message = self.prefix + \ '%s rows in %.2fs (%s row/s); ' \ 'batch in %.2fs (%s row/s)' % v self.print_message(message) batchstart = batchend batchtimemean, batchtimevar = \ onlinestats(batchtime, batchn, mean=batchtimemean, variance=batchtimevar) batchratemean, batchratevar = \ onlinestats(batchrate, batchn, mean=batchratemean, variance=batchratevar) yield r # compute total elapsed time and rate end = time.time() elapsedtime = end - start try: rate = int(n / elapsedtime) except ZeroDivisionError: rate = 0 # construct the final message if batchn > 1: if batchtimemin is None: batchtimemin = 0 if batchtimemax is None: batchtimemax = 0 try: batchratemin = int(self.batchsize / batchtimemax) except ZeroDivisionError: batchratemin = 0 try: batchratemax = int(self.batchsize / batchtimemin) except ZeroDivisionError: batchratemax = 0 v = (n, elapsedtime, rate, batchtimemean, batchtimevar**.5, batchtimemin, batchtimemax, int(batchratemean), int(batchratevar**.5), int(batchratemin), int(batchratemax)) message = self.prefix + '%s rows in %.2fs (%s row/s); batches in ' \ '%.2f +/- %.2fs [%.2f-%.2f] ' \ '(%s +/- %s rows/s [%s-%s])' % v else: v = (n, elapsedtime, rate) message = self.prefix + '%s rows in %.2fs (%s row/s)' % v self.print_message(message) class ProgressView(ProgressViewBase): """ Reports progress to a file_object like sys.stdout or a file handler """ def __init__(self, inner, batchsize, prefix, out): if out is None: self.file_object = sys.stderr else: self.file_object = out super(ProgressView, self).__init__(inner, batchsize, prefix) def print_message(self, message): print(message, file=self.file_object) if hasattr(self.file_object, 'flush'): self.file_object.flush() class LoggingProgressView(ProgressViewBase): """ Reports progress to a logger, log handler, or log adapter """ def __init__(self, inner, batchsize, prefix, logger, level=logging.INFO): if logger is None: self.logger = logging.getLogger(__name__) self.logger.setLevel(level) else: self.logger = logger self.level = level super(LoggingProgressView, self).__init__(inner, batchsize, prefix) def print_message(self, message): self.logger.log(self.level, message)
[docs]def clock(table): """ Time how long is spent retrieving rows from the wrapped container. Enables diagnosis of which steps in a pipeline are taking the most time. E.g.:: >>> import petl as etl >>> t1 = etl.dummytable(100000) >>> c1 = etl.clock(t1) >>> t2 = etl.convert(c1, 'foo', lambda v: v**2) >>> c2 = etl.clock(t2) >>> p = etl.progress(c2, 10000) >>> etl.tocsv(p, 'example.csv') # doctest: +SKIP 10000 rows in 0.23s (44036 row/s); batch in 0.23s (44036 row/s) 20000 rows in 0.38s (52167 row/s); batch in 0.16s (63979 row/s) 30000 rows in 0.54s (55749 row/s); batch in 0.15s (64624 row/s) 40000 rows in 0.69s (57765 row/s); batch in 0.15s (64793 row/s) 50000 rows in 0.85s (59031 row/s); batch in 0.15s (64707 row/s) 60000 rows in 1.00s (59927 row/s); batch in 0.15s (64847 row/s) 70000 rows in 1.16s (60483 row/s); batch in 0.16s (64051 row/s) 80000 rows in 1.31s (61008 row/s); batch in 0.15s (64953 row/s) 90000 rows in 1.47s (61356 row/s); batch in 0.16s (64285 row/s) 100000 rows in 1.62s (61703 row/s); batch in 0.15s (65012 row/s) 100000 rows in 1.62s (61700 row/s); batches in 0.16 +/- 0.02s [0.15-0.23] (62528 +/- 6173 rows/s [44036-65012]) >>> # time consumed retrieving rows from t1 ... c1.time # doctest: +SKIP 0.7243089999999492 >>> # time consumed retrieving rows from t2 ... c2.time # doctest: +SKIP 1.1704209999999766 >>> # actual time consumed by the convert step ... c2.time - c1.time # doctest: +SKIP 0.4461120000000274 See also :func:`petl.util.timing.progress`. """ return ClockView(table)
Table.clock = clock class ClockView(Table): def __init__(self, wrapped): self.wrapped = wrapped def __iter__(self): self.time = 0 it = iter(self.wrapped) while True: before = time.perf_counter() if PY3 else time.clock() try: row = next(it) except StopIteration: return after = time.perf_counter() if PY3 else time.clock() self.time += (after - before) yield row