Source code for teilab.utils.download_utils

# coding: utf-8
import os
import re
import urllib
import zipfile
from typing import Dict, List, Optional, Tuple

import requests
from tqdm import tqdm

from .generic_utils import now_str, progress_reporthook_create, readable_bytes, verbose2print

CONTENT_ENCODING2EXT: Dict[str, str] = {
    "gzip": ".gz",
    "x-gzip": ".gz",
    "image/jpeg": ".jpg",
    "image/jpx": ".jpx",
    "image/png": ".png",
    "image/gif": ".gif",
    "image/webp": ".webp",
    "image/x-canon-cr2": ".cr2",
    "image/tiff": ".tif",
    "image/bmp": ".bmp",
    "image/vnd.ms-photo": ".jxr",
    "image/vnd.adobe.photoshop": ".psd",
    "image/x-icon": ".ico",
    "image/heic": ".heic",
}

CONTENT_TYPE2EXT: Dict[str, str] = {
    "application/epub+zip": ".epub",
    "application/zip": ".zip",
    "application/x-tar": ".tar",
    "application/x-rar-compressed": ".rar",
    "application/gzip": ".gz",
    "application/x-bzip2": ".bz2",
    "application/x-7z-compressed": ".7z",
    "application/x-xz": ".xz",
    "application/pdf": ".pdf",
    "application/x-msdownload": ".exe",
    "application/x-shockwave-flash": ".swf",
    "application/rtf": ".rtf",
    "application/octet-stream": ".eot",
    "application/postscript": ".ps",
    "application/x-sqlite3": ".sqlite",
    "application/x-nintendo-nes-rom": ".nes",
    "application/x-google-chrome-extension": ".crx",
    "application/vnd.ms-cab-compressed": ".cab",
    "application/x-deb": ".deb",
    "application/x-unix-archive": ".ar",
    "application/x-compress": ".Z",
    "application/x-lzip": ".lz",
    "text/html": ".txt",
}


