Source code for sparklightautoml.dataset.base

import functools
import importlib
import json
import logging
import os
import uuid
import warnings
from abc import ABC, abstractmethod
from collections import Counter
from copy import copy, deepcopy
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from json import JSONEncoder, JSONDecoder
from typing import Sequence, Any, Tuple, Union, Optional, List, cast, Dict, Set, Callable

import numpy as np
import pandas as pd
from lightautoml.dataset.base import (
    LAMLDataset,
    IntIdx,
    RowSlice,
    ColSlice,
    LAMLColumn,
    RolesDict,
    valid_array_attributes,
    array_attr_roles,
)
from lightautoml.dataset.np_pd_dataset import PandasDataset, NumpyDataset, NpRoles
from lightautoml.dataset.roles import ColumnRole, NumericRole, DropRole, DatetimeRole, GroupRole, WeightsRole, \
    FoldsRole, PathRole, TreatmentRole, DateRole
from lightautoml.tasks import Task
from pyspark.ml.functions import vector_to_array
from pyspark.sql import functions as sf, Column
from pyspark.sql.session import SparkSession

from sparklightautoml import VALIDATION_COLUMN
from sparklightautoml.dataset.roles import NumericVectorOrArrayRole
from sparklightautoml.utils import SparkDataFrame, create_directory, get_current_session

logger = logging.getLogger(__name__)

Dependency = Union[str, 'SparkDataset', 'Unpersistable', Callable]
DepIdentifable = Union[str, 'SparkDataset']


