Source code for pipeasy_spark.core

# -*- coding: utf-8 -*-
from pyspark.ml import Pipeline
from .transformers import (
    set_transformer_in_out,
    ColumnDropper,
    ColumnRenamer,
)


[docs]def build_pipeline(column_transformers): """Create a dataframe transformation pipeline. The created pipeline can be used to apply successive transformations on a spark dataframe. The transformations are intended to be applied per column. Example ------- >>> df = titanic.select('Survived', 'Sex', 'Age').dropna() >>> df.show(2) +--------+------+----+ |Survived| Sex| Age| +--------+------+----+ | 0| male|22.0| | 1|female|38.0| +--------+------+----+ >>> pipeline = build_pipeline({ # 'Survived' : this variable is not modified, it can also be omitted from the dict 'Survived': [], 'Sex': [StringIndexer(), OneHotEncoderEstimator(dropLast=False)], # 'Age': a VectorAssembler must be applied before the StandardScaler # as the latter only accepts vectors as input. 'Age': [VectorAssembler(), StandardScaler()] }) >>> trained_pipeline = pipeline.fit(df) >>> trained_pipeline.transform(df).show(2) +--------+-------------+--------------------+ |Survived| Sex| Age| +--------+-------------+--------------------+ | 0|(2,[0],[1.0])|[1.5054181442954726]| | 1|(2,[1],[1.0])| [2.600267703783089]| +--------+-------------+--------------------+ Parameters ---------- column_transformers: dict(str -> list) key (str): column name; value (list): transformer instances (typically instances of pyspark.ml.feature transformers) Returns ------- pipeline: a pyspark.ml.Pipeline instance """ stages = [] for column, transformers in column_transformers.items(): temp_column_names = [column] for transformer in transformers: previous_column = temp_column_names[-1] next_column = previous_column + '_' + transformer.__class__.__name__ temp_column_names.append(next_column) transformer = set_transformer_in_out(transformer, previous_column, next_column) stages.append(transformer) if len(temp_column_names) > 1: # all the temporary columns should be dropped except the last one stages.append(ColumnDropper(inputCols=temp_column_names[:-1])) # the last temporary column should be renamed to the original name # (this needs to append AFTER dropping the original column) stages.append(ColumnRenamer( temp_column_names[-1], temp_column_names[0] )) # Create a Pipeline pipeline = Pipeline(stages=stages) return pipeline