[docs]def unzip(path: str, verbose: bool = True) -> Tuple[str, List[str]]: """Unzip a zipped file ( Only support the file with ``.zip`` extension. ) Args: path (str) : The path to zipped file. verbose (bool, optional) : Whether to print verbose or not. Defaults to ``True``. Returns: Tuple[str,List[str]]: The directory where the expanded data is stored and the List of their respective file paths. Examples: >>> from teilab.utils import unzip >>> unzip("target.zip") """ extracted_file_paths = [] print = verbose2print(verbose=verbose) root, ext = os.path.splitext(path) if ext not in [".zip", ".gz"]: print(f"Do not support to extract files with the '{ext}' extension.") else: if not os.path.exists(root): os.mkdir(root) print("[Unzip] Show file contents:") with zipfile.ZipFile(path) as z: for info in z.infolist(): info.filename = info.orig_filename.encode("cp437").decode("utf-8") if (os.sep != "/") and (os.sep in info.filename): info.filename = info.filename.replace(os.sep, "/") z.extract(member=info, path=root) extracted_file_path = os.path.join(root, info.filename) extracted_file_paths.append(extracted_file_path) print(f"\t* {info.filename}") return root, extracted_file_paths
[docs]def decide_extension( content_encoding: Optional[str] = None, content_type: Optional[str] = None, basename: Optional[str] = None ): """Decide File Extension based on ``content_encoding`` and ``content_type`` Args: content_encoding (Optional[str], optional) : The MIME type of the resource or the data. content_type (Optional[str], optional) : The Content-Encoding entity header is used to compress the media-type. basename (Optional[str], optional) : The basename. Returns: str: The file extension which starts with "." Examples: >>> from teilab.utils import decide_extension >>> decide_extension(content_encoding="image/png") '.png' >>> decide_extension(content_type="application/pdf") '.pdf' >>> decide_extension(content_encoding="image/webp", content_type="application/pdf") '.webp' >>> decide_extension(basename="hoge.zip") '.zip' """ ext = CONTENT_ENCODING2EXT.get( content_encoding, CONTENT_TYPE2EXT.get(content_type, os.path.splitext(str(basename))[-1]) ) return ext
[docs]class Downloader: """General Downloader"""
[docs] @classmethod def download_file( cls, url: str, dirname: str = ".", basename: str = "", path: Optional[str] = None, verbose: bool = True, expand: bool = True, **kwargs, ) -> str: """Download a file and expand it if you want. Args: url (str) : The URL of the file you want to download. dirname (str, optional) : The directory where downloaded data will be saved. Defaults to ``"."``. basename (str, optional) : The basename of the target file. Defaults to ``""``. path (Optional[str], optional) : Where and what name to save the downloaded file. Defaults to ``None``. verbose (bool, optional) : Whether to print verbose or not. Defaults to ``True``. expand (bool, optional) : Whether to expand the downloaded file. Defaults to ``True`` Returns: path (str) : The path to the downloaded file. Examples: >>> import os >>> from teilab.utils import Downloader >>> path = Downloader.download_file(url="http://ui-tei.rnai.jp/") [Download] URL: http://ui-tei.rnai.jp/ * Content-Encoding : None * Content-Length : 32.1 [KB] * Content-Type : text/html * Save Destination : ./2021-06-01@21.30.html ===== Progress ===== 2021-06-01@21.30.04 100.0%[####################] 0.0[s] 1.3[MB/s] eta -0.0[s] Do not support to extract files with the '.html' extension. >>> os.path.exists(path) True """ path = cls.download_target_file(url=url, dirname=dirname, basename=basename, path=path, verbose=True, **kwargs) if expand: path, extracted_file_paths = unzip(path=path, verbose=verbose) return path
[docs] @staticmethod def download_target_file( url: str, dirname: str = ".", basename: str = ".", path: Optional[str] = None, bar_width: int = 20, verbose: bool = True, **kwargs, ) -> str: """Download the target file. Args: url (str) : The URL of the file you want to download. dirname (str, optional) : The directory where downloaded data will be saved. Defaults to ``"."``. basename (str, optional) : The basename of the target file. Defaults to ``""``. path (Optional[str], optional) : Where and what name to save the downloaded file. Defaults to ``None``. bar_width (int, optional) : The width of progress bar. Defaults to ``20``. verbose (bool, optional) : Whether to print verbose or not. Defaults to ``True``. Returns: path (str) : The path to the downloaded file. Examples: >>> import os >>> from teilab.utils import Downloader >>> path = Downloader.download_target_file(url="http://ui-tei.rnai.jp/") [Download] URL: http://ui-tei.rnai.jp/ * Content-Encoding : None * Content-Length : 31.8 [KB] * Content-Type : text/html * Save Destination : ./2021-06-01@11.26.html ===== Progress ===== 2021-06-01@11.26.48 100.0%[####################] 0.0[s] 1.0[MB/s] eta -0.0[s] >>> os.path.exists(path) True """ try: with urllib.request.urlopen(url) as web_file: headers = dict(web_file.headers._headers) filename, path = Downloader.prepare_for_download( url=url, basename=os.path.basename(url), dirname=dirname, path=path, headers=headers, verbose=verbose ) if verbose: print("===== Progress =====") _, res = urllib.request.urlretrieve( url=url, filename=path, reporthook=progress_reporthook_create(filename=filename, bar_width=bar_width, verbose=verbose), ) except urllib.error.URLError: print(f"[URLError] Please check if the URL is correct, given {url}") except Exception as err: print(f"[{err.__class__.__name__}] {err}") return path
[docs] @staticmethod def prepare_for_download( url: str = "", dirname: str = ".", basename: str = "", path: Optional[str] = None, headers: Optional[Dict[str, str]] = None, verbose: bool = True, ) -> Tuple[str, str]: """Get Information from webfile header and prepare for downloading. Args: url (str, optional) : The URL of the file you want to download. Defaults to ``""``. dirname (str, optional) : The directory where downloaded data will be saved. Defaults to ``"."``. basename (str, optional) : The basename of the target file. Defaults to ``""``. path (Optional[str], optional) : Where and what name to save the downloaded file. Defaults to ``None``. headers (Optional[Dict[str,str]], optional) : The header information of the target file. Defaults to ``{}``. verbose (bool, optional) : Whether to print verbose or not. Defaults to ``True``. Returns: Tuple[str,str]: ``filename`` and ``path`` of the file that will be downloaded. Examples: >>> from teilab.utils import Downloader >>> filename, path = Downloader.prepare_for_download( ... url="http://ui-tei.rnai.jp/", ... basename="index.html", ... dirname=".", ... path=None, >>> ) [Download] URL: http://ui-tei.rnai.jp/ * Content-Encoding : None * Content-Length : 32.1 [KB] * Content-Type : text/html * Save Destination : ./index.html >>> filename, path ('index.html', './index.html') """ # Get the information of the file you want to download from the header. if headers is None: with urllib.request.urlopen(url) as web_file: headers = dict(web_file.headers._headers) content_encoding: str = headers.get("Content-Encoding") content_length: str = "{0:.1f} [{1}]".format(*readable_bytes(int(headers.get("Content-Length", 0)))) content_type: str = headers.get("Content-Type").split(";")[0] # Decide the download destination if basename == "": basename = now_str() if path is None: root, _ = os.path.splitext(basename) guessed_ext = decide_extension( content_encoding=content_encoding, content_type=content_type, basename=basename ) filename = root + guessed_ext path = os.path.join(dirname, filename) else: filename = os.path.split(path)[-1] # Show the results. if verbose: print( f"[Download] URL: {url}", f"* Content-Encoding : {content_encoding}", f"* Content-Length : {content_length}", f"* Content-Type : {content_type}", f"* Save Destination : {path}", sep="\n", ) return (filename, path)
[docs]class GoogleDriveDownloader(Downloader): """Specific Downloader for files in GoogleDrive""" CHUNK_SIZE = 32768 DRIVE_URL = "https://docs.google.com/uc?export=download"
[docs] @staticmethod def prepare_for_download( url: str = "", dirname: str = ".", basename: str = "", path: Optional[str] = None, headers: Optional[Dict[str, str]] = None, verbose: bool = True, driveId: Optional[str] = None, ) -> Tuple[str, str]: if driveId is None: q = urllib.parse.parse_qs(urllib.parse.urlparse(url).query).get("id") if len(q) == 0: raise TypeError("Please specify the target Google Drive Id using ``url`` or ``driveId`` arguments.") else: driveId = q[0] if basename == "": basename = driveId # Start a Session params = {"id": driveId} session = requests.Session() response = session.get(url=GoogleDriveDownloader.DRIVE_URL, params=params, stream=True) for key, val in response.cookies.items(): if key.startswith("download_warning"): params.update({"confirm": val}) break # Get Information from headers headers = session.head(url=GoogleDriveDownloader.DRIVE_URL, params=params).headers return [ *Downloader.prepare_for_download( url=url, dirname=dirname, basename=basename, path=path, headers=headers, verbose=verbose, ), session, params, ]
[docs] @staticmethod def download_target_file( url: str, dirname: str = ".", basename: str = "", path: Optional[str] = None, driveId: Optional[str] = None, verbose: bool = True, **kwargs, ) -> str: """Download the target Google Drive file. Args: url (str) : The URL of the file you want to download. dirname (str, optional) : The directory where downloaded data will be saved. Defaults to ``"."``. basename (str, optional) : The basename of the target file. Defaults to ``""``. path (Optional[str], optional) : Where and what name to save the downloaded file. Defaults to ``None``. driveId (Optional[str], optional) : The GoogleDrive's file ID. Defaults to ``None``. verbose (bool, optional) : Whether to print verbose or not. Defaults to ``True``. Raises: TypeError: When Google Drive File ID is not detected from ``driveId`` and ``url`` . Returns: str: The path to the downloaded file. """ filename, path, session, params = GoogleDriveDownloader.prepare_for_download( url=url, basename=basename, dirname=dirname, path=path, verbose=verbose ) # Get contents response = session.get(GoogleDriveDownloader.DRIVE_URL, params=params, stream=True) with open(path, "wb") as f: with tqdm(response.iter_content(GoogleDriveDownloader.CHUNK_SIZE), desc=driveId) as pbar: for i, chunk in enumerate(pbar, start=1): if chunk: f.write(chunk) pbar.set_postfix( { "Downloaded": "{0:.1f} [{1}]".format( *readable_bytes(i * GoogleDriveDownloader.CHUNK_SIZE) ) } ) return path
[docs]def decide_downloader(url: str) -> Downloader: """Decide ``Downloader`` from ``url`` Args: url (str): The URL of the file you want to download. Returns: Downloader: File Downloader for target ``url``. Examples: >>> from teilab.utils import decide_downloader >>> decide_downloader("https://www.dropbox.com/sh/ID").__name__ 'Downloader' >>> decide_downloader("https://drive.google.com/u/0/uc?export=download&id=ID").__name__ 'GoogleDriveDownloader' """ url_domain = re.match(pattern=r"^https?:\/\/(.+?)\/", string=url).group(1) return { # "drive.google.com": GoogleDriveDownloader, }.get(url_domain, Downloader)