Source code for petl.io.sources

from __future__ import absolute_import, print_function, division


import io
import gzip
import sys
import bz2
import zipfile
from contextlib import contextmanager
import subprocess
import logging


from petl.errors import ArgumentError
from petl.compat import urlopen, StringIO, BytesIO, string_types, PY2


logger = logging.getLogger(__name__)
warning = logger.warning
info = logger.info
debug = logger.debug


[docs]class FileSource(object): def __init__(self, filename, **kwargs): self.filename = filename self.kwargs = kwargs def open(self, mode='r'): return io.open(self.filename, mode, **self.kwargs)
[docs]class GzipSource(object): def __init__(self, filename, **kwargs): self.filename = filename self.kwargs = kwargs @contextmanager def open(self, mode='r'): source = gzip.open(self.filename, mode, **self.kwargs) try: yield source finally: source.close()
[docs]class BZ2Source(object): def __init__(self, filename, **kwargs): self.filename = filename self.kwargs = kwargs def open(self, mode='r'): return bz2.BZ2File(self.filename, mode, **self.kwargs)
[docs]class ZipSource(object): def __init__(self, filename, membername, pwd=None, **kwargs): self.filename = filename self.membername = membername self.pwd = pwd self.kwargs = kwargs @contextmanager def open(self, mode): if PY2: mode = mode.translate(None, 'bU') else: mode = mode.translate({ord('b'): None, ord('U'): None}) zf = zipfile.ZipFile(self.filename, mode, **self.kwargs) try: if self.pwd is not None: yield zf.open(self.membername, mode, self.pwd) else: yield zf.open(self.membername, mode) finally: zf.close()
[docs]class StdinSource(object): @contextmanager def open(self, mode='r'): if not mode.startswith('r'): raise ArgumentError('source is read-only') yield sys.stdin
[docs]class StdoutSource(object): @contextmanager def open(self, mode): if mode.startswith('r'): raise ArgumentError('source is write-only') yield sys.stdout
[docs]class URLSource(object): def __init__(self, *args, **kwargs): self.args = args self.kwargs = kwargs @contextmanager def open(self, mode='r'): if not mode.startswith('r'): raise ArgumentError('source is read-only') f = urlopen(*self.args, **self.kwargs) try: yield f finally: f.close()
[docs]class MemorySource(object): def __init__(self, s=None): self.s = s self.buffer = None @contextmanager def open(self, mode='rb'): try: if 'r' in mode: if self.s is not None: if 'b' in mode: self.buffer = BytesIO(self.s) else: self.buffer = StringIO(self.s) else: raise ArgumentError('no string data supplied') elif 'w' in mode: if self.buffer is not None: self.buffer.close() if 'b' in mode: self.buffer = BytesIO() else: self.buffer = StringIO() elif 'a' in mode: if self.buffer is None: if 'b' in self.buffer: self.buffer = BytesIO() else: self.buffer = StringIO() yield self.buffer except: raise finally: pass # don't close the buffer def getvalue(self): if self.buffer: return self.buffer.getvalue() # backwards compatibility
StringSource = MemorySource
[docs]class PopenSource(object): def __init__(self, *args, **kwargs): self.args = args self.kwargs = kwargs @contextmanager def open(self, mode='r'): if not mode.startswith('r'): raise ArgumentError('source is read-only') self.kwargs['stdout'] = subprocess.PIPE proc = subprocess.Popen(*self.args, **self.kwargs) try: yield proc.stdout finally: pass
_invalid_source_msg = 'invalid source argument, expected None or a string or ' \ 'an object implementing open(), found %r' def read_source_from_arg(source): if source is None: return StdinSource() elif isinstance(source, string_types): if any(map(source.startswith, ['http://', 'https://', 'ftp://'])): return URLSource(source) elif source.endswith('.gz') or source.endswith('.bgz'): return GzipSource(source) elif source.endswith('.bz2'): return BZ2Source(source) else: return FileSource(source) else: assert (hasattr(source, 'open') and callable(getattr(source, 'open'))), \ _invalid_source_msg % source return source def write_source_from_arg(source): if source is None: return StdoutSource() elif isinstance(source, string_types): if source.endswith('.gz') or source.endswith('.bgz'): return GzipSource(source) elif source.endswith('.bz2'): return BZ2Source(source) else: return FileSource(source) else: assert (hasattr(source, 'open') and callable(getattr(source, 'open'))), \ _invalid_source_msg % source return source