#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Exemple d'utilisation de la classe de wrapping Scikit-Learn pour PyTorch.
Created on Mon Oct 21 14:42:57 2019
@author: Cyrile Delestre
"""
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch.nn import Module, LSTM
from torch.nn.functional import l1_loss
from torch.optim import SGD
from torch.utils.data import Dataset
from dstk.pytorch.networks import MLP
from dstk.pytorch.metrics import (mean_absolute_error_online,
mean_squared_error_online, reverse_score)
from dstk.pytorch import (BaseRegressorOnline, RandomizedSearchOnline,
check_tensor)
class dataset(Dataset):
r"""
Classe mettant les datas au format adéquat PyTorch.
Attention ! La sortie est un dictionnaire et les clefs du dictionnaire
doivent correspondre aux noms des features utilisés dans le forward de la
classe PyTorch. Idem si une fonction collate_fn est mise en sortie du
DataLeader, il faut que les noms soient cohérents avec le forward de la
classe PyTorch.
"""
def __init__(self, data, target, win_size=1):
self.data = data
self.target = target
self.win_size = win_size
def __len__(self):
return data.shape[0]
def __getitem__(self, idx):
if idx > self.__len__()-1:
raise IndexError()
return {
'data': check_tensor(
self.data[max(0, idx-self.win_size+1):idx+1,:]
.astype(np.float32)
#[np.newaxis,:]
),
'target': check_tensor(
self.target[idx].astype(np.float32)
)
}
class MonRegressorOnline(Module, BaseRegressorOnline):
r"""
Ma classe MonRegressorOnline hérite de Module de PyTorch et de
BaseRegressorOnline qui permet d'avoir un environnement PyTorch compatible
Scikit-Learn. Il faut que la fonction d'initialisation soit compatible
avec le standard Scikit-Learn, donc l'implémentation du réseau ne se fait
pas dans "__init__" mais dans une fonction "build" séparée. Il est
important d'ajouter dans "__init__" l'appel vers la méthode de
construction build permettant d'instancier le réseau. Attention à l'ordre,
l'héritage doit être Module puis BaseRegressorOnline.
Notes
-----
Les fonctions obligatoires à minima :
__init__ :
fonction d'initialisation de la classe. Elle doit être au format
Scikit-Learn, c'est-à-dire que toutes les entrées qui sont
susceptibles d'être modifiées via "set_params()" doivent avoir leur
homonyme en attributs. Penser également à instancier le réseau avec
l'appel à la méthode build. Ce n'est pas obligatoire mais alors il
faudra appelé la méthode soit même pour construire le réseau. Ne pas
oublier de commencer par initialiser la classe mère Module de PyTorch
avec super().__init__().
build :
méthode d'implémentation des éléments du réseau dans PyTorch.
L'attribut "built" doit être mis à True à la fin de cette méthode. Il
y a un attribut qui doit apparaitre ici :
- optimizer : Optionel
Il s'agit de l'optimizer du modèle. Il doit être placé en
attribut et doit être initialisé dans le build si l'une de ses
caractéristiques est susceptible d'être impactée par
"set_params()" (comme le learning rate ou le type d'optimizer,
etc.). Si l'optimizer et ses paramètres sont fixes, alors il
est possible d'initialiser l'attribut "optimizer" dans la
fonction d'initialisation __init__.
forward :
méthode indispensable à la classe mère Module de PyTorch, il s'agit de
l'application forward du réseau. Il y a 2 contraintes à cette méthode :
- args
Les noms des arguments utiles pour le calcul du forward du
modèle (les entrées) doivent posséder les mêmes noms que ceux
présents dans le dictionnaire en sortie de DataLoader, donc en
sortie de générateur de données Dataset et éventuellement de
collate_fn.
- **kargs
Autres arguments éventuellement envoyés par la méthode fit ou
autre qui ne sont pas utiles au calcul du forward. A l'inverse
tout argument indispensable au forward doit être mentionné en
argument de manière explicite.
"""
def __init__(self,
dim_in,
num_layers=1,
hidden_size=16,
dropout_rnn=0,
n_layers=2,
dim_first_lay=16,
embed_topo='linear',
inter_units=10,
alpha=0.3,
dropout_mlp=0,
lr=1e-3,
momentum=0.1,
weight_decay=0):
super().__init__()
self.dim_in = dim_in
self.num_layers = num_layers
self.hidden_size = hidden_size
self.dropout_rnn = dropout_rnn
self.n_layers = n_layers
self.dim_first_lay = dim_first_lay
self.embed_topo = embed_topo
self.inter_units = inter_units
self.alpha = alpha
self.dropout_mlp = dropout_mlp
self.lr = lr
self.momentum = momentum
self.weight_decay = weight_decay
self.build()
def build(self):
self.rnn = LSTM(
input_size=self.dim_in,
hidden_size=self.hidden_size,
num_layers=self.num_layers,
batch_first=True,
dropout=self.dropout_rnn
)
self.mlp = MLP(
dim_in=self.hidden_size,
dim_out=1,
dim_first_lay=self.dim_first_lay,
n_layers=self.n_layers,
embed_topo=self.embed_topo,
inter_units=self.inter_units,
alpha=self.alpha,
dropout_prob=self.dropout_mlp,
batchnorm=False,
activation_last_layer=None,
batchnorm_last_layer=False,
dropout_last_layer=False
)
self.optimizer = SGD(
self.parameters(),
lr=self.lr,
momentum=self.momentum,
dampening=0,
weight_decay=self.weight_decay,
nesterov=True
)
self.hidden_lay = tuple()
def forward(self, data, **kargs):
# Initialisation des hiddens layers suivant une loi Noramle centrée
# réduite.
if len(self.hidden_lay) == 0:
self.hidden_lay = (
torch.randn(self.num_layers, data.shape[0], self.hidden_size),
torch.randn(self.num_layers, data.shape[0], self.hidden_size)
)
lay1, hidden_lay = self.rnn(data, self.hidden_lay)
# Périnisation des hiddens layres d'une itération à l'autre
self.hidden_lay = tuple(ii.detach().clone() for ii in hidden_lay)
return self.mlp(lay1[:,-1,:])
# Définition de l'univers des hyper-paramètres à tester
UNIV_PARAM = dict(
num_layers=[2, 3],
hidden_size=[8, 16, 32],
dropout_rnn=[0.2, 0.5, 0.7, 0.8, 0.9],
n_layers=[2, 3],
dim_first_lay=[2, 4, 16],
embed_topo=["linear", "bottleneck"],
inter_units=[8, 16, 32],
alpha=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6],
dropout_mlp=[0.2, 0.5],
lr=[5e-2, 1e-2, 5e-3, 1e-3],
momentum=[0.01, 0.1, 0.25],
weight_decay=[0, 0.25, 0.5]
)
# Série non stationnaire -> offset + trend + Random walk + saisonality
def non_stationary_ts(size,
horizon=50,
alpha=5,
beta=0.005,
pow_noise=0.1,
pow_sais=1,
freq_sais=0.05):
r"""
Fonction génératrice d'une série temporelle non stationnaire ayant une
composante de offset (alpha), de trend (beta), de marche aléatoire
(pow_noise) et d'une saisonalité sinusoidale (pow_sais et freq_sais) :
y_t = α + β*t + y_{t-1} + pow_sais*sin(freq_sais*t) + pow_noise*ε
avec ε qui suit une loi normale centrée réduite.
"""
rand_walk = np.cumsum(np.random.randn(size)*pow_noise)
trend = np.arange(size)*beta
sais = pow_sais * np.sin(freq_sais*np.arange(size))
ts = alpha+trend+rand_walk+sais
return ts[:-horizon], ts[horizon:]
if __name__=="__main__":
# Génération d'un dataset de regression non stationnaire avec un horizon
# de prédiction de 100 idx
horizon = 100
data, target = non_stationary_ts(1000, horizon)
# Création d'un dataset PyTorch
data_ds = dataset(data.reshape(-1,1), target.reshape(-1,1), win_size=1)
# Chargement du modèle
model_online = MonRegressorOnline(dim_in=1)
# Initialisation et éxécution du RandomizedSearchOnline du module
# :class:`~dstk.pytorch._random_search.RandomizedSearchOnline` avec
# utilisation des métriques spécialisé dans les modèles online
# :mod:`~dstk.pytorch.metrics`.
dict_scor = dict(
MAE=reverse_score(mean_absolute_error_online),
MSE=reverse_score(mean_squared_error_online)
)
model_search = RandomizedSearchOnline(
model_online,
UNIV_PARAM,
n_iter=50,
monte_carlo=5,
scoring=dict_scor,
n_jobs=4,
refit="MAE",
verbose=1
)
# L'entraînement brut s'arrêtre jusqu'à l'itération 100
model_search.fit(
X=data_ds,
y=target,
idx_stop_train=100,
loss_fn=l1_loss,
target_field='target'
)
# Création d'un modèle online vierge avec les meilleurs paramètres du
# random search. (Attention, bien penser à rebuilder)
model_online.set_params(**model_search.best_params_).build()
# Fit online de l'approche avec les bons paramètres
c_torch = model_online.fit_online(
X=data_ds,
predict=True,
loss_fn=l1_loss,
target_field='target'
)
# Visualisation du résultat du modèle
plt.figure()
plt.plot(target)
plt.plot(c_torch)
plt.xlabel('timestamp')
plt.ylabel('magnitude')
plt.legend(['Réalité terrain', 'Estimation'])
plt.title('Hirozon = {}'.format(horizon))
plt.grid()
# Sauvegarde du modèle avec l'état des hidden layers
model_online.save_model('reg_online.pt', attr_list=['hidden_lay']);
# Reload du modèle
model = MonRegressorOnline.load_model('reg_online.pt')