Source code for tafra.base

"""
Tafra: a minimalist dataframe

Copyright (c) 2020 Derrick W. Turk and David S. Fulford

Author
------
Derrick W. Turk
David S. Fulford

Notes
-----
Created on April 25, 2020
"""
__all__ = ['Tafra']

from pathlib import Path
import re
import warnings
import csv
import pprint as pprint
from datetime import date, datetime
from itertools import chain, islice
from collections import namedtuple
import dataclasses as dc

import numpy as np
from .protocol import Series, DataFrame, Cursor  # just for mypy...

from typing import (Any, Callable, Dict, Mapping, List, Tuple, Optional, Union as _Union, Sequence,
                    Sized, Iterable, Iterator, Type, KeysView, ValuesView, ItemsView,
                    IO)
from typing_extensions import Concatenate, ParamSpec
from typing import cast
from io import TextIOWrapper

from .formatter import ObjectFormatter
from .csvreader import CSVReader


P = ParamSpec('P')


# default object formats
object_formatter = ObjectFormatter()
object_formatter['Decimal'] = lambda x: x.astype(float)


NAMEDTUPLE_TYPE: Dict[str, Type[Any]] = {
    'int': int,
    'float': float,
    'bool': bool,
    'str': str,
    'date': date,
    'datetime': datetime,
    'object': str,
}

RECORD_TYPE: Dict[str, Callable[[Any], Any]] = {
    'int': int,
    'float': float,
    'bool': bool,
    'str': str,
    'date': lambda x: x.isoformat(),
    'datetime': lambda x: x.isoformat(),
    'object': str,
}


Scalar = _Union[str, int, float, bool]
_Mapping = _Union[
    Mapping[str, Any],
    Mapping[int, Any],
    Mapping[float, Any],
    Mapping[bool, Any],
]
_Element = _Union[Tuple[_Union[str, int, float, np.ndarray], Any], List[Any], _Mapping]
InitVar = _Union[
    Tuple[str, Any],
    _Mapping,
    Sequence[_Element],
    Iterable[_Element],
    Iterator[_Element],
    enumerate
]


