Source code for sparklightautoml.automl.presets.tabular_presets

import logging
import os
from copy import deepcopy, copy
from typing import Optional, Sequence, Iterable, Tuple, List

import numpy as np
from lightautoml.automl.presets.base import upd_params
from lightautoml.automl.presets.utils import plot_pdp_with_distribution
from lightautoml.pipelines.selection.base import ComposedSelector
from lightautoml.pipelines.selection.importance_based import ModelBasedImportanceEstimator, ImportanceCutoffSelector
from lightautoml.pipelines.selection.permutation_importance_based import NpIterativeFeatureSelector
from lightautoml.reader.tabular_batch_generator import ReadableToDf
from lightautoml.utils.timer import TaskTimer
from pyspark.ml import PipelineModel, Transformer
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf, Window
from pyspark.sql.types import DateType, StringType
from tqdm import tqdm

from sparklightautoml.automl.base import ReadableIntoSparkDf
from sparklightautoml.automl.blend import SparkWeightedBlender
from sparklightautoml.automl.presets.base import SparkAutoMLPreset
from sparklightautoml.automl.presets.utils import (
    calc_feats_permutation_imps,
    replace_dayofweek_in_date,
    replace_month_in_date,
    replace_year_in_date,
)
from sparklightautoml.computations.builder import AutoMLComputationsSettings, ComputationsManagerFactory
from sparklightautoml.dataset.base import SparkDataset, PersistenceManager
from sparklightautoml.dataset.persistence import PlainCachePersistenceManager
from sparklightautoml.ml_algo.boost_lgbm import SparkBoostLGBM
from sparklightautoml.ml_algo.linear_pyspark import SparkLinearLBFGS
from sparklightautoml.ml_algo.tuning.parallel_optuna import ParallelOptunaTuner
from sparklightautoml.pipelines.features.lgb_pipeline import SparkLGBSimpleFeatures, SparkLGBAdvancedPipeline
from sparklightautoml.pipelines.features.linear_pipeline import SparkLinearFeatures
from sparklightautoml.pipelines.ml.nested_ml_pipe import SparkNestedTabularMLPipeline
from sparklightautoml.pipelines.selection.base import BugFixSelectionPipelineWrapper
from sparklightautoml.pipelines.selection.base import SparkSelectionPipelineWrapper
from sparklightautoml.pipelines.selection.permutation_importance_based import SparkNpPermutationImportanceEstimator
from sparklightautoml.reader.base import SparkToSparkReader
from sparklightautoml.tasks.base import SparkTask
from sparklightautoml.utils import SparkDataFrame

logger = logging.getLogger(__name__)
base_dir = os.path.dirname(__file__)


