import functools
import logging
import random
import time
import warnings
from copy import copy
from typing import Dict, Optional, Tuple, Union, cast, List, Any
import lightgbm as lgb
import pandas as pd
import pyspark.sql.functions as sf
from lightautoml.ml_algo.tuning.base import Distribution, SearchSpace
from lightautoml.pipelines.selection.base import ImportanceEstimator
from lightautoml.utils.timer import TaskTimer
from lightautoml.validation.base import TrainValidIterator
from lightgbm import Booster
from pandas import Series
from pyspark.ml import Transformer, PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.util import MLWritable, MLReadable, MLWriter
from synapse.ml.lightgbm import (
LightGBMClassifier,
LightGBMRegressor,
LightGBMRegressionModel,
LightGBMClassificationModel,
)
from synapse.ml.onnx import ONNXModel
from sparklightautoml.dataset.base import SparkDataset
from sparklightautoml.dataset.roles import NumericVectorOrArrayRole
from sparklightautoml.ml_algo.base import SparkTabularMLAlgo, SparkMLModel, AveragingTransformer, \
ComputationalParameters
from sparklightautoml.mlwriters import (
LightGBMModelWrapperMLReader,
LightGBMModelWrapperMLWriter,
ONNXModelWrapperMLReader,
ONNXModelWrapperMLWriter,
)
from sparklightautoml.transformers.base import (
DropColumnsTransformer,
PredictionColsTransformer,
ProbabilityColsTransformer,
)
from sparklightautoml.utils import SparkDataFrame
from sparklightautoml.validation.base import SparkBaseTrainValidIterator, split_out_val
logger = logging.getLogger(__name__)
[docs]class LightGBMModelWrapper(Transformer, MLWritable, MLReadable):
"""Simple wrapper for `synapse.ml.lightgbm.[LightGBMRegressionModel|LightGBMClassificationModel]`
to fix issue with loading model from saved composite pipeline.
For more details see: https://github.com/microsoft/SynapseML/issues/614.
"""
def __init__(self, model: Union[LightGBMRegressionModel, LightGBMClassificationModel] = None) -> None:
super().__init__()
self.model = model
def write(self) -> MLWriter:
return LightGBMModelWrapperMLWriter(self)
[docs] @classmethod
def read(cls):
"""Returns an MLReader instance for this class."""
return LightGBMModelWrapperMLReader()
def _transform(self, dataset: SparkDataFrame) -> SparkDataFrame:
return self.model.transform(dataset)
[docs]class ONNXModelWrapper(Transformer, MLWritable, MLReadable):
"""Simple wrapper for `ONNXModel` to fix issue with loading model from saved composite pipeline.
For more details see: https://github.com/microsoft/SynapseML/issues/614.
"""
def __init__(self, model: ONNXModel = None) -> None:
super().__init__()
self.model = model
def write(self) -> MLWriter:
return ONNXModelWrapperMLWriter(self)
[docs] @classmethod
def read(cls):
"""Returns an MLReader instance for this class."""
return ONNXModelWrapperMLReader()
def _transform(self, dataset: SparkDataFrame) -> SparkDataFrame:
return self.model.transform(dataset)
[docs]class SparkBoostLGBM(SparkTabularMLAlgo, ImportanceEstimator):
"""Gradient boosting on decision trees from LightGBM library.
default_params: All available parameters listed in synapse.ml documentation:
- https://mmlspark.blob.core.windows.net/docs/0.9.5/pyspark/synapse.ml.lightgbm.html#module-synapse.ml.lightgbm.LightGBMClassifier
- https://mmlspark.blob.core.windows.net/docs/0.9.5/pyspark/synapse.ml.lightgbm.html#module-synapse.ml.lightgbm.LightGBMRegressor
freeze_defaults:
- ``True`` : params may be rewritten depending on dataset.
- ``False``: params may be changed only manually or with tuning.
timer: :class:`~lightautoml.utils.timer.Timer` instance or ``None``.
"""
_name: str = "LightGBM"
_default_params = {
"learningRate": 0.05,
"numLeaves": 128,
"featureFraction": 0.7,
"baggingFraction": 0.7,
"baggingFreq": 1,
"maxDepth": -1,
"minGainToSplit": 0.0,
"maxBin": 255,
"minDataInLeaf": 5,
# e.g. num trees
"numIterations": 3000,
"earlyStoppingRound": 50,
# for regression
"alpha": 1.0,
"lambdaL1": 0.0,
"lambdaL2": 0.0,
}
# mapping between metric name defined via SparkTask
# and metric names supported by LightGBM
_metric2lgbm = {
"binary": {"auc": "auc", "aupr": "areaUnderPR"},
"reg": {
"r2": "rmse",
"mse": "mse",
"mae": "mae",
},
"multiclass": {"crossentropy": "cross_entropy"},
}
def __init__(
self,
default_params: Optional[dict] = None,
freeze_defaults: bool = True,
timer: Optional[TaskTimer] = None,
optimization_search_space: Optional[dict] = None,
use_single_dataset_mode: bool = True,
max_validation_size: int = 10_000,
chunk_size: int = 4_000_000,
convert_to_onnx: bool = False,
mini_batch_size: int = 5000,
seed: int = 42,
parallelism: int = 1,
use_barrier_execution_mode: bool = False,
experimental_parallel_mode: bool = False,
persist_output_dataset: bool = True,
computations_settings: Optional[ComputationalParameters] = None
):
optimization_search_space = optimization_search_space if optimization_search_space else dict()
SparkTabularMLAlgo.__init__(self, default_params, freeze_defaults,
timer, optimization_search_space, persist_output_dataset, computations_settings)
self._probability_col_name = "probability"
self._prediction_col_name = "prediction"
self._raw_prediction_col_name = "raw_prediction"
self._assembler = None
self._drop_cols_transformer = None
self._use_single_dataset_mode = use_single_dataset_mode
self._max_validation_size = max_validation_size
self._seed = seed
self._models_feature_importances = []
self._chunk_size = chunk_size
self._convert_to_onnx = convert_to_onnx
self._mini_batch_size = mini_batch_size
self._parallelism = parallelism
self._use_barrier_execution_mode = use_barrier_execution_mode
self._experimental_parallel_mode = experimental_parallel_mode
def _infer_params(self, runtime_settings: Optional[Dict[str, Any]] = None) -> Tuple[dict, int]:
"""Infer all parameters in lightgbm format.
Returns:
Tuple (params, num_trees, early_stopping_rounds, verbose_eval, fobj, feval).
About parameters: https://lightgbm.readthedocs.io/en/latest/_modules/lightgbm/engine.html
"""
assert self.task is not None
# TODO: PARALLEL - validate runtime settings
task = self.task.name
params = copy(self.params)
if "isUnbalance" in params:
params["isUnbalance"] = True if params["isUnbalance"] == 1 else False
verbose_eval = 1
if task == "reg":
params["objective"] = "regression"
params["metric"] = self._metric2lgbm[task].get(self.task.metric_name, None)
elif task == "binary":
params["objective"] = "binary"
params["metric"] = self._metric2lgbm[task].get(self.task.metric_name, None)
elif task == "multiclass":
params["objective"] = "multiclass"
params["metric"] = "multiclass"
else:
raise ValueError(f"Unsupported task type: {task}")
if task != "reg":
if "alpha" in params:
del params["alpha"]
if "lambdaL1" in params:
del params["lambdaL1"]
if "lambdaL2" in params:
del params["lambdaL2"]
runtime_settings = runtime_settings or dict()
if 'num_tasks' in runtime_settings:
params["numTasks"] = runtime_settings['num_tasks']
if 'num_threads' in runtime_settings:
params["numThreads"] = runtime_settings['num_threads']
return params, verbose_eval
def init_params_on_input(self, train_valid_iterator: TrainValidIterator) -> dict:
self.task = train_valid_iterator.train.task
sds = cast(SparkDataset, train_valid_iterator.train)
rows_num = sds.data.count()
task = train_valid_iterator.train.task.name
suggested_params = copy(self.default_params)
if self.freeze_defaults:
# if user change defaults manually - keep it
return suggested_params
if task == "reg":
suggested_params = {
"learningRate": 0.05,
"numLeaves": 32,
"featureFraction": 0.9,
"baggingFraction": 0.9,
}
if rows_num <= 10000:
init_lr = 0.01
ntrees = 3000
es = 200
elif rows_num <= 20000:
init_lr = 0.02
ntrees = 3000
es = 200
elif rows_num <= 100000:
init_lr = 0.03
ntrees = 2000
es = 200
elif rows_num <= 300000:
init_lr = 0.04
ntrees = 2000
es = 100
else:
init_lr = 0.05
ntrees = 2000
es = 100
if rows_num > 300000:
suggested_params["numLeaves"] = 128 if task == "reg" else 244
elif rows_num > 100000:
suggested_params["numLeaves"] = 64 if task == "reg" else 128
elif rows_num > 50000:
suggested_params["numLeaves"] = 32 if task == "reg" else 64
# params['reg_alpha'] = 1 if task == 'reg' else 0.5
elif rows_num > 20000:
suggested_params["numLeaves"] = 32 if task == "reg" else 32
suggested_params["alpha"] = 0.5 if task == "reg" else 0.0
elif rows_num > 10000:
suggested_params["numLeaves"] = 32 if task == "reg" else 64
suggested_params["alpha"] = 0.5 if task == "reg" else 0.2
elif rows_num > 5000:
suggested_params["numLeaves"] = 24 if task == "reg" else 32
suggested_params["alpha"] = 0.5 if task == "reg" else 0.5
else:
suggested_params["numLeaves"] = 16 if task == "reg" else 16
suggested_params["alpha"] = 1 if task == "reg" else 1
suggested_params["learningRate"] = init_lr
suggested_params["numIterations"] = ntrees
suggested_params["earlyStoppingRound"] = es
if task != "reg":
if "alpha" in suggested_params:
del suggested_params["alpha"]
return suggested_params
def _get_default_search_spaces(self, suggested_params: Dict, estimated_n_trials: int) -> Dict:
"""Train on train dataset and predict on holdout dataset.
Args:.
suggested_params: suggested params
estimated_n_trials: Number of trials.
Returns:
Target predictions for valid dataset.
"""
assert self.task is not None
optimization_search_space = dict()
optimization_search_space["featureFraction"] = SearchSpace(
Distribution.UNIFORM,
low=0.5,
high=1.0,
)
optimization_search_space["numLeaves"] = SearchSpace(
Distribution.INTUNIFORM,
low=4,
high=255,
)
if self.task.name == "binary" or self.task.name == "multiclass":
optimization_search_space["isUnbalance"] = SearchSpace(Distribution.DISCRETEUNIFORM, low=0, high=1, q=1)
if estimated_n_trials > 30:
optimization_search_space["baggingFraction"] = SearchSpace(
Distribution.UNIFORM,
low=0.5,
high=1.0,
)
optimization_search_space["minSumHessianInLeaf"] = SearchSpace(
Distribution.LOGUNIFORM,
low=1e-3,
high=10.0,
)
if estimated_n_trials > 100:
if self.task.name == "reg":
optimization_search_space["alpha"] = SearchSpace(
Distribution.LOGUNIFORM,
low=1e-8,
high=10.0,
)
optimization_search_space["lambdaL1"] = SearchSpace(
Distribution.LOGUNIFORM,
low=1e-8,
high=10.0,
)
return optimization_search_space
def predict_single_fold(self, dataset: SparkDataset, model: PipelineModel) -> SparkDataFrame:
return model.transform(dataset.data)
def _do_convert_to_onnx(self, train: SparkDataset, ml_model):
logger.info("Model convert is started")
booster_model_str = ml_model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = self._convert_model(booster, len(train.features))
onnx_ml = ONNXModel().setModelPayload(model_payload_ml)
if train.task.name == "reg":
onnx_ml = (
onnx_ml.setDeviceType("CPU")
.setFeedDict({"input": f"{self._name}_vassembler_features"})
.setFetchDict({ml_model.getPredictionCol(): "variable"})
.setMiniBatchSize(self._mini_batch_size)
)
else:
onnx_ml = (
onnx_ml.setDeviceType("CPU")
.setFeedDict({"input": f"{self._name}_vassembler_features"})
.setFetchDict({ml_model.getProbabilityCol(): "probabilities", ml_model.getPredictionCol(): "label"})
.setMiniBatchSize(self._mini_batch_size)
)
logger.info("Model convert is ended")
return onnx_ml
def fit_predict_single_fold(self,
fold_prediction_column: str,
validation_column: str,
train: SparkDataset,
runtime_settings: Optional[Dict[str, Any]] = None) \
-> Tuple[SparkMLModel, SparkDataFrame, str]:
if self.task is None:
self.task = train.task
(params, verbose_eval) = self._infer_params(runtime_settings)
logger.info(f"Input cols for the vector assembler: {train.features}")
logger.info(f"Running lgb with the following params: {params}")
if train.task.name in ["binary", "multiclass"]:
params["rawPredictionCol"] = self._raw_prediction_col_name
params["probabilityCol"] = fold_prediction_column
params["predictionCol"] = self._prediction_col_name
params["isUnbalance"] = True
else:
params["predictionCol"] = fold_prediction_column
assert validation_column in train.data.columns, \
f"Validation column {validation_column} should be present in the data"
full_data = self._ensure_validation_size(train.data, validation_column)
# prepare assembler
if self._assembler is None:
self._assembler = VectorAssembler(
inputCols=train.features,
outputCol=f"{self._name}_vassembler_features",
handleInvalid="keep"
)
# assign a random port to decrease chances of allocating the same port from multiple instances
rand = random.Random(time.time_ns())
random_port = rand.randint(10_000, 50_000)
run_params = {
'featuresCol': self._assembler.getOutputCol(),
'labelCol': train.target_column,
'validationIndicatorCol': validation_column,
'verbosity': verbose_eval,
'useSingleDatasetMode': self._use_single_dataset_mode,
'useBarrierExecutionMode': self._use_barrier_execution_mode,
'isProvideTrainingMetric': True,
'chunkSize': self._chunk_size,
'defaultListenPort': random_port,
**params,
**({'alpha': 0.5, 'lambdaL1': 0.0, 'lambdaL2': 0.0} if train.task.name == "reg" else dict())
}
# build the booster
lgbm_booster = LightGBMRegressor if train.task.name == "reg" else LightGBMClassifier
lgbm = lgbm_booster(**run_params)
logger.info(f"Use single dataset mode: {lgbm.getUseSingleDatasetMode()}. NumThreads: {lgbm.getNumThreads()}")
logger.info(f"All lgbm booster params: {run_params}")
# fitting the model
ml_model = lgbm.fit(self._assembler.transform(full_data))
# handle the model
ml_model = self._do_convert_to_onnx(train, ml_model) if self._convert_to_onnx else ml_model
self._models_feature_importances.append(ml_model.getFeatureImportances(importance_type="gain"))
valid_data = split_out_val(full_data, validation_column)
# predict validation
val_pred = ml_model.transform(self._assembler.transform(valid_data))
val_pred = DropColumnsTransformer(
remove_cols=[],
optional_remove_cols=[self._prediction_col_name, self._probability_col_name, self._raw_prediction_col_name],
).transform(val_pred)
return ml_model, val_pred, fold_prediction_column
def fit(self, train_valid: SparkBaseTrainValidIterator):
logger.info("Starting LGBM fit")
self.fit_predict(train_valid)
logger.info("Finished LGBM fit")
def get_features_score(self) -> Series:
imp = functools.reduce(lambda acc, x: acc + pd.Series(x), self._models_feature_importances, 0)
# imp = 0
# for model_feature_impotances in self._models_feature_importances:
# imp += pd.Series(model_feature_impotances)
imp /= len(self._models_feature_importances)
def flatten_features(feat: str):
role = self.input_roles[feat]
if isinstance(role, NumericVectorOrArrayRole):
return [f"{feat}_pos_{i}" for i in range(role.size)]
return [feat]
index = [
ff
for feat in self._assembler.getInputCols()
for ff in flatten_features(feat)
]
result = Series(list(imp), index=index).sort_values(ascending=False)
return result
@staticmethod
def _convert_model(lgbm_model: Booster, input_size: int) -> bytes:
from onnxmltools.convert import convert_lightgbm
from onnxconverter_common.data_types import FloatTensorType
initial_types = [("input", FloatTensorType([-1, input_size]))]
onnx_model = convert_lightgbm(lgbm_model, initial_types=initial_types, target_opset=9)
return onnx_model.SerializeToString()
def _ensure_validation_size(self, full_data: SparkDataFrame, validation_column: str) -> SparkDataFrame:
# reduce validation size if it is too big
val_data_size = full_data.where(sf.col(validation_column).astype('int') == 1).count()
if val_data_size > self._max_validation_size:
logger.warning(f"Too big validation fold: {val_data_size}. "
f"Reducing its size down according to max_validation_size setting:"
f" {self._max_validation_size}")
full_data = full_data.where(
(sf.col(validation_column) != sf.lit(1)) |
(sf.rand(seed=self._seed) < sf.lit(self._max_validation_size / val_data_size))
)
# checking if there are no empty partitions that may lead to hanging
rows = (
full_data
.withColumn("__partition_id__", sf.spark_partition_id())
.groupby("__partition_id__").agg(
sf.sum(validation_column).alias("val_values"),
sf.count("*").alias("all_values")
)
.collect()
)
for row in rows:
if row["val_values"] == row["all_values"] or row["all_values"] == 0:
warnings.warn(f"Empty partition encountered: partition id - {row['__partition_id_']},"
f"validation values count in the partition - {row['val_values']}, "
f"all values count in the partition - {row['all_values']}")
raise ValueError(f"Empty partition encountered: partition id - {row['__partition_id_']},"
f"validation values count in the partition - {row['val_values']}, "
f"all values count in the partition - {row['all_values']}")
return full_data
def _build_transformer(self) -> Transformer:
avr = self._build_averaging_transformer()
if self._convert_to_onnx:
wrapped_models = [ONNXModelWrapper(m) for m in self.models]
else:
wrapped_models = [LightGBMModelWrapper(m) for m in self.models]
models: List[Transformer] = [
el
for m in wrapped_models
for el in [
m,
DropColumnsTransformer(
remove_cols=[],
optional_remove_cols=[
self._prediction_col_name,
self._probability_col_name,
self._raw_prediction_col_name,
],
),
]
]
if self._convert_to_onnx:
if self.task.name in ["binary", "multiclass"]:
models.append(
ProbabilityColsTransformer(
probability_cols=self._models_prediction_columns, num_classes=self.n_classes
)
)
else:
models.append(PredictionColsTransformer(prediction_cols=self._models_prediction_columns))
averaging_model = PipelineModel(stages=[
self._assembler,
*models,
avr,
self._build_vector_size_hint(self.prediction_feature, self.prediction_role)
])
return averaging_model
def _build_averaging_transformer(self) -> Transformer:
avr = AveragingTransformer(
self.task.name,
input_cols=self._models_prediction_columns,
output_col=self.prediction_feature,
remove_cols=[self._assembler.getOutputCol(), *self._models_prediction_columns],
convert_to_array_first=not (self.task.name == "reg"),
dim_num=self.n_classes,
)
return avr
[docs] def fit_predict(self, train_valid_iterator: SparkBaseTrainValidIterator) -> SparkDataset:
"""Fit and then predict accordig the strategy that uses train_valid_iterator.
If item uses more then one time it will
predict mean value of predictions.
If the element is not used in training then
the prediction will be ``numpy.nan`` for this item
Args:
train_valid_iterator: Classic cv-iterator.
Returns:
Dataset with predicted values.
"""
logger.info("Starting LGBM fit")
self.timer.start()
res = super().fit_predict(train_valid_iterator)
logger.info("Finished LGBM fit")
return res