[docs]@dc.dataclass(repr=False, eq=False) class Tafra: """ A minimalist dataframe. Constructs a :class:`Tafra` from :class:`dict` of data and (optionally) dtypes. Types on parameters are the types of the constructed :class:`Tafra`, but attempts are made to parse anything that "looks" like the correct data structure, including :class:`Iterable`, :class:`Iterator`, :class:`Sequence`, and :class:`Mapping` and various combinations. Parameters are given as an ``InitVar``, defined as: ``InitVar = Union[Tuple[str, Any], _Mapping, Sequence[_Element], Iterable[_Element],`` ``Iterator[_Element], enumerate]`` ``_Mapping = Union[Mapping[str, Any], Mapping[int, Any], Mapping[float, Any],`` ``Mapping[bool, Any]`` ``_Element = Union[Tuple[Union[str, int, float, np.ndarray], Any], List[Any], Mapping]`` Parameters ---------- data: InitVar The data of the Tafra. dtypes: InitVar The dtypes of the columns. validate: bool = True Run validation checks of the data. False will improve performance, but `data` and `dtypes` will not be validated for conformance to expected data structures. check_rows: bool = True Run row count checks. False will allow columns of differing lengths, which may break several methods. Returns ------- tafra: Tafra The constructed :class:`Tafra`. """ data: dc.InitVar[InitVar] dtypes: dc.InitVar[Optional[InitVar]] = None validate: dc.InitVar[bool] = True check_rows: bool = True _data: Dict[str, np.ndarray] = dc.field(init=False) _dtypes: Dict[str, str] = dc.field(init=False) def __post_init__(self, data: InitVar, dtypes: Optional[InitVar], validate: bool) -> None: # TODO: enable this? # if isinstance(self._data, DataFrame): # tf = self.from_dataframe(df=self._data) # self._data = tf._data # self._dtypes = tf._dtypes # self._rows = tf._rows # return rows: Optional[int] = None if validate: # check that the structure is actually a dict self._data = self._check_initvar(data) if dtypes is None or isinstance(dtypes, property): self._dtypes = {} else: self._dtypes = cast(Dict[str, str], self._check_initvar(dtypes)) # check that the values are properly formed np.ndarray for column, value in self._data.items(): self._ensure_valid(column, value, check_rows=False) n_rows = len(self._data[column]) if rows is None: rows = n_rows if self.check_rows and rows != n_rows: raise ValueError('`Tafra` must have consistent row counts.') elif rows < n_rows: # pragma: no cover rows = n_rows if rows is None: raise ValueError('No data provided in constructor statement.') self.update_dtypes_inplace(self._dtypes) # must coalesce all dtypes immediately, other functions assume a # proper structure of the Tafra self._coalesce_dtypes() else: self._data = cast(Dict[str, np.ndarray], data) if dtypes is None or isinstance(dtypes, property): self._dtypes = {} self._coalesce_dtypes() else: self._dtypes = cast(Dict[str, str], dtypes) self._update_rows() def _check_initvar(self, values: InitVar) -> Dict[str, Any]: """ Pre-process an :class:`InitVar` into a :class:`Dict`. """ _values: Dict[Any, Any] if isinstance(values, (Mapping, dict)): _values = cast(Dict[str, Any], values) elif isinstance(values, Sequence): _values = self._parse_sequence(values) elif isinstance(values, (Iterator, enumerate)): _values = self._parse_iterator(cast(Iterator[_Element], values)) elif isinstance(values, Iterable): _values = self._parse_iterable(cast(Iterable[_Element], values)) else: # last ditch attempt _values = cast(Dict[Any, Any], values) if not isinstance(_values, Dict): raise TypeError('Must contain `Dict`, `Mapping`, `Sequence`, Iterable, or Iterator, ' f'got `{type(_values)}`') # cast all keys to strings if they are not # must copy first as mutating the dict changes next(iterator) columns = [c for c in _values.keys() if not isinstance(c, str)] for column in columns: _values[str(column)] = _values.pop(column) return _values def _parse_sequence(self, values: Sequence[_Element]) -> Dict[Any, Any]: """ Pre-Process a :class:`Sequence` :class:`InitVar` into a :class:`Dict`. """ head = values[0] if isinstance(head, Dict): for _dict in values: head.update(cast(Dict[Any, Any], _dict)) _values = head # maybe a Sequence of 2-tuples or 2-lists? Cast and try it. elif isinstance(head, Sequence) and len(head) == 2: # is the key an ndarray? turn it into a scalar if isinstance(head[0], np.ndarray) and len(np.atleast_1d(head[0])) == 1: # mypy doesn't get that we've checked the head of values as an ndarray _values = {key.item(): value for key, value in cast(Iterable[Tuple[np.ndarray, Any]], values)} else: _values = dict(cast(Iterable[Tuple[Any, Any]], values)) else: raise TypeError('Sequence must contain `Dict`, `Mapping`, or `Sequence`, ' f'got `{type(head)}`') return _values def _parse_iterable(self, values: Iterable[_Element]) -> Dict[Any, Any]: """ Pre-Process a :class:`Iterable` :class:`InitVar` into a :class:`Dict`. """ iter_values = iter(values) head = next(iter_values) if isinstance(head, Dict): for _dict in iter_values: head.update(cast(Dict[Any, Any], _dict)) _values = head # maybe an Iterable of 2-tuples or 2-lists? Cast and try it. elif isinstance(head, Sequence) and len(head) == 2: # is the key an ndarray? turn it into a scalar if isinstance(head[0], np.ndarray) and len(np.atleast_1d(head[0])) == 1: # mypy doesn't get that we've checked the head of values as an ndarray _values = _values = {key.item(): value for key, value in chain( cast(Iterable[Tuple[np.ndarray, Any]], [head]), cast(Iterator[Tuple[np.ndarray, Any]], values))} else: _values = dict(chain( cast(Iterable[Tuple[Any, Any]], [head]), cast(Iterator[Tuple[Any, Any]], values))) else: raise TypeError('Iterable must contain `Dict`, `Mapping`, or `Sequence`, ' f'got `{type(head)}`') return _values def _parse_iterator(self, values: Iterator[_Element]) -> Dict[Any, Any]: """ Pre-Process a :class:`Iterator` :class:`InitVar` into a :class:`Dict`. """ head = next(values) if isinstance(head, Dict): # consume the iterator if its a dict for _dict in values: head.update(cast(Dict[Any, Any], _dict)) _values = head # maybe an Iterator of 2-tuples or 2-lists? Cast and try it. elif isinstance(head, Sequence) and len(head) == 2: # is the key an ndarray? turn it into a scalar if isinstance(head[0], np.ndarray) and len(np.atleast_1d(head[0])) == 1: # mypy doesn't get that we've checked the head of values as an ndarray _values = {key.item(): value for key, value in chain( cast(Iterable[Tuple[np.ndarray, Any]], [head]), cast(Iterator[Tuple[np.ndarray, Any]], values))} else: _values = dict(chain( cast(Iterable[Tuple[Any, Any]], [head]), cast(Iterator[Tuple[Any, Any]], values))) else: raise TypeError('Iterator must contain `Dict`, `Mapping`, or `Sequence`, ' f'got `{type(head)}`') return _values def __getitem__( self, item: _Union[str, int, slice, Sequence[_Union[str, int, bool]], np.ndarray]) -> Any: # return type is actually Union[np.ndarray, 'Tafra'] but mypy requires user to type check # in either case, what we return is a "slice" of the :class:`Tafra` if isinstance(item, str): return self._data[item] elif isinstance(item, int): return self._iindex(item) elif isinstance(item, slice): return self._slice(item) elif isinstance(item, np.ndarray): return self._ndindex(item) elif isinstance(item, Sequence): if isinstance(item[0], str): return self.select(cast(Sequence[str], item)) else: return self._aindex(cast(Sequence[_Union[int, bool]], item)) else: raise TypeError(f'Type {type(item)} not supported.') def __setitem__(self, item: str, value: _Union[np.ndarray, Sequence[Any], Any]) -> None: self._ensure_valid(item, value, set_item=True) def __repr__(self) -> str: if not hasattr(self, '_rows'): return f'Tafra(data={self._data}, dtypes={self._dtypes}, rows=n/a)' return f'Tafra(data={self._data}, dtypes={self._dtypes}, rows={self._rows})' def __str__(self) -> str: return self.__repr__() def __len__(self) -> int: assert self._data is not None, \ 'Interal error: Cannot construct a Tafra with no data.' return self._rows def __iter__(self) -> Iterator['Tafra']: return (self._iindex(i) for i in range(self._rows))
[docs] def __rshift__(self, other: Callable[['Tafra'], 'Tafra']) -> 'Tafra': return self.pipe(other)
[docs] def iterrows(self) -> Iterator['Tafra']: """ Yield rows as :class:`Tafra`. Use :meth:`itertuples` for better performance. Returns ------- tafras: Iterator[Tafra] An iterator of :class:`Tafra`. """ yield from self.__iter__()
[docs] def itertuples(self, name: Optional[str] = 'Tafra') -> Iterator[Tuple[Any, ...]]: """ Yield rows as :class:`NamedTuple`, or if ``name`` is ``None``, yield rows as :class:`tuple`. Parameters ---------- name: Optional[str] = 'Tafra' The name for the :class:`NamedTuple`. If ``None``, construct a :class:`Tuple` instead. Returns ------- tuples: Iterator[NamedTuple[Any, ...]] An iterator of :class:`NamedTuple`. """ if name is None: return (tuple(values) for values in zip(*self._data.values())) TafraNT = namedtuple(name, self._data.keys()) # type: ignore return map(TafraNT._make, zip(*self._data.values()))
[docs] def itercols(self) -> Iterator[Tuple[str, np.ndarray]]: """ Yield columns as :class:`Tuple[str, np.ndarray]`, where the ``str`` is the column name. Returns ------- tuples: Iterator[Tuple[str, np.ndarray]] An iterator of :class:`Tafra`. """ return map(tuple, self.data.items()) # type: ignore
def _update_rows(self) -> None: """ Updates :attr:`_rows`. User should call this if they have directly assigned to :attr:_data and need to validate the :class:`Tafra`. """ iter_values = iter(self._data.values()) self._rows = len(next(iter_values)) if self.check_rows and not all(len(v) == self._rows for v in iter_values): raise TypeError('Uneven length of data.')
[docs] def _slice(self, _slice: slice) -> 'Tafra': """ Use a :class:`slice` to slice the :class:`Tafra`. Parameters ---------- _slice: slice The ``slice`` object. Returns ------- tafra: Tafra The sliced :class:`Tafra`. """ return Tafra( {column: np.atleast_1d(value[_slice]) for column, value in self._data.items()}, self._dtypes, validate=False )
[docs] def _iindex(self, index: int) -> 'Tafra': """ Use a :class`int` to slice the :class:`Tafra`. Parameters ---------- index: int Returns ------- tafra: Tafra The sliced :class:`Tafra`. """ return Tafra( {column: value[[index]] for column, value in self._data.items()}, self._dtypes, validate=False )
[docs] def _aindex(self, index: Sequence[_Union[int, bool]]) -> 'Tafra': """ Use numpy advanced indexing to slice the :class:`Tafra`. Parameters ---------- index: Sequence[Union[int, bool]] Returns ------- tafra: Tafra The sliced :class:`Tafra`. """ return Tafra( {column: value[index] for column, value in self._data.items()}, self._dtypes, validate=False )
[docs] def _ndindex(self, index: np.ndarray) -> 'Tafra': """ Use :class:`numpy.ndarray` indexing to slice the :class:`Tafra`. Parameters ---------- index: np.ndarray Returns ------- tafra: Tafra The sliced :class:`Tafra`. """ if index.ndim != 1: raise IndexError(f'Indexing np.ndarray must ndim == 1, got ndim == {index.ndim}') return Tafra( {column: value[index] for column, value in self._data.items()}, self._dtypes, validate=False )
def _repr_pretty_(self, p: 'IPython.lib.pretty.RepresentationPrinter', # type: ignore # noqa cycle: bool) -> None: """ A dunder method for IPython to pretty print. Parameters ---------- p: IPython.lib.pretty.RepresentationPrinter IPython provides this class to handle the object representation. cycle: bool IPython has detected an infinite loop. Print an alternative represenation and return. Returns ------- None Calls p.text and returns. """ if cycle: p.text('Tafra(...)') else: p.text(self._pretty_format(lambda s: ' ' + pprint.pformat(s, indent=1)[1:].strip())) def _repr_html_(self) -> str: """ a dunder method for Jupyter Notebook to print HTML. """ return self.to_html() def _pretty_format(self, formatter: Callable[[object], str]) -> str: """ Format _data and _dtypes for pretty printing. Parameters ---------- formatter: Callable[[object], str] A formatter that operates on the _data and _dtypes :class:`dict`. Returns ------- string: str The formatted string for printing. """ PATTERN = r'(, dtype=[a-z]+)(?=\))' return '\n'.join([ 'Tafra(data = {', f'{re.sub(PATTERN, "", formatter(self._data))},', 'dtypes = {', f'{re.sub(PATTERN, "", formatter(self._dtypes))},', f'rows = {self._rows})' ])
[docs] def pformat(self, indent: int = 1, width: int = 80, depth: Optional[int] = None, compact: bool = False) -> str: """ Format for pretty printing. Parameters are passed to :class:`pprint.PrettyPrinter`. Parameters ---------- indent: int Number of spaces to indent for each level of nesting. width: int Attempted maximum number of columns in the output. depth: Optional[int] The maximum depth to print out nested structures. compact: bool If true, several items will be combined in one line. Returns ------- formatted string: str A formatted string for pretty printing. """ return self._pretty_format( lambda s: indent * ' ' + pprint.pformat( s, indent, width, depth, compact=compact)[1:].strip())
[docs] def pprint(self, indent: int = 1, width: int = 80, depth: Optional[int] = None, compact: bool = False) -> None: """ Pretty print. Parameters are passed to :class:`pprint.PrettyPrinter`. Parameters ---------- indent: int Number of spaces to indent for each level of nesting. width: int Attempted maximum number of columns in the output. depth: Optional[int] The maximum depth to print out nested structures. compact: bool If true, several items will be combined in one line. Returns ------- None: None """ print(self.pformat(indent, width, depth, compact=compact))
@staticmethod def _html_thead(columns: Iterable[Any]) -> str: """ Construct the table head of the HTML representation. Parameters ---------- columns: Iterable[Any] An iterable of items with defined func:`__repr__` methods. Returns ------- HTML: str The HTML table head. """ return '<thead>\n<tr>\n{th}\n</tr>\n</thead>' \ .format(th='\n'.join(f'<th>{c}</th>' for c in columns)) @staticmethod def _html_tr(row: Iterable[Any]) -> str: """ Construct each table row of the HTML representation. Parameters ---------- row: Iterable[Any] An iterable of items with defined func:`__repr__` methods. Returns ------- HTML: str The HTML table row. """ return '<tr>\n{td}\n</tr>' \ .format(td='\n'.join(f'<td>{td}</td>' for td in row)) @staticmethod def _html_tbody(tr: Iterable[str]) -> str: """ Construct the table body of the HTML representation. Parameters ---------- tr: Iterable[str] An iterable of HTML table rows. Returns ------- HTML: str The HTML table body. """ return '<tbody>\n{tr}\n</tbody>' \ .format(tr='\n'.join(tr)) @staticmethod def _html_table(thead: str, tbody: str) -> str: """ Construct the final table of the HTML representation. Parameters ---------- thead: str An HTML representation of the table head. tbody: str An HTML representation of the table body. Returns ------- HTML: str The HTML table. """ return f'<table>\n{thead}\n{tbody}\n</table>'
[docs] def to_html(self, n: int = 20) -> str: """ Construct an HTML table representation of the :class:`Tafra` data. Parameters ---------- n: int = 20 Number of items to print. Returns ------- HTML: str The HTML table representation. """ thead = self._html_thead(chain([''], self._data.keys())) tr = chain( [self._html_tr(chain( ['dtype'], (self._dtypes[column] for column in self._data.keys()) ))], (self._html_tr(chain( [i], (v[i] for v in self._data.values()) )) for i in range(min(n, self._rows))) ) tbody = self._html_tbody(tr) return self._html_table(thead, tbody)
def _ensure_valid(self, column: str, value: _Union[np.ndarray, Sequence[Any], Any], check_rows: bool = True, set_item: bool = False) -> None: """ Validate values as an :class:`np.ndarray` of equal length to :attr:`rows` before assignment. Will attempt to create a :class:`np.ndarray` if ``value`` is not one already, and will check that :attr`np.ndarray.ndim` ``== 1``. If :attr:`np.ndarray.ndim` ``> 1`` it will attempt :meth:`np.squeeze` on ``value``. Parameters ---------- column: str The column to assign to. value: Union[np.ndarray, Sequence[Any], Any] The value to be assigned. Returns ------- None: None """ _type = type(value).__name__ id_value = id(value) rows = self._rows if check_rows else 1 if value is None: value = np.full(rows, value) elif isinstance(value, np.ndarray): if value.ndim == 0: value = np.full(rows, value.item()) elif value.ndim == 1 and value.shape[0] == 1 and rows > 1: value = np.full(rows, value) elif isinstance(value, str): value = np.full(rows, value) elif isinstance(value, Iterator): value = np.asarray(tuple(value)) elif isinstance(value, Iterable): value = np.asarray(value) elif not isinstance(value, Sized): value = np.full(rows, value) assert isinstance(value, np.ndarray), \ 'Internal error: `Tafra` only supports assigning `ndarray`.' if value.ndim > 1: sq_value = value.squeeze() if sq_value.ndim > 1: raise ValueError('`ndarray` or `np.squeeze(ndarray)` must have ndim == 1.') elif sq_value.ndim == 1: # if value was a single item, squeeze returns zero length item warnings.warn('`np.squeeze(ndarray)` applied to set ndim == 1.') warnings.resetwarnings() value = sq_value assert value.ndim >= 1, \ 'Interal error: `Tafra` only supports assigning ndim == 1.' if check_rows and len(value) != rows: raise ValueError( '`Tafra` must have consistent row counts.\n' f'This `Tafra` has {rows} rows. Assigned {_type} has {len(value)} rows.') # special parsing of various object types parsed_value = object_formatter.parse_dtype(value) if parsed_value is not None: value = parsed_value # have we modified value? if set_item or id(value) != id_value: self._data[column] = value self._dtypes[column] = self._format_dtype(value.dtype)
[docs] def parse_object_dtypes(self) -> 'Tafra': """ Parse the object dtypes using the :class:`ObjectFormatter` instance. """ tafra = self.copy() tafra.parse_object_dtypes_inplace() return tafra
[docs] def parse_object_dtypes_inplace(self) -> None: """ Inplace version. Parse the object dtypes using the :class:`ObjectFormatter` instance. """ for column, value in self._data.items(): parsed_value = object_formatter.parse_dtype(value) if parsed_value is not None: self._data[column] = parsed_value self._dtypes[column] = self._format_dtype(parsed_value.dtype)
def _validate_columns(self, columns: Iterable[str]) -> None: """ Validate that the column name(s) exists in :attr:`_data`. Parameters ---------- columns: Iterable[str] The column names to validate. Returns ------- None: None """ for column in columns: if column not in self._data.keys(): raise ValueError(f'Column {column} does not exist in `tafra`.') def _validate_dtypes(self, dtypes: Dict[str, Any]) -> Dict[str, str]: """ Validate that the dtypes as internally used names and that the columns exists in :attr:`_data`. Parameters ---------- dtypes: Dict[str, Any] The dtypes to validate. Returns ------- dtypes: Dict[str, str] The validated types. """ self._validate_columns(dtypes.keys()) return {column: self._format_dtype(dtype) for column, dtype in dtypes.items()} @staticmethod def _format_dtype(dtype: Any) -> str: """ Parse a dtype into the internally used string representation, if defined. Otherwise, pass through and let numpy raise error if it is not a valid dtype. Parameters ---------- dtype: Any The dtype to parse. Returns ------- dtype: str The parsed dtype. """ _dtype = np.dtype(dtype) name = _dtype.type.__name__ if 'str' in name: return 'str' return name.replace('_', '') @staticmethod def _reduce_dtype(dtype: Any) -> str: """ Parse a dtype to the base type. Parameters ---------- dtype: Any The dtype to parse. Returns ------- dtype: str The parsed dtype. """ name = np.dtype(dtype).type.__name__ m = re.search(r'([a-z]+)', name) if m: return m.group(1) # are there any dtypes without text names? return name # pragma: no cover
[docs] @classmethod def from_records(cls, records: Iterable[Iterable[Any]], columns: Iterable[str], dtypes: Optional[Iterable[Any]] = None, **kwargs: Any) -> 'Tafra': """ Construct a :class:`Tafra` from an Iterator of records, e.g. from a SQL query. The records should be a nested Iterable, but can also be fed a cursor method such as ``cur.fetchmany()`` or ``cur.fetchall()``. Parameters ---------- records: ITerable[Iteralble[str]] The records to turn into a :class:`Tafra`. columns: Iterable[str] The column names to use. dtypes: Optional[Iterable[Any]] = None The dtypes of the columns. Returns ------- tafra: Tafra The constructed :class:`Tafra`. """ if dtypes is None: return Tafra({column: value for column, value in zip(columns, zip(*records))}, **kwargs) return Tafra( {column: value for column, value in zip(columns, zip(*records))}, {column: value for column, value in zip(columns, dtypes)}, **kwargs )
[docs] @classmethod def from_series(cls, s: Series, dtype: Optional[str] = None, **kwargs: Any) -> 'Tafra': """ Construct a :class:`Tafra` from a :class:`pandas.Series`. If ``dtype`` is not given, take from :attr:`pandas.Series.dtype`. Parameters ---------- df: pandas.Series The series used to build the :class:`Tafra`. dtype: Optional[str] = None The dtypes of the column. Returns ------- tafra: Tafra The constructed :class:`Tafra`. """ if dtype is None: dtype = s.dtype dtypes = {s.name: cls._format_dtype(dtype)} return cls( {s.name: s.values.astype(dtypes[s.name])}, dtypes, **kwargs )
[docs] @classmethod def from_dataframe(cls, df: DataFrame, dtypes: Optional[Dict[str, Any]] = None, **kwargs: Any) -> 'Tafra': """ Construct a :class:`Tafra` from a :class:`pandas.DataFrame`. If ``dtypes`` are not given, take from :attr:`pandas.DataFrame.dtypes`. Parameters ---------- df: pandas.DataFrame The dataframe used to build the :class:`Tafra`. dtypes: Optional[Dict[str, Any]] = None The dtypes of the columns. Returns ------- tafra: Tafra The constructed :class:`Tafra`. """ if dtypes is None: dtypes = {c: t for c, t in zip(df.columns, df.dtypes)} dtypes = {c: cls._format_dtype(t) for c, t in dtypes.items()} return cls( {c: df[c].values.astype(dtypes[c]) for c in df.columns}, {c: dtypes[c] for c in df.columns}, **kwargs )
[docs] @classmethod def read_sql(cls, query: str, cur: Cursor) -> 'Tafra': """ Execute a SQL SELECT statement using a :class:`pyodbc.Cursor` and return a Tuple of column names and an Iterator of records. Parameters ---------- query: str The SQL query. cur: pyodbc.Cursor The ``pyodbc`` cursor. Returns ------- tafra: Tafra The constructed :class:`Tafra`. """ cur.execute(query) columns, dtypes = zip(*((d[0], d[1]) for d in cur.description)) head = cur.fetchone() if head is None: return Tafra({column: () for column in columns}) return Tafra.from_records(chain([head], cur.fetchall()), columns, dtypes)
[docs] @classmethod def read_sql_chunks(cls, query: str, cur: Cursor, chunksize: int = 100) -> Iterator['Tafra']: """ Execute a SQL SELECT statement using a :class:`pyodbc.Cursor` and return a Tuple of column names and an Iterator of records. Parameters ---------- query: str The SQL query. cur: pyodbc.Cursor The ``pyodbc`` cursor. Returns ------- tafra: Tafra The constructed :class:`Tafra`. """ cur.execute(query) columns, dtypes = zip(*((d[0], d[1]) for d in cur.description)) head = cur.fetchone() if head is None: yield Tafra({column: () for column in columns}) return def chunks(iterable: Iterable[Any], chunksize: int = 1000) -> Iterator[Iterable[Any]]: for f in iterable: yield list(chain([f], islice(iterable, chunksize - 1))) for chunk in chunks(chain([head], cur), chunksize): yield Tafra.from_records(chunk, columns, dtypes)
[docs] @classmethod def read_csv(cls, csv_file: _Union[str, Path, TextIOWrapper, IO[str]], guess_rows: int = 5, missing: Optional[str] = '', dtypes: Optional[Dict[str, Any]] = None, **csvkw: Dict[str, Any] ) -> 'Tafra': """ Read a CSV file with a header row, infer the types of each column, and return a Tafra containing the file's contents. Parameters ---------- csv_file: Union[str, TextIOWrapper] The path to the CSV file, or an open file-like object. guess_rows: int The number of rows to use when guessing column types. dtypes: Optional[Dict[str, str]] dtypes by column name; by default, all dtypes will be inferred from the file contents. **csvkw: Dict[str, Any] Additional keyword arguments passed to csv.reader. Returns ------- tafra: Tafra The constructed :class:`Tafra`. """ reader = CSVReader(cast(_Union[str, Path, TextIOWrapper], csv_file), guess_rows, missing, **csvkw) return Tafra(reader.read(), dtypes=dtypes)
[docs] @classmethod def as_tafra(cls, maybe_tafra: _Union['Tafra', DataFrame, Series, Dict[str, Any], Any] ) -> Optional['Tafra']: """ Returns the unmodified `tafra`` if already a :class:`Tafra`, else construct a :class:`Tafra` from known types or subtypes of :class:`DataFrame` or `dict`. Structural subtypes of :class:`DataFrame` or :class:`Series` are also valid, as are classes that have ``cls.__name__ == 'DataFrame'`` or ``cls.__name__ == 'Series'``. Parameters ---------- maybe_tafra: Union['tafra', DataFrame] The object to ensure is a :class:`Tafra`. Returns ------- tafra: Optional[Tafra] The :class:`Tafra`, or None is ``maybe_tafra`` is an unknown type. """ if isinstance(maybe_tafra, Tafra): return maybe_tafra elif isinstance(maybe_tafra, Series): # pragma: no cover return cls.from_series(maybe_tafra) elif type(maybe_tafra).__name__ == 'Series': # pragma: no cover return cls.from_series(cast(Series, maybe_tafra)) elif isinstance(maybe_tafra, DataFrame): # pragma: no cover return cls.from_dataframe(maybe_tafra) elif type(maybe_tafra).__name__ == 'DataFrame': # pragma: no cover return cls.from_dataframe(cast(DataFrame, maybe_tafra)) elif isinstance(maybe_tafra, dict): return cls(maybe_tafra) raise TypeError(f'Unknown type `{type(maybe_tafra)}` for conversion to `Tafra`')
@property def columns(self) -> Tuple[str, ...]: """ The names of the columns. Equivalent to `Tafra`.keys(). Returns ------- columns: Tuple[str, ...] The column names. """ return tuple(self._data.keys()) @columns.setter def columns(self, value: Any) -> None: raise ValueError('Assignment to `columns` is forbidden.') @property def rows(self) -> int: """ The number of rows of the first item in :attr:`data`. The :func:`len()` of all items have been previously validated. Returns ------- rows: int The number of rows of the :class:`Tafra`. """ return self.__len__() @rows.setter def rows(self, value: Any) -> None: raise ValueError('Assignment to `rows` is forbidden.') @property # type: ignore def data(self) -> Dict[str, np.ndarray]: """ The :class:`Tafra` data. Returns ------- data: Dict[str, np.ndarray] The data. """ return self._data @data.setter def data(self, value: Any) -> None: raise ValueError('Assignment to `data` is forbidden.') @property # type: ignore def dtypes(self) -> Dict[str, str]: """ The :class:`Tafra` dtypes. Returns ------- dtypes: Dict[str, str] The dtypes. """ return self._dtypes @dtypes.setter def dtypes(self, value: Any) -> None: raise ValueError('Assignment to `dtypes` is forbidden.') @property def size(self) -> int: """ The :class:`Tafra` size. Returns ------- size: int The size. """ return self.rows * len(self.columns) @size.setter def size(self, value: Any) -> None: raise ValueError('Assignment to `size` is forbidden.') @property def ndim(self) -> int: """ The :class:`Tafra` number of dimensions. Returns ------- ndim: int The number of dimensions. """ return max(2, len(self.columns)) @ndim.setter def ndim(self, value: Any) -> None: raise ValueError('Assignment to `ndim` is forbidden.') @property def shape(self) -> Tuple[int, int]: """ The :class:`Tafra` shape. Returns ------- shape: int The shape. """ return self.rows, len(self.columns) @shape.setter def shape(self, value: Any) -> None: raise ValueError('Assignment to `shape` is forbidden.')
[docs] def row_map(self, fn: Callable[..., Any], *args: Any, **kwargs: Any) -> Iterator[Any]: """ Map a function over rows. To apply to specific columns, use :meth:`select` first. The function must operate on :class:`Tafra`. Parameters ---------- fn: Callable[..., Any] The function to map. *args: Any Additional positional arguments to ``fn``. **kwargs: Any Additional keyword arguments to ``fn``. Returns ------- iter_tf: Iterator[Any] An iterator to map the function. """ return (fn(tf, *args, **kwargs) for tf in self.__iter__())
[docs] def tuple_map(self, fn: Callable[..., Any], *args: Any, **kwargs: Any) -> Iterator[Any]: """ Map a function over rows. This is faster than :meth:`row_map`. To apply to specific columns, use :meth:`select` first. The function must operate on :class:`NamedTuple` from :meth:`itertuples`. Parameters ---------- fn: Callable[..., Any] The function to map. name: Optional[str] = 'Tafra' The name for the :class:`NamedTuple`. If ``None``, construct a :class:`Tuple` instead. Must be given as a keyword argument. *args: Any Additional positional arguments to ``fn``. **kwargs: Any Additional keyword arguments to ``fn``. Returns ------- iter_tf: Iterator[Any] An iterator to map the function. """ name = kwargs.pop('name', 'Tafra') return (fn(tf, *args, **kwargs) for tf in self.itertuples(name))
[docs] def col_map(self, fn: Callable[..., Any], *args: Any, **kwargs: Any) -> Iterator[Any]: """ Map a function over columns. To apply to specific columns, use :meth:`select` first. The function must operate on :class:`Tuple[str, np.ndarray]`. Parameters ---------- fn: Callable[..., Any] The function to map. *args: Any Additional positional arguments to ``fn``. **kwargs: Any Additional keyword arguments to ``fn``. Returns ------- iter_tf: Iterator[Any] An iterator to map the function. """ return (fn(value, *args, **kwargs) for column, value in self.itercols())
[docs] def key_map(self, fn: Callable[..., Any], *args: Any, **kwargs: Any) -> Iterator[Tuple[str, Any]]: """ Map a function over columns like :meth:col_map, but return :class:`Tuple` of the key with the function result. To apply to specific columns, use :meth:`select` first. The function must operate on :class:`Tuple[str, np.ndarray]`. Parameters ---------- fn: Callable[..., Any] The function to map. *args: Any Additional positional arguments to ``fn``. **kwargs: Any Additional keyword arguments to ``fn``. Returns ------- iter_tf: Iterator[Any] An iterator to map the function. """ return ((column, fn(value, *args, **kwargs)) for column, value in self.itercols())
[docs] def pipe(self, fn: Callable[Concatenate['Tafra', P], 'Tafra'], *args: Any, **kwargs: Any) -> 'Tafra': """ Apply a function to the :class:`Tafra` and return the resulting :class:`Tafra`. Primarily used to build a tranformer pipeline. Parameters ---------- fn: Callable[[], 'Tafra'] The function to apply. *args: Any Additional positional arguments to ``fn``. **kwargs: Any Additional keyword arguments to ``fn``. Returns ------- tafra: Tafra A new :class:`Tafra` result of the function. """ return fn(self, *args, **kwargs)
[docs] def select(self, columns: Iterable[str]) -> 'Tafra': """ Use column names to slice the :class:`Tafra` columns analogous to SQL SELECT. This does not copy the data. Call :meth:`copy` to obtain a copy of the sliced data. Parameters ---------- columns: Iterable[str] The column names to slice from the :class:`Tafra`. Returns ------- tafra: Tafra the :class:`Tafra` with the sliced columns. """ if isinstance(columns, str): columns = [columns] self._validate_columns(columns) return Tafra( {column: self._data[column] for column in columns}, {column: self._dtypes[column] for column in columns}, validate=False )
[docs] def head(self, n: int = 5) -> 'Tafra': """ Display the head of the :class:`Tafra`. Parameters ---------- n: int = 5 The number of rows to display. Returns ------- None: None """ return self._slice(slice(n))
[docs] def keys(self) -> KeysView[str]: """ Return the keys of :attr:`data`, i.e. like :meth:`dict.keys()`. Returns ------- data keys: KeysView[str] The keys of the data property. """ return self._data.keys()
[docs] def values(self) -> ValuesView[np.ndarray]: """ Return the values of :attr:`data`, i.e. like :meth:`dict.values()`. Returns ------- data values: ValuesView[np.ndarray] The values of the data property. """ return self._data.values()
[docs] def items(self) -> ItemsView[str, np.ndarray]: """ Return the items of :attr:`data`, i.e. like :meth:`dict.items()`. Returns ------- items: ItemsView[str, np.ndarray] The data items. """ return self._data.items()
[docs] def get(self, key: str, default: Any = None) -> Any: """ Return from the :meth:`get` function of :attr:`data`, i.e. like :meth:`dict.get()`. Parameters ---------- key: str The key value in the data property. default: Any The default to return if the key does not exist. Returns ------- value: Any The value for the key, or the default if the key does not exist. """ return self._data.get(key, default)
[docs] def update(self, other: 'Tafra') -> 'Tafra': """ Update the data and dtypes of this :class:`Tafra` with another :class:`Tafra`. Length of rows must match, while data of different ``dtype`` will overwrite. Parameters ---------- other: Tafra The other :class:`Tafra` from which to update. Returns ------- None: None """ tafra = self.copy() tafra.update_inplace(other) return tafra
[docs] def update_inplace(self, other: 'Tafra') -> None: """ Inplace version. Update the data and dtypes of this :class:`Tafra` with another :class:`Tafra`. Length of rows must match, while data of different ``dtype`` will overwrite. Parameters ---------- other: Tafra The other :class:`Tafra` from which to update. Returns ------- None: None """ if not isinstance(other, Tafra): # should be a Tafra, but if not let's construct one other = Tafra(other) # type: ignore rows = self._rows for column, value in other._data.items(): if len(value) != rows: raise ValueError( 'Other `Tafra` must have consistent row count. ' f'This `Tafra` has {rows} rows, other `Tafra` has {len(value)} rows.') self._data[column] = value self.update_dtypes_inplace(other._dtypes)
[docs] def _coalesce_dtypes(self) -> None: """ Update :attr:`dtypes` with missing keys that exist in :attr:`data`. **Must be called if :attr:`data` or :attr:`data` is directly modified!** Returns ------- None: None """ for column in self._data.keys(): if column not in self._dtypes: self._dtypes[column] = self._format_dtype(self._data[column].dtype)
[docs] def update_dtypes(self, dtypes: Dict[str, Any]) -> 'Tafra': """ Apply new dtypes. Parameters ---------- dtypes: Dict[str, Any] The dtypes to update. If ``None``, create from entries in :attr:`data`. Returns ------- tafra: Optional[Tafra] The updated :class:`Tafra`. """ tafra = self.copy() tafra.update_dtypes_inplace(dtypes) return tafra
[docs] def update_dtypes_inplace(self, dtypes: Dict[str, Any]) -> None: """ Inplace version. Apply new dtypes. Parameters ---------- dtypes: Dict[str, Any] The dtypes to update. If ``None``, create from entries in :attr:`data`. Returns ------- tafra: Optional[Tafra] The updated :class:`Tafra`. """ dtypes = self._validate_dtypes(dtypes) self._dtypes.update(dtypes) for column in dtypes.keys(): if self._format_dtype(self._data[column].dtype) != self._dtypes[column]: try: self._data[column] = self._data[column].astype(self._dtypes[column]) except ValueError: REPL_VALS = ['', ] for repl_val in REPL_VALS: where_repl = np.equal(self._data[column], repl_val) self._data[column][where_repl] = None self._data[column] = self._data[column].astype(self._dtypes[column])
[docs] def rename(self, renames: Dict[str, str]) -> 'Tafra': """ Rename columns in the :class:`Tafra` from a :class:`dict`. Parameters ---------- renames: Dict[str, str] The map from current names to new names. Returns ------- tafra: Optional[Tafra] The :class:`Tafra` with update names. """ tafra = self.copy() tafra.rename_inplace(renames) return tafra
[docs] def rename_inplace(self, renames: Dict[str, str]) -> None: """ In-place version. Rename columns in the :class:`Tafra` from a :class:`dict`. Parameters ---------- renames: Dict[str, str] The map from current names to new names. Returns ------- tafra: Optional[Tafra] The :class:`Tafra` with update names. """ self._validate_columns(renames.keys()) for cur, new in renames.items(): self._data[new] = self._data.pop(cur) self._dtypes[new] = self._dtypes.pop(cur) return None
[docs] def delete(self, columns: Iterable[str]) -> 'Tafra': """ Remove a column from :attr:`data` and :attr:`dtypes`. Parameters ---------- column: str The column to remove. Returns ------- tafra: Optional[Tafra] The :class:`Tafra` with the deleted column. """ if isinstance(columns, str): columns = [columns] self._validate_columns(columns) return Tafra( {column: value.copy() for column, value in self._data.items() if column not in columns}, {column: value for column, value in self._dtypes.items() if column not in columns}, validate=False )
[docs] def delete_inplace(self, columns: Iterable[str]) -> None: """ In-place version. Remove a column from :attr:`data` and :attr:`dtypes`. Parameters ---------- column: str The column to remove. Returns ------- tafra: Optional[Tafra] The :class:`Tafra` with the deleted column. """ if isinstance(columns, str): columns = [columns] self._validate_columns(columns) for column in columns: _ = self._data.pop(column, None) _ = self._dtypes.pop(column, None)
[docs] def copy(self, order: str = 'C') -> 'Tafra': """ Create a copy of a :class:`Tafra`. Parameters ---------- order: str = 'C' {‘C’, ‘F’, ‘A’, ‘K’} Controls the memory layout of the copy. ‘C’ means C-order, ‘F’ means F-order, ‘A’ means ‘F’ if a is Fortran contiguous, ‘C’ otherwise. ‘K’ means match the layout of a as closely as possible. Returns ------- tafra: Tafra A copied :class:`Tafra`. """ return Tafra( {column: value.copy(order=order) for column, value in self._data.items()}, self._dtypes.copy(), validate=False )
[docs] def coalesce(self, column: str, fills: Iterable[ Iterable[_Union[None, str, int, float, bool, np.ndarray]] ]) -> np.ndarray: """ Fill ``None`` values from ``fills``. Analogous to ``SQL COALESCE`` or :meth:`pandas.fillna`. Parameters ---------- column: str The column to coalesce. fills: Iterable[Union[str, int, float, bool, np.ndarray]: Returns ------- data: np.ndarray The coalesced data. """ # TODO: handle dtype? iter_fills = iter(fills) head = next(iter_fills) if column in self._data.keys(): value = self._data[column].copy() else: value = np.empty(self._rows, np.asarray(head).dtype) for _fill in chain([head], iter_fills): fill = np.atleast_1d(_fill) where_na = np.full(self._rows, False) where_na |= value == np.array([None]) try: where_na |= np.isnan(value) except: pass if len(fill) == 1: value[where_na] = fill else: value[where_na] = fill[where_na] return value
[docs] def coalesce_inplace(self, column: str, fills: Iterable[ Iterable[_Union[None, str, int, float, bool, np.ndarray]] ]) -> None: """ In-place version. Fill ``None`` values from ``fills``. Analogous to ``SQL COALESCE`` or :meth:`pandas.fillna`. Parameters ---------- column: str The column to coalesce. fills: Iterable[Union[str, int, float, bool, np.ndarray]: Returns ------- data: np.ndarray The coalesced data. """ self._data[column] = self.coalesce(column, fills) self.update_dtypes_inplace({column: self._data[column].dtype})
def _cast_record(self, dtype: str, data: np.ndarray, cast_null: bool) -> Optional[float]: """ Casts needed to generate records for database insert. Will cast ``np.nan`` to ``None``. Requires changing ``dtype`` to ``object``. Parameters ---------- dtype: str The dtype of the data value. data: np.ndarray The data to have its values cast. cast_null: bool Perform the cast for ``np.nan`` Returns ------- value: Any The cast value. """ _dtype = self._reduce_dtype(dtype) value: Any = RECORD_TYPE[_dtype](data.item()) if cast_null and _dtype == 'float' and np.isnan(data.item()): return None return value
[docs] def to_records(self, columns: Optional[Iterable[str]] = None, cast_null: bool = True) -> Iterator[Tuple[Any, ...]]: """ Return a :class:`Iterator` of :class:`Tuple`, each being a record (i.e. row) and allowing heterogeneous typing. Useful for e.g. sending records back to a database. Parameters ---------- columns: Optional[Iterable[str]] = None The columns to extract. If ``None``, extract all columns. cast_null: bool Cast ``np.nan`` to None. Necessary for :mod:``pyodbc`` Returns ------- records: Iterator[Tuple[Any, ...]] """ if columns is None: columns = self.columns else: if isinstance(columns, str): columns = [columns] self._validate_columns(columns) return (tuple( None if len(self._data[c]) <= row else self._cast_record( self._dtypes[c], self._data[c][[row]], cast_null ) for c in columns) for row in range(self._rows))
[docs] def to_list(self, columns: Optional[Iterable[str]] = None, inner: bool = False) -> _Union[List[np.ndarray], List[List[Any]]]: """ Return a list of homogeneously typed columns (as :class:`numpy.ndarray`). If a generator is needed, use :meth:`to_records`. If ``inner == True`` each column will be cast from :class:`numpy.ndarray` to a :class:`List`. Parameters ---------- columns: Optional[Iterable[str]] = None The columns to extract. If ``None``, extract all columns. inner: bool = False Cast all :class:`np.ndarray` to :class`List`. Returns ------- list: Union[List[np.ndarray], List[List[Any]]] """ if columns is None: columns = self.columns else: if isinstance(columns, str): columns = [columns] self._validate_columns(columns) if inner: return [list(self._data[c]) for c in columns] return [self._data[c] for c in columns]
[docs] def to_tuple(self, columns: Optional[Iterable[str]] = None, name: Optional[str] = 'Tafra', inner: bool = False) -> _Union[Tuple[np.ndarray], Tuple[Tuple[Any, ...]]]: """ Return a :class:`NamedTuple` or :class:`Tuple`. If a generator is needed, use :meth:`to_records`. If ``inner == True`` each column will be cast from :class:`np.ndarray` to a :class:`Tuple`. If `name` is `None`, returns a :class:`Tuple` instead. Parameters ---------- columns: Optional[Iterable[str]] = None The columns to extract. If ``None``, extract all columns. name: Optional[str] = 'Tafra' The name for the :class:`NamedTuple`. If ``None``, construct a :class:`Tuple` instead. inner: bool = False Cast all :class:`np.ndarray` to :class`List`. Returns ------- list: Union[Tuple[np.ndarray], Tuple[Tuple[Any, ...]]] """ if columns is None: columns = self.columns else: if isinstance(columns, str): columns = [columns] self._validate_columns(columns) if name is None: if inner: return tuple(tuple(self._data[c]) for c in columns) # type: ignore return tuple(self._data[c] for c in columns) # type: ignore TafraNT = namedtuple(name, columns, rename=True) # type: ignore if inner: return TafraNT._make((tuple(self._data[c]) for c in columns)) # type: ignore return TafraNT._make((self._data[c] for c in columns)) # type: ignore
[docs] def to_array(self, columns: Optional[Iterable[str]] = None) -> np.ndarray: """ Return an object array. Parameters ---------- columns: Optional[Iterable[str]] = None The columns to extract. If ``None``, extract all columns. Returns ------- array: np.ndarray """ if columns is None: columns = self.columns else: if isinstance(columns, str): columns = [columns] self._validate_columns(columns) return np.array([self._data[c] for c in columns], dtype=object).T
[docs] def to_pandas(self, columns: Optional[Iterable[str]] = None) -> DataFrame: """ Construct a :class:`pandas.DataFrame`. Parameters ---------- columns: Iterable[str] The columns to write. IF ``None``, write all columns. Returns ------- dataframe: :class:`pandas.DataFrame` """ try: import pandas as pd # type: ignore except ImportError as e: # pragma: no cover raise ImportError('`pandas` does not appear to be installed.') if columns is None: columns = self.columns else: if isinstance(columns, str): columns = [columns] self._validate_columns(columns) return pd.DataFrame({ column: pd.Series(value) for column, value in self._data.items() if column in columns })
[docs] def to_csv(self, filename: _Union[str, Path, TextIOWrapper, IO[str]], columns: Optional[Iterable[str]] = None) -> None: """ Write the :class:`Tafra` to a CSV. Parameters ---------- filename: Union[str, Path] The path of the filename to write. columns: Iterable[str] The columns to write. IF ``None``, write all columns. """ if columns is None: columns = self.columns else: if isinstance(columns, str): columns = [columns] self._validate_columns(columns) if isinstance(filename, (str, Path)): f = open(filename, 'w', newline='') should_close = True elif isinstance(filename, TextIOWrapper): if 'w' not in filename.mode: raise ValueError(f'file must be opened in write mode, got {filename.mode}') f = filename should_close = False f.reconfigure(newline='') writer = csv.writer(f, delimiter=',', quotechar='"') writer.writerow((column for column in self._data.keys() if column in columns)) writer.writerows(self.to_records(columns)) if should_close: f.close()
[docs] def union(self, other: 'Tafra') -> 'Tafra': """ Helper function to implement :meth:`tafra.group.Union.apply`. Union two :class:`Tafra` together. Analogy to SQL UNION or `pandas.append`. All column names and dtypes must match. Parameters ---------- other: Tafra The other tafra to union. Returns ------- tafra: Tafra A new tafra with the unioned data. """ return Union().apply(self, other)
[docs] def union_inplace(self, other: 'Tafra') -> None: """ Inplace version. Helper function to implement :meth:`tafra.group.Union.apply_inplace`. Union two :class:`Tafra` together. Analogy to SQL UNION or `pandas.append`. All column names and dtypes must match. Parameters ---------- other: Tafra The other tafra to union. Returns ------- None: None """ Union().apply_inplace(self, other)
[docs] def group_by(self, columns: Iterable[str], aggregation: 'InitAggregation' = {}, iter_fn: Mapping[str, Callable[[np.ndarray], Any]] = dict()) -> 'Tafra': """ Helper function to implement :meth:`tafra.group.GroupBy.apply`. Aggregation by a set of unique values. Analogy to SQL ``GROUP BY``, not :meth:`pandas.DataFrame.groupby()`. Parameters ---------- columns: Iterable[str] The column names to group by. aggregation: Mapping[str, Union[Callable[[np.ndarray], Any], \ Tuple[Callable[[np.ndarray], Any], str]]] Optional. A mapping for columns and aggregation functions. Should be given as {'column': fn} or {'new_column': (fn, 'column')}. iter_fn: Mapping[str, Callable[[np.ndarray], Any]] Optional. A mapping for new columns names to the function to apply to the enumeration. Should be given as {'new_column': fn}. Returns ------- tafra: Tafra The aggregated :class:`Tafra`. """ return GroupBy(columns, aggregation, iter_fn).apply(self)
[docs] def transform(self, columns: Iterable[str], aggregation: 'InitAggregation' = {}, iter_fn: Dict[str, Callable[[np.ndarray], Any]] = dict()) -> 'Tafra': """ Helper function to implement :meth:`tafra.group.Transform.apply`. Apply a function to each unique set of values and join to the original table. Analogy to :meth:`pandas.DataFrame.groupby().transform()`, i.e. a SQL ``GROUP BY`` and ``LEFT JOIN`` back to the original table. Parameters ---------- group_by: Iterable[str] The column names to group by. aggregation: Mapping[str, Union[Callable[[np.ndarray], Any], \ Tuple[Callable[[np.ndarray], Any], str]]] Optional. A mapping for columns and aggregation functions. Should be given as {'column': fn} or {'new_column': (fn, 'column')}. iter_fn: Mapping[str, Callable[[np.ndarray], Any]] Optional. A mapping for new columns names to the function to apply to the enumeration. Should be given as {'new_column': fn}. Returns ------- tafra: Tafra The transformed :class:`Tafra`. """ return Transform(columns, aggregation, iter_fn).apply(self)
[docs] def iterate_by(self, columns: Iterable[str]) -> Iterator['GroupDescription']: """ Helper function to implement :meth:`tafra.group.IterateBy.apply`. A generator that yields a :class:`Tafra` for each set of unique values. Analogy to `pandas.DataFrame.groupby()`, i.e. an :class:`Iterator` of :class:`Tafra`. Yields tuples of ((unique grouping values, ...), row indices array, subset tafra) Parameters ---------- group_by: Iterable[str] The column names to group by. Returns ------- tafras: Iterator[GroupDescription] An iterator over the grouped :class:`Tafra`. """ yield from IterateBy(columns).apply(self)
[docs] def inner_join(self, right: 'Tafra', on: Iterable[Tuple[str, str, str]], select: Iterable[str] = list()) -> 'Tafra': """ Helper function to implement :meth:`tafra.group.InnerJoin.apply`. An inner join. Analogy to SQL INNER JOIN, or `pandas.merge(..., how='inner')`, Parameters ---------- right: Tafra The right-side :class:`Tafra` to join. on: Iterable[Tuple[str, str, str]] The columns and operator to join on. Should be given as ('left column', 'right column', 'op') Valid ops are: '==' : equal to '!=' : not equal to '<' : less than '<=' : less than or equal to '>' : greater than '>=' : greater than or equal to select: Iterable[str] = [] The columns to return. If not given, all unique columns names are returned. If the column exists in both :class`Tafra`, prefers the left over the right. Returns ------- tafra: Tafra The joined :class:`Tafra`. """ return InnerJoin(on, select).apply(self, right)
[docs] def left_join(self, right: 'Tafra', on: Iterable[Tuple[str, str, str]], select: Iterable[str] = list()) -> 'Tafra': """ Helper function to implement :meth:`tafra.group.LeftJoin.apply`. A left join. Analogy to SQL LEFT JOIN, or `pandas.merge(..., how='left')`, Parameters ---------- right: Tafra The right-side :class:`Tafra` to join. on: Iterable[Tuple[str, str, str]] The columns and operator to join on. Should be given as ('left column', 'right column', 'op') Valid ops are: '==' : equal to '!=' : not equal to '<' : less than '<=' : less than or equal to '>' : greater than '>=' : greater than or equal to select: Iterable[str] = [] The columns to return. If not given, all unique columns names are returned. If the column exists in both :class`Tafra`, prefers the left over the right. Returns ------- tafra: Tafra The joined :class:`Tafra`. """ return LeftJoin(on, select).apply(self, right)
[docs] def cross_join(self, right: 'Tafra', select: Iterable[str] = list()) -> 'Tafra': """ Helper function to implement :meth:`tafra.group.CrossJoin.apply`. A cross join. Analogy to SQL CROSS JOIN, or `pandas.merge(..., how='outer') using temporary columns of static value to intersect all rows`. Parameters ---------- right: Tafra The right-side :class:`Tafra` to join. select: Iterable[str] = [] The columns to return. If not given, all unique columns names are returned. If the column exists in both :class`Tafra`, prefers the left over the right. Returns ------- tafra: Tafra The joined :class:`Tafra`. """ return CrossJoin([], select).apply(self, right)
def to_field_name(maybe_text: _Union[str, int, float]) -> str: # pragma: no cover text = str(maybe_text) # Remove invalid characters mid_text = re.sub('[^0-9a-zA-Z]', '', text) # Remove leading characters until we find a letter final_text = re.sub('^[^a-zA-Z]+', '', mid_text) if final_text == '': final_text = 'field_' + mid_text return final_text def _in_notebook() -> bool: # pragma: no cover """ Checks if running in a Jupyter Notebook. Returns ------- in_notebook: bool """ try: from IPython import get_ipython # type: ignore if 'IPKernelApp' in get_ipython().config: return True except Exception as e: pass return False # Import here to resolve circular dependency from .group import (GroupBy, Transform, IterateBy, InnerJoin, LeftJoin, CrossJoin, Union, InitAggregation, GroupDescription)