[docs]class SparkTabularAutoML(SparkAutoMLPreset): """ Spark version of :class:`TabularAutoML <lightautoml.automl.presets.tabular_presets.TabularAutoML>`. Represent high level entity of spark lightautoml. Use this class to create automl instance. Example: >>> automl = SparkTabularAutoML( >>> spark=spark, >>> task=SparkTask('binary'), >>> general_params={"use_algos": [["lgb"]]}, >>> lgb_params={'use_single_dataset_mode': True}, >>> reader_params={"cv": cv, "advanced_roles": False} >>> ) >>> oof_predictions = automl.fit_predict( >>> train_data, >>> roles=roles >>> ) """ _default_config_path = "tabular_config.yml" # set initial runtime rate guess for first level models _time_scores = { "lgb": 1, "lgb_tuned": 3, "linear_l2": 0.7, "cb": 2, "cb_tuned": 6, } def __init__( self, spark: SparkSession, task: SparkTask, timeout: int = 3600, memory_limit: int = 16, cpu_limit: int = 4, gpu_ids: Optional[str] = "all", timing_params: Optional[dict] = None, config_path: Optional[str] = None, general_params: Optional[dict] = None, reader_params: Optional[dict] = None, read_csv_params: Optional[dict] = None, nested_cv_params: Optional[dict] = None, tuning_params: Optional[dict] = None, selection_params: Optional[dict] = None, lgb_params: Optional[dict] = None, cb_params: Optional[dict] = None, linear_l2_params: Optional[dict] = None, gbm_pipeline_params: Optional[dict] = None, linear_pipeline_params: Optional[dict] = None, persistence_manager: Optional[PersistenceManager] = None, computation_settings: AutoMLComputationsSettings = ("no_parallelism", -1) ): if config_path is None: config_path = os.path.join(base_dir, self._default_config_path) self._computation_managers_factory = computation_settings \ if isinstance(computation_settings, ComputationsManagerFactory) \ else ComputationsManagerFactory(computation_settings) super().__init__(task, timeout, memory_limit, cpu_limit, gpu_ids, timing_params, config_path, self._computation_managers_factory.get_ml_pipelines_manager()) self._persistence_manager = persistence_manager or PlainCachePersistenceManager() self._spark = spark # upd manual params for name, param in zip( [ "general_params", "reader_params", "read_csv_params", "nested_cv_params", "tuning_params", "selection_params", "lgb_params", "cb_params", "linear_l2_params", "gbm_pipeline_params", "linear_pipeline_params", ], [ general_params, reader_params, read_csv_params, nested_cv_params, tuning_params, selection_params, lgb_params, cb_params, linear_l2_params, gbm_pipeline_params, linear_pipeline_params, ], ): if param is None: param = {} self.__dict__[name] = upd_params(self.__dict__[name], param) def infer_auto_params(self, train_data: SparkDataFrame, multilevel_avail: bool = False): # infer optuna tuning iteration based on dataframe len if self.tuning_params["max_tuning_iter"] == "auto": if not train_data.is_cached: self.tuning_params["max_tuning_iter"] = 5 length = train_data.count() if length < 10000: self.tuning_params["max_tuning_iter"] = 100 elif length < 30000: self.tuning_params["max_tuning_iter"] = 50 elif length < 100000: self.tuning_params["max_tuning_iter"] = 10 else: self.tuning_params["max_tuning_iter"] = 5 if self.general_params["use_algos"] == "auto": self.general_params["use_algos"] = [["lgb", "lgb_tuned", "linear_l2", "cb", "cb_tuned"]] if self.task.name == "multiclass" and multilevel_avail: self.general_params["use_algos"].append(["linear_l2", "lgb"]) if not self.general_params["nested_cv"]: self.nested_cv_params["cv"] = 1 def get_time_score(self, n_level: int, model_type: str, nested: Optional[bool] = None): if nested is None: nested = self.general_params["nested_cv"] score = self._time_scores[model_type] mult = 1 if nested: if self.nested_cv_params["n_folds"] is not None: mult = self.nested_cv_params["n_folds"] else: mult = self.nested_cv_params["cv"] if n_level > 1: mult *= 0.8 if self.general_params["skip_conn"] else 0.1 score = score * mult # lower score for catboost on gpu if model_type in ["cb", "cb_tuned"] and self.cb_params["default_params"]["task_type"] == "GPU": score *= 0.5 return score def get_selector(self, n_level: Optional[int] = 1) -> SparkSelectionPipelineWrapper: selection_params = self.selection_params # lgb_params lgb_params = deepcopy(self.lgb_params) lgb_params["default_params"] = { **lgb_params["default_params"], **{"featureFraction": 1}, } mode = selection_params["mode"] # create pre selection based on mode pre_selector = None if mode > 0: # if we need selector - define model # timer will be useful to estimate time for next gbm runs time_score = self.get_time_score(n_level, "lgb", False) sel_timer_0 = self.timer.get_task_timer("lgb", time_score) selection_feats = SparkLGBSimpleFeatures() selection_gbm = SparkBoostLGBM(timer=sel_timer_0, **lgb_params) selection_gbm.set_prefix("Selector") if selection_params["importance_type"] == "permutation": importance = SparkNpPermutationImportanceEstimator() else: importance = ModelBasedImportanceEstimator() pre_selector = ImportanceCutoffSelector( selection_feats, selection_gbm, importance, cutoff=selection_params["cutoff"], fit_on_holdout=selection_params["fit_on_holdout"], ) if mode == 2: time_score = self.get_time_score(n_level, "lgb", False) sel_timer_1 = self.timer.get_task_timer("lgb", time_score) selection_feats = SparkLGBSimpleFeatures() selection_gbm = SparkBoostLGBM(timer=sel_timer_1, **lgb_params) selection_gbm.set_prefix("Selector") importance = SparkNpPermutationImportanceEstimator( computations_settings=self._computation_managers_factory.get_selector_manager() ) extra_selector = NpIterativeFeatureSelector( selection_feats, selection_gbm, importance, feature_group_size=selection_params["feature_group_size"], max_features_cnt_in_result=selection_params["max_features_cnt_in_result"], ) pre_selector = ComposedSelector([pre_selector, extra_selector]) pre_selector = BugFixSelectionPipelineWrapper(pre_selector) return SparkSelectionPipelineWrapper(pre_selector) def get_linear( self, n_level: int = 1, pre_selector: Optional[SparkSelectionPipelineWrapper] = None ) -> SparkNestedTabularMLPipeline: # linear model with l2 time_score = self.get_time_score(n_level, "linear_l2") linear_l2_timer = self.timer.get_task_timer("reg_l2", time_score) linear_l2_params = { **self.linear_l2_params } linear_l2_model = SparkLinearLBFGS( timer=linear_l2_timer, computations_settings=self._computation_managers_factory.get_linear_manager(), **linear_l2_params ) linear_l2_feats = SparkLinearFeatures( output_categories=True, **self.linear_pipeline_params ) linear_l2_pipe = SparkNestedTabularMLPipeline( [linear_l2_model], force_calc=True, pre_selection=pre_selector, features_pipeline=linear_l2_feats, computations_settings=self._computation_managers_factory.get_ml_algo_manager(), **self.nested_cv_params, ) return linear_l2_pipe def get_gbms( self, keys: Sequence[str], n_level: int = 1, pre_selector: Optional[SparkSelectionPipelineWrapper] = None, ): gbm_feats = SparkLGBAdvancedPipeline(**self.gbm_pipeline_params) ml_algos = [] force_calc = [] for key, force in zip(keys, [True, False, False, False]): tuned = "_tuned" in key algo_key = key.split("_")[0] time_score = self.get_time_score(n_level, key) gbm_timer = self.timer.get_task_timer(algo_key, time_score) gbm_model, lgb_params = self._get_boosting_model(algo_key, gbm_timer) if tuned: gbm_model.set_prefix("Tuned") tuner_comp_manager = self._computation_managers_factory.get_tuning_manager() gbm_tuner = ParallelOptunaTuner( n_trials=self.tuning_params["max_tuning_iter"], timeout=self.tuning_params["max_tuning_time"], fit_on_holdout=self.tuning_params["fit_on_holdout"], parallelism=tuner_comp_manager.parallelism, computations_manager=tuner_comp_manager ) gbm_model = (gbm_model, gbm_tuner) ml_algos.append(gbm_model) force_calc.append(force) gbm_pipe = SparkNestedTabularMLPipeline( ml_algos, force_calc, pre_selection=pre_selector, features_pipeline=gbm_feats, computations_settings=self._computation_managers_factory.get_ml_algo_manager(), **self.nested_cv_params ) return gbm_pipe def _get_boosting_model(self, algo_key: str, gbm_timer: Optional[TaskTimer] = None): if algo_key == "lgb": lgb_params = { **self.lgb_params } gbm_model = SparkBoostLGBM( timer=gbm_timer, computations_settings=self._computation_managers_factory.get_lgb_manager(), **lgb_params ) return gbm_model, lgb_params elif algo_key == "cb": raise NotImplementedError("Not supported yet") else: raise ValueError("Wrong algo key")
[docs] def create_automl(self, **fit_args): """Create basic automl instance. Args: **fit_args: Contain all information needed for creating automl. """ train_data = fit_args["train_data"] multilevel_avail = fit_args["valid_data"] is None and fit_args["cv_iter"] is None self.infer_auto_params(train_data, multilevel_avail) reader = SparkToSparkReader(task=self.task, **self.reader_params) pre_selector = self.get_selector() levels = [] for n, names in enumerate(self.general_params["use_algos"]): lvl = [] # regs if "linear_l2" in names: selector = None if "linear_l2" in self.selection_params["select_algos"] and ( self.general_params["skip_conn"] or n == 0 ): selector = pre_selector lvl.append(self.get_linear(n + 1, selector)) gbm_models = [ x for x in ["lgb", "lgb_tuned", "cb", "cb_tuned"] if x in names and x.split("_")[0] in self.task.losses ] if len(gbm_models) > 0: selector = None if "gbm" in self.selection_params["select_algos"] and (self.general_params["skip_conn"] or n == 0): selector = pre_selector lvl.append(self.get_gbms(gbm_models, n + 1, selector)) levels.append(lvl) # blend everything blender = SparkWeightedBlender(max_nonzero_coef=self.general_params["weighted_blender_max_nonzero_coef"]) # initialize self._initialize( reader, levels, skip_conn=self.general_params["skip_conn"], blender=blender, return_all_predictions=self.general_params["return_all_predictions"], timer=self.timer, )
def _get_read_csv_params(self): try: cols_to_read = self.reader.used_features numeric_dtypes = { x: self.reader.roles[x].dtype for x in self.reader.roles if self.reader.roles[x].name == "Numeric" } except AttributeError: cols_to_read = [] numeric_dtypes = {} # cols_to_read is empty if reader is not fitted if len(cols_to_read) == 0: cols_to_read = None read_csv_params = copy(self.read_csv_params) read_csv_params = { **read_csv_params, **{"usecols": cols_to_read, "dtype": numeric_dtypes}, } return read_csv_params
[docs] def fit_predict( self, train_data: ReadableIntoSparkDf, roles: Optional[dict] = None, train_features: Optional[Sequence[str]] = None, cv_iter: Optional[Iterable] = None, valid_data: Optional[ReadableIntoSparkDf] = None, valid_features: Optional[Sequence[str]] = None, log_file: str = None, verbose: int = 0, persistence_manager: Optional[PersistenceManager] = None ) -> SparkDataset: """Fit and get prediction on validation dataset. Almost same as :meth:`lightautoml.automl.base.AutoML.fit_predict`. Additional features - working with different data formats. Supported now: - Path to ``.csv``, ``.parquet``, ``.feather`` files. - :class:`~numpy.ndarray`, or dict of :class:`~numpy.ndarray`. For example, ``{'data': X...}``. In this case, roles are optional, but `train_features` and `valid_features` required. - :class:`pandas.DataFrame`. Args: train_data: Dataset to train. roles: Roles dict. train_features: Optional features names, if can't be inferred from `train_data`. cv_iter: Custom cv-iterator. For example, :class:`~lightautoml.validation.np_iterators.TimeSeriesIterator`. valid_data: Optional validation dataset. valid_features: Optional validation dataset features if cannot be inferred from `valid_data`. verbose: Controls the verbosity: the higher, the more messages. <1 : messages are not displayed; >=1 : the computation process for layers is displayed; >=2 : the information about folds processing is also displayed; >=3 : the hyperparameters optimization process is also displayed; >=4 : the training process for every algorithm is displayed; log_file: Filename for writing logging messages. If log_file is specified, the messages will be saved in a the file. If the file exists, it will be overwritten. Returns: Dataset with predictions. Call ``.data`` to get predictions array. """ # roles may be none in case of train data is set {'data': np.ndarray, 'target': np.ndarray ...} self.set_logfile(log_file) if roles is None: roles = {} train = self._read_data(train_data, train_features) # if upd_roles: # roles = {**roles, **upd_roles} if valid_data is not None: valid_data = self._read_data(valid_data, valid_features) oof_pred = super().fit_predict( train, roles=roles, cv_iter=cv_iter, valid_data=valid_data, verbose=verbose, persistence_manager=persistence_manager ) return oof_pred
def get_feature_scores( self, calc_method: str = "fast", data: Optional[ReadableIntoSparkDf] = None, features_names: Optional[Sequence[str]] = None, silent: bool = True, ): if calc_method == "fast": for level in self.levels: for pipe in level: fi = pipe.pre_selection.get_features_score() if fi is not None: used_feats = set(self.collect_used_feats()) fi = fi.reset_index() fi.columns = ["Feature", "Importance"] return fi[fi["Feature"].map(lambda x: x in used_feats)] else: if not silent: logger.info2("No feature importances to show. Please use another calculation method") return None if calc_method != "accurate": if not silent: logger.info2( "Unknown calc_method. " + "Currently supported methods for feature importances calculation are 'fast' and 'accurate'." ) return None if data is None: if not silent: logger.info2("Data parameter is not setup for accurate calculation method. Aborting...") return None read_csv_params = self._get_read_csv_params() data = self._read_data(data, features_names, read_csv_params) used_feats = self.collect_used_feats() fi = calc_feats_permutation_imps( self, used_feats, data, self.task.get_dataset_metric(), silent=silent, ) return fi @staticmethod def get_histogram(data: SparkDataFrame, column: str, n_bins: int) -> Tuple[List, np.ndarray]: assert n_bins >= 2, "n_bins must be equal 2 or more" bin_edges, counts = ( data.select(sf.col(column).cast("double")) .where(sf.col(column).isNotNull()) .rdd.map(lambda x: x[0]) .histogram(n_bins) ) bin_edges = np.array(bin_edges) return counts, bin_edges
[docs] @staticmethod def get_pdp_data_numeric_feature( df: SparkDataFrame, feature_name: str, model: Transformer, prediction_col: str, n_bins: int, ice_fraction: float = 1.0, ice_fraction_seed: int = 42, ) -> Tuple[List, List, List]: """Returns `grid`, `ys` and `counts` calculated on input numeric column to plot PDP. Args: df (SparkDataFrame): Spark DataFrame with `feature_name` column feature_name (str): feature column name model (PipelineModel): Spark Pipeline Model prediction_col (str): prediction column to be created by the `model` n_bins (int): The number of bins to produce. Raises exception if n_bins < 2. ice_fraction (float, optional): What fraction of the input dataframe will be used to make predictions. Useful for very large dataframe. Defaults to 1.0. ice_fraction_seed (int, optional): Seed for `ice_fraction`. Defaults to 42. Returns: Tuple[List, List, List]: `grid` is list of categories, `ys` is list of predictions by category, `counts` is numbers of values by category """ counts, bin_edges = SparkTabularAutoML.get_histogram(df, feature_name, n_bins) grid = (bin_edges[:-1] + bin_edges[1:]) / 2 ys = [] sample_df = ( df.select(*[c for c in df.columns if c != feature_name]) .sample(fraction=ice_fraction, seed=ice_fraction_seed) .cache() ) for i in tqdm(grid): # replace feature column values with constant sdf = sample_df.select("*", sf.lit(i).alias(feature_name)) # infer via transformer preds = model.transform(sdf) # TODO: SPARK-LAMA remove this line after passing the "prediction_col" parameter prediction_col = next(c for c in preds.columns if c.startswith("prediction")) preds = np.array(preds.select(prediction_col).collect()) # when preds.shape is (n, 1, k) we change it to (n, k), # where n is number of rows and k is number of classes if len(preds.shape) == 3: preds = np.squeeze(preds, axis=1) ys.append(preds) sample_df.unpersist() return grid, ys, counts
[docs] @staticmethod def get_pdp_data_categorical_feature( df: SparkDataFrame, feature_name: str, model: Transformer, prediction_col: str, n_top_cats: int, ice_fraction: float = 1.0, ice_fraction_seed: int = 42, ) -> Tuple[List, List, List]: """Returns `grid`, `ys` and `counts` calculated on input categorical column to plot PDP. Args: df (SparkDataFrame): Spark DataFrame with `feature_name` column feature_name (str): feature column name model (PipelineModel): Spark Pipeline Model prediction_col (str): prediction column to be created by the `model` n_top_cats (int): param to selection top n categories ice_fraction (float, optional): What fraction of the input dataframe will be used to make predictions. Useful for very large dataframe. Defaults to 1.0. ice_fraction_seed (int, optional): Seed for `ice_fraction`. Defaults to 42. Returns: Tuple[List, List, List]: `grid` is list of categories, `ys` is list of predictions by category, `counts` is numbers of values by category """ feature_cnt = ( df.where(sf.col(feature_name).isNotNull()).groupBy(feature_name).count().orderBy(sf.desc("count")).collect() ) grid = [row[feature_name] for row in feature_cnt[:n_top_cats]] counts = [row["count"] for row in feature_cnt[:n_top_cats]] ys = [] sample_df = ( df.select(*[c for c in df.columns if c != feature_name]) .sample(fraction=ice_fraction, seed=ice_fraction_seed) .cache() ) for i in tqdm(grid): sdf = sample_df.select("*", sf.lit(i).alias(feature_name)) preds = model.transform(sdf) # TODO: SPARK-LAMA remove this line after passing the "prediction_col" parameter prediction_col = next(c for c in preds.columns if c.startswith("prediction")) preds = np.array(preds.select(prediction_col).collect()) # when preds.shape is (n, 1, k) we change it to (n, k), # where n is number of rows and k is number of classes if len(preds.shape) == 3: preds = np.squeeze(preds, axis=1) ys.append(preds) if len(feature_cnt) > n_top_cats: # unique other categories unique_other_categories = [row[feature_name] for row in feature_cnt[n_top_cats:]] # get non-top categories, natural distributions is important here w = Window().orderBy(sf.lit("A")) # window without sorting other_categories_collection = ( df.select(feature_name) .filter(sf.col(feature_name).isin(unique_other_categories)) .select(sf.row_number().over(w).alias("row_num"), feature_name) .collect() ) # dict with key=%row number% and value=%category% other_categories_dict = {x["row_num"]: x[feature_name] for x in other_categories_collection} max_row_num = len(other_categories_collection) def get_category_by_row_num(row_num): remainder = row_num % max_row_num if remainder == 0: key = max_row_num else: key = remainder return other_categories_dict[key] get_category_udf = sf.udf(get_category_by_row_num, StringType()) # add row number to main dataframe and exclude feature_name column sdf = sample_df.select("*", sf.row_number().over(w).alias("row_num")) all_columns_except_row_num = [f for f in sdf.columns if f != "row_num"] feature_col = get_category_udf(sf.col("row_num")).alias(feature_name) # exclude row number from dataframe # and add back feature_name column filled with other categories same distribution sdf = sdf.select(*all_columns_except_row_num, feature_col) preds = model.transform(sdf) preds = np.array(preds.select(prediction_col).collect()) # when preds.shape is (n, 1, k) we change it to (n, k), # where n is number of rows and k is number of classes if len(preds.shape) == 3: preds = np.squeeze(preds, axis=1) grid.append("<OTHER>") ys.append(preds) counts.append(sum([row["count"] for row in feature_cnt[n_top_cats:]])) sample_df.unpersist() return grid, ys, counts
[docs] @staticmethod def get_pdp_data_datetime_feature( df: SparkDataFrame, feature_name: str, model: Transformer, prediction_col: str, datetime_level: str, reader, ice_fraction: float = 1.0, ice_fraction_seed: int = 42, ) -> Tuple[List, List, List]: """Returns `grid`, `ys` and `counts` calculated on input datetime column to plot PDP. Args: df (SparkDataFrame): Spark DataFrame with `feature_name` column feature_name (str): feature column name model (PipelineModel): Spark Pipeline Model prediction_col (str): prediction column to be created by the `model` datetime_level (str): Unit of time that will be modified to calculate dependence: "year", "month" or "dayofweek" reader (_type_): Automl reader to transform input dataframe before `model` inferring. ice_fraction (float, optional): What fraction of the input dataframe will be used to make predictions. Useful for very large dataframe. Defaults to 1.0. ice_fraction_seed (int, optional): Seed for `ice_fraction`. Defaults to 42. Returns: Tuple[List, List, List]: `grid` is list of categories, `ys` is list of predictions by category, `counts` is numbers of values by category """ df = reader.read(df).data if datetime_level == "year": feature_cnt = df.groupBy(sf.year(feature_name).alias("year")).count().orderBy(sf.asc("year")).collect() grid = [x["year"] for x in feature_cnt] counts = [row["count"] for row in feature_cnt] replace_date_element_udf = sf.udf(replace_year_in_date, DateType()) elif datetime_level == "month": feature_cnt = df.groupBy(sf.month(feature_name).alias("month")).count().orderBy(sf.asc("month")).collect() grid = np.arange(1, 13) grid = grid.tolist() counts = [0] * 12 for row in feature_cnt: counts[row["month"] - 1] = row["count"] replace_date_element_udf = sf.udf(replace_month_in_date, DateType()) else: feature_cnt = ( df.groupBy(sf.dayofweek(feature_name).alias("dayofweek")).count().orderBy(sf.asc("dayofweek")).collect() ) grid = np.arange(7) grid = grid.tolist() counts = [0] * 7 for row in feature_cnt: counts[row["dayofweek"] - 1] = row["count"] replace_date_element_udf = sf.udf(replace_dayofweek_in_date, DateType()) ys = [] sample_df = df.sample(fraction=ice_fraction, seed=ice_fraction_seed).cache() for i in tqdm(grid): feature_col = replace_date_element_udf(sf.col(feature_name), sf.lit(i)).alias(feature_name) sdf = sample_df.select(*[c for c in sample_df.columns if c != feature_name], feature_col) preds = model.transform(sdf) # TODO: SPARK-LAMA remove this line after passing the "prediction_col" parameter prediction_col = next(c for c in preds.columns if c.startswith("prediction")) preds = np.array(preds.select(prediction_col).collect()) # when preds.shape is (n, 1, k) we change it to (n, k), # where n is number of rows and k is number of classes if len(preds.shape) == 3: preds = np.squeeze(preds, axis=1) ys.append(preds) return grid, ys, counts
def get_individual_pdp( self, test_data: SparkDataFrame, feature_name: str, n_bins: Optional[int] = 30, top_n_categories: Optional[int] = 10, datetime_level: Optional[str] = "year", ice_fraction: float = 1.0, ice_fraction_seed: int = 42, ): assert feature_name in self.reader._roles assert datetime_level in ["year", "month", "dayofweek"] assert 0 < ice_fraction <= 1.0 pipeline_model = self.transformer(leave_only_predict_cols=True) # Numerical features if self.reader._roles[feature_name].name == "Numeric": return self.get_pdp_data_numeric_feature( test_data, feature_name, pipeline_model, "prediction", n_bins, ice_fraction, ice_fraction_seed ) # Categorical features elif self.reader._roles[feature_name].name == "Category": return self.get_pdp_data_categorical_feature( test_data, feature_name, pipeline_model, "prediction", top_n_categories, ice_fraction, ice_fraction_seed ) # Datetime Features elif self.reader._roles[feature_name].name == "Datetime": return self.get_pdp_data_datetime_feature( test_data, feature_name, pipeline_model, "prediction", datetime_level, self.reader, ice_fraction, ice_fraction_seed, ) else: raise ValueError("Supported only Numeric, Category or Datetime feature") def plot_pdp( self, test_data: ReadableToDf, feature_name: str, individual: Optional[bool] = False, n_bins: Optional[int] = 30, top_n_categories: Optional[int] = 10, top_n_classes: Optional[int] = 10, datetime_level: Optional[str] = "year", ice_fraction: float = 1.0, ice_fraction_seed: int = 42, ): grid, ys, counts = self.get_individual_pdp( test_data=test_data, feature_name=feature_name, n_bins=n_bins, top_n_categories=top_n_categories, datetime_level=datetime_level, ice_fraction=ice_fraction, ice_fraction_seed=ice_fraction_seed, ) histogram_data_rows_limit = 2000 rows_count = test_data.count() if rows_count > histogram_data_rows_limit: fraction = histogram_data_rows_limit / rows_count test_data = test_data.sample(frac=fraction) if self.reader._roles[feature_name].name == "Numeric": test_data = test_data.select(sf.col(feature_name).cast("double")).toPandas() else: test_data = test_data.select(feature_name).toPandas() plot_pdp_with_distribution( test_data, grid, ys, counts, self.reader, feature_name, individual, top_n_classes, datetime_level, )