"""Base AutoML class."""
import functools
import logging
import os
from copy import copy
from typing import Any, Callable, Tuple, cast, Union
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Sequence
from lightautoml.dataset.base import RolesDict
from lightautoml.reader.base import RolesDict
from lightautoml.utils.logging import set_stdout_level, verbosity_to_loglevel
from lightautoml.utils.timer import PipelineTimer
from pyspark.ml import PipelineModel, Transformer
from pyspark.sql.session import SparkSession
from .blend import SparkBlender, SparkBestModelSelector
from ..computations.builder import build_computations_manager
from ..computations.base import ComputationsManager, ComputationsSettings
from ..dataset.base import SparkDataset, PersistenceLevel, PersistenceManager
from ..dataset.persistence import PlainCachePersistenceManager
from ..pipelines.base import TransformerInputOutputRoles
from ..pipelines.features.base import SparkPipelineModel
from ..pipelines.ml.base import SparkMLPipeline
from ..reader.base import SparkToSparkReader
from ..utils import ColumnsSelectorTransformer, SparkDataFrame, get_current_session
from ..validation.base import SparkBaseTrainValidIterator, mark_as_train, mark_as_val
from ..validation.iterators import SparkFoldsIterator, SparkHoldoutIterator, SparkDummyIterator
logger = logging.getLogger(__name__)
# Either path/full url, or pyspark.sql.DataFrame
ReadableIntoSparkDf = Union[str, SparkDataFrame]
[docs]class SparkAutoML(TransformerInputOutputRoles):
"""Class for compile full pipeline of AutoML task.
AutoML steps:
- Read, analyze data and get inner
:class:`~lightautoml.dataset.base.LAMLDataset` from input
dataset: performed by reader.
- Create validation scheme.
- Compute passed ml pipelines from levels.
Each element of levels is list
of :class:`~lightautoml.pipelines.ml.base.MLPipelines`
prediction from current level are passed to next level
pipelines as features.
- Time monitoring - check if we have enough time to calc new pipeline.
- Blend last level models and prune useless pipelines
to speedup inference: performed by blender.
- Returns prediction on validation data.
If crossvalidation scheme is used,
out-of-fold prediction will returned.
If validation data is passed
it will return prediction on validation dataset.
In case of cv scheme when some point of train data
never was used as validation (ex. timeout exceeded
or custom cv iterator like
:class:`~lightautoml.validation.np_iterators.TimeSeriesIterator`
was used) NaN for this point will be returned.
Example:
Common usecase - create custom pipelines or presets.
>>> reader = SparkToSparkReader()
>>> pipe = SparkMLPipeline([SparkMLAlgo()])
>>> levels = [[pipe]]
>>> automl = SparkAutoML(reader, levels, )
>>> automl.fit_predict(data, roles={'target': 'TARGET'})
"""
[docs] def __init__(
self,
reader: Optional[SparkToSparkReader] = None,
levels: Optional[Sequence[Sequence[SparkMLPipeline]]] = None,
timer: Optional[PipelineTimer] = None,
blender: Optional[SparkBlender] = None,
skip_conn: bool = False,
return_all_predictions: bool = False,
computation_settings: Optional[ComputationsSettings] = ("no_parallelism", -1)
):
"""
Args:
reader: Instance of Reader class object that
creates :class:`~lightautoml.dataset.base.LAMLDataset`
from input data.
levels: List of list
of :class:`~lightautoml.pipelines.ml..base.MLPipelines`.
timer: Timer instance of
:class:`~lightautoml.utils.timer.PipelineTimer`.
Default - unlimited timer.
blender: Instance of Blender.
Default - :class:`~lightautoml.automl.blend.BestModelSelector`.
skip_conn: True if we should pass first level
input features to next levels.
Note:
There are several verbosity levels:
- `0`: No messages.
- `1`: Warnings.
- `2`: Info.
- `3`: Debug.
"""
super().__init__()
self.levels: Optional[Sequence[Sequence[SparkMLPipeline]]] = None
self._transformer = None
self._input_roles: Optional[RolesDict] = None
self._output_roles: Optional[RolesDict] = None
self._service_columns: Optional[List[str]] = None
self._persistence_manager: Optional[PersistenceManager] = None
if reader and levels:
self._initialize(reader, levels, timer, blender, skip_conn, return_all_predictions)
self._computations_manager: Optional[ComputationsManager] = \
build_computations_manager(computation_settings)
@property
def input_roles(self) -> Optional[RolesDict]:
return self._input_roles
@property
def output_roles(self) -> Optional[RolesDict]:
return self._output_roles
@property
def persistence_manager(self) -> Optional[PersistenceManager]:
return self._persistence_manager
def transformer(self,
return_all_predictions: bool = False,
add_array_attrs: bool = True,
leave_only_predict_cols: bool = False,
**reader_args) \
-> SparkPipelineModel:
if not return_all_predictions:
blender = [self.blender.transformer()]
output_roles = self.blender.output_roles
else:
blender = []
output_roles = {**(ml_pipe.output_roles for ml_pipe in self.levels[-1])}
sel_tr = ColumnsSelectorTransformer(
name="SparkAutoML",
input_cols=[SparkDataset.ID_COLUMN] + list(output_roles.keys()),
optional_cols=[self.reader.target_col] if self.reader.target_col else [],
)
stages = [
self.reader.transformer(add_array_attrs=add_array_attrs, **reader_args),
*(ml_pipe.transformer() for level in self.levels for ml_pipe in level),
*blender,
*([sel_tr] if leave_only_predict_cols else [])
]
return SparkPipelineModel(stages, input_roles=self.input_roles, output_roles=output_roles)
def _initialize(
self,
reader: SparkToSparkReader,
levels: Sequence[Sequence[SparkMLPipeline]],
timer: Optional[PipelineTimer] = None,
blender: Optional[SparkBlender] = None,
skip_conn: bool = False,
return_all_predictions: bool = False,
):
"""Same as __init__. Exists for delayed initialization in presets.
Args:
reader: Instance of Reader class object that
creates :class:`~lightautoml.dataset.base.LAMLDataset`
from input data.
levels: List of list
of :class:`~lightautoml.pipelines.ml..base.MLPipelines`.
timer: Timer instance of
:class:`~lightautoml.utils.timer.PipelineTimer`.
Default - unlimited timer.
blender: Instance of Blender.
Default - :class:`~lightautoml.automl.blend.BestModelSelector`.
skip_conn: True if we should pass first level
input features to next levels.
return_all_predictions: True if we should return all predictions from last
level models.
"""
assert len(levels) > 0, "At least 1 level should be defined"
self.timer = timer
if timer is None:
self.timer = PipelineTimer()
self.reader = reader
self._levels = levels
# default blender is - select best model and prune other pipes
self.blender = blender
if blender is None:
self.blender = SparkBestModelSelector()
# update model names
for i, lvl in enumerate(self._levels):
for j, pipe in enumerate(lvl):
pipe.upd_model_names("Lvl_{0}_Pipe_{1}".format(i, j))
self.skip_conn = skip_conn
self.return_all_predictions = return_all_predictions
[docs] def fit_predict(
self,
train_data: Any,
roles: dict,
train_features: Optional[Sequence[str]] = None,
cv_iter: Optional[Iterable] = None,
valid_data: Optional[Any] = None,
valid_features: Optional[Sequence[str]] = None,
verbose: int = 0,
persistence_manager: Optional[PersistenceManager] = None
) -> SparkDataset:
"""Fit on input data and make prediction on validation part.
Args:
train_data: Dataset to train.
roles: Roles dict.
train_features: Optional features names,
if cannot be inferred from train_data.
cv_iter: Custom cv iterator. For example,
:class:`~lightautoml.validation.np_iterators.TimeSeriesIterator`.
valid_data: Optional validation dataset.
valid_features: Optional validation dataset
features if can't be inferred from `valid_data`.
verbose: controls verbosity
Returns:
Predicted values.
"""
set_stdout_level(verbosity_to_loglevel(verbose))
self.timer.start()
self._persistence_manager = persistence_manager or PlainCachePersistenceManager()
train_dataset = self.reader.fit_read(train_data, train_features, roles,
persistence_manager=self._persistence_manager)
train_dataset = train_dataset.persist(level=PersistenceLevel.REGULAR)
assert (
len(self._levels) <= 1 or train_dataset.folds_column is not None
), "Not possible to fit more than 1 level without cv folds"
assert (
len(self._levels) <= 1 or valid_data is None
), "Not possible to fit more than 1 level with holdout validation"
if valid_data:
valid_dataset = self.reader.read(valid_data, valid_features, add_array_attrs=True)\
.persist(PersistenceLevel.READER)
else:
valid_dataset = None
train_valid = self._create_validation_iterator(train_dataset, valid_dataset, None, cv_iter=cv_iter)
pipes: List[SparkMLPipeline] = []
self.levels = []
level_ds: Optional[SparkDataset] = None
for leven_number, level in enumerate(self._levels, 1):
pipes = []
flg_last_level = leven_number == len(self._levels)
logger.info(
f"Layer \x1b[1m{leven_number}\x1b[0m train process start. Time left {self.timer.time_left:.2f} secs"
)
with train_valid.frozen() as frozen_train_valid:
pipes, all_pipes_predictions, flg_last_level = self._parallel_level(
level=level,
train_valid_iterator=frozen_train_valid
)
flg_last_level = flg_last_level or leven_number == len(self._levels)
logger.info("\x1b[1mLayer {} training completed.\x1b[0m\n".format(leven_number))
level_ds_name = f"all_piped_predictions_level_{leven_number}"
if flg_last_level:
level_ds = SparkDataset.concatenate(all_pipes_predictions, name=level_ds_name)
train_valid.unpersist()
break
self.levels.append(pipes)
level = [train_valid.get_validation_data(), *all_pipes_predictions] \
if self.skip_conn else all_pipes_predictions
name = f"{level_ds_name}_skip_conn" if self.skip_conn else level_ds_name
level_ds = SparkDataset.concatenate(level, name=name)
train_valid.unpersist(skip_val=self.skip_conn)
train_valid = self._create_validation_iterator(level_ds, None, n_folds=None, cv_iter=None)
blended_prediction, last_pipes = self.blender.fit_predict(level_ds, pipes)
self.levels.append(last_pipes)
del self._levels
oof_pred = level_ds if self.return_all_predictions else blended_prediction
self._input_roles = copy(train_dataset.roles)
self._output_roles = copy(oof_pred.roles)
self._service_columns = train_dataset.service_columns
return oof_pred
[docs] def predict(
self,
data: ReadableIntoSparkDf,
return_all_predictions: Optional[bool] = None,
add_reader_attrs: bool = False,
persistence_manager: Optional[PersistenceManager] = None
) -> SparkDataset:
"""Get dataset with predictions.
Almost same as :meth:`lightautoml.automl.base.AutoML.predict`
on new dataset, with additional features.
Additional features - working with different data formats.
Supported now:
- Path to ``.csv``, ``.parquet``, ``.feather`` files.
- :class:`~numpy.ndarray`, or dict of :class:`~numpy.ndarray`. For example,
``{'data': X...}``. In this case roles are optional,
but `train_features` and `valid_features` required.
- :class:`pandas.DataFrame`.
Parallel inference - you can pass ``n_jobs`` to speedup
prediction (requires more RAM).
Batch_inference - you can pass ``batch_size``
to decrease RAM usage (may be longer).
Args:
data: Dataset to perform inference.
features_names: Optional features names,
if cannot be inferred from `train_data`.
return_all_predictions: if True,
returns all model predictions from last level
add_reader_attrs: if True,
the reader's attributes will be added to the SparkDataset
Returns:
Dataset with predictions.
"""
persistence_manager = persistence_manager or PlainCachePersistenceManager()
automl_model = self.transformer(
return_all_predictions=return_all_predictions,
add_array_attrs=add_reader_attrs,
leave_only_predict_cols=True
)
data = self._read_data(data)
predictions = automl_model.transform(data)
sds = SparkDataset(
data=predictions,
roles=copy(automl_model.get_output_roles()),
task=self.reader.task,
persistence_manager=persistence_manager,
target=self.reader.target_col
)
return sds
[docs] def collect_used_feats(self) -> List[str]:
"""Get feats that automl uses on inference.
Returns:
Features names list.
"""
used_feats = set()
for lvl in self.levels:
for pipe in lvl:
used_feats.update(pipe.used_features)
used_feats = list(used_feats)
return used_feats
[docs] def collect_model_stats(self) -> Dict[str, int]:
"""Collect info about models in automl.
Returns:
Dict with models and its runtime numbers.
"""
model_stats = {}
for lvl in self.levels:
for pipe in lvl:
for ml_algo in pipe.ml_algos:
model_stats[ml_algo.name] = len(ml_algo.models)
return model_stats
def _create_validation_iterator(
self, train: SparkDataset, valid: Optional[SparkDataset], n_folds: Optional[int], cv_iter: Optional[Callable]
) -> SparkBaseTrainValidIterator:
if valid:
sdf = mark_as_train(train.data, SparkBaseTrainValidIterator.TRAIN_VAL_COLUMN)\
.unionByName(mark_as_val(valid.data, SparkBaseTrainValidIterator.TRAIN_VAL_COLUMN))
new_train = train.empty()
new_train.set_data(sdf, train.features, train.roles, dependencies=[train, valid])
new_train = new_train.persist(level=PersistenceLevel.REGULAR)
iterator = SparkHoldoutIterator(new_train)
elif cv_iter:
raise NotImplementedError("Not supported now")
elif train.folds_column:
train = train.persist(level=PersistenceLevel.REGULAR)
iterator = SparkFoldsIterator(train, n_folds)
else:
train = train.persist(level=PersistenceLevel.REGULAR)
iterator = SparkDummyIterator(train)
logger.info(f"Using train valid iterator of type: {type(iterator)}")
return iterator
def _get_service_columns(self) -> List[str]:
return self._service_columns
def _build_transformer(
self, no_reader: bool = False, return_all_predictions: bool = False
) -> Tuple[Transformer, RolesDict]:
stages = []
if not no_reader:
stages.append(self.reader.transformer(add_array_attrs=True))
ml_pipes = [ml_pipe.transformer() for level in self.levels for ml_pipe in level]
stages.extend(ml_pipes)
if not return_all_predictions:
stages.append(self.blender.transformer())
output_roles = self.blender.output_roles
else:
output_roles = dict()
for ml_pipe in self.levels[-1]:
output_roles.update(ml_pipe.output_roles)
sel_tr = ColumnsSelectorTransformer(
name="SparkAutoML",
input_cols=[SparkDataset.ID_COLUMN] + list(output_roles.keys()),
optional_cols=[self.reader.target_col] if self.reader.target_col else [],
)
stages.append(sel_tr)
automl_transformer = PipelineModel(stages=stages)
return automl_transformer, output_roles
def _read_data(self, data: ReadableIntoSparkDf, features: Optional[List[str]] = None) -> SparkDataFrame:
"""Get :class:`~pyspark.sql.DataFrame` from different data formats.
Note:
Supported now data formats:
- Path to ``.csv``, ``.parquet``, ``.feather`` files.
- :class:`~numpy.ndarray`, or dict of :class:`~numpy.ndarray`.
For example, ``{'data': X...}``. In this case,
roles are optional, but `train_features`
and `valid_features` required.
- :class:`pandas.DataFrame`.
Args:
data: Readable to DataFrame data.
features_names: Optional features names if ``numpy.ndarray``.
n_jobs: Number of processes to read file.
read_csv_params: Params to read csv file.
Returns:
Tuple with read data and new roles mapping.
"""
spark = get_current_session()
if isinstance(data, SparkDataFrame):
out_sdf = data
elif isinstance(data, str):
path = cast(str, data)
if path.endswith(".parquet"):
out_sdf = spark.read.parquet(path)
# return pd.read_parquet(data, columns=read_csv_params["usecols"]), None
elif path.endswith(".csv"):
csv_params = self._get_read_csv_params()
out_sdf = spark.read.csv(path, **csv_params)
else:
raise ValueError(f"Unsupported data format: {os.path.splitext(path)[1]}")
else:
raise ValueError("Input data format is not supported")
out_sdf = out_sdf.select(*features) if features is not None else out_sdf
return out_sdf
def _get_read_csv_params(self) -> Dict:
return {}
def _parallel_level(self,
level: Sequence[SparkMLPipeline],
train_valid_iterator: SparkBaseTrainValidIterator) \
-> Tuple[List[SparkMLPipeline], List[SparkDataset], bool]:
fit_tasks = [
functools.partial(_do_fit, ml_pipe, copy(train_valid_iterator))
for k, ml_pipe in enumerate(level)
]
results = self._computations_manager.compute(fit_tasks)
ml_pipes, ml_pipes_preds = [list(el) for el in zip(*results)]
if len(results) < len(level):
flg_last_level = True
elif self.timer.time_limit_exceeded() or self.timer.child_out_of_time:
# if all pipes have been fitted, but generally we don't have time anymore
flg_last_level = True
else:
flg_last_level = False
return ml_pipes, ml_pipes_preds, flg_last_level
def _do_fit(ml_pipe: SparkMLPipeline, iterator: SparkBaseTrainValidIterator) \
-> Optional[Tuple[SparkMLPipeline, SparkDataset]]:
pipe_predictions = cast(SparkDataset, ml_pipe.fit_predict(iterator)) \
.persist(level=PersistenceLevel.CHECKPOINT, force=True)
return ml_pipe, pipe_predictions