En este tema, exploraremos el flujo de trabajo de un job MapReduce, desde la entrada de datos hasta la salida final. MapReduce es un modelo de programación que permite el procesamiento de grandes volúmenes de datos de manera distribuida. Comprender el flujo de trabajo es crucial para escribir programas eficientes y optimizados.

  1. Introducción al Flujo de Trabajo de MapReduce

El flujo de trabajo de un job MapReduce se puede dividir en varias etapas clave:

  1. Input Splits: División de los datos de entrada en fragmentos manejables.
  2. Map Phase: Procesamiento de los datos de entrada y generación de pares clave-valor intermedios.
  3. Shuffle and Sort: Agrupación y ordenación de los pares clave-valor intermedios.
  4. Reduce Phase: Procesamiento de los pares clave-valor agrupados para generar la salida final.
  5. Output: Escritura de los resultados en el sistema de archivos.

  1. Desglose del Flujo de Trabajo

2.1 Input Splits

Antes de que comience el procesamiento, los datos de entrada se dividen en fragmentos llamados "input splits". Cada split es procesado por una tarea de mapeo (mapper).

Input Data
|----------------|----------------|----------------|
|     Split 1    |     Split 2    |     Split 3    |
|----------------|----------------|----------------|

2.2 Map Phase

Cada mapper procesa un split de datos y genera pares clave-valor intermedios. La función map toma una entrada y produce una lista de pares clave-valor.

public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}

2.3 Shuffle and Sort

Después de la fase de mapeo, los pares clave-valor intermedios se agrupan y ordenan por clave. Este proceso se conoce como "shuffle and sort". Es una etapa crucial que asegura que todos los valores asociados con una clave específica se envíen al mismo reducer.

Intermediate Key-Value Pairs
|----------------|----------------|----------------|
|  (key1, val1)  |  (key2, val2)  |  (key1, val3)  |
|----------------|----------------|----------------|
       |                  |                 |
       |                  |                 |
       v                  v                 v
|----------------|----------------|----------------|
|  (key1, [val1, val3])  |  (key2, [val2])  |
|----------------|----------------|----------------|

2.4 Reduce Phase

En la fase de reducción, cada reducer procesa los pares clave-valor agrupados y genera la salida final. La función reduce toma una clave y una lista de valores asociados y produce una lista de pares clave-valor de salida.

public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

2.5 Output

Finalmente, los resultados de la fase de reducción se escriben en el sistema de archivos. Este es el resultado final del job MapReduce.

Output Data
|----------------|----------------|
|  (key1, result1)  |  (key2, result2)  |
|----------------|----------------|

  1. Ejemplo Práctico

Vamos a ver un ejemplo práctico de un job MapReduce que cuenta la frecuencia de palabras en un conjunto de documentos.

3.1 Código del Mapper

public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}

3.2 Código del Reducer

public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

3.3 Configuración del Job

public class WordCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

  1. Ejercicio Práctico

Ejercicio: Escribe un programa MapReduce que cuente la frecuencia de caracteres en un conjunto de documentos.

Solución

Mapper

public class CharCountMapper extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text character = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        for (char c : value.toString().toCharArray()) {
            character.set(Character.toString(c));
            context.write(character, one);
        }
    }
}

Reducer

public class CharCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

Configuración del Job

public class CharCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "char count");
        job.setJarByClass(CharCount.class);
        job.setMapperClass(CharCountMapper.class);
        job.setCombinerClass(CharCountReducer.class);
        job.setReducerClass(CharCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

  1. Conclusión

En esta sección, hemos desglosado el flujo de trabajo de un job MapReduce, desde la entrada de datos hasta la salida final. Hemos visto cómo se dividen los datos, cómo se procesan en la fase de mapeo, cómo se agrupan y ordenan, y finalmente, cómo se reducen para generar la salida final. También hemos proporcionado un ejemplo práctico y un ejercicio para reforzar los conceptos aprendidos.

En el siguiente tema, profundizaremos en cómo escribir un programa MapReduce, abordando aspectos más detallados y avanzados de la programación MapReduce.

© Copyright 2024. Todos los derechos reservados