Source code for dstk.data._sql

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Classes et fonctions liées à la gestion de bases SQL.

Created on Mon Nov 23 08:42:43 2020

@author: Cyrile Delestre
"""

import sqlite3
from typing import Union

import pandas as pd
import numpy as np
from tqdm import tqdm
from joblib import Parallel, delayed

[docs]def csv2sql(path_csv_or_df: Union[str, pd.DataFrame], path_sql: str, table_name: str, n_jobs: int=1, if_exists: str='append', **csv_opt): r""" Fonction permettant de convertir un fichier CSV en base SQLite3. La méthode permet de ne pas charger le CSV en RAM si ce dernier est trop gros. Parameters ---------- path_csv : Union[str, pd.DataFrame] chemin du csv ou DataFrame path_sql : str chemin de la base sql table_name : str nom de la table sql n_jobs : int nombre de jobs (gain si gros chunck), par défaut 1 if_exists : str comportement si l'indice existe déjà dans la table sql. Peut être : 'fail', 'replace', 'append', par défaut 'append' **csv_opt : arguments de pandas.read_csv Examples -------- Transfert d'un CSV trop gros pour être mis en RAM vers une base SQLite3. >>> from dstk.data import csv2sql >>> >>> csv2sql( >>> path_csv_or_df="data/mon_gros_csv.csv", >>> path_sql="data/ma_table_sql.db", >>> table_name="nom_de_ma_table", >>> n_jobs=4, # Nombre de jobs en parallèle >>> chunksize=1000 # taille du chunker (1000 par 1000 ligne) >>> ); Attention, dans notre cas ce sera 4 lignes qui seront stockées en RAM : n_jobs*chunksize """ if isinstance(path_csv_or_df, str): data = pd.read_csv(path_csv_or_df, **csv_opt) elif isinstance(path_csv_or_df, pd.DataFrame): data = path_csv_or_df else: raise AttributeError( "path_csv_or_df doit être soit un str ou une DataFrame, mais pas " f"{type(path_csv_or_df)}." ) if "chunksize" in csv_opt.keys() and csv_opt["chunksize"] is not None: if isinstance(path_csv_or_df, str): with open(path_csv_or_df) as file: total = sum(1 for line in file) else: total = data.shape[0] @delayed def inser_sql(dd:pd.DataFrame): with sqlite3.connect(path_sql) as conn: dd.to_sql( table_name, conn, if_exists = 'append', index = False ) with Parallel(n_jobs=n_jobs, backend='loky') as par: par( inser_sql(dd) for dd in tqdm( data, desc = "csv2sql", total = int(np.ceil(total/csv_opt["chunksize"])), miniters = 0.5 ) ) else: with sqlite3.connect(path_sql) as conn: data.to_sql( table_name, conn, if_exists = if_exists, index = False )
[docs]def index_sql(path_sql: str, table_name: str, list_keys: list, index_name: str='idx', unique: bool=False): r""" Fonction créant un index pour une base SQLite. Créer un index peremet d'accélérer grandement les filtres sur une base, car l'index est mise en RAM. Parameters ---------- path_sql : str chemin de la base sql table_name : str nom de la table sql list_keys: list list contenant les colonnes pour la génération de l'index index_name : str nom de l'index, par défaut 'idx' unique : bool si l'index est unique ou non, par défaut false Examples -------- Je possède une base base.db et je sais que je vais de manière très fréquente un filtrage sur la colonne A et B du type : >>> from dstk.data import PandasSQLite >>> >>> arg1, arg2 = '...', '...' >>> with PandasSQLite("data/base.db") as db: >>> data = db( >>> "SELECT * FROM toto " >>> f"WHERE A='{arg1}' AND B='{arg2}';" >>> ) Cette commande ira beaucoup plus vite si au préalable en aillant fait : >>> from dstk.data import index_sql >>> >>> index_sql( >>> path_sql="data/base.db", >>> table_name="toto", >>> list_keys=['A', 'B'] >>> ); Notes ----- Commande à n'exécuter qu'une seule fois. Si plusieurs index sont créés les noms 'index_name' douvent être uniques. """ with sqlite3.connect(path_sql) as conn: cmd = (f"CREATE {'UNIQUE INDEX' if unique else 'INDEX'} {index_name} " f"ON {table_name}({', '.join(list_keys)});") cursor = conn.cursor() cursor.execute(cmd) conn.commit()