En este módulo, aprenderemos cómo construir y utilizar pipelines de aprendizaje automático en Apache Spark utilizando la biblioteca MLlib. Los pipelines son una herramienta poderosa para estructurar y simplificar el flujo de trabajo de aprendizaje automático, permitiendo la creación de modelos reproducibles y escalables.

Contenido

Introducción a los Pipelines de ML

Un pipeline de aprendizaje automático en Spark MLlib es una secuencia de etapas, donde cada etapa es un transformador o un estimador. Los pipelines permiten estructurar el flujo de trabajo de aprendizaje automático de manera modular y reproducible.

Conceptos Clave

  • Transformador (Transformer): Un algoritmo que transforma un DataFrame en otro DataFrame. Ejemplos incluyen VectorAssembler y StandardScaler.
  • Estimador (Estimator): Un algoritmo que puede ser entrenado en un DataFrame para producir un transformador. Ejemplos incluyen LogisticRegression y RandomForestClassifier.
  • Pipeline: Una secuencia de transformadores y estimadores que se ejecutan en orden.

Componentes de un Pipeline

Transformadores

Los transformadores toman un DataFrame como entrada y producen un nuevo DataFrame como salida. Algunos ejemplos comunes son:

  • VectorAssembler: Combina múltiples columnas en una sola columna de vectores.
  • StandardScaler: Escala las características para tener media cero y varianza unitaria.

Estimadores

Los estimadores se entrenan en un DataFrame para producir un transformador. Algunos ejemplos comunes son:

  • LogisticRegression: Un modelo de regresión logística.
  • RandomForestClassifier: Un clasificador basado en bosques aleatorios.

Pipeline

Un pipeline es una secuencia de etapas, donde cada etapa es un transformador o un estimador. La salida de una etapa se convierte en la entrada de la siguiente etapa.

Construcción de un Pipeline

Paso 1: Importar las Bibliotecas Necesarias

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression

Paso 2: Crear el SparkSession

spark = SparkSession.builder.appName("ML Pipeline Example").getOrCreate()

Paso 3: Cargar y Preparar los Datos

data = spark.read.csv("data.csv", header=True, inferSchema=True)
data = data.select("feature1", "feature2", "label")

Paso 4: Definir las Etapas del Pipeline

# VectorAssembler para combinar características
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")

# StandardScaler para escalar las características
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Modelo de Regresión Logística
lr = LogisticRegression(featuresCol="scaledFeatures", labelCol="label")

Paso 5: Crear el Pipeline

pipeline = Pipeline(stages=[assembler, scaler, lr])

Paso 6: Entrenar el Pipeline

model = pipeline.fit(data)

Paso 7: Hacer Predicciones

predictions = model.transform(data)
predictions.select("features", "scaledFeatures", "prediction").show()

Ejemplo Práctico

Vamos a construir un pipeline completo utilizando un conjunto de datos de ejemplo. Este pipeline incluirá las siguientes etapas:

  1. VectorAssembler: Para combinar las características en una sola columna.
  2. StandardScaler: Para escalar las características.
  3. LogisticRegression: Para entrenar un modelo de regresión logística.

Código Completo

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression

# Crear SparkSession
spark = SparkSession.builder.appName("ML Pipeline Example").getOrCreate()

# Cargar datos
data = spark.read.csv("data.csv", header=True, inferSchema=True)
data = data.select("feature1", "feature2", "label")

# Definir las etapas del pipeline
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
lr = LogisticRegression(featuresCol="scaledFeatures", labelCol="label")

# Crear el pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

# Entrenar el pipeline
model = pipeline.fit(data)

# Hacer predicciones
predictions = model.transform(data)
predictions.select("features", "scaledFeatures", "prediction").show()

Ejercicios Prácticos

Ejercicio 1: Construir un Pipeline con un Clasificador Diferente

Objetivo: Construir un pipeline utilizando RandomForestClassifier en lugar de LogisticRegression.

Instrucciones:

  1. Cargar los datos.
  2. Definir las etapas del pipeline.
  3. Crear y entrenar el pipeline.
  4. Hacer predicciones y mostrar los resultados.

Código de Inicio:

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier

# Crear SparkSession
spark = SparkSession.builder.appName("ML Pipeline Exercise").getOrCreate()

# Cargar datos
data = spark.read.csv("data.csv", header=True, inferSchema=True)
data = data.select("feature1", "feature2", "label")

# Definir las etapas del pipeline
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
rf = RandomForestClassifier(featuresCol="scaledFeatures", labelCol="label")

# Crear el pipeline
pipeline = Pipeline(stages=[assembler, scaler, rf])

# Entrenar el pipeline
model = pipeline.fit(data)

# Hacer predicciones
predictions = model.transform(data)
predictions.select("features", "scaledFeatures", "prediction").show()

Ejercicio 2: Añadir una Etapa de Selección de Características

Objetivo: Añadir una etapa de selección de características utilizando ChiSqSelector antes del clasificador.

Instrucciones:

  1. Cargar los datos.
  2. Definir las etapas del pipeline, incluyendo ChiSqSelector.
  3. Crear y entrenar el pipeline.
  4. Hacer predicciones y mostrar los resultados.

Código de Inicio:

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, ChiSqSelector
from pyspark.ml.classification import LogisticRegression

# Crear SparkSession
spark = SparkSession.builder.appName("ML Pipeline Exercise").getOrCreate()

# Cargar datos
data = spark.read.csv("data.csv", header=True, inferSchema=True)
data = data.select("feature1", "feature2", "label")

# Definir las etapas del pipeline
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
selector = ChiSqSelector(numTopFeatures=1, featuresCol="scaledFeatures", outputCol="selectedFeatures", labelCol="label")
lr = LogisticRegression(featuresCol="selectedFeatures", labelCol="label")

# Crear el pipeline
pipeline = Pipeline(stages=[assembler, scaler, selector, lr])

# Entrenar el pipeline
model = pipeline.fit(data)

# Hacer predicciones
predictions = model.transform(data)
predictions.select("features", "scaledFeatures", "selectedFeatures", "prediction").show()

Conclusión

En este módulo, hemos aprendido cómo construir y utilizar pipelines de aprendizaje automático en Apache Spark. Los pipelines nos permiten estructurar el flujo de trabajo de aprendizaje automático de manera modular y reproducible, facilitando la creación de modelos escalables y mantenibles. Hemos visto cómo definir transformadores y estimadores, y cómo combinarlos en un pipeline completo. Además, hemos practicado con ejercicios para reforzar los conceptos aprendidos.

En el siguiente módulo, exploraremos cómo ajustar y optimizar el rendimiento de nuestras aplicaciones Spark.

© Copyright 2024. Todos los derechos reservados