# coding: utf-8
import os
import re
import urllib
import zipfile
from typing import Dict, List, Optional, Tuple
import requests
from tqdm import tqdm
from .generic_utils import now_str, progress_reporthook_create, readable_bytes, verbose2print
CONTENT_ENCODING2EXT: Dict[str, str] = {
    "gzip": ".gz",
    "x-gzip": ".gz",
    "image/jpeg": ".jpg",
    "image/jpx": ".jpx",
    "image/png": ".png",
    "image/gif": ".gif",
    "image/webp": ".webp",
    "image/x-canon-cr2": ".cr2",
    "image/tiff": ".tif",
    "image/bmp": ".bmp",
    "image/vnd.ms-photo": ".jxr",
    "image/vnd.adobe.photoshop": ".psd",
    "image/x-icon": ".ico",
    "image/heic": ".heic",
}
CONTENT_TYPE2EXT: Dict[str, str] = {
    "application/epub+zip": ".epub",
    "application/zip": ".zip",
    "application/x-tar": ".tar",
    "application/x-rar-compressed": ".rar",
    "application/gzip": ".gz",
    "application/x-bzip2": ".bz2",
    "application/x-7z-compressed": ".7z",
    "application/x-xz": ".xz",
    "application/pdf": ".pdf",
    "application/x-msdownload": ".exe",
    "application/x-shockwave-flash": ".swf",
    "application/rtf": ".rtf",
    "application/octet-stream": ".eot",
    "application/postscript": ".ps",
    "application/x-sqlite3": ".sqlite",
    "application/x-nintendo-nes-rom": ".nes",
    "application/x-google-chrome-extension": ".crx",
    "application/vnd.ms-cab-compressed": ".cab",
    "application/x-deb": ".deb",
    "application/x-unix-archive": ".ar",
    "application/x-compress": ".Z",
    "application/x-lzip": ".lz",
    "text/html": ".txt",
}
[docs]def unzip(path: str, verbose: bool = True) -> Tuple[str, List[str]]:
    """Unzip a zipped file ( Only support the file with ``.zip`` extension. )
    Args:
        path (str)               : The path to zipped file.
        verbose (bool, optional) : Whether to print verbose or not. Defaults to ``True``.
    Returns:
        Tuple[str,List[str]]: The directory where the expanded data is stored and the List of their respective file paths.
    Examples:
        >>> from teilab.utils import unzip
        >>> unzip("target.zip")
    """
    extracted_file_paths = []
    print = verbose2print(verbose=verbose)
    root, ext = os.path.splitext(path)
    if ext not in [".zip", ".gz"]:
        print(f"Do not support to extract files with the '{ext}' extension.")
    else:
        if not os.path.exists(root):
            os.mkdir(root)
        print("[Unzip] Show file contents:")
        with zipfile.ZipFile(path) as z:
            for info in z.infolist():
                info.filename = info.orig_filename.encode("cp437").decode("utf-8")
                if (os.sep != "/") and (os.sep in info.filename):
                    info.filename = info.filename.replace(os.sep, "/")
                z.extract(member=info, path=root)
                extracted_file_path = os.path.join(root, info.filename)
                extracted_file_paths.append(extracted_file_path)
                print(f"\t* {info.filename}")
    return root, extracted_file_paths 
[docs]def decide_extension(
    content_encoding: Optional[str] = None, content_type: Optional[str] = None, basename: Optional[str] = None
):
    """Decide File Extension based on ``content_encoding`` and ``content_type``
    Args:
        content_encoding (Optional[str], optional) : The MIME type of the resource or the data.
        content_type (Optional[str], optional)     : The Content-Encoding entity header is used to compress the media-type.
        basename (Optional[str], optional)         : The basename.
    Returns:
        str: The file extension which starts with "."
    Examples:
        >>> from teilab.utils import decide_extension
        >>> decide_extension(content_encoding="image/png")
        '.png'
        >>> decide_extension(content_type="application/pdf")
        '.pdf'
        >>> decide_extension(content_encoding="image/webp", content_type="application/pdf")
        '.webp'
        >>> decide_extension(basename="hoge.zip")
        '.zip'
    """
    ext = CONTENT_ENCODING2EXT.get(
        content_encoding, CONTENT_TYPE2EXT.get(content_type, os.path.splitext(str(basename))[-1])
    )
    return ext 
[docs]class Downloader:
    """General Downloader"""
