Source code for dstk.ssh._ssh

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Classe pour client SSH.

Created on Sat Apr 11 06:41:28 2020

@author: Cyrile Delestre
"""

import logging
from os.path import dirname
from typing import List, Dict, Optional
from functools import reduce
from subprocess import run as sp_run

[docs]class ClientSSH: r""" Classe de connextion SSH permettant d'envoyer, exécuter et récupérer des données sur un serveur distant (typiquement les bastions). La classe a pour optique de faciliter le lancement de scipts Pig directement d'une machine locale. Parameters ---------- hostname : Optional[str] nom du hostname du serveur distant. Si cette valeur n'est pas renseigné alors un promp pour demander le serveur souvrira dans le terminal. username : Optional[str] nom de l'utilisateur pour identification. Si cette valeur n'est pas renseigné alors uin prompt pour demander l'identifiant de l'utilisateur souvrira dans le terminal. ssh_key : Optional[str] chemin vers la clef SSH privé. Si non reseigné un prompt s'ouvrira pour demande le mot de passe associé au username. timeout : int timeout des commande SSH. ATTENTION : Une commande Pig peut être long. Si pendant l'exécution le timeout est atteint et qu'il y a d'autre instruction dans le script il y aura une erreur Python. Mais la procédure Pig continura en fond sur la cellule Hadoop, sauf si le terminal avec la session Python ce ferme. verbose : bool affiche le retour de la commande SSH. Notes ----- La procédure standars pour lancer un scipt Pig est : 1 : Envoyer le Pig sur un des bastions 2 : Envoyer les données nécéssaire pour l'exécution du Pig sur Bastion 3 : Envoyer les données sur HDFS 4 : Exécuter le scipt Pig 5 : Récupérer les résultat sous Bastion 6 : Les renvoyer en local 7 : Supprimer les données de Bastion ou une alternative à l'étape 6 et 7 : 5 bis : Envoyer sur staging 6 bis : Récupérer les données directement depuis HDFS via le staging Il existe dans la classe des procédures aggregant certaines de ces étapes. Si aucun ssh_key n'est renseigné alors un mot de passe sera demandé. Dans un terminal bash ce mot de passe sera masqué, mais ce ne sera pas le cas dans un terminal iPython (incompatible avec le masquage des inputs). Examples -------- L'exemple suiviant consiste à mettre l'ensemble des fichiers jar et script sur un serveur distant afin de permettre l'exécusion d'un script pig. La commande pig serait équivalente à : :: pig -useHCatalog -f script.pig -p param_1="param_1" -p param_2=42 -p param_3="param_3" >>> from dstk.ssh import ClientSSH >>> >>> # conf_ssh dictionnaire contenant les paramètres de connexions SSH >>> ssh = ClientSSH(**conf_ssh, verbose=False, timeout=10800) >>> >>> # SCP d'un fichier vers un serveur distant >>> ssh.cp(local_path_file, server_path_file) >>> >>> # SCP de plusieur script Pig vers un serveur distant >>> for script in all_script_path: >>> ssh.cp(f"{local_path_script}{script}", server_path_file) >>> >>> # SCP d'un repertoir local ver un serveur distant >>> ssh.cp(local_path_folder, server_path_folder, recursive=True) >>> >>> # Upload sur la cellule Hadoop d'un fichier nécessaire à l'exécution >>> # d'un script Pig >>> ssh.hadoop_put(f"{server_path}/data_input.csv") >>> >>> # Exécution d'un script Pig qui utilise HCatalog pour une table Hive >>> ssh.pig( >>> f"{server_path}/script.pig", >>> params=dict( >>> param_1='param_1', >>> param_2=42, >>> param_3='param_3' >>> ), >>> opt=['useHCatalog'] >>> ) >>> >>> # Déconnexion au serveur >>> ssh.logout() See also -------- download_file """ def __init__(self, hostname: Optional[str]=None, username: Optional[str]=None, ssh_key: Optional[str]=None, timeout: int=30, verbose: bool=False): try: global getpass import getpass except ModuleNotFoundError: raise ModuleNotFoundError( "Le module getpass n'est pas installé : l'instruction " "import getpass renvoit une erreur " "ModuleNotFoundError. Rajouter getpass dans les " "packages à installer dans le fichier environment.yml du " "projet." ) try: global pxssh from pexpect import pxssh except ModuleNotFoundError: raise ModuleNotFoundError( "Le module pexpect n'est pas installé : l'instruction " "from pexpect import pxssh renvoit une erreur " "ModuleNotFoundError. Rajouter pexpect dans les " "packages à installer dans le fichier environment.yml du " "projet." ) self.hostname = input('hostname: ') if hostname is None else hostname self.username = input('username: ') if username is None else username self.ssh_key = ssh_key self.timeout = timeout self.verbose = verbose self._connexion() def _connexion(self): try: self.conn = pxssh.pxssh(timeout=self.timeout) if self.ssh_key is None: password = getpass.getpass('password: ') self.conn.login( server = self.hostname, username = self.username, password = password ) else: self.conn.login( server = self.hostname, username = self.username, ssh_key = self.ssh_key ) except pxssh.ExceptionPxssh as e: print("La connexion pxssh a échouer.") print(e)
[docs] def test_connexion(self): r""" Méthode permettant de tester la connexion SSH. Retourne la commande 'uptime'. Si la connexion est correctement établie retourne sur le terminal le uptime du serveur, sinon retourne une erreur. """ self.exe_cmd('uptime', True) return self
[docs] def exe_cmd(self, cmd: str, verbose: Optional[bool]=None): r""" Méthode d'éxécution d'une commande sur le serveur distant. Parameters ---------- cmd : str cammande (shell, bach, etc.) à exécuter. verbose : Optional[bool] affiche le retour de la commande. Si None utilise le paramétrage par défaut de la classe ClientSSH. Examples -------- La commande suivant retourne les fichiers contenue dans le repertoire courant : >>> from dstk.ssh import ClientSSH >>> >>> # conf_ssh dictionnaire contenant les paramètres de connexions SSH >>> ssh = ClientSSH(**conf_ssh, verbose=False, timeout=10800) >>> ssh.exe_cmd('ls', verbose=True) """ verbose = verbose if verbose is not None else self.verbose self.conn.sendline(cmd) self.conn.prompt() if verbose: print(self.before()) return self
[docs] def before(self): r""" Permet de rendre interprétable dans Python les retours des prompts ce produisant avec le timeout. """ return self.conn.before.decode('utf-8').replace('\r\n', '\n')
[docs] def after(self): r""" Permet de rendre interprétable dans Python les retours des prompts ce produisant après le timeout. """ return self.conn.after.decode('utf-8').replace('\r\n', '\n')
[docs] def check_cmd(self): r""" Permet de récupérer le sortie d'une commande. """ self.conn.sendline("echo $?") self.conn.prompt() return self.before().split('\n')[1]
[docs] def pig(self, path_script: str, params: Dict[str, str]=dict(), opt: List[str]=[]): r""" Méthode d'exécution d'un script Pig. Parameters ---------- path_scipt : str chemin sur le serveur distant du scipt Pig à exécuter params : Dict[str, str] dictionnaire de l'ensemble des paramètres à envoyer au srcipt Pig. opt : List[str] list d'option Pig. Examples -------- L'exemple suivant est equivalent à lancer manuellement sur le serveur distant la commande : :: pig -useHCatalog -f script.pig -p param_1="param_1" -p param_2=42 -p param_3="param_3" >>> from dstk.ssh import ClientSSH >>> >>> # conf_ssh dictionnaire contenant les paramètres de connexions SSH >>> ssh = ClientSSH(**conf_ssh, verbose=False, timeout=10800) >>> ssh.pig( >>> "script.pig", >>> params=dict( >>> param_1='param_1', >>> param_2=42, >>> param_3='param_3', >>> ), >>> opt=['useHCatalog'] >>> ) """ query = "pig" if len(opt) > 0: for op in opt: query += f" -{op}" query += f" -f {path_script}" if len(params) > 0: for kk, vv in params.items(): query += f' -p {kk}=\"{vv}\"' self.exe_cmd(query) return self
[docs] def hadoop_put(self, path_file_server: str, path_file_hadoop: Optional[str]=None, verbose: Optional[bool]=None, if_exist: bool=True): r""" Permet de mettre un fichier du serveur distant vers la cellule Hadoop. Parameters ---------- path_file_server : str chemin du fichier sur le serveur distant. path_file_hadoop : Optional[str] chemin de destination sur la cellule Hadoop. verbose : Optional[bool] affiche le retour de commandes. if_exist : bool créé le répertoire sur la cellule Hadoop sur ce dernier n'éxiste pas. """ if path_file_hadoop is None: path_file_hadoop = "./" if if_exist: # Check si le réperoire existe path_hadoop = '/'.join(path_file_hadoop.split('/')[:-1])+'/' exist = ( self.exe_cmd(f"hadoop fs -ls {path_hadoop}", False) .check_cmd() ) if exist != '0': logging.warning( f"Le répertoire {path_hadoop} sur la cellule Hadoop " "n'existe pas. Création de cette dernière." ) self.exe_cmd(f"hadoop fs -mkdir {path_hadoop}", False) # Check si le fichier existe file_name = path_file_hadoop.split('/')[-1] if file_name == '': file_name = path_file_server.split('/')[-1] exist = ( self.exe_cmd(f"hadoop fs -ls {path_hadoop}{file_name}", False) .check_cmd() ) if exist == '0': logging.warning( f"Le fichier {file_name} existe déjà. " "Suppression de ce dernier." ) self.exe_cmd(f"hadoop fs -rmr {path_hadoop}{file_name}", False) query = f"hadoop fs -put {path_file_server} {path_file_hadoop}" self.exe_cmd(query, verbose) return self
[docs] def hadoop_getmerge(self, path_file_hadoop: str, path_file_server: Optional[str]=None, check_is_directory: bool=False, verbose: Optional[bool]=None): r""" Permet de faire un getmerge de la cellule Hadoop vers un répertoire du serveur distant. Parameters ---------- path_file_hadoop : str chemin vers le répertoire Hadoop où sont stockés tous les segments d'un même fichier. Fonctionne également si le chemin pointe vers un simple fichier. path_file_server : Optional[str] chemin vers le fichier du serveur distant vers le quel seront fusionné tous les sergments. check_is_directory : bool check si le chemin entré est un répertoire ou un fichier (par défaut False). Pour une exécution plus rapide sans passer par ce test finir le path_file_or_folder_hadoop par "/*" si on pointe vers un répertoire évitant ainsi ce test. verbose : Optional[bool] affiche le retour de commandes. Notes ----- Il est déconseillé généralement de passer par cette méthode de récupération de donnée si le fichier en trop volumineux pouvant saturer très rapidement l'espace disque du serveur distant. Pour les gros fichier il est conseiller de stocker sur le répertoire staging de la cellule et de passer par get_url_staging qui permet de créer le chemin URL du fichier et d'utiliser :func:`~dstk.ssh._utils.download_file`. Celà permet de passer de Hadoop vers un machine local sans passer par le serveur distant. See also -------- get_url_staging, :class:`~dstk.ssh._utils.download_file` """ siz_file = 0 if check_is_directory: query = f"hadoop fs -ls {path_file_hadoop}/" self.exe_cmd(query, False) siz_file = int( list( filter( lambda x: len(x) > 0, self.before().split('\n')[1].split(' ') ) )[4] ) query = f"hadoop fs -getmerge {path_file_hadoop}" if siz_file > 0 and check_is_directory: query += "/*" query = ( query if path_file_server is None else query + f" {path_file_server}" ) self.exe_cmd(query, verbose) return self
[docs] def get_url_staging(self, path_staging: str, file_name: Optional[str]=None, content_length: bool=True): r""" Méthode permettant de récuper l'URL d'un fichier (ou repertoire si le fichier est segmenté) sur le staging de la cellule Hadoop. Parameters ---------- path_staging : str chemin vers le répertoire du staging. file_name : Optional[str] nom du fichier que l'on souhaite récupérer (attention si None descend l'ensemble de répertoire). content_length : bool Permet d'intégrer la taille du fichier dans l'URL permettant d'avoir une bare de progression du téléchargement de la donnée via :class:`~dstk.ssh._utils.download_file`. Notes ----- Cette méthode est à utiliser avec le fonction :func:`~dstk.ssh._utils.download_file` qui, a partir de l'URL généré, télécharger la donnée depuis la cellule Hadoop directement vers l'ordinateur local. See also -------- :class:`~dstk.ssh._utils.download_file` """ if not path_staging.startswith("/hdfs/staging/out/"): path = f"/hdfs/staging/out/{path_staging}" else: path = path_staging info = self.exe_cmd(f"hadoop fs -ls {path}").before() if self.check_cmd() != '0': raise IOError(f"Fichier {path} inexistant.") url = ( "http://hadoop-manny-staging.s.arkea.com:8080/hadoop-staging/get" + path ) if file_name is not None or content_length: url += "?" if file_name is not None: url += f"file-name={file_name}" if content_length: info = list( map( lambda x: list( filter( lambda y: len(y) > 0, x.split(' ') ) ), filter( lambda x: len(x) > 0, info.split("\n") ) ) ) length = reduce(lambda x,y: int(y[4])+x, info[1:], 0) if file_name is not None: url += "&" url += f"content-length={length}" return url
[docs] def cp(self, file_path_local: str, file_path_server: Optional[str]=None, recursive: bool=False, if_exist: bool=True, verbose: Optional[bool]=None): r""" Méthode SCP (Secure CoPy) d'une machine local vers un serveur distant. Parameters ---------- file_path_local : str chemin vers un fichier ou un répertoire local. file_path_server : Optional[str] chemin vers un répertoire sur le serveur distant. recursive : bool si file_path_local pointe vers un répertoire, permet de copié récursivement les fichiers et répertoires contenu dans le répertoire vers le serveur distant. if_exist : bool permet de tester si le répertoire sur le serveur distant existe, s'il n'éxiste pas le crée. verbose : Optional[bool] affiche le retour de commandes. """ verbose = verbose if verbose is not None else self.verbose option = "rp" if recursive else "p" if file_path_server is None: path_server = "./" else: path_server = file_path_server if if_exist: dir_server = dirname(file_path_server) exist = self.exe_cmd(f"ls {dir_server}").check_cmd() if exist == '2': logging.warning( f"Le répertoire {dir_server} n'existe pas. " "Création de ce dernier." ) self.exe_cmd(f"mkdir {dir_server}") if isinstance(file_path_local, list): dquote = '"' file_path_local = f"{dquote}{' '.join(['a', 'b'])}{dquote}" path_server = dirname(file_path_server) query = [ "scp", f"-{option}", file_path_local, f"{self.username}@{self.hostname}:{path_server}" ] output = sp_run(query) if verbose: print(output.stdout) return self
[docs] def logout(self): r""" Déconnexion du client SSH. """ self.conn.logout()