from typing import Optional, Union
import numpy as np
from lightautoml.dataset.roles import CategoryRole
from lightautoml.pipelines.selection.base import ImportanceEstimator
from sparklightautoml.dataset.base import SparkDataset
from sparklightautoml.pipelines.features.base import SparkFeaturesPipeline, SparkTabularDataFeatures
from sparklightautoml.transformers.base import (
SparkChangeRolesTransformer,
SparkUnionTransformer,
SparkSequentialTransformer,
SparkEstOrTrans,
)
# Same comments as for spark.pipelines.features.base
from sparklightautoml.transformers.categorical import SparkLabelEncoderEstimator
from sparklightautoml.transformers.numeric import (
SparkFillInfTransformer,
SparkFillnaMedianEstimator,
SparkLogOddsTransformer,
SparkNaNFlagsEstimator,
SparkStandardScalerEstimator,
)
[docs]class SparkLinearFeatures(SparkFeaturesPipeline, SparkTabularDataFeatures):
"""
Creates pipeline for linear models and nnets.
Includes:
- Create categorical intersections.
- OHE or embed idx encoding for categories.
- Other cats to numbers ways if defined in role params.
- Standartization and nan handling for numbers.
- Numbers discretization if needed.
- Dates handling.
- Handling probs (output of lower level models).
"""
[docs] def __init__(
self,
feats_imp: Optional[ImportanceEstimator] = None,
top_intersections: int = 5,
max_bin_count: int = 10,
max_intersection_depth: int = 3,
subsample: Optional[Union[int, float]] = None,
sparse_ohe: Union[str, bool] = "auto",
auto_unique_co: int = 50,
output_categories: bool = True,
multiclass_te_co: int = 3,
**_
):
"""
Args:
feats_imp: Features importances mapping.
top_intersections: Max number of categories
to generate intersections.
max_bin_count: Max number of bins to discretize numbers.
max_intersection_depth: Max depth of cat intersection.
subsample: Subsample to calc data statistics.
sparse_ohe: Should we output sparse if ohe encoding
was used during cat handling.
auto_unique_co: Switch to target encoding if high cardinality.
output_categories: Output encoded categories or embed idxs.
multiclass_te_co: Cutoff if use target encoding in cat handling
on multiclass task if number of classes is high.
"""
super().__init__(
multiclass_te_co=multiclass_te_co,
top_intersections=top_intersections,
max_intersection_depth=max_intersection_depth,
subsample=subsample,
feats_imp=feats_imp,
auto_unique_co=auto_unique_co,
output_categories=output_categories,
ascending_by_cardinality=True,
)
[docs] def create_pipeline(self, train: SparkDataset) -> SparkEstOrTrans:
"""Create linear pipeline.
Args:
train: Dataset with train features.
Returns:
Transformer.
"""
transformers_list = []
dense_list = []
sparse_list = []
probs_list = []
target_encoder = self.get_target_encoder(train)
te_list = dense_list if train.task.name == "reg" else probs_list
# handle categorical feats
# split categories by handling type. This pipe use 4 encodings - freq/label/target/ohe/ordinal
# 1 - separate freqs. It does not need label encoding
dense_list.append(self.get_freq_encoding(train))
# 2 - check 'auto' type (int is the same - no label encoded numbers in linear models)
auto = self._cols_by_role(train, "Category", encoding_type="auto") + self._cols_by_role(
train, "Category", encoding_type="int"
)
# if self.output_categories or target_encoder is None:
if target_encoder is None:
le = (
auto
+ self._cols_by_role(train, "Category", encoding_type="oof")
+ self._cols_by_role(train, "Category", encoding_type="ohe")
)
te = []
else:
te = self._cols_by_role(train, "Category", encoding_type="oof")
le = self._cols_by_role(train, "Category", encoding_type="ohe")
# split auto categories by unique values cnt
un_values = self.get_uniques_cnt(train, auto)
te = te + [x for x in un_values.index if un_values[x] > self.auto_unique_co]
le = le + list(set(auto) - set(te))
# get label encoded categories
sparse_list.append(self.get_categorical_raw(train, le))
# get target encoded categories
te_part = self.get_categorical_raw(train, te)
if te_part is not None:
target_encoder_stage = target_encoder(
input_cols=te_part.getOutputCols(),
input_roles=te_part.get_output_roles(),
task_name=train.task.name,
folds_column=train.folds_column,
target_column=train.target_column,
do_replace_columns=True,
)
te_part = SparkSequentialTransformer([te_part, target_encoder_stage])
te_list.append(te_part)
# get intersection of top categories
intersections = self.get_categorical_intersections(train)
if intersections is not None:
if target_encoder is not None:
target_encoder_stage = target_encoder(
input_cols=intersections.getOutputCols(),
input_roles=intersections.get_output_roles(),
task_name=train.task.name,
folds_column=train.folds_column,
target_column=train.target_column,
do_replace_columns=True,
)
ints_part = SparkSequentialTransformer([intersections, target_encoder_stage])
te_list.append(ints_part)
else:
sparse_list.append(intersections)
# add datetime seasonality
seas_cats = self.get_datetime_seasons(train, CategoryRole(np.int32))
if seas_cats is not None:
# sparse_list.append(SequentialTransformer([seas_cats, LabelEncoder()]))
label_encoder_stage = SparkLabelEncoderEstimator(
input_cols=seas_cats.getOutputCols(), input_roles=seas_cats.get_output_roles(), do_replace_columns=True
)
sparse_list.append(SparkSequentialTransformer([seas_cats, label_encoder_stage]))
# get quantile binning
sparse_list.append(self.get_binned_data(train))
# add numeric pipeline wo probs
dense_list.append(self.get_numeric_data(train, prob=False))
dense_list.append(self.get_numeric_vectors_data(train, prob=False))
# add ordinal categories
dense_list.append(self.get_ordinal_encoding(train))
# add probs
probs_list.append(self.get_numeric_data(train, prob=True))
probs_list.append(self.get_numeric_vectors_data(train, prob=True))
# add difference with base date
dense_list.append(self.get_datetime_diffs(train))
# combine it all together
# handle probs if exists
probs_list = [x for x in probs_list if x is not None]
if len(probs_list) > 0:
probs_pipe = SparkUnionTransformer(probs_list)
probs_pipe = SparkSequentialTransformer(
[
probs_pipe,
SparkLogOddsTransformer(
input_cols=probs_pipe.get_output_cols(),
input_roles=probs_pipe.get_output_roles(),
do_replace_columns=True,
),
]
)
dense_list.append(probs_pipe)
# handle dense
dense_list = [x for x in dense_list if x is not None]
if len(dense_list) > 0:
# standartize, fillna, add null flags
dense_pipe1 = SparkUnionTransformer(dense_list)
fill_inf_stage = SparkFillInfTransformer(
input_cols=dense_pipe1.get_output_cols(),
input_roles=dense_pipe1.get_output_roles(),
do_replace_columns=True,
)
fill_na_median_stage = SparkFillnaMedianEstimator(
input_cols=fill_inf_stage.getOutputCols(),
input_roles=fill_inf_stage.get_output_roles(),
do_replace_columns=True,
)
standerd_scaler_stage = SparkStandardScalerEstimator(
input_cols=fill_na_median_stage.getOutputCols(),
input_roles=fill_na_median_stage.get_output_roles(),
do_replace_columns=True,
)
dense_pipe = SparkSequentialTransformer(
[
dense_pipe1,
SparkUnionTransformer(
[
SparkNaNFlagsEstimator(
input_cols=dense_pipe1.get_output_cols(),
input_roles=dense_pipe1.get_output_roles(),
do_replace_columns=True,
),
SparkSequentialTransformer([fill_inf_stage, fill_na_median_stage, standerd_scaler_stage]),
]
),
]
)
transformers_list.append(dense_pipe)
# handle categories - cast to float32 if categories are inputs or make ohe
sparse_list = [x for x in sparse_list if x is not None]
if len(sparse_list) > 0:
sparse_pipe = SparkUnionTransformer(sparse_list)
if self.output_categories:
final = SparkChangeRolesTransformer(
input_cols=sparse_pipe.get_output_cols(),
input_roles=sparse_pipe.get_output_roles(),
role=CategoryRole(np.float32),
)
else:
assert target_encoder is not None, "Cannot process sparse features cause target encoder is None"
final = target_encoder(
input_cols=sparse_pipe.get_output_cols(),
input_roles=sparse_pipe.get_output_roles(),
task_name=train.task.name,
folds_column=train.folds_column,
target_column=train.target_column,
do_replace_columns=True,
)
# if self.sparse_ohe == "auto":
# final = SparkOHEEncoderEstimator(input_cols=sparse_pipe.get_output_cols(),
# input_roles=sparse_pipe.get_output_roles(),
# total_feats_cnt=train.shape[1])
# else:
# final = SparkOHEEncoderEstimator(input_cols=sparse_pipe.get_output_cols(),
# input_roles=sparse_pipe.get_output_roles(),
# make_sparse=self.sparse_ohe)
sparse_pipe = SparkSequentialTransformer([sparse_pipe, final])
transformers_list.append(sparse_pipe)
# final pipeline
union_all = SparkUnionTransformer(transformers_list)
return union_all