[docs]    @classmethod
    def download_file(
        cls,
        url: str,
        dirname: str = ".",
        basename: str = "",
        path: Optional[str] = None,
        verbose: bool = True,
        expand: bool = True,
        **kwargs,
    ) -> str:
        """Download a file and expand it if you want.
        Args:
            url (str)                      : The URL of the file you want to download.
            dirname (str, optional)        : The directory where downloaded data will be saved. Defaults to ``"."``.
            basename (str, optional)       : The basename of the target file. Defaults to ``""``.
            path (Optional[str], optional) : Where and what name to save the downloaded file. Defaults to ``None``.
            verbose (bool, optional)       : Whether to print verbose or not. Defaults to ``True``.
            expand (bool, optional)        : Whether to expand the downloaded file. Defaults to ``True``
        Returns:
            path (str) : The path to the downloaded file.
        Examples:
            >>> import os
            >>> from teilab.utils import Downloader
            >>> path = Downloader.download_file(url="http://ui-tei.rnai.jp/")
            [Download] URL: http://ui-tei.rnai.jp/
            * Content-Encoding : None
            * Content-Length   : 32.1 [KB]
            * Content-Type     : text/html
            * Save Destination : ./2021-06-01@21.30.html
            ===== Progress =====
            2021-06-01@21.30.04	100.0%[####################] 0.0[s] 1.3[MB/s] eta -0.0[s]
            Do not support to extract files with the '.html' extension.
            >>> os.path.exists(path)
            True
        """
        path = cls.download_target_file(url=url, dirname=dirname, basename=basename, path=path, verbose=True, **kwargs)
        if expand:
            path, extracted_file_paths = unzip(path=path, verbose=verbose)
        return path 
[docs]    @staticmethod
    def download_target_file(
        url: str,
        dirname: str = ".",
        basename: str = ".",
        path: Optional[str] = None,
        bar_width: int = 20,
        verbose: bool = True,
        **kwargs,
    ) -> str:
        """Download the target file.
        Args:
            url (str)                      : The URL of the file you want to download.
            dirname (str, optional)        : The directory where downloaded data will be saved. Defaults to ``"."``.
            basename (str, optional)       : The basename of the target file. Defaults to ``""``.
            path (Optional[str], optional) : Where and what name to save the downloaded file. Defaults to ``None``.
            bar_width (int, optional)      : The width of progress bar. Defaults to ``20``.
            verbose (bool, optional)       : Whether to print verbose or not. Defaults to ``True``.
        Returns:
            path (str) : The path to the downloaded file.
        Examples:
            >>> import os
            >>> from teilab.utils import Downloader
            >>> path = Downloader.download_target_file(url="http://ui-tei.rnai.jp/")
            [Download] URL: http://ui-tei.rnai.jp/
            * Content-Encoding : None
            * Content-Length   : 31.8 [KB]
            * Content-Type     : text/html
            * Save Destination : ./2021-06-01@11.26.html
            ===== Progress =====
            2021-06-01@11.26.48	100.0%[####################] 0.0[s] 1.0[MB/s]	eta -0.0[s]
            >>> os.path.exists(path)
            True
        """
        try:
            with urllib.request.urlopen(url) as web_file:
                headers = dict(web_file.headers._headers)
            filename, path = Downloader.prepare_for_download(
                url=url, basename=os.path.basename(url), dirname=dirname, path=path, headers=headers, verbose=verbose
            )
            if verbose:
                print("===== Progress =====")
            _, res = urllib.request.urlretrieve(
                url=url,
                filename=path,
                reporthook=progress_reporthook_create(filename=filename, bar_width=bar_width, verbose=verbose),
            )
        except urllib.error.URLError:
            print(f"[URLError] Please check if the URL is correct, given {url}")
        except Exception as err:
            print(f"[{err.__class__.__name__}] {err}")
        return path 
[docs]    @staticmethod
    def prepare_for_download(
        url: str = "",
        dirname: str = ".",
        basename: str = "",
        path: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None,
        verbose: bool = True,
    ) -> Tuple[str, str]:
        """Get Information from webfile header and prepare for downloading.
        Args:
            url (str, optional)                         : The URL of the file you want to download. Defaults to ``""``.
            dirname (str, optional)                     : The directory where downloaded data will be saved. Defaults to ``"."``.
            basename (str, optional)                    : The basename of the target file. Defaults to ``""``.
            path (Optional[str], optional)              : Where and what name to save the downloaded file. Defaults to ``None``.
            headers (Optional[Dict[str,str]], optional) : The header information of the target file. Defaults to ``{}``.
            verbose (bool, optional)                    : Whether to print verbose or not. Defaults to ``True``.
        Returns:
            Tuple[str,str]: ``filename`` and ``path`` of the file that will be downloaded.
        Examples:
            >>> from teilab.utils import Downloader
            >>> filename, path = Downloader.prepare_for_download(
            ...     url="http://ui-tei.rnai.jp/",
            ...     basename="index.html",
            ...     dirname=".",
            ...     path=None,
            >>> )
            [Download] URL: http://ui-tei.rnai.jp/
            * Content-Encoding : None
            * Content-Length   : 32.1 [KB]
            * Content-Type     : text/html
            * Save Destination : ./index.html
            >>> filename, path
            ('index.html', './index.html')
        """
        # Get the information of the file you want to download from the header.
        if headers is None:
            with urllib.request.urlopen(url) as web_file:
                headers = dict(web_file.headers._headers)
        content_encoding: str = headers.get("Content-Encoding")
        content_length: str = "{0:.1f} [{1}]".format(*readable_bytes(int(headers.get("Content-Length", 0))))
        content_type: str = headers.get("Content-Type").split(";")[0]
        # Decide the download destination
        if basename == "":
            basename = now_str()
        if path is None:
            root, _ = os.path.splitext(basename)
            guessed_ext = decide_extension(
                content_encoding=content_encoding, content_type=content_type, basename=basename
            )
            filename = root + guessed_ext
            path = os.path.join(dirname, filename)
        else:
            filename = os.path.split(path)[-1]
        # Show the results.
        if verbose:
            print(
                f"[Download] URL: {url}",
                f"* Content-Encoding : {content_encoding}",
                f"* Content-Length   : {content_length}",
                f"* Content-Type     : {content_type}",
                f"* Save Destination : {path}",
                sep="\n",
            )
        return (filename, path)  
