En este módulo, aprenderemos cómo producir mensajes en Apache Kafka. Este es un paso fundamental para cualquier aplicación que desee enviar datos a un clúster de Kafka. Cubriremos los conceptos básicos, proporcionaremos ejemplos prácticos y ejercicios para reforzar el aprendizaje.

Conceptos Clave

  1. Productor (Producer): Es una aplicación que envía mensajes a un tema en Kafka.
  2. Tema (Topic): Es una categoría o nombre a la cual los registros son enviados por los productores.
  3. Partición (Partition): Cada tema se divide en particiones, que son unidades de paralelismo.
  4. Mensaje (Message): Es el dato que se envía a Kafka, compuesto por una clave, un valor y un timestamp.
  5. Desplazamiento (Offset): Es un identificador único de cada mensaje dentro de una partición.

Configuración del Productor

Para producir mensajes en Kafka, primero necesitamos configurar un productor. Aquí hay un ejemplo de configuración básica en Java:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        // Configuración del productor
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Crear el productor
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // Enviar un mensaje
        ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
        producer.send(record);

        // Cerrar el productor
        producer.close();
    }
}

Explicación del Código

  1. Configuración del Productor:

    • BOOTSTRAP_SERVERS_CONFIG: Dirección del clúster de Kafka.
    • KEY_SERIALIZER_CLASS_CONFIG y VALUE_SERIALIZER_CLASS_CONFIG: Serializadores para la clave y el valor del mensaje.
  2. Creación del Productor:

    • KafkaProducer<String, String>: Crea una instancia del productor con las propiedades configuradas.
  3. Envío de un Mensaje:

    • ProducerRecord<String, String>: Crea un registro (mensaje) con el tema, clave y valor.
    • producer.send(record): Envía el mensaje al tema especificado.
  4. Cierre del Productor:

    • producer.close(): Cierra el productor para liberar recursos.

Ejercicio Práctico

Ejercicio 1: Enviar Múltiples Mensajes

Modifica el código anterior para enviar 10 mensajes al tema "my-topic". Cada mensaje debe tener una clave y un valor únicos.

Solución

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class MultiMessageProducer {
    public static void main(String[] args) {
        // Configuración del productor
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Crear el productor
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // Enviar 10 mensajes
        for (int i = 0; i < 10; i++) {
            String key = "key" + i;
            String value = "value" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", key, value);
            producer.send(record);
        }

        // Cerrar el productor
        producer.close();
    }
}

Ejercicio 2: Manejo de Errores

Modifica el código para manejar posibles errores durante el envío de mensajes. Utiliza un callback para imprimir un mensaje de éxito o error.

Solución

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ErrorHandlingProducer {
    public static void main(String[] args) {
        // Configuración del productor
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Crear el productor
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // Enviar 10 mensajes con manejo de errores
        for (int i = 0; i < 10; i++) {
            String key = "key" + i;
            String value = "value" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", key, value);
            producer.send(record, (RecordMetadata metadata, Exception exception) -> {
                if (exception == null) {
                    System.out.printf("Mensaje enviado con éxito a la partición %d con desplazamiento %d%n", metadata.partition(), metadata.offset());
                } else {
                    System.err.printf("Error al enviar mensaje: %s%n", exception.getMessage());
                }
            });
        }

        // Cerrar el productor
        producer.close();
    }
}

Resumen

En esta sección, hemos aprendido cómo producir mensajes en Kafka utilizando un productor en Java. Hemos cubierto la configuración básica del productor, el envío de mensajes y el manejo de errores. Estos conceptos son fundamentales para cualquier aplicación que desee interactuar con un clúster de Kafka.

En el próximo módulo, exploraremos cómo consumir mensajes de Kafka, lo que nos permitirá leer los datos que hemos enviado a nuestros temas.

© Copyright 2024. Todos los derechos reservados