from copy import deepcopy
from typing import Optional, Union
import numpy as np
from lightautoml.dataset.roles import CategoryRole, NumericRole
from lightautoml.pipelines.selection.base import ImportanceEstimator
from sparklightautoml.dataset.base import SparkDataset, PersistenceManager
from sparklightautoml.pipelines.features.base import SparkFeaturesPipeline, SparkTabularDataFeatures
from sparklightautoml.transformers.base import (
SparkChangeRolesTransformer,
SparkUnionTransformer,
SparkSequentialTransformer,
SparkEstOrTrans,
)
from sparklightautoml.transformers.categorical import SparkOrdinalEncoderEstimator
from sparklightautoml.transformers.datetime import SparkTimeToNumTransformer
[docs]class SparkLGBSimpleFeatures(SparkFeaturesPipeline, SparkTabularDataFeatures):
"""Creates simple pipeline for tree based models.
Simple but is ok for select features.
Numeric stay as is, Datetime transforms to numeric.
Categorical label encoding.
Maps input to output features exactly one-to-one.
"""
def __init__(self):
super().__init__()
[docs] def create_pipeline(self, train: SparkDataset) -> Union[SparkUnionTransformer, SparkSequentialTransformer]:
"""Create tree pipeline.
Args:
train: Dataset with train features.
Returns:
Composite datetime, categorical, numeric transformer.
"""
transformers_list = []
# process categories
categories = self._cols_by_role(train, "Category")
if len(categories) > 0:
roles = {f: train.roles[f] for f in categories}
cat_processing = SparkOrdinalEncoderEstimator(
input_cols=categories, input_roles=roles, subs=None, random_state=42
)
transformers_list.append(cat_processing)
# process datetimes
datetimes = self._cols_by_role(train, "Datetime")
if len(datetimes) > 0:
roles = {f: train.roles[f] for f in datetimes}
dt_processing = SparkTimeToNumTransformer(input_cols=datetimes, input_roles=roles)
transformers_list.append(dt_processing)
transformers_list.append(self.get_numeric_data(train))
transformers_list.append(self.get_numeric_vectors_data(train))
union_all = SparkUnionTransformer([x for x in transformers_list if x is not None])
return union_all
[docs]class SparkLGBAdvancedPipeline(SparkFeaturesPipeline, SparkTabularDataFeatures):
"""Create advanced pipeline for trees based models.
Includes:
- Different cats and numbers handling according to role params.
- Dates handling - extracting seasons and create datediffs.
- Create categorical intersections.
"""
[docs] def __init__(
self,
feats_imp: Optional[ImportanceEstimator] = None,
top_intersections: int = 5,
max_intersection_depth: int = 3,
subsample: Optional[Union[int, float]] = None,
multiclass_te_co: int = 3,
auto_unique_co: int = 10,
output_categories: bool = False,
**kwargs
):
"""
Args:
feats_imp: Features importances mapping.
top_intersections: Max number of categories
to generate intersections.
max_intersection_depth: Max depth of cat intersection.
subsample: Subsample to calc data statistics.
multiclass_te_co: Cutoff if use target encoding in cat
handling on multiclass task if number of classes is high.
auto_unique_co: Switch to target encoding if high cardinality.
"""
super().__init__(
multiclass_te_co=multiclass_te_co,
top_intersections=top_intersections,
max_intersection_depth=max_intersection_depth,
subsample=subsample,
feats_imp=feats_imp,
auto_unique_co=auto_unique_co,
output_categories=output_categories,
ascending_by_cardinality=False,
)
[docs] def create_pipeline(self, train: SparkDataset) -> SparkEstOrTrans:
"""Create tree pipeline.
Args:
train: Dataset with train features.
Returns:
Transformer.
"""
features = train.features
roles = deepcopy(train.roles)
transformer_list = []
target_encoder = self.get_target_encoder(train)
output_category_role = (
CategoryRole(np.float32, label_encoded=True) if self.output_categories else NumericRole(np.float32)
)
# handle categorical feats
# split categories by handling type. This pipe use 3 encodings - freq/label/target/ordinal
# 1 - separate freqs. It does not need label encoding
stage = self.get_freq_encoding(train)
transformer_list.append(stage)
# 2 - check different target encoding parts and split (ohe is the same as auto - no ohe in gbm)
auto = self._cols_by_role(train, "Category", encoding_type="auto") + self._cols_by_role(
train, "Category", encoding_type="ohe"
)
if self.output_categories:
le = (
auto
+ self._cols_by_role(train, "Category", encoding_type="oof")
+ self._cols_by_role(train, "Category", encoding_type="int")
)
te = []
ordinal = None
else:
le = self._cols_by_role(train, "Category", encoding_type="int")
ordinal = self._cols_by_role(train, "Category", ordinal=True)
if target_encoder is not None:
te = self._cols_by_role(train, "Category", encoding_type="oof")
# split auto categories by unique values cnt
un_values = self.get_uniques_cnt(train, auto)
te = te + [x for x in un_values.index if un_values[x] > self.auto_unique_co]
ordinal = ordinal + list(set(auto) - set(te))
else:
te = []
ordinal = ordinal + auto + self._cols_by_role(train, "Category", encoding_type="oof")
ordinal = sorted(list(set(ordinal)))
# get label encoded categories
le_part = self.get_categorical_raw(train, le)
if le_part is not None:
# le_part = SequentialTransformer([le_part, ChangeRoles(output_category_role)])
change_roles_stage = SparkChangeRolesTransformer(
input_cols=le_part.getOutputCols(), input_roles=le_part.get_output_roles(), role=output_category_role
)
le_part = SparkSequentialTransformer([le_part, change_roles_stage])
transformer_list.append(le_part)
# get target encoded part
te_part = self.get_categorical_raw(train, te)
if te_part is not None:
target_encoder_stage = target_encoder(
input_cols=te_part.getOutputCols(),
input_roles=te_part.get_output_roles(),
task_name=train.task.name,
folds_column=train.folds_column,
target_column=train.target_column,
do_replace_columns=True,
)
te_part = SparkSequentialTransformer([te_part, target_encoder_stage])
transformer_list.append(te_part)
# get intersection of top categories
intersections = self.get_categorical_intersections(train)
if intersections is not None:
if target_encoder is not None:
target_encoder_stage = target_encoder(
input_cols=intersections.getOutputCols(),
input_roles=intersections.get_output_roles(),
task_name=train.task.name,
folds_column=train.folds_column,
target_column=train.target_column,
do_replace_columns=True,
)
ints_part = SparkSequentialTransformer([intersections, target_encoder_stage])
else:
change_roles_stage = SparkChangeRolesTransformer(
input_cols=intersections.getOutputCols(),
input_roles=intersections.get_output_roles(),
role=output_category_role,
)
ints_part = SparkSequentialTransformer([intersections, change_roles_stage])
transformer_list.append(ints_part)
transformer_list.append(self.get_numeric_data(train))
transformer_list.append(self.get_numeric_vectors_data(train))
transformer_list.append(self.get_ordinal_encoding(train, ordinal))
transformer_list.append(self.get_datetime_diffs(train))
transformer_list.append(self.get_datetime_seasons(train, NumericRole(np.float32)))
# final pipeline
union_all = SparkUnionTransformer([x for x in transformer_list if x is not None])
return union_all