[docs]class GoogleDriveDownloader(Downloader):
    """Specific Downloader for files in GoogleDrive"""
    CHUNK_SIZE = 32768
    DRIVE_URL = "https://docs.google.com/uc?export=download"
[docs]    @staticmethod
    def prepare_for_download(
        url: str = "",
        dirname: str = ".",
        basename: str = "",
        path: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None,
        verbose: bool = True,
        driveId: Optional[str] = None,
    ) -> Tuple[str, str]:
        if driveId is None:
            q = urllib.parse.parse_qs(urllib.parse.urlparse(url).query).get("id")
            if len(q) == 0:
                raise TypeError("Please specify the target Google Drive Id using ``url`` or ``driveId`` arguments.")
            else:
                driveId = q[0]
        if basename == "":
            basename = driveId
        # Start a Session
        params = {"id": driveId}
        session = requests.Session()
        response = session.get(url=GoogleDriveDownloader.DRIVE_URL, params=params, stream=True)
        for key, val in response.cookies.items():
            if key.startswith("download_warning"):
                params.update({"confirm": val})
                break
        # Get Information from headers
        headers = session.head(url=GoogleDriveDownloader.DRIVE_URL, params=params).headers
        return [
            *Downloader.prepare_for_download(
                url=url,
                dirname=dirname,
                basename=basename,
                path=path,
                headers=headers,
                verbose=verbose,
            ),
            session,
            params,
        ] 
[docs]    @staticmethod
    def download_target_file(
        url: str,
        dirname: str = ".",
        basename: str = "",
        path: Optional[str] = None,
        driveId: Optional[str] = None,
        verbose: bool = True,
        **kwargs,
    ) -> str:
        """Download the target Google Drive file.
        Args:
            url (str)                         : The URL of the file you want to download.
            dirname (str, optional)           : The directory where downloaded data will be saved. Defaults to ``"."``.
            basename (str, optional)          : The basename of the target file. Defaults to ``""``.
            path (Optional[str], optional)    : Where and what name to save the downloaded file. Defaults to ``None``.
            driveId (Optional[str], optional) : The GoogleDrive's file ID. Defaults to ``None``.
            verbose (bool, optional)          : Whether to print verbose or not. Defaults to ``True``.
        Raises:
            TypeError: When Google Drive File ID is not detected from ``driveId`` and ``url`` .
        Returns:
            str: The path to the downloaded file.
        """
        filename, path, session, params = GoogleDriveDownloader.prepare_for_download(
            url=url, basename=basename, dirname=dirname, path=path, verbose=verbose
        )
        # Get contents
        response = session.get(GoogleDriveDownloader.DRIVE_URL, params=params, stream=True)
        with open(path, "wb") as f:
            with tqdm(response.iter_content(GoogleDriveDownloader.CHUNK_SIZE), desc=driveId) as pbar:
                for i, chunk in enumerate(pbar, start=1):
                    if chunk:
                        f.write(chunk)
                        pbar.set_postfix(
                            {
                                "Downloaded": "{0:.1f} [{1}]".format(
                                    *readable_bytes(i * GoogleDriveDownloader.CHUNK_SIZE)
                                )
                            }
                        )
        return path  
[docs]def decide_downloader(url: str) -> Downloader:
    """Decide ``Downloader`` from ``url``
    Args:
        url (str): The URL of the file you want to download.
    Returns:
        Downloader: File Downloader for target ``url``.
    Examples:
        >>> from teilab.utils import decide_downloader
        >>> decide_downloader("https://www.dropbox.com/sh/ID").__name__
        'Downloader'
        >>> decide_downloader("https://drive.google.com/u/0/uc?export=download&id=ID").__name__
        'GoogleDriveDownloader'
    """
    url_domain = re.match(pattern=r"^https?:\/\/(.+?)\/", string=url).group(1)
    return {
        # "drive.google.com": GoogleDriveDownloader,
    }.get(url_domain, Downloader)