En este tema, exploraremos el flujo de trabajo de un job MapReduce, desde la entrada de datos hasta la salida final. MapReduce es un modelo de programación que permite el procesamiento de grandes volúmenes de datos de manera distribuida. Comprender el flujo de trabajo es crucial para escribir programas eficientes y optimizados.
- Introducción al Flujo de Trabajo de MapReduce
El flujo de trabajo de un job MapReduce se puede dividir en varias etapas clave:
- Input Splits: División de los datos de entrada en fragmentos manejables.
- Map Phase: Procesamiento de los datos de entrada y generación de pares clave-valor intermedios.
- Shuffle and Sort: Agrupación y ordenación de los pares clave-valor intermedios.
- Reduce Phase: Procesamiento de los pares clave-valor agrupados para generar la salida final.
- Output: Escritura de los resultados en el sistema de archivos.
- Desglose del Flujo de Trabajo
2.1 Input Splits
Antes de que comience el procesamiento, los datos de entrada se dividen en fragmentos llamados "input splits". Cada split es procesado por una tarea de mapeo (mapper).
Input Data |----------------|----------------|----------------| | Split 1 | Split 2 | Split 3 | |----------------|----------------|----------------|
2.2 Map Phase
Cada mapper procesa un split de datos y genera pares clave-valor intermedios. La función map
toma una entrada y produce una lista de pares clave-valor.
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
2.3 Shuffle and Sort
Después de la fase de mapeo, los pares clave-valor intermedios se agrupan y ordenan por clave. Este proceso se conoce como "shuffle and sort". Es una etapa crucial que asegura que todos los valores asociados con una clave específica se envíen al mismo reducer.
Intermediate Key-Value Pairs |----------------|----------------|----------------| | (key1, val1) | (key2, val2) | (key1, val3) | |----------------|----------------|----------------| | | | | | | v v v |----------------|----------------|----------------| | (key1, [val1, val3]) | (key2, [val2]) | |----------------|----------------|----------------|
2.4 Reduce Phase
En la fase de reducción, cada reducer procesa los pares clave-valor agrupados y genera la salida final. La función reduce
toma una clave y una lista de valores asociados y produce una lista de pares clave-valor de salida.
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
2.5 Output
Finalmente, los resultados de la fase de reducción se escriben en el sistema de archivos. Este es el resultado final del job MapReduce.
Output Data |----------------|----------------| | (key1, result1) | (key2, result2) | |----------------|----------------|
- Ejemplo Práctico
Vamos a ver un ejemplo práctico de un job MapReduce que cuenta la frecuencia de palabras en un conjunto de documentos.
3.1 Código del Mapper
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
3.2 Código del Reducer
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
3.3 Configuración del Job
public class WordCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
- Ejercicio Práctico
Ejercicio: Escribe un programa MapReduce que cuente la frecuencia de caracteres en un conjunto de documentos.
Solución
Mapper
public class CharCountMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text character = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { for (char c : value.toString().toCharArray()) { character.set(Character.toString(c)); context.write(character, one); } } }
Reducer
public class CharCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
Configuración del Job
public class CharCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "char count"); job.setJarByClass(CharCount.class); job.setMapperClass(CharCountMapper.class); job.setCombinerClass(CharCountReducer.class); job.setReducerClass(CharCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
- Conclusión
En esta sección, hemos desglosado el flujo de trabajo de un job MapReduce, desde la entrada de datos hasta la salida final. Hemos visto cómo se dividen los datos, cómo se procesan en la fase de mapeo, cómo se agrupan y ordenan, y finalmente, cómo se reducen para generar la salida final. También hemos proporcionado un ejemplo práctico y un ejercicio para reforzar los conceptos aprendidos.
En el siguiente tema, profundizaremos en cómo escribir un programa MapReduce, abordando aspectos más detallados y avanzados de la programación MapReduce.
Curso de Hadoop
Módulo 1: Introducción a Hadoop
- ¿Qué es Hadoop?
- Visión General del Ecosistema Hadoop
- Hadoop vs Bases de Datos Tradicionales
- Configuración del Entorno Hadoop
Módulo 2: Arquitectura de Hadoop
- Componentes Principales de Hadoop
- HDFS (Sistema de Archivos Distribuido de Hadoop)
- Marco de Trabajo MapReduce
- YARN (Yet Another Resource Negotiator)
Módulo 3: HDFS (Sistema de Archivos Distribuido de Hadoop)
Módulo 4: Programación MapReduce
- Introducción a MapReduce
- Flujo de Trabajo de un Job MapReduce
- Escribiendo un Programa MapReduce
- Técnicas de Optimización de MapReduce
Módulo 5: Herramientas del Ecosistema Hadoop
Módulo 6: Conceptos Avanzados de Hadoop
- Seguridad en Hadoop
- Gestión de Clústeres Hadoop
- Ajuste de Rendimiento de Hadoop
- Serialización de Datos en Hadoop
Módulo 7: Aplicaciones del Mundo Real y Estudios de Caso
- Hadoop en Almacenamiento de Datos
- Hadoop en Aprendizaje Automático
- Hadoop en Procesamiento de Datos en Tiempo Real
- Estudios de Caso de Implementaciones de Hadoop