Source code for sparklightautoml.transformers.numeric

import logging
from typing import Optional, Dict, List

import numpy as np
from lightautoml.dataset.base import RolesDict
from lightautoml.dataset.roles import ColumnRole, NumericRole, CategoryRole
from lightautoml.transformers.numeric import numeric_check
from pyspark.ml import Transformer
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.sql import functions as sf
from pyspark.sql.types import FloatType, IntegerType

from sparklightautoml.mlwriters import CommonPickleMLReadable, CommonPickleMLWritable
from sparklightautoml.transformers.base import SparkBaseEstimator, SparkBaseTransformer
from sparklightautoml.utils import SparkDataFrame

logger = logging.getLogger(__name__)


[docs]class SparkNaNFlagsEstimator(SparkBaseEstimator): """ Estimator that calculate nan rate for input columns and build :class:`~sparklightautoml.transformers.numeric.SparkNaNFlagsTransformer`. """ _fit_checks = (numeric_check,) _transform_checks = () _fname_prefix = "nanflg"
[docs] def __init__( self, input_cols: List[str], input_roles: Dict[str, ColumnRole], do_replace_columns: bool = False, nan_rate: float = 0.005, ): """ Args: nan_rate: Nan rate cutoff. """ super().__init__( input_cols, input_roles, do_replace_columns=do_replace_columns, output_role=NumericRole(np.float32) ) self._nan_rate = nan_rate self._nan_cols: Optional[str] = None self.set(self.outputRoles, dict()) self.set(self.outputCols, [])
# self._features: Optional[List[str]] = None def _fit(self, sdf: SparkDataFrame) -> "Transformer": row = sdf.select([sf.mean(sf.isnan(c).astype(FloatType())).alias(c) for c in self.getInputCols()]).first() self._nan_cols = [ f"{self._fname_prefix}__{col}" for col, col_nan_rate in row.asDict(True).items() if col_nan_rate > self._nan_rate ] self.set(self.outputCols, self._nan_cols) self.set(self.outputRoles, {c: self._output_role for c in self._nan_cols}) return SparkNaNFlagsTransformer( input_cols=self.getInputCols(), input_roles=self.get_input_roles(), output_cols=self.getOutputCols(), output_roles=self.get_output_roles(), do_replace_columns=self.get_do_replace_columns(), )
[docs]class SparkNaNFlagsTransformer(SparkBaseTransformer, CommonPickleMLWritable, CommonPickleMLReadable): """Adds columns with nan flags (0 or 1) for input columns.""" _fit_checks = (numeric_check,) _transform_checks = () # TODO: the value is copied from the corresponding LAMA transformer. # TODO: it is better to be taken from shared module as a string constant _fname_prefix = "nanflg" # def __init__(self, # input_cols: List[str], # input_roles: RolesDict, # output_cols: List[str], # output_roles: RolesDict,): # super().__init__( # input_cols=input_cols, # output_cols=output_cols, # input_roles=input_roles, # output_roles=output_roles, # do_replace_columns=False) # self._nan_cols = nan_cols def _transform(self, sdf: SparkDataFrame) -> SparkDataFrame: new_cols = [ sf.isnan(in_c).astype(FloatType()).alias(out_c) for in_c, out_c in zip(self.getInputCols(), self.getOutputCols()) ] out_sdf = self._make_output_df(sdf, new_cols) return out_sdf
[docs]class SparkFillInfTransformer(SparkBaseTransformer, CommonPickleMLWritable, CommonPickleMLReadable): """Transformer that replace inf values to np.nan values in input columns.""" _fit_checks = (numeric_check,) _transform_checks = () _fname_prefix = "fillinf" def __init__(self, input_cols: List[str], input_roles: RolesDict, do_replace_columns=False): output_cols = [f"{self._fname_prefix}__{feat}" for feat in input_cols] super().__init__( input_cols=input_cols, output_cols=output_cols, input_roles=input_roles, output_roles={f: NumericRole(np.float32) for f in output_cols}, do_replace_columns=do_replace_columns, ) def _transform(self, df: SparkDataFrame) -> SparkDataFrame: def is_inf(col: str): return sf.col(col).isin([sf.lit("+Infinity").cast("double"), sf.lit("-Infinity").cast("double")]) new_cols = [ sf.when(is_inf(i), np.nan).otherwise(sf.col(i)).alias(f"{self._fname_prefix}__{i}") for i in self.getInputCols() ] out_df = self._make_output_df(df, cols_to_add=new_cols) return out_df
[docs]class SparkFillnaMedianEstimator(SparkBaseEstimator): """Fillna with median.""" _fit_checks = (numeric_check,) _transform_checks = () _fname_prefix = "fillnamed" def __init__( self, input_cols: List[str], input_roles: Dict[str, ColumnRole], do_replace_columns: bool = False, subsample: int = 1_000_000, seed: int = 42, ): super().__init__( input_cols, input_roles, do_replace_columns=do_replace_columns, output_role=NumericRole(np.float32) ) self._meds: Optional[Dict[str, float]] = None self._subsample = subsample self._seed = seed def _fit(self, sdf: SparkDataFrame) -> Transformer: """Approximately estimates medians. Args: sdf: SparkDataFrame with numerical features. Returns: Spark MLlib Transformer """ logger.debug(f"Starting to fit estimator {self}") total_number = sdf.count() if self._subsample > total_number: fraction = 1.0 else: fraction = self._subsample / total_number sdf = sdf.sample(fraction=fraction, seed=self._seed) logger.debug(f"Sample size: {sdf.count()}") row = ( sdf.select([sf.percentile_approx(c, 0.5).alias(c) for c in self.getInputCols()]) .select([sf.when(sf.isnan(c), 0).otherwise(sf.col(c)).alias(c) for c in self.getInputCols()]) .first() ) self._meds = row.asDict() logger.debug(f"Finished to fit estimator {self}") return SparkFillnaMedianTransformer( input_cols=self.getInputCols(), output_cols=self.getOutputCols(), input_roles=self.get_input_roles(), output_roles=self.get_output_roles(), meds=self._meds, do_replace_columns=self.get_do_replace_columns(), )
[docs]class SparkFillnaMedianTransformer(SparkBaseTransformer, CommonPickleMLWritable, CommonPickleMLReadable): """Fillna with median.""" _fit_checks = (numeric_check,) _transform_checks = () _fname_prefix = "fillnamed" def __init__( self, input_cols: List[str], output_cols: List[str], input_roles: RolesDict, output_roles: RolesDict, meds: Dict, do_replace_columns: bool = False, ): super().__init__( input_cols=input_cols, output_cols=output_cols, input_roles=input_roles, output_roles=output_roles, do_replace_columns=do_replace_columns, ) self._meds = meds def _transform(self, sdf: SparkDataFrame) -> SparkDataFrame: """Transform - fillna with medians. Args: sdf: SparkDataFrame of numerical features Returns: SparkDataFrame with replaced NaN with medians """ new_cols = [ sf.when(sf.isnan(c), self._meds[c]).otherwise(sf.col(c)).alias(f"{self._fname_prefix}__{c}") for c in self.getInputCols() ] out_sdf = self._make_output_df(sdf, new_cols) return out_sdf
[docs]class SparkLogOddsTransformer(SparkBaseTransformer, CommonPickleMLWritable, CommonPickleMLReadable): """Convert probs to logodds.""" _fit_checks = (numeric_check,) _transform_checks = () _fname_prefix = "logodds" _can_unwind_parents = False def __init__(self, input_cols: List[str], input_roles: RolesDict, do_replace_columns=False): out_cols = [f"{self._fname_prefix}__{feat}" for feat in input_cols] super().__init__( input_cols=input_cols, output_cols=out_cols, input_roles=input_roles, output_roles={f: NumericRole(np.float32) for f in out_cols}, do_replace_columns=do_replace_columns, ) def _transform(self, sdf: SparkDataFrame) -> SparkDataFrame: """Transform - convert num values to logodds. Args: sdf: SparkDataFrame dataset of categorical features. Returns: SparkDataFrame with encoded labels. """ new_cols = [] for i in self.getInputCols(): col = sf.when(sf.col(i) < 1e-7, 1e-7).when(sf.col(i) > 1 - 1e-7, 1 - 1e-7).otherwise(sf.col(i)) col = sf.log(col / (sf.lit(1) - col)) new_cols.append(col.alias(f"{self._fname_prefix}__{i}")) out_sdf = self._make_output_df(sdf, new_cols) return out_sdf
[docs]class SparkStandardScalerEstimator(SparkBaseEstimator): """Classic StandardScaler.""" _fit_checks = (numeric_check,) _transform_checks = () _fname_prefix = "scaler" def __init__(self, input_cols: List[str], input_roles: Dict[str, ColumnRole], do_replace_columns: bool = False): super().__init__( input_cols, input_roles, do_replace_columns=do_replace_columns, output_role=NumericRole(np.float32) ) self._means_and_stds: Optional[Dict[str, float]] = None def _fit(self, sdf: SparkDataFrame) -> Transformer: """Estimate means and stds. Args: sdf: SparkDataFrame of categorical features. Returns: StandardScalerTransformer instance """ means = [sf.mean(c).alias(f"mean_{c}") for c in self.getInputCols()] stds = [ sf.when(sf.stddev(c) == 0, 1).when(sf.isnan(sf.stddev(c)), 1).otherwise(sf.stddev(c)).alias(f"std_{c}") for c in self.getInputCols() ] self._means_and_stds = sdf.select(means + stds).first().asDict() return SparkStandardScalerTransformer( input_cols=self.getInputCols(), output_cols=self.getOutputCols(), input_roles=self.get_input_roles(), output_roles=self.get_output_roles(), means_and_stds=self._means_and_stds, do_replace_columns=self.get_do_replace_columns(), )
[docs]class SparkStandardScalerTransformer(SparkBaseTransformer, CommonPickleMLWritable, CommonPickleMLReadable): """Classic StandardScaler.""" _fit_checks = (numeric_check,) _transform_checks = () _fname_prefix = "scaler" def __init__( self, input_cols: List[str], output_cols: List[str], input_roles: RolesDict, output_roles: RolesDict, means_and_stds: Dict, do_replace_columns: bool = False, ): super().__init__( input_cols=input_cols, output_cols=output_cols, input_roles=input_roles, output_roles=output_roles, do_replace_columns=do_replace_columns, ) self._means_and_stds = means_and_stds def _transform(self, sdf: SparkDataFrame) -> SparkDataFrame: """Scale test data. Args: sdf: SparkDataFrame of numeric features. Returns: SparkDataFrame with encoded labels. """ new_cols = [] for c in self.getInputCols(): col = (sf.col(c) - self._means_and_stds[f"mean_{c}"]) / sf.lit(self._means_and_stds[f"std_{c}"]) new_cols.append(col.alias(f"{self._fname_prefix}__{c}")) out_sdf = self._make_output_df(sdf, new_cols) return out_sdf
[docs]class SparkQuantileBinningEstimator(SparkBaseEstimator): """Discretization of numeric features by quantiles.""" _fit_checks = (numeric_check,) _transform_checks = () _fname_prefix = "qntl" def __init__( self, input_cols: List[str], input_roles: Dict[str, ColumnRole], do_replace_columns: bool = False, nbins: int = 10, ): super().__init__( input_cols, input_roles, do_replace_columns=do_replace_columns, output_role=CategoryRole(np.int32, label_encoded=True), ) self._nbins = nbins self._bucketizer = None def _fit(self, sdf: SparkDataFrame) -> Transformer: qdisc = QuantileDiscretizer( numBucketsArray=[self._nbins for _ in self.getInputCols()], handleInvalid="keep", inputCols=self.getInputCols(), outputCols=self.getOutputCols(), ) self._bucketizer = qdisc.fit(sdf) return SparkQuantileBinningTransformer( self._nbins, self._bucketizer, input_cols=self.getInputCols(), input_roles=self.get_input_roles(), output_cols=self.getOutputCols(), output_roles=self.get_output_roles(), do_replace_columns=self.get_do_replace_columns(), )
[docs]class SparkQuantileBinningTransformer(SparkBaseTransformer, CommonPickleMLWritable, CommonPickleMLReadable): """Adds column with quantile bin number of input columns. Quantile bin number of column value calculated by `QuantileDiscretizer` in SparkQuantileBinningEstimator. """ _fit_checks = (numeric_check,) _transform_checks = () _fname_prefix = "qntl" def __init__( self, bins, bucketizer, input_cols: List[str], output_cols: List[str], input_roles: RolesDict, output_roles: RolesDict, do_replace_columns: bool = False, ): super().__init__( input_cols=input_cols, output_cols=output_cols, input_roles=input_roles, output_roles=output_roles, do_replace_columns=do_replace_columns, ) self._bins = bins self._bucketizer = bucketizer def _transform(self, sdf: SparkDataFrame) -> SparkDataFrame: new_cols = [ sf.when(sf.col(c).astype(IntegerType()) == sf.lit(self._bins), 0) .otherwise(sf.col(c).astype(IntegerType()) + 1) .alias(c) for c in self._bucketizer.getOutputCols() ] rest_cols = [c for c in sdf.columns if c not in self._bucketizer.getOutputCols()] intermediate_sdf = self._bucketizer.transform(sdf).select(*rest_cols, *new_cols) output_sdf = self._make_output_df(intermediate_sdf, cols_to_add=[]) return output_sdf