SLAMA (Spark version of LAMA)
This is a distributed version of LAMA library written on Spark framework. SLAMA brings LAMA functionality on Spark including:
Automatic hyperparameter tuning, data processing.
Automatic typing, feature selection.
Automatic time utilization.
Automatic report creation.
Easy-to-use modular scheme to create your own pipelines.
Support of Spark ML pipelines, including saving/loading.
Caching and checkpointing of intermediate results
Known limitations: - Only the tabular preset is currently supported
sparklightautoml.automl
The main module, which includes the SparkAutoML class, blenders and ready-made presets.
Class for compile full pipeline of AutoML task. |
Presets
Presets for end-to-end model training for special tasks.
Basic class for automl preset. |
|
Spark version of |
Blenders
Basic class for blending. |
|
Select best single model from level. |
|
Simple average level predictions. |
|
Weighted Blender based on coord descent, optimize task metric directly. |
sparklightautoml.dataset
Provides base entities for working with data.
Dataset Interfaces
Implements a dataset that uses a |
|
Used for signaling types of persistence points encountered during AutoML process. |
|
Base interface of an entity responsible for caching and storing intermediate results somewhere. |
|
Interface to provide for external entities to unpersist dataframes and files stored by the entity that implements this interface |
Roles
Role contains information about the column, which determines how it is processed.
Role that describe numeric vector or numeric array. |
Persistence
Persistence managers are responsible for caching and storing intermediate results on various steps during AutoML process. Storing intermediate results is required by various reasons.
Depending on the manager, it can be used for the following goals: * Support iterative data processing and preventing repeatition of calculations * Prunning of long plans that slows down catalyst optimizer * Prunning of long lineages that increase overheads on tasks serialization (and may lead to large broadcasts) * Creating reliable checkpoints preventing long recalculations in case of failures * Optimize joins converting them into shuffle-less merge joins instead of SortMerge joins (optimization of split-merge patterns in the process of multiple models/multiple feature generation)
For instance, PlainCachePersistenceManager allows to support iterative data processing and provides fast storing due to leveraging Spark caching mechanism which may employ RAM, but cannot provide for the rest of goals. From the other side, BucketedPersistenceManager can deliver for all the goals, but requires more time to store data due to writing to external storage like HDFS. LocalCheckpointPersistenceManager is in the middle: it can deliver only the first three goals, but store data fast leveraging RAM and DISK if necessary
Different persistence managers may be of more usefulness depending on current step in the automl process. There can be found several explicit levels of storing stated in PersistenceLevel entity: * READER level marks the beginning of the automl process, root of all pipelines, executed only onces. * REGULAR means storing data somewhere in the middle of ML pipeline, mainly feature processing or model training. * CHECKPOINT is used for denoting data storing in the very end of ML pipeline. These data will consist only of predictions made by one or several ML models thus making the dataframe being stored relatively small.
All persistence managers can be divided on two main types depending on how they handle different levels supplied during calling .persist(): * simple managers, that exploit the same approach to store intermediate results on all levels * composite managers (their name starts with ‘Composite’ prefix) that can employ different approaches to store data for different levels.
CompositeBucketedPersistenceManager should be used in most cases. It creates a bucketed dataset on READER level, which is an expensive operation executed only once. In exchanges, it leads to making all joins (on the direct descendants of the main dataframe) in the downstream process to be either broadcast joins or merge joins. In both cases it wouldn’t require shuffle. On REGULAR level, mainly for the sake of supporting fast iterative data processing, it employs PlainCachePersistenceManager. On CHECKPOINT level, having a relatively small dataframe after the end of heavy data processing and computations this manager opts to reliable data storing using BucketedPersistenceManager. This choice is also motivated by prunning of long plan and lineage which have grown large up to this moment.
CompositePlainCachePersistenceManager uses PlainCachePersistenceManager for READER and REGULAR levels, avoiding expensive initial creation of a bucketed dataset. On CHECKPOINT level, it relies on BucketedPersistenceManager with the same motivation as for the previous case. However, it does have some advantages it should be used with caution. Use cases when it may be used requires specific Spark Session and AutoML configurations having the following traits: * AutoML has only one level of ML pipelines or two levels with skip_conn=False * autoBroadcastJoinThreshold is set high enough to handle some minor joins The alternative case: * AutoML has two levels with skip_conn=True * autoBroadcastJoinThreshold is set sufficiently high to make joining the main dataframe with resulting dataframes from the first level (containing predictions) shuffle-less
These conditions may change in the future.
Abstract implementation of base persistence functionality, including registering and de-registering what have been requested to persist/un-persist |
|
Manager that uses Spark .cache() / .persist() methods |
|
Manager that uses Spark .localCheckpoint() method |
|
Manager that uses Spark Warehouse folder to store bucketed datasets (.bucketBy . |
|
Universal composite manager that can combine other manager to apply different storing strategies on different levels. |
|
Combines bucketing on READER and CHECKPOINT levels with PlainCache on REGULAR level. |
|
Universal composite manager that can combine other manager to apply different storing strategies on different levels. |
sparklightautoml.ml_algo
Models used for machine learning pipelines.
Base Classes
Machine learning algorithms that accepts numpy arrays as input. |
|
Transformer that gets one or more columns and produce column with average values. |
Available Models
LBFGS L2 regression based on Spark MLlib. |
|
Gradient boosting on decision trees from LightGBM library. |
Utilities
Simple wrapper for synapse.ml.lightgbm.[LightGBMRegressionModel|LightGBMClassificationModel] to fix issue with loading model from saved composite pipeline. |
|
Simple wrapper for ONNXModel to fix issue with loading model from saved composite pipeline. |
sparklightautoml.pipelines
Pipelines for solving different tasks.
Utility traits
sparklightautoml.pipelines.selection
Feature selection module for ML pipelines.
Base Classes
Importance Based Selectors
|
Permutation importance based estimator. |
sparklightautoml.pipelines.features
Pipelines for features generation.
Base Classes
Abstract class. |
|
Helper class contains basic features transformations for tabular data. |
|
This class creates pipeline with |
|
This transformer does nothing, it just returns the input dataframe unchanged. |
|
Feature Pipelines for Boosting Models
Creates simple pipeline for tree based models. |
|
Create advanced pipeline for trees based models. |
Feature Pipelines for Linear Models
Creates pipeline for linear models and nnets. |
Utility Functions
Fill dict that represents graph of estimators and transformers |
sparklightautoml.pipelines.ml
Pipelines that merge together single model training steps.
Base Classes
Spark version of |
Pipeline for Nested Cross-Validation
Same as NestedTabularMLPipeline of LAMA, but redefines a couple of methods via SparkMLPipelineMixin |
sparklightautoml.reader
Utils for reading, training and analysing data.
Readers
Reader to convert |
|
Transformer of SparkToSparkReader. |
|
Helper class that provide some methods for |
Utility functions for advanced roles guessing
Search for optimal processing of categorical values. |
|
Returns generator that take iterator by pandas dataframes and yield dataframes with calculated ginis. |
|
Get null scores. |
|
Calculate statistics about different encodings performances. |
|
Get normalized gini index from pipeline. |
sparklightautoml.report
Report generators and templates.
sparklightautoml.tasks
Task Class
Specify task (binary classification, multiclass classification, regression), metrics, losses. |
|
Spark version of metric function that implements function assessing prediction error. |
sparklightautoml.transformers
Basic feature generation steps and helper utils.
Base Classes
Base class for estimators from sparklightautoml.transformers. |
|
Base class for transformers from sparklightautoml.transformers. |
|
Transformer that change roles for input columns. |
|
Entity that represents sequential of transformers in preprocess pipeline. |
|
Entity that represents parallel layers (transformers) in preprocess pipeline. |
|
Helper and base class for |
|
Mixin for param inputCols: input column names. |
|
Mixin for param inputCols: input column names. |
|
Transformer that drops columns from input dataframe. |
|
Converts prediction columns values from ONNX model format to LGBMCBooster format |
|
Converts probability columns values from ONNX model format to LGBMCBooster format |
Numeric
Fillna with median. |
|
Estimator that calculate nan rate for input columns and build |
|
Discretization of numeric features by quantiles. |
|
Classic StandardScaler. |
|
Transformer that replace inf values to np.nan values in input columns. |
|
Fillna with median. |
|
Convert probs to logodds. |
|
Adds columns with nan flags (0 or 1) for input columns. |
|
Adds column with quantile bin number of input columns. |
|
Classic StandardScaler. |
Categorical
Spark label encoder estimator. |
|
Spark ordinal encoder estimator. |
|
Calculates frequency in train data and produces |
|
Combines categorical features and fits |
|
Spark target encoder estimator. |
|
Spark multiclass target encoder estimator. |
|
Simple OneHotEncoder over label encoded categories. |
|
Simple Spark version of LabelEncoder. |
|
Spark version of |
|
Labels are encoded with frequency in train data. |
|
Combines category columns and encode with label encoder. |
|
Spark multiclass target encoder transformer. |
|
Helper class for |
Categorical (Scala)
Custom implementation of PySpark StringIndexer wrapper |
|
Model fitted by |
Datetime
Transforms datetime columns values to numeric values. |
|
Basic conversion strategy, used in selection one-to-one transformers. |
|
Extracts unit of time from Datetime values and marks holiday dates. |
|
Helper class for |
sparklightautoml.validation
The module provide classes and functions for model validation.
Iterators
Implements applying selection pipeline and feature pipeline to SparkDataset. |
|
Simple one step iterator over one fold of SparkDataset |
|
Classic cv iterator. |
|
Simple one step iterator over train part of SparkDataset |
Running spark lama app on Spark YARN with spark_submit
Prerequisites
1. Create a folder and put there the following files from the repository: * <project_root>/examples/spark/* -> examples-spark/* * <project_root>/sparklightautoml/automl/presets/tabular_config.yml -> tabular_config.yml
Install sparklightautoml in your python env on cluster
<python env on your cluster>/bin/pip install sparklightautoml
Launching
To launch example ‘tabular-preset-automl.py’ (the most comprehensive example) run the following command
PYSPARK_PYTHON_PATH=<python env on your cluster>
WAREHOUSE_DIR=<hdfs folder>
DRIVER_CORES=1
DRIVER_MEMORY="4g"
DRIVER_MAX_RESULT_SIZE="1g"
EXECUTOR_INSTANCES=4
EXECUTOR_CORES=4
EXECUTOR_MEMORY="10g"
CORES_MAX=$(($EXECUTOR_CORES * $EXECUTOR_INSTANCES))
# PARTITION_NUM and BUCKET_NUMS should be equal
PARTITION_NUM=$CORES_MAX
BUCKET_NUMS=$PARTITION_NUM
SCRIPT="examples-spark/tabular-preset-automl.py"
# Notes:
# "spark.kryoserializer.buffer.max=512m"
# is required when there are a lot of categorical variables with very high cardinality
# "spark.sql.autoBroadcastJoinThreshold=100MB" depends on your dataset
# if you run on jdk11
#--conf "spark.driver.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible=true" \
#--conf "spark.executor.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible=true"
spark-submit \
--master yarn \
--deploy-mode cluster \
--conf "spark.yarn.appMasterEnv.SCRIPT_ENV=cluster" \
--conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=${PYSPARK_PYTHON_PATH}" \
--conf "spark.yarn.appMasterEnv.PERSISTENCE_MANAGER=CompositeBucketedPersistenceManager" \
--conf "spark.yarn.appMasterEnv.BUCKET_NUMS=${BUCKET_NUMS}" \
--conf "spark.kryoserializer.buffer.max=512m" \
--conf "spark.driver.cores=${DRIVER_CORES}" \
--conf "spark.driver.memory=${DRIVER_MEMORY}" \
--conf "spark.driver.maxResultSize=${DRIVER_MAX_RESULT_SIZE}" \
--conf "spark.executor.instances=${EXECUTOR_INSTANCES}" \
--conf "spark.executor.cores=${EXECUTOR_CORES}" \
--conf "spark.executor.memory=${EXECUTOR_MEMORY}" \
--conf "spark.cores.max=${CORES_MAX}" \
--conf "spark.memory.fraction=0.8" \
--conf "spark.sql.shuffle.partitions=${PARTITION_NUM}" \
--conf "spark.default.parallelism=${PARTITION_NUM}" \
--conf "spark.rpc.message.maxSize=1024" \
--conf "spark.sql.autoBroadcastJoinThreshold=100MB" \
--conf "spark.sql.execution.arrow.pyspark.enabled=true" \
--conf "spark.driver.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible=true" \
--conf "spark.executor.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible=true" \
--conf "spark.jars.repositories=https://mmlspark.azureedge.net/maven"
--conf "spark.jars.packages=com.microsoft.azure:synapseml_2.12:0.9.5,io.github.fonhorst:spark-lightautoml_2.12:0.1.1"
--conf "spark.sql.warehouse.dir=${WAREHOUSE_DIR}" \
--py-files "examples-spark/*,tabular_config.yml" \
--num-executors "${EXECUTOR_INSTANCES}" \
--jars "spark-lightautoml_2.12-0.1.jar" \
"${SCRIPT}"
Running spark lama app on Spark YARN
Next, it will be shown how to run the examples/spark/tabular-preset-automl.py
script for execution on local Hadoop YARN.
Local deployment of Hadoop YARN is done using the docker-hadoop project from the https://github.com/big-data-europe/docker-hadoop repository. It consists of the following services: datanode, historyserver, namenode, nodemanager, resourcemanager. The files docker-hadoop/nodemanager/Dockerfile
, docker-hadoop/docker-compose.yml
have been modified and a description of the new service docker-hadoop/spark-submit
has been added. Required tools to get started to work with docker-hadoop project: Docker, Docker Compose and GNU Make.
1. First, let’s go to the LightAutoML project directory

Make sure that in the dist
directory there is a wheel assembly and in the jars
directory there is a jar file.
If the dist
directory does not exist, or if there are no files in it, then you need to build lama dist files.
./bin/slamactl.sh build-lama-dist
If there are no jar file(s) in jars
directory, then you need to build lama jar file(s).
./bin/slamactl.sh build-jars
2. Distribute lama wheel to nodemanager
Copy lama wheel file from dist/LightAutoML-0.3.0-py3-none-any.whl
to docker-hadoop/nodemanager/LightAutoML-0.3.0-py3-none-any.whl
.
We copy the lama wheel assembly to the nodemanager Docker file, because later it will be needed in the nodemanager service to execute the pipelines that we will send to spark.
cp dist/LightAutoML-0.3.0-py3-none-any.whl docker-hadoop/nodemanager/LightAutoML-0.3.0-py3-none-any.whl
3. Go to docker-hadoop
directory
cd docker-hadoop
4. Open docker-compose.yml
file and configure services.
nano docker-compose.yml
Edit volumes
setting to mount directory with datasets to nodemanager
service.

5. Open hadoop.env
file and configure hadoop settings.
Pay attention to the highlighted settings. They need to be set in accordance with the resources of your computers.

6. Build image for nodemanager
service.
The following command will build the nodemanager
image according to docker-hadoop/nodemanager/Dockerfile
. Python 3.9 and the installation of the lama wheel package has been added to this Dockerfile.
make build-nodemanager-with-python
7. Build image for spark-submit
service.
The spark-submit
container will be used to submit our applications for execution.
make build-image-to-spark-submit
8. Start Hadoop YARN services
docker-compose up
or same in detached mode:
docker-compose up -d
Check that all services have started:
docker-compose ps

Here datanode
, historyserver
, namenode
, nodemanager
, resourcemanager
is services of Hadoop. namenode
and datanode
is parts of HDFS. resourcemanager
, nodemanager
and historyserver
is parts of YARN. For more information see the documentation at https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html and https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html.
spark-submit
is service to submitting our applications to Hadoop YARN for execution (see step 9).
If one of the services did not up, then you need to look at its logs. For example resourcemanager
logs.
docker-compose logs -f resourcemanager
9. Send job to cluster via spark-submit
container
docker exec -ti spark-submit bash -c "./bin/slamactl.sh submit-job-yarn dist/LightAutoML-0.3.0.tar.gz,examples/spark/examples_utils.py examples/spark/tabular-preset-automl.py"
10. Monitoring application execution
To monitor application execution, you can use the hadoop web interface (http://localhost:8088), which displays the status of the application, resources and application logs.

Let’s see the information about the application and its logs.



11. Spark WebUI
When the application is running, you can go to the hadoop web interface and get a link to the Spark WebUI.


12. HDFS Web UI
HDFS Web UI is available at http://localhost:9870. Here you can browse your files in HDFS http://localhost:9870/explorer.html. HDFS stores trained pipelines and Spark application files.

Running spark lama app on standalone cluster
Next, it will be shown how to run the examples/spark/tabular-preset-automl.py
script for execution on Spark cluster.
1. First, let’s go to the LightAutoML project directory

Make sure that in the dist
directory there is a wheel assembly and in the jars
directory there is a jar file.
If the dist
directory does not exist, or if there are no files in it, then you need to build lama dist files.
./bin/slamactl.sh build-lama-dist
If there are no jar file(s) in jars
directory, then you need to build lama jar file(s).
./bin/slamactl.sh build-jars
2. Set Spark master URL via environment variable
export SPARK_MASTER_URL=spark://HOST:PORT
For example:
export SPARK_MASTER_URL=spark://node21.bdcl:7077
3. Set Hadoop namenode address (fs.defaultFS) via environment variable
export HADOOP_DEFAULT_FS=hdfs://HOST:PORT
For example:
export HADOOP_DEFAULT_FS=hdfs://node21.bdcl:9000
4. Submit job via slamactl.sh
script
./bin/slamactl.sh submit-job-spark examples/spark/tabular-preset-automl.py
Deploy on Minikube
On a host with Linux operating system and amd64 architecture, run the following commands:
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 sudo install minikube-linux-amd64 /usr/local/bin/minikube
For other operating systems and architectures, one should follow the official guide.
Start minikube:
minikube start –cpus 12 –memory 20000 –driver=docker
Setting of cpus and memory is up to you.
Check minikube status:
minikube status
All services should be in running state:

Environment setup
Clone LigthAutoML repository.
Check Python is installed on the host.
Download libraries required to be installed on the host:
pip install pyspark pip install poetry
Create several folders to be used for data storage and particulary for pv (PersistentVolume) and pvc (PersistentVolumeClaim): One may choose different paths. All described below is just a suggestion.
/opt/data-slama - dataset folder. All required datasets, one is planning to work with, should be copied in this folder.
/opt/result-slama - service folder for intermediate data
/tmp/slama - temporary files folder
Mount the folders mentioned earlier into minikube:
minikube mount <source>:<dest>
!! Не рабоатет на ОС не поддерживающих файловую систему 9p
Create a namespace and a service account in K8s to run SLAMA application and give it ability to create executor pods. Take a look on examples in ~/LightAutoML/dev-tools/config/spark-lama-ns.yaml.
kubectl apply -f ./dev-tools/config/spark-lama-ns.yaml
Results of the command execution should look like:

Instead of ‘unchanged’ state there may be ‘created’ state if nothing existed before this command was executed.
Create pv and pvc to be used by spark application with SLAMA. It is assumed that the folders previously created will be used for this purpose. One may take a look on the example ~/LightAutoML/dev-tools/config/spark-lama-data-pv-pvc.yaml to create pv and pvc.
kubectl apply -f ./dev-tools/config/spark-lama-data-pv-pvc.yaml
Setup environment variables to be used with slamactl.sh utility:
export KUBE_NAMESPACE=spark-lama-exps export REPO=node2.bdcl:5000
Build required images:
./bin/slamactl.sh build-dist ./bin/slamactl.sh build-lama-image
One can check resulting images with the command:
docker images

Upload required datasets into the folder of pv spark-lama-data.
Run examples in minikube
Ensure that REPO and KUBE_NAMESPACE variables are set. Ensure that all required docker images and kubernetes objects have been created.
Go to LigthAutoML folder.
Run an example with slamactl.sh:
./bin/slamactl.sh submit-job ./examples/spark/tabular-preset-automl.py
Check state of SLAMA application’s pods with command
kubectl get pods -n spark-lama-exps
:

Check the result of execution when the driver pod is completed with:
kubectl logs --tail 5 tabular-preset-automl-py-8d95207fdfd06fbd-driver -n spark-lama-exps
An example of the result:

One can open Spark Web UI of SLAMA application on localhost. That requires to execute a command for port forwarding to one of localhost ports:
kubectl -n spark-lama-exps port-forward svc/$(kubectl -n spark-lama-exps get svc -o jsonpath='{.items[0].metadata.name}') 9040:4040 --address='0.0.0.0'
To open Spark WebUI follow the link <http://localhost:9040>

Note: SLAMA application should be in running state.
Running spark lama app on Kubernetes cluster
Examples for Spark-LAMA can be found in examples/spark/
.
These examples can be run both locally and remotely on a cluster.
To run examples locally one needs just ensure that data files lay in appropriate locations. These locations typically /opt/spark_data directory. (Data for the examples can be found in examples/data)
To run examples remotely on a cluster under Kubernetes control one needs to have installed and configured kubectl utility.
1. Establish nfs / S3
This step is necessary to make uploading of script file
(e.g. executable of Spark LAMA) into a location that is accessible from anywhere on cluster.
This file will be used by spark driver which is also submitted to the cluster.
Upon configuring set appropriate value for spark.kubernetes.file.upload.path in ./bin/slamactl.sh
or mount it to /mnt/nfs
on the localhost.
2. Create persistent volumes (PV) and claims (PVC)
- Examples required 2 PVC for their functioning (defined in slamactl.sh, spark-submit arguments):
spark-lama-data - provides access for driver and executors to data
mnt-nfs - provide access for driver and executors to the mentioned above upload dir
3. Define required env variables
Define required environment variables to use appropriate kubernetes namespace and remote docker repository accessible from anywhere in the cluster.
export KUBE_NAMESPACE=spark-lama-exps
export REPO=node2.bdcl:5000
4. Build spark lama dependencies and docker images.
On this step use slamactl.sh utility to build dependencies and docker images:
./bin/slamactl.sh build-dist
It will:
compile jars containing Scala-based components
(currently only LAMLStringIndexer required for LE-family transformers)
download Spark distro and use dockerfiles from there to build base pyspark images
(and push these images to the remote docker repo)
compile lama wheel (including spark subpackage) and build a docker image based upon mentioned above pyspark images
(this image will be pushed to the remote repository too)
5. Run an example on the remote cluster
To do that use the following command:
./bin/slamactl.sh submit-job ./examples/spark/tabular-preset-automl.py
The command submits a driver pod (using spark-submit) to the cluster which creates executor pods.
6. Forward 4040 port to make Spark Web UI accessible.
The utility provides a command to make port forwording for the running example.
./bin/slamactl.sh port-forward ./examples/spark/tabular-preset-automl.py
The driver’s 4040 port will be forwarded to http://localhost:9040.