Source code for schrodinger.application.matsci.mlearn.sklearn_json.regression

"""
# Third-party code. No Schrodinger Copyright.
"""

import numpy as np
import scipy as sp
from sklearn import dummy
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import _gb_losses
from sklearn.linear_model import Lasso
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Ridge
from sklearn.neural_network import MLPRegressor
from sklearn.svm import SVR
from sklearn.tree import DecisionTreeRegressor
from sklearn.tree._tree import Tree

from . import common
from . import csr


[docs]def serialize_linear_regressor(model): serialized_model = { 'meta': 'linear-regression', 'coef_': model.coef_.tolist(), 'intercept_': model.intercept_.tolist(), 'params': model.get_params() } return serialized_model
[docs]def deserialize_linear_regressor(model_dict): model = LinearRegression(**model_dict['params']) model.coef_ = np.array(model_dict['coef_']) model.intercept_ = np.array(model_dict['intercept_']) return model
[docs]def serialize_lasso_regressor(model): serialized_model = { 'meta': 'lasso-regression', 'coef_': model.coef_.tolist(), 'params': model.get_params() } if isinstance(model.n_iter_, int): serialized_model['n_iter_'] = model.n_iter_ else: serialized_model['n_iter_'] = model.n_iter_.tolist() if isinstance(model.n_iter_, float): serialized_model['intercept_'] = model.intercept_ else: serialized_model['intercept_'] = model.intercept_.tolist() return serialized_model
[docs]def deserialize_lasso_regressor(model_dict): model = Lasso(**model_dict['params']) model.coef_ = np.array(model_dict['coef_']) if isinstance(model_dict['n_iter_'], list): model.n_iter_ = np.array(model_dict['n_iter_']) else: model.n_iter_ = int(model_dict['n_iter_']) if isinstance(model_dict['intercept_'], list): model.intercept_ = np.array(model_dict['intercept_']) else: model.intercept_ = float(model_dict['intercept_']) return model
[docs]def serialize_ridge_regressor(model): serialized_model = { 'meta': 'ridge-regression', 'coef_': model.coef_.tolist(), 'params': model.get_params() } if model.n_iter_: serialized_model['n_iter_'] = model.n_iter_.tolist() if isinstance(model.n_iter_, float): serialized_model['intercept_'] = model.intercept_ else: serialized_model['intercept_'] = model.intercept_.tolist() return serialized_model
[docs]def deserialize_ridge_regressor(model_dict): model = Ridge(**model_dict['params']) model.coef_ = np.array(model_dict['coef_']) if 'n_iter_' in model_dict: model.n_iter_ = np.array(model_dict['n_iter_']) if isinstance(model_dict['intercept_'], list): model.intercept_ = np.array(model_dict['intercept_']) else: model.intercept_ = float(model_dict['intercept_']) return model
[docs]def serialize_svr(model): serialized_model = { 'meta': 'svr', 'class_weight_': model.class_weight_.tolist(), 'support_': model.support_.tolist(), 'n_support_': model.n_support_.tolist(), 'intercept_': model.intercept_.tolist(), 'probA_': model.probA_.tolist(), 'probB_': model.probB_.tolist(), '_intercept_': model._intercept_.tolist(), 'shape_fit_': model.shape_fit_, '_gamma': model._gamma, 'params': model.get_params() } if isinstance(model.support_vectors_, sp.sparse.csr_matrix): serialized_model['support_vectors_'] = csr.serialize_csr_matrix( model.support_vectors_) elif isinstance(model.support_vectors_, np.ndarray): serialized_model['support_vectors_'] = model.support_vectors_.tolist() if isinstance(model.dual_coef_, sp.sparse.csr_matrix): serialized_model['dual_coef_'] = csr.serialize_csr_matrix( model.dual_coef_) elif isinstance(model.dual_coef_, np.ndarray): serialized_model['dual_coef_'] = model.dual_coef_.tolist() if isinstance(model._dual_coef_, sp.sparse.csr_matrix): serialized_model['_dual_coef_'] = csr.serialize_csr_matrix( model._dual_coef_) elif isinstance(model._dual_coef_, np.ndarray): serialized_model['_dual_coef_'] = model._dual_coef_.tolist() return serialized_model
[docs]def deserialize_svr(model_dict): model = SVR(**model_dict['params']) model.shape_fit_ = model_dict['shape_fit_'] model._gamma = model_dict['_gamma'] model.class_weight_ = np.array(model_dict['class_weight_']).astype( np.float64) model.support_ = np.array(model_dict['support_']).astype(np.int32) model.n_support_ = np.array(model_dict['n_support_']).astype(np.int32) model.intercept_ = np.array(model_dict['intercept_']).astype(np.float64) model.probA_ = np.array(model_dict['probA_']).astype(np.float64) model.probB_ = np.array(model_dict['probB_']).astype(np.float64) model._intercept_ = np.array(model_dict['_intercept_']).astype(np.float64) if 'meta' in model_dict['support_vectors_'] and model_dict[ 'support_vectors_']['meta'] == 'csr': model.support_vectors_ = csr.deserialize_csr_matrix( model_dict['support_vectors_']) model._sparse = True else: model.support_vectors_ = np.array( model_dict['support_vectors_']).astype(np.float64) model._sparse = False if 'meta' in model_dict['dual_coef_'] and model_dict['dual_coef_'][ 'meta'] == 'csr': model.dual_coef_ = csr.deserialize_csr_matrix(model_dict['dual_coef_']) else: model.dual_coef_ = np.array(model_dict['dual_coef_']).astype(np.float64) if 'meta' in model_dict['_dual_coef_'] and model_dict['_dual_coef_'][ 'meta'] == 'csr': model._dual_coef_ = csr.deserialize_csr_matrix( model_dict['_dual_coef_']) else: model._dual_coef_ = np.array(model_dict['_dual_coef_']).astype( np.float64) return model
[docs]def serialize_tree(tree): serialized_tree = tree.__getstate__() # serialized_tree['nodes_dtype'] = serialized_tree['nodes'].dtype dtypes = serialized_tree['nodes'].dtype serialized_tree['nodes'] = serialized_tree['nodes'].tolist() serialized_tree['values'] = serialized_tree['values'].tolist() return serialized_tree, dtypes
[docs]def deserialize_tree(tree_dict, n_features, n_classes, n_outputs): tree_dict['nodes'] = [tuple(lst) for lst in tree_dict['nodes']] names = [ 'left_child', 'right_child', 'feature', 'threshold', 'impurity', 'n_node_samples', 'weighted_n_node_samples' ] tree_dict['nodes'] = np.array(tree_dict['nodes'], dtype=np.dtype({ 'names': names, 'formats': tree_dict['nodes_dtype'] })) tree_dict['values'] = np.array(tree_dict['values']) tree = Tree(n_features, np.array([n_classes], dtype=np.intp), n_outputs) tree.__setstate__(tree_dict) return tree
[docs]def serialize_decision_tree_regressor(model): tree, dtypes = serialize_tree(model.tree_) serialized_model = { 'meta': 'decision-tree-regression', 'feature_importances_': model.feature_importances_.tolist(), 'max_features_': model.max_features_, 'n_features_': model.n_features_, 'n_outputs_': model.n_outputs_, 'tree_': tree } # serialized_model. tree_dtypes = [] for i in range(0, len(dtypes)): tree_dtypes.append(dtypes[i].str) serialized_model['tree_']['nodes_dtype'] = tree_dtypes return serialized_model
[docs]def deserialize_decision_tree_regressor(model_dict): deserialized_decision_tree = DecisionTreeRegressor() deserialized_decision_tree.max_features_ = model_dict['max_features_'] deserialized_decision_tree.n_features_ = model_dict['n_features_'] deserialized_decision_tree.n_outputs_ = model_dict['n_outputs_'] tree = deserialize_tree(model_dict['tree_'], model_dict['n_features_'], 1, model_dict['n_outputs_']) deserialized_decision_tree.tree_ = tree return deserialized_decision_tree
[docs]def serialize_dummy_regressor(model): model.constant = model.constant_.tolist() return model.__dict__
[docs]def serialize_gradient_boosting_regressor(model): serialized_model = { 'meta': 'gb-regression', 'max_features_': model.max_features_, 'n_features_': model.n_features_, 'train_score_': model.train_score_.tolist(), 'params': model.get_params(), 'estimators_shape': list(model.estimators_.shape), 'estimators_': [] } if isinstance(model.init_, dummy.DummyRegressor): serialized_model['init_'] = serialize_dummy_regressor(model.init_) serialized_model['init_']['meta'] = 'dummy' elif isinstance(model.init_, str): serialized_model['init_'] = model.init_ if isinstance(model.loss_, _gb_losses.LeastSquaresError): serialized_model['loss_'] = 'ls' elif isinstance(model.loss_, _gb_losses.LeastAbsoluteError): serialized_model['loss_'] = 'lad' elif isinstance(model.loss_, _gb_losses.HuberLossFunction): serialized_model['loss_'] = 'huber' elif isinstance(model.loss_, _gb_losses.QuantileLossFunction): serialized_model['loss_'] = 'quantile' if 'priors' in model.init_.__dict__: serialized_model['priors'] = model.init_.priors.tolist() for tree in model.estimators_.reshape((-1,)): serialized_model['estimators_'].append( serialize_decision_tree_regressor(tree)) return serialized_model
[docs]def deserialize_gradient_boosting_regressor(model_dict): model = GradientBoostingRegressor(**model_dict['params']) trees = [ deserialize_decision_tree_regressor(tree) for tree in model_dict['estimators_'] ] model.estimators_ = np.array(trees).reshape(model_dict['estimators_shape']) if 'init_' in model_dict and model_dict['init_']['meta'] == 'dummy': model.init_ = dummy.DummyRegressor() model.init_.__dict__ = model_dict['init_'] model.init_.__dict__.pop('meta') model.train_score_ = np.array(model_dict['train_score_']) model.max_features_ = model_dict['max_features_'] model.n_features_ = model_dict['n_features_'] if model_dict['loss_'] == 'ls': model.loss_ = _gb_losses.LeastSquaresError(1) elif model_dict['loss_'] == 'lad': model.loss_ = _gb_losses.LeastAbsoluteError(1) elif model_dict['loss_'] == 'huber': model.loss_ = _gb_losses.HuberLossFunction(1) elif model_dict['loss_'] == 'quantile': model.loss_ = _gb_losses.QuantileLossFunction(1) if 'priors' in model_dict: model.init_.priors = np.array(model_dict['priors']) return model
[docs]def serialize_random_forest_regressor(model): serialized_model = { 'meta': 'rf-regression', 'max_depth': model.max_depth, 'min_samples_split': model.min_samples_split, 'min_samples_leaf': model.min_samples_leaf, 'min_weight_fraction_leaf': model.min_weight_fraction_leaf, 'max_features': model.max_features, 'max_leaf_nodes': model.max_leaf_nodes, 'min_impurity_decrease': model.min_impurity_decrease, 'min_impurity_split': model.min_impurity_split, 'n_features_': model.n_features_, 'n_outputs_': model.n_outputs_, 'estimators_': [ serialize_decision_tree_regressor(decision_tree) for decision_tree in model.estimators_ ], 'params': model.get_params() } common.serialize_params_random_state(serialized_model['params']) if 'oob_score_' in model.__dict__: serialized_model['oob_score_'] = model.oob_score_ if 'oob_decision_function_' in model.__dict__: serialized_model['oob_prediction_'] = model.oob_prediction_.tolist() return serialized_model
[docs]def deserialize_random_forest_regressor(model_dict): common.deserialize_params_random_state(model_dict['params']) model = RandomForestRegressor(**model_dict['params']) estimators = [ deserialize_decision_tree_regressor(decision_tree) for decision_tree in model_dict['estimators_'] ] model.estimators_ = np.array(estimators) model.n_features_ = model_dict['n_features_'] model.n_outputs_ = model_dict['n_outputs_'] model.max_depth = model_dict['max_depth'] model.min_samples_split = model_dict['min_samples_split'] model.min_samples_leaf = model_dict['min_samples_leaf'] model.min_weight_fraction_leaf = model_dict['min_weight_fraction_leaf'] model.max_features = model_dict['max_features'] model.max_leaf_nodes = model_dict['max_leaf_nodes'] model.min_impurity_decrease = model_dict['min_impurity_decrease'] model.min_impurity_split = model_dict['min_impurity_split'] if 'oob_score_' in model_dict: model.oob_score_ = model_dict['oob_score_'] if 'oob_prediction_' in model_dict: model.oob_prediction_ = np.array(model_dict['oob_prediction_']) return model
[docs]def serialize_mlp_regressor(model): serialized_model = { 'meta': 'mlp-regression', 'coefs_': model.coefs_, 'loss_': model.loss_, 'intercepts_': model.intercepts_, 'n_iter_': model.n_iter_, 'n_layers_': model.n_layers_, 'n_outputs_': model.n_outputs_, 'out_activation_': model.out_activation_, 'params': model.get_params() } return serialized_model
[docs]def deserialize_mlp_regressor(model_dict): model = MLPRegressor(**model_dict['params']) model.coefs_ = model_dict['coefs_'] model.loss_ = model_dict['loss_'] model.intercepts_ = model_dict['intercepts_'] model.n_iter_ = model_dict['n_iter_'] model.n_layers_ = model_dict['n_layers_'] model.n_outputs_ = model_dict['n_outputs_'] model.out_activation_ = model_dict['out_activation_'] return model