[docs]class PersistenceLevel(Enum): """ Used for signaling types of persistence points encountered during AutoML process. """ READER = 0 REGULAR = 1 CHECKPOINT = 2
[docs]class Unpersistable(ABC): """ Interface to provide for external entities to unpersist dataframes and files stored by the entity that implements this interface """ def unpersist(self): ...
class SparkDatasetMetadataJsonEncoder(JSONEncoder): def default(self, o: Any) -> Any: if isinstance(o, ColumnRole): dct = deepcopy(o.__dict__) dct["__type__"] = ".".join([type(o).__module__, type(o).__name__]) dct['dtype'] = o.dtype.__name__ if isinstance(o, DatetimeRole): dct['date_format'] = dct['format'] del dct['format'] dct['origin'] = o.origin.timestamp() if isinstance(o.origin, datetime) else o.origin return dct from sparklightautoml.tasks.base import SparkTask if isinstance(o, SparkTask): dct = { "__type__": type(o).__name__, "name": o.name, "loss": o.loss_name, "metric": o.metric_name, # convert to python bool from numpy.bool_ "greater_is_better": True if o.greater_is_better else False } return dct raise ValueError(f"Cannot handle object which is not a ColumRole. Object type: {type(o)}") class SparkDatasetMetadataJsonDecoder(JSONDecoder): @staticmethod def _column_roles_object_hook(json_object): from sparklightautoml.tasks.base import SparkTask if json_object.get("__type__", None) == "SparkTask": del json_object["__type__"] return SparkTask(**json_object) if "__type__" in json_object: components = json_object["__type__"].split(".") module_name = ".".join(components[:-1]) class_name = components[-1] module = importlib.import_module(module_name) clazz = getattr(module, class_name) del json_object["__type__"] if clazz in [DatetimeRole, DateRole]: json_object["seasonality"] = tuple(json_object["seasonality"]) if isinstance(json_object["origin"], float): json_object["origin"] = datetime.fromtimestamp(json_object["origin"]) json_object["dtype"] = getattr(np, json_object["dtype"]) if clazz in [GroupRole, DropRole, WeightsRole, FoldsRole, PathRole, TreatmentRole]: del json_object["dtype"] instance = clazz(**json_object) return instance return json_object def __init__(self, **kwargs): kwargs.setdefault("object_hook", self._column_roles_object_hook) super().__init__(**kwargs)
[docs]class SparkDataset(LAMLDataset, Unpersistable): """ Implements a dataset that uses a ``pyspark.sql.DataFrame`` internally, stores some internal state (features, roles, ...) and provide methods to work with dataset. """ @staticmethod def _get_rows(data, k: IntIdx) -> Any: raise NotImplementedError("This method is not supported") @staticmethod def _set_col(data: Any, k: int, val: Any): raise NotImplementedError("This method is not supported") _init_checks = () _data_checks = () _concat_checks = () _dataset_type = "SparkDataset" ID_COLUMN = "_id" @classmethod def load(cls, path: str, file_format: str = 'parquet', file_format_options: Optional[Dict[str, Any]] = None, persistence_manager: Optional['PersistenceManager'] = None, partitions_num: Optional[int] = None) -> 'SparkDataset': metadata_file_path = os.path.join(path, f"metadata.{file_format}") file_path = os.path.join(path, f"data.{file_format}") file_format_options = file_format_options or dict() spark = get_current_session() # reading metadata metadata_df = spark.read.format(file_format).options(**file_format_options).load(metadata_file_path) metadata = json.loads(metadata_df.select('metadata').first().asDict()['metadata'], cls=SparkDatasetMetadataJsonDecoder) # reading data data_df = spark.read.format(file_format).options(**file_format_options).load(file_path) name_fixed_cols = (sf.col(c).alias(c.replace('[', '(').replace(']', ')')) for c in data_df.columns) data_df = data_df.select(*name_fixed_cols) if partitions_num: data_df = data_df.repartition(partitions_num) return SparkDataset(data=data_df, persistence_manager=persistence_manager, **metadata) # TODO: SLAMA - implement filling dependencies
[docs] @classmethod def concatenate( cls, datasets: Sequence["SparkDataset"], name: Optional[str] = None, extra_dependencies: Optional[List[Dependency]] = None ) -> "SparkDataset": """ Concat multiple datasets by joining their internal ``pyspark.sql.DataFrame`` using inner join on special hidden '_id' column Args: datasets: spark datasets to be joined Returns: a joined dataset, containing features (and columns too) from all datasets except containing only one _id column """ assert len(datasets) > 0, "Cannot join an empty list of datasets" if len(datasets) == 1: return datasets[0] if any(not d.bucketized for d in datasets): warnings.warn( f"NOT bucketized datasets are requested to be joined. It may severely affect performance", RuntimeWarning ) # we should join datasets only with unique features features = [feat for ds in datasets for feat in ds.features] feat_counter = Counter(features) assert all(count == 1 for el, count in feat_counter.items()), \ f"Different datasets being joined contain columns with the same names: {feat_counter}" roles = {col: role for ds in datasets for col, role in ds.roles.items()} except_cols = [c for c in datasets[0].service_columns if c != SparkDataset.ID_COLUMN] concatenated_sdf = functools.reduce( lambda acc, sdf: acc.join(sdf.drop(*except_cols), on=cls.ID_COLUMN, how='left'), (d.data for d in datasets) ) output = datasets[0].empty() output.set_data(concatenated_sdf, features, roles, dependencies=[*datasets, *(extra_dependencies or [])], name=name) return output
def __init__(self, data: SparkDataFrame, roles: Optional[RolesDict], persistence_manager: Optional['PersistenceManager'] = None, task: Optional[Task] = None, bucketized: bool = False, dependencies: Optional[List[Dependency]] = None, name: Optional[str] = None, target: Optional[str] = None, folds: Optional[str] = None, **kwargs: Any): self._validate_dataframe(data) roles = roles if roles else dict() # currently only target is supported # adapted from PandasDataset for f in roles: for k, r in zip(valid_array_attributes, array_attr_roles): if roles[f].name == r: roles[f] = DropRole() self._data = None self._bucketized = bucketized self._roles = None self._uid = str(uuid.uuid4()) self._persistence_manager = persistence_manager self._dependencies = dependencies self._frozen = False self._name = name self._is_persisted = False self._target_column = target self._folds_column = folds # columns that can be transferred intact across all transformations # in the pipeline self._service_columns: Set[str] = {self.ID_COLUMN, target, folds, VALIDATION_COLUMN} super().__init__(data, list(roles.keys()), roles, task, **kwargs) @property def uid(self) -> str: return self._uid @property def name(self) -> Optional[str]: return self._name @property def spark_session(self) -> SparkSession: return self.data.sql_ctx.sparkSession @property def data(self) -> SparkDataFrame: return self._data @data.setter def data(self, val: SparkDataFrame) -> None: self._data = val @property def dependencies(self) -> Optional[List[Dependency]]: return self._dependencies @property def features(self) -> List[str]: """Get list of features. Returns: list of features. """ return self._features @features.setter def features(self, val: List[str]): """ Set features available in this dataset. Columns won't be deleted from the dataframe but won't appear throught features property. Args: val: list of feature names. """ diff = set(val).difference(self.data.columns) assert len(diff) == 0, f"Not all roles have features in the dataset. Absent features: {diff}." self._features = copy(val) @property def roles(self) -> RolesDict: """Roles dict.""" return copy(self._roles) @roles.setter def roles(self, val: NpRoles): """Define how to set roles. Args: val: Roles. Note: There is different behavior for different type of val parameter: - `List` - should be same len as ``data.shape[1]``. - `None` - automatic set ``NumericRole(np.float32)``. - ``ColumnRole`` - single role for all. - ``dict``. """ if type(val) is dict: self._roles = dict(((x, val[x]) for x in self.features)) elif type(val) is list: self._roles = dict(zip(self.features, val)) diff = set(self._roles.keys()).difference(self.data.columns) assert len(diff) == 0, f"Not all roles have features in the dataset. Absent features: {diff}." elif val: role = cast(ColumnRole, val) self._roles = dict(((x, role) for x in self.features)) else: raise ValueError() diff = set(self._roles.keys()).difference(self.data.columns) assert len(diff) == 0, f"Not all roles have features in the dataset. Absent features: {diff}." @property def bucketized(self) -> bool: return self._bucketized @property def shape(self) -> Tuple[Optional[int], Optional[int]]: return -1, len(self.features) # with JobGroup("sparkdataset.shape", f"Finding count for dataset (uid={self.uid}, name={self.name})"): # warn_if_not_cached(self.data) # return self.data.count(), len(self.features) @property def target_column(self) -> Optional[str]: return self._target_column @property def folds_column(self) -> Optional[str]: return self._folds_column @property def service_columns(self) -> List[str]: return [sc for sc in self._service_columns if sc in self.data.columns] @property def num_folds(self) -> int: return self.data.select(self.folds_column).distinct().count() @property def persistence_manager(self) -> 'PersistenceManager': return self._persistence_manager def __repr__(self): return f"SparkDataset ({self.data})" def __getitem__(self, k: Tuple[RowSlice, ColSlice]) -> Union["LAMLDataset", LAMLColumn]: rslice, clice = k if isinstance(clice, str): clice = [clice] assert all(c in self.features for c in clice), ( f"Not all columns presented in the dataset.\n" f"Presented: {self.features}\n" f"Asked for: {clice}" ) present_svc_cols = [c for c in self.service_columns] sdf = cast(SparkDataFrame, self.data.select(*present_svc_cols, *clice)) roles = {c: self.roles[c] for c in clice} output = self.empty() output.set_data(sdf, clice, roles, name=self.name) return output # raise NotImplementedError(f"The method is not supported by {self._dataset_type}") def __setitem__(self, k: str, val: Any): raise NotImplementedError(f"The method is not supported by {self._dataset_type}") def __eq__(self, o: object) -> bool: return isinstance(o, SparkDataset) and o.uid == self.uid def __hash__(self) -> int: return hash(self.uid) def _validate_dataframe(self, sdf: SparkDataFrame) -> None: assert ( self.ID_COLUMN in sdf.columns ), f"No special unique row id column (the column name: {self.ID_COLUMN}) in the spark dataframe" # assert kwargs["target"] in data.columns, \ # f"No target column (the column name: {kwargs['target']}) in the spark dataframe" def _materialize_to_pandas( self, ) -> Tuple[pd.DataFrame, Optional[pd.Series], Optional[pd.Series], Dict[str, ColumnRole]]: sdf = self.data def expand_if_vec_or_arr(col, role) -> Tuple[List[Column], ColumnRole]: if not isinstance(role, NumericVectorOrArrayRole): return [col], role vrole = cast(NumericVectorOrArrayRole, role) def to_array(column): if vrole.is_vector: return vector_to_array(column) return column arr = [to_array(sf.col(col))[i].alias(vrole.feature_name_at(i)) for i in range(vrole.size)] return arr, NumericRole(dtype=vrole.dtype) arr_cols = (expand_if_vec_or_arr(c, self.roles[c]) for c in self.features) all_cols_and_roles = {c: role for c_arr, role in arr_cols for c in c_arr} all_cols = [scol for scol, _ in all_cols_and_roles.items()] if self.target_column is not None: all_cols.append(self.target_column) if self.folds_column is not None: all_cols.append(self.folds_column) sdf = sdf.orderBy(SparkDataset.ID_COLUMN).select(*all_cols) all_roles = {c: all_cols_and_roles[c] for c in sdf.columns if c not in self.service_columns} data = sdf.toPandas() df = pd.DataFrame(data=data.to_dict()) if self.target_column is not None: target_series = df[self.target_column] df = df.drop(self.target_column, axis=1) else: target_series = None if self.folds_column is not None: folds_series = df[self.folds_column] df = df.drop(self.folds_column, axis=1) else: folds_series = None return df, target_series, folds_series, all_roles def _initialize(self, task: Optional[Task], **kwargs: Any): super()._initialize(task, **kwargs) self._dependencies = None def empty(self) -> "SparkDataset": dataset = cast(SparkDataset, super().empty()) dataset._dependencies = [self] dataset._uid = str(uuid.uuid4()) dataset._frozen = False return dataset
[docs] def set_data( self, data: SparkDataFrame, features: List[str], roles: NpRoles = None, persistence_manager: Optional['PersistenceManager'] = None, dependencies: Optional[List[Dependency]] = None, uid: Optional[str] = None, name: Optional[str] = None, frozen: bool = False ): """Inplace set data, features, roles for empty dataset. Args: data: Table with features. features: `ignored, always None. just for same interface. roles: Dict with roles. """ self._validate_dataframe(data) super().set_data(data, features, roles) self._persistence_manager = persistence_manager or self._persistence_manager self._dependencies = dependencies if dependencies is not None else self._dependencies self._uid = uid or self._uid self._name = name or self._name self._frozen = frozen
[docs] def persist(self, level: Optional[PersistenceLevel] = None, force: bool=False) -> 'SparkDataset': """ Materializes current Spark DataFrame and unpersists all its dependencies Args: level: Returns: a new SparkDataset that is persisted and materialized """ # if self._is_persisted: # return self assert self.persistence_manager, "Cannot persist when persistence_manager is None" logger.debug(f"Persisting SparkDataset (uid={self.uid}, name={self.name})") level = level if level is not None else PersistenceLevel.REGULAR if force: ds = self.empty() ds.set_data(self.data, self.features, self.roles) persisted_dataset = self.persistence_manager.persist(ds, level).to_dataset() else: persisted_dataset = self.persistence_manager.persist(self, level).to_dataset() self._unpersist_dependencies() self._is_persisted = True return persisted_dataset
[docs] def unpersist(self): """ Unpersists current dataframe if it is persisted and all its dependencies. It does nothing if the dataset is frozen """ assert self.persistence_manager, "Cannot unpersist when persistence_manager is None" if self.frozen: logger.debug(f"Cannot unpersist frozen SparkDataset (uid={self.uid}, name={self.name})") return logger.debug(f"Unpersisting SparkDataset (uid={self.uid}, name={self.name})") self.persistence_manager.unpersist(self.uid) self._unpersist_dependencies()
# if self._is_persisted: # self.persistence_manager.unpersist(self.uid) # else: # self._unpersist_dependencies() def _unpersist_dependencies(self): for dep in (self.dependencies or []): if isinstance(dep, str): self.persistence_manager.unpersist(dep) elif isinstance(dep, Unpersistable): dep.unpersist() else: dep() @property def frozen(self) -> bool: return self._frozen @frozen.setter def frozen(self, val: bool): self._frozen = val def freeze(self) -> 'SparkDataset': ds = self.empty() ds.set_data(self.data, self.features, self.roles, frozen=True) return ds def save(self, path: str, save_mode: str = 'error', file_format: str = 'parquet', file_format_options: Optional[Dict[str, Any]] = None): metadata_file_path = os.path.join(path, f"metadata.{file_format}") file_path = os.path.join(path, f"data.{file_format}") file_format_options = file_format_options or dict() # prepare metadata of the dataset metadata = { "name": self.name, "roles": self.roles, "task": self.task, "target": self.target_column, "folds": self.folds_column, } metadata_str = json.dumps(metadata, cls=SparkDatasetMetadataJsonEncoder) metadata_df = self.spark_session.createDataFrame([{"metadata": metadata_str}]) # create directory that will store data and metadata as separate files of dataframes create_directory(path, spark=self.spark_session, exists_ok=(save_mode in ['overwrite', 'append'])) # writing dataframes metadata_df.write.format(file_format).mode(save_mode).options(**file_format_options).save(metadata_file_path) # fix name of columns: parquet cannot have columns with '(' or ')' in the name name_fixed_cols = (sf.col(c).alias(c.replace('(', '[').replace(')', ']')) for c in self.data.columns) # write dataframe with fixed column names ( self.data .select(*name_fixed_cols) .write .format(file_format) .mode(save_mode) .options(**file_format_options) .save(file_path) ) def to_pandas(self) -> PandasDataset: data, target_data, folds_data, roles = self._materialize_to_pandas() task = Task(self.task.name) if self.task else None kwargs = dict() if target_data is not None: kwargs["target"] = target_data if folds_data is not None: kwargs["folds"] = folds_data pds = PandasDataset(data=data, roles=roles, task=task, **kwargs) return pds def to_numpy(self) -> NumpyDataset: data, target_data, folds_data, roles = self._materialize_to_pandas() try: target = self.target if isinstance(target, pd.Series): target = target.to_numpy() elif isinstance(target, SparkDataFrame): target = target.toPandas().to_numpy() except AttributeError: target = None try: folds = self.folds if isinstance(folds, pd.Series): folds = folds.to_numpy() elif isinstance(folds, SparkDataFrame): folds = folds.toPandas().to_numpy() except AttributeError: folds = None return NumpyDataset( data=data.to_numpy(), features=list(data.columns), roles=roles, task=self.task, target=target, folds=folds ) @staticmethod def _hstack(datasets: Sequence[Any]) -> Any: raise NotImplementedError("Unsupported operation for this dataset type") @staticmethod def _get_cols(data, k: IntIdx) -> Any: raise NotImplementedError("Unsupported operation for this dataset type") @staticmethod def from_dataset(dataset: "LAMLDataset") -> "LAMLDataset": assert isinstance(dataset, SparkDataset), "Can only convert from SparkDataset" return dataset
@dataclass(frozen=True) class PersistableDataFrame: sdf: SparkDataFrame uid: str callback: Optional[Callable] = None base_dataset: Optional[SparkDataset] = None custom_name: Optional[str] = None def to_dataset(self) -> SparkDataset: assert self.base_dataset is not None ds = self.base_dataset.empty() ds.set_data( self.sdf, self.base_dataset.features, self.base_dataset.roles, dependencies=list(self.base_dataset.dependencies or []), uid=self.uid, name=self.base_dataset.name ) return ds @property def name(self) -> Optional[str]: ds_name = self.base_dataset.name if self.base_dataset is not None else None return self.custom_name or ds_name
[docs]class PersistenceManager(ABC): """ Base interface of an entity responsible for caching and storing intermediate results somewhere. """ @staticmethod def to_persistable_dataframe(dataset: SparkDataset) -> PersistableDataFrame: # we intentially create new uid to use to distinguish a persisted and unpersisted dataset return PersistableDataFrame(dataset.data, uid=dataset.uid, base_dataset=dataset) @property @abstractmethod def uid(self) -> str: ... @property @abstractmethod def children(self) -> List['PersistenceManager']: ... @property @abstractmethod def datasets(self) -> List[SparkDataset]: ... @property @abstractmethod def all_datasets(self) -> List[SparkDataset]: """ Returns: all persisted datasets including persisted with children contexts """ ... @abstractmethod def persist(self, dataset: Union[SparkDataset, PersistableDataFrame], level: PersistenceLevel = PersistenceLevel.REGULAR) -> PersistableDataFrame: ... @abstractmethod def unpersist(self, uid: str): ... @abstractmethod def unpersist_all(self): ... @abstractmethod def unpersist_children(self): ... @abstractmethod def child(self) -> 'PersistenceManager': ... @abstractmethod def remove_child(self, child: Union['PersistenceManager', str]): ... @abstractmethod def is_persisted(self, pdf: PersistableDataFrame) -> bool: ...