Source code for teilab.utils.generic_utils

# coding: utf-8
import datetime
import os
import sys
import time
from numbers import Number
from typing import Any, Callable, Dict, List, Optional, Tuple

from ._exceptions import KeyError


[docs]def now_str(tz: Optional[datetime.timezone] = None, fmt: str = "%Y-%m-%d@%H.%M.%S") -> str: """Returns new datetime string representing current time local to tz under the control of an explicit format string. Args: tz (Optional[datetime.timezone], optional) : Timezone object. If no ``tz`` is specified, uses local timezone. Defaults to ``None``. fmt (str, optional) : format string. See `Python Documentation <https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes>`_ Defaults to ``"%Y-%m-%d@%H.%M.%S"``. Returns: str: Formatted current time. Example: >>> from teilab.utils import now_str >>> now_str() '2021-06-01@11.22.14' >>> now_str(tz=datetime.timezone.utc) '2021-06-01@02.22.16' >>> now_str(fmt="%A, %d. %B %Y %I:%M%p") 'Tuesday, 01. June 2021 11:22AM' """ return datetime.datetime.now(tz=tz).strftime(fmt)
[docs]def readable_bytes(byte: Number) -> Tuple[Number, str]: """Unit conversion for readability. Args: byte (Number): File byte [B]. Examples: >>> from teilab.utils import readable_bytes >>> for i in range(1,30,3): ... byte = pow(10,i) ... size, unit = readable_bytes(pow(10,i)) ... print(f"{byte:.1g}[B] = {size:.2f}[{unit}]") 1e+01[B] = 10.00[B] 1e+04[B] = 9.77[KB] 1e+07[B] = 9.54[MB] 1e+10[B] = 9.31[GB] 1e+13[B] = 9.09[TB] 1e+16[B] = 8.88[PB] 1e+19[B] = 8.67[EB] 1e+22[B] = 8.47[ZB] 1e+25[B] = 8.27[YB] 1e+28[B] = 8271.81[YB] """ units = ["", "K", "M", "G", "T", "P", "E", "Z", "Y"] for unit in units: if (abs(byte) < 1024.0) or (unit == units[-1]): break byte /= 1024.0 # size >> 10 return (byte, unit + "B")
[docs]def progress_reporthook_create( filename: str = "", bar_width: int = 20, verbose: bool = True ) -> Callable[[int, int, int], None]: """Create a progress reporthook for ``urllib.request.urlretrieve`` Args: filename (str, optional) : Downloading filename.. Defaults to ``""``. bar_width (int, optional) : The width of progress bar. Defaults to ``20``. verbose (bool, optional) : Whether to output the status. Defaults to ``True``. Returns: Callable[[int,int,int], None]: The ``reporthook`` which is a callable that accepts a ``block number``, a ``read size``, and the ``total file size`` of the URL target. Examples: >>> import urllib >>> from teilab.utils import progress_reporthook_create >>> urllib.request.urlretrieve(url="hoge.zip", filename="hoge.zip", reporthook=progress_reporthook_create(filename="hoge.zip")) hoge.zip 1.5%[--------------------] 21.5[s] 8.0[GB/s] eta 1415.1[s] """ def progress_reporthook_verbose(block_count: int, block_size: int, total_size: int) -> None: """``reporthook`` to report the current status. Args: block_count (int) : The number of block. block_size (int) : The size of the data block. total_size (int) : The size of the total data. """ global _reporthook_start_time if block_count == 0: _reporthook_start_time = time.time() return progress_size = block_count * block_size percentage = min(1.0, progress_size / total_size) progress_bar = ("#" * int(percentage * bar_width)).ljust(bar_width, "-") duration = time.time() - _reporthook_start_time speed = progress_size / duration eta = (total_size - progress_size) / speed speed, speed_unit = readable_bytes(speed) sys.stdout.write( f"\r{filename}\t{percentage:.1%}[{progress_bar}] {duration:.1f}[s] {speed:.1f}[{speed_unit}/s] eta {eta:.1f}[s]" ) if progress_size >= total_size: print() def progress_reporthook_non_verbose(block_count: int, block_size: int, total_size: int) -> None: """``reporthook`` not to report the current status.""" pass return progress_reporthook_verbose if verbose else progress_reporthook_non_verbose
[docs]def verbose2print(verbose: bool = True) -> callable: """Create a simple print function based on verbose Args: verbose (bool, optional) : Whether to print or not. Defaults to ``True``. Returns: callable: Print function Examples: >>> from teilab.utils import verbose2print >>> print_verbose = verbose2print(verbose=True) >>> print_non_verbose = verbose2print(verbose=False) >>> print_verbose("Hello, world.") Hello, world. >>> print_non_verbose("Hello, world.") """ if verbose: return print else: return lambda *args, **kwargs: None
[docs]def dict2str(d: Dict[Any, Any], item_separator: str = ", ", key_separator: str = "=") -> str: """Convert a dictionary to string. Args: d (Dict[Any,Any]) : A dictionary. item_separator (str, optional) : A separator between items. Defaults to ``", "``. key_separator (str, optional) : A separator between keys. Defaults to ``"="``. Returns: str: A textual summary of the contents of the dictionary. Examples: >>> from teilab.utils import dict2str >>> dict2str({"key1":"val", "key2":1}) 'key1=val, key2=1' >>> dict2str({"key1":"val", "key2":1}, key_separator=":") 'key1:val, key2:1' >>> dict2str({"key1":"val", "key2":1}, item_separator="🤔") 'key1=val🤔key2=1' """ return str(item_separator).join([f"{k}{key_separator}{v}" for k, v in d.items()])
[docs]def check_supported(lst: List[Any], **kwargs): """Check whether all ``kwargs.values()`` in the ``lst``. Args: lst (List[Any]) : candidates for each value in ``kwargs.values()``. kwargs : ``key`` is the varname that is easy to understand when an error occurs Examples: >>> from teilab.utils import check_supported >>> check_supported(lst=range(3), val=1) >>> check_supported(lst=range(3), val=100) KeyError: Please choose the argment 'val' from ['0', '1', '2'], but you chose '100' >>> check_supported(lst=range(3), val1=1, val2=2) >>> check_supported(lst=range(3), val1=1, val2=100) KeyError: Please choose the argment 'val2' from ['0', '1', '2'], but you chose '100' Raise: KeyError: If ``kwargs.values()`` not in the ``lst`` """ for k, v in kwargs.items(): if v not in lst: lst = ", ".join([f"'{e}'" for e in lst]) raise KeyError(f"Please choose the argment '{k}' from [{lst}], but you chose '{v}'")