Source code for pycharmers.sdk.base
"""Base class for Python-Charmers' SDK """
#coding: utf-8
import os
import pandas as pd
from abc import ABC, abstractmethod
from ..utils._path import DOTENV_PATH
from ..utils.environ_utils import name2envname, load_environ, check_environ
from ..utils.generic_utils import verbose2print, handleKeyError
from ..utils._colorings import toBLUE, toGREEN, toRED
from typing import Any,Optional,Callable,Union,List,Tuple,Dict
[docs]class PycharmersSDK(ABC):
"""Abstract Class for Python-Charmers API
Args:
env_path (str) : path/to/.env (default= ``DOTENV_PATH``)
service_name (str) : The service name.
verbose (bool) : Whether to print message or not. (default= ``False``)
\*\*kwargs (dict) : Required Keywords.
Attributes:
print (function) : Print function based on ``verbose``
service_name (str) : The service name
required_env_varnames (list) : The required env varnames.
kwargs (dict) : Keyword arguments.
"""
def __init__(self, env_path:str=DOTENV_PATH, api_name:str="", verbose:bool=True, **kwargs):
self.api_name = api_name
required_keynames = list(kwargs.keys())
required_env_varnames = [self.keyname2envname(k) for k in required_keynames]
load_environ(
dotenv_path=env_path,
env_varnames=required_env_varnames,
verbose=verbose,
)
self.print = verbose2print(verbose)
self.kwargs = kwargs
self.required_keynames = required_keynames
self.required_env_varnames = required_env_varnames
[docs] def keyname2envname(self, keyname):
"""Convert keyname to environment varname.
Args
keyname (str) : Keyname for method.
Examples:
>>> from pycharmers.sdk._base import PycharmersSDK
>>> sdk = PycharmersSDK()
>>> sdk.keyname2envname("id")
'TRANSLATION_GUMMY_GATEWAY_USELESS_NAME'
>>> sdk.keyname2envname("hoge")
'TRANSLATION_GUMMY_GATEWAY_USELESS_HOGE'
"""
return name2envname(name=keyname, prefix=self.api_name, service="sdk")
[docs] def get_val(self, keyname, **kwargs):
"""Get the value from ``gatewaykwargs`` or an environment variable.
Args:
keyname (str) : Keyname for each method.
\*\*kwargs (dict) : Given ``kwargs``.
Examples:
>>> from pycharmers.sdk._base import PycharmersSDK
>>> sdk = PycharmersSDK()
>>> print(sdk.get_val("hoge"))
None
>>> print(sdk.get_val("username"))
USERNAME_IN_ENVFILE
>>> print(gateway.get_val("username", username=":)"))
:)
"""
return kwargs.get(keyname) or os.getenv(self.keyname2envname(keyname))
[docs]class PycharmersSQL(PycharmersSDK):
"""Wrapper class for Sqlite.
Args:
database (Optional[str], optional) : database to use. Defaults to ``None``.
verbose (bool, optional) : Whether to print message or not Defaults to ``False``.
"""
def __init__(self, api_name:str="SQL", verbose:bool=False, **kwargs):
super().__init__(
api_name=api_name,
verbose=verbose,
**kwargs
)
[docs] @staticmethod
def format_data(data:Any, type:str) -> str:
"""Format python data to match sql data format.
Args:
data (Any): Python any data.
type (str): SQL data type. (ex. ``"int(11)"``, ``"varchar(100)"``)
Returns:
str: Correctly formatted python data.
Examples:
>>> import datetime
>>> from pycharmers.sdk import PycharmersSQL
>>> PycharmersSQL.format_data(data=1, type="int(8)")
'1'
>>> PycharmersSQL.format_data(data=1, type="varchar(10)")
'"1"'
>>> PycharmersSQL.format_data(data=datetime.datetime(1998,7,3), type="datetime")
'"1998-07-03 00:00:00"'
"""
if type.startswith("datetime") and (not isinstance(data, str)):
data = data.strftime("%Y-%m-%d %H:%M:%S")
if type.startswith("int") or ("now()" in data):
data = str(data)
else:
data = f'"{data}"'
return data
[docs] @abstractmethod
def connect(self, func:Callable[[Any], Union[pd.DataFrame, Tuple[tuple], None]], **kwargs) -> Union[pd.DataFrame, Tuple[tuple], None]:
"""Create a connection to the database, and close it after excuting ``func`` .
Args:
func (Callable[[Any], Union[pd.DataFrame, Tuple[tuple], None]) : The function you want to execute. Receive ``cursor`` as the first argument.
Returns:
Union[pd.DataFrame, Tuple[tuple], None]: Return value from ``func``
"""
check_environ(
required_keynames=self.required_keynames,
required_env_varnames=self.required_env_varnames,
# database=database,
)
connection = "Create a connection to the database." # XXX.connect(
# database=self.get_val("database", database=database),
# )
cursor = connection.cursor()
ret = func# (cursor=XXX_cursor, **kwargs)
cursor.close()
connection.commit()
connection.close()
return ret
[docs] def execute(self, query:str, columns:List[str]=[], verbose:bool=False) -> Union[pd.DataFrame, Tuple[tuple], None]:
"""Execute a SQL query.
Args:
query (str) : Query to execute on server.
columns (List[str], optional) : If this value is fiven, Return value will be as a ``pd.DataFrame`` format. Defaults to ``[]``.
verbose (bool, optional) : Whether to display the query or not. Defaults to ``False``.
Returns:
Union[pd.DataFrame, Tuple[tuple], None]: Return value of query.
Examples:
>>> from pycharmers.sdk import PycharmersSQL
>>> sql = PycharmersSQL()
>>> sql.execute("show databases;")
(('information_schema',),
('mysql',),
('performance_schema',),
('remody',),
('sys',))
>>> # Receive as a DataFrame.
>>> df = sql.execute("show databases;", columns=["name"])
>>> print(df.to_markdown())
| | name |
|---:|:-------------------|
| 0 | information_schema |
| 1 | mysql |
| 2 | performance_schema |
| 3 | remody |
| 4 | sys |
"""
if verbose: print(query)
def execute_query(cursor:Any) -> Tuple[tuple]:
"""Execute a query and
Args:
cursor (Any) : Database cursor to use.
Returns:
Tuple[tuple]: Return value of ``query``.
"""
cursor.execute(query)
return cursor.fetchall()
ret = self.connect(func=execute_query)
if len(columns)>0:
ret = pd.DataFrame(data=ret, columns=columns)
return ret
[docs] def create(self, table:str, verbose:bool=False, column_info={}) -> None:
"""Create a Table named ``table`` .
Args:
table (str) : The name of table.
verbose (bool, optional) : Whether to display the query or not. Defaults to ``False``.
columninfo (dict) : Key value style column information.
Examples:
>>> from pycharmers.sdk import PycharmersSQL
>>> sql = PycharmersSQL()
>>> sql.create(table="pycharmers_user", column_info={
... "id" : 'int NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT "ID"',
... "username": 'VARCHAR(100) NOT NULL COMMENT "ユーザー名"',
... "created" : 'datetime DEFAULT NULL COMMENT "登録日"' ,
>>> })
"""
return self.execute(f"CREATE TABLE {table} ({','.join([f'`{k}` {v}' for k,v in column_info.items()])});", verbose=verbose)
[docs] def drop(self, table:str, verbose:bool=False) -> None:
"""Drop the ``table`` .
Args:
table (str) : The name of table.
verbose (bool, optional) : Whether to display the query or not. Defaults to ``False``.
Examples:
>>> from pycharmers.sdk import PycharmersSQL
>>> sql = PycharmersSQL()
>>> sql.drop(table="pycharmers_user")
"""
return self.execute(f"DROP TABLE {table};", verbose=verbose)
[docs] def describe(self, table:str, verbose:bool=False) -> pd.DataFrame:
"""Get table structure.
Args:
table (str) : The name of table.
verbose (bool, optional) : Whether to display the query or not. Defaults to ``False``.
Returns:
pd.DataFrame: Table structure.
Examples:
>>> from pycharmers.sdk import PycharmersSQL
>>> sql = PycharmersSQL()
>>> df = sql.describe(table="pycharmers_user")
>>> print(df.to_markdown())
| | Field | Type | Null | Key | Default | Extra |
|---:|:---------|:-------------|:-------|:------|:----------|:---------------|
| 0 | id | int(11) | NO | PRI | | auto_increment |
| 1 | username | varchar(100) | NO | | | |
| 2 | created | datetime | YES | | | |
"""
return self.execute(f"DESCRIBE {table};", columns=["Field","Type","Null","Key","Default","Extra"], verbose=verbose)
[docs] def insert(self, table:str, data:List[list], columns:list=[], col_type="input_field", verbose:bool=False) -> int:
"""Insert a record to specified ``table``
Args:
table (str) : The name of the table.
data (List[list]) : Data to be inserted.
columns (list, optional) : Column names to be inserted values. Defaults to ``[]``.
col_type (str, optional) : If ``columns`` has no data, use :meth:`get_colnames <pycharmers.sdk.mysql.PycharmersSQL.get_colnames>` to decide which columns to be inseted. Defaults to ``"input_field"``.
verbose (bool, optional) : Whether to display the query or not. Defaults to ``False``.
Returns:
int: The number of rows in ``table``
Examples:
>>> import datetime
>>> from pycharmers.sdk import PycharmersSQL
>>> sql = PycharmersSQL()
>>> sql.insert(table="pycharmers_user", data=[
... ["iwasaki", "now()"],
... ["shuto", datetime.datetime(1998,7,3)]
>>> ], columns=["username", "created"])
"""
if len(columns)==0:
columns = self.get_colnames(table=table, col_type=col_type)
if not isinstance(data[0], list):
data = [data]
df_field = self.describe(table=table)
coltypes = df_field.set_index("Field").filter(items=columns, axis=0).Type
values = ", ".join([f"({', '.join([self.format_data(e,t) for e,t in zip(d,coltypes)])})" for d in data])
return self.execute(f"INSERT INTO {table} ({', '.join(columns)}) VALUES {values}", verbose=verbose)
[docs] def update(self, table:str, old_column:str, old_value:Any, new_columns:List[str], new_values:List[Any], verbose:bool=False) -> None:
"""Update records.
Args:
table (str) : The name of table.
old_column (str) : [description]
old_value (Any) :
new_columns (List[str]) : [description]
new_values (List[Any]) :
verbose (bool, optional) : Whether to display the query or not. Defaults to ``False``.
Examples:
>>> from pycharmers.sdk import PycharmersSQL
>>> sql = PycharmersSQL()
>>> print(sql.selectAll(table="pycharmers_user").to_markdown())
| | id | username | created |
|---:|-----:|:-----------|:--------------------|
| 0 | 1 | iwasaki | 2021-05-22 07:23:10 |
| 1 | 2 | shuto | 1998-07-03 00:00:00 |
>>> sql.update(table="pycharmers_user", old_column="username", old_value="iwasaki", new_column="created", new_value="now()")
>>> print(sql.selectAll(table="pycharmers_user").to_markdown())
| | id | username | created |
|---:|-----:|:-----------|:--------------------|
| 0 | 1 | iwasaki | 2021-05-22 07:28:09 |
| 1 | 2 | shuto | 1998-07-03 00:00:00 |
"""
df_field = self.describe(table=table)
old_type = df_field[df_field.Field==old_column].Type.values[0]
new_coltypes = df_field.set_index("Field").filter(items=new_columns, axis=0).Type
update_values = ", ".join([f"{col} = {self.format_data(val, type)}" for col,val,type in zip(new_columns, new_values, new_coltypes)])
return self.execute(f"UPDATE {table} SET {update_values} WHERE {old_column} = {self.format_data(old_value, old_type)}", verbose=verbose)
[docs] def delete(self, table:str, column:str, value:Any, verbose:bool=False) -> None:
"""Delete the records whose ``column`` is ``value`` from the table named ``table`` .
Args:
table (str) : The name of table.
column (str) :
value (Any) :
verbose (bool, optional) : Whether to display the query or not. Defaults to ``False``.
Examples:
>>> from pycharmers.sdk import PycharmersSQL
>>> sql = PycharmersSQL()
>>> print(sql.select(table="pycharmers_user").to_markdown())
| | id | username | created |
|---:|-----:|:-----------|:--------------------|
| 0 | 1 | iwasaki | 2021-05-22 07:23:10 |
| 1 | 2 | shuto | 1998-07-03 00:00:00 |
>>> sql.delete(table="pycharmers_user", column="username", value="shuto")
>>> print(sql.select(table="pycharmers_user").to_markdown())
| | id | username | created |
|---:|-----:|:-----------|:--------------------|
| 0 | 1 | iwasaki | 2021-05-22 07:23:10 |
"""
df_field = self.describe(table=table)
type = df_field[df_field.Field==column].Type.values[0]
return self.execute(f"DELETE FROM {table} WHERE {column} = {self.format_data(value, type)}", verbose=verbose)
[docs] def merge(self, table1info:Dict[str,List[str]], table2info:Dict[str,List[str]], method:str="inner", verbose:bool=False) -> pd.DataFrame:
"""Select values from table1 and table2 using ``method`` JOIN
Args:
table1info (Dict[str,List[str]]) : Table1 information ( ``key`` : table name, ``value`` : selected columns (0th column is used for merging.))
table2info (Dict[str,List[str]]) : Table2 information ( ``key`` : table name, ``value`` : selected columns (0th column is used for merging.))
method (str, optional) : How to merge two tables. Defaults to ``"inner"``.
verbose (bool, optional) : Whether to display the query or not. Defaults to ``False``.
Returns:
pd.DataFrame: Merged table records.
Examples:
>>> from pycharmers.sdk import PycharmersSQL
>>> sql.merge(
... table1info={"user": ["id", "name", "job_id"]},
... table2info={"jobs": ["id", "job_name"]},
... method="inner", verbose=True
>>> )
>>> SELECT user.name, user.job_id, jobs.job_name FROM user INNER JOIN jobs ON user.id = jobs.id;
"""
table1,columns1 = table1info.popitem()
table2,columns2 = table2info.popitem()
col_merge1 = columns1.pop(0)
col_merge2 = columns2.pop(0)
return self.execute(f"SELECT {', '.join([f'{table1}.{col}' for col in columns1] + [f'{table2}.{col}' for col in columns2])} FROM {table1} {method.upper()} JOIN {table2} ON {table1}.{col_merge1} = {table2}.{col_merge2};", columns=columns1+columns2, verbose=verbose)
[docs] def get_colnames(self, table:str, col_type:str="all") -> list:
"""Get column names in the specified ``table`` .
Args:
table (str) : The name of table.
col_type (str, optional) : What types columns to extract. Defaults to ``"all"``.
Returns:
list: Extracted Columns.
Examples:
>>> from pycharmers.sdk import PycharmersSQL
>>> sql = PycharmersSQL()
>>> sql.get_colnames(table="pycharmers_user")
>>> ['id', 'username', 'created']
"""
handleKeyError(lst=["all", "minimum", "input_field"], col_type=col_type)
df = self.describe(table=table)
if col_type=="all":
pass
elif col_type=="minimum":
df = df[(df.Extra!="auto_increment") & (df.Null=="NO")]
elif col_type=="input_field":
df = df[df.Extra!="auto_increment"]
return df.Field.to_list()
[docs] def select(self, table:str, columns:List[str]=[], where:Dict[str,Any]={}, verbose:bool=False) -> pd.DataFrame:
"""Get selected records.
Args:
table (str) : The name of table.
columns (List[str], optional) : Selected Columns. If you don't specify any columns, extract all columns. Defaults to ``[]``.
where (Dict[str,Any], optional) : Specify the condition of the column to be extracted. { ``colname`` : ``value`` }
verbose (bool, optional) : Whether to display the query or not. Defaults to ``False``.
Returns:
pd.DataFrame: Selected Records.
Examples:
>>> from pycharmers.sdk import PycharmersSQL
>>> sql = PycharmersSQL()
>>> df = sql.select(table="pycharmers_user")
>>> print(df.to_markdown())
| | id | username | created |
|---:|-----:|:-----------|:--------------------|
| 0 | 1 | iwasaki | 2021-05-22 07:23:10 |
| 1 | 2 | shuto | 1998-07-03 00:00:00 |
>>> df = sql.select(table="pycharmers_user", columns=["id", "username"])
>>> print(df.to_markdown())
| | id | username |
|---:|-----:|:-----------|
| 0 | 1 | iwasaki |
| 1 | 2 | shuto |
"""
if len(columns)==0:
columns = self.get_colnames(table=table, col_type="all")
if len(where)>0:
df_field = self.describe(table=table)
coltypes = df_field.set_index("Field").filter(items=list(where.keys()), axis=0).Type
WHERE = f'WHERE {" AND ".join([f"{key} = {self.format_data(val,type)}" for type,(key,val) in zip(coltypes,where.items())])}'
else:
WHERE = ""
return self.execute(f"SELECT {', '.join(columns)} FROM {table} {WHERE};", columns=columns, verbose=verbose)
[docs] def count_rows(self, table:str, verbose:bool=False) -> int:
"""Get the number of rows in the ``table`` .
Args:
table (str) : The name of table.
verbose (bool, optional) : Whether to display the query or not. Defaults to ``False``.
Returns:
int: The number of rows in the specified ``table`` .
Examples:
>>> from pycharmers.sdk import PycharmersSQL
>>> sql = PycharmersSQL()
>>> sql.count_rows(table="pycharmers_user")
2
"""
return self.execute(f"SELECT count(*) FROM {table};")[0][0]
[docs] def show_tables(self, verbose:bool=False) -> pd.DataFrame:
"""Show all tables.
Args:
verbose (bool, optional) : Whether to display the query or not. Defaults to ``False``.
Returns:
pd.DataFrame: Information for all Tables.
>>> from pycharmers.sdk import PycharmersSQL
>>> sql = PycharmersSQL()
>>> df = sql.show_tables()
>>> print(df.to_markdown())
| | Name | Engine | Version | Row_format | Rows | Avg_row_length | Data_length | Max_data_length | Index_length | Data_free | Auto_increment | Create_time | Update_time | Check_time | Collation | Checksum | Create_options | Comment |
|---:|:----------------|:---------|----------:|:-------------|-------:|-----------------:|--------------:|------------------:|---------------:|------------:|-----------------:|:--------------------|:--------------------|:-------------|:----------------|:-----------|:-----------------|:----------|
| 0 | pycharmers_user | InnoDB | 10 | Dynamic | 0 | 0 | 16384 | 0 | 0 | 0 | 1 | 2021-05-22 07:06:17 | NaT | | utf8_general_ci | | | |
"""
return self.execute("show table status;", columns=['Name', 'Engine', 'Version', 'Row_format', 'Rows', 'Avg_row_length', 'Data_length', 'Max_data_length', 'Index_length', 'Data_free', 'Auto_increment', 'Create_time', 'Update_time', 'Check_time', 'Collation', 'Checksum', 'Create_options', 'Comment'], verbose=verbose)
[docs] def explain(self, table:str, verbose:bool=False) -> pd.DataFrame:
"""Explain records
Args:
table (str) : The name of table.
verbose (bool, optional) : Whether to display the query or not. Defaults to ``False``.
Returns:
pd.DataFrame: Explanation for records.
Examples:
>>> from pycharmers.sdk import PycharmersSQL
>>> sql = PycharmersSQL()
>>> df = sql.explain(table="pycharmers_user")
>>> print(df.to_markdown())
| | id | select_type | table | partition | type | possible_keys | key | key_len | ref | rows | filtered | Extra |
|---:|-----:|:--------------|:----------------|:------------|:-------|:----------------|:------|:----------|:------|-------:|-----------:|:--------|
| 0 | 1 | SIMPLE | pycharmers_user | | ALL | | | | | 2 | 100 | |
"""
return self.execute(f"EXPLAIN SELECT * FROM {table};", columns=["id","select_type","table","partition","type","possible_keys","key","key_len","ref","rows","filtered","Extra"], verbose=verbose)