En este tema, exploraremos los conceptos fundamentales de los mensajes y los desplazamientos (offsets) en Apache Kafka. Estos conceptos son cruciales para entender cómo Kafka maneja la transmisión de datos y garantiza la entrega ordenada y confiable de mensajes.
¿Qué es un Mensaje en Kafka?
Un mensaje en Kafka es la unidad básica de datos que se transmite entre productores y consumidores. Cada mensaje contiene:
- Clave (Key): Opcional, utilizada para determinar la partición a la que se enviará el mensaje.
- Valor (Value): El contenido del mensaje.
- Encabezados (Headers): Opcionales, metadatos adicionales.
- Timestamp: Marca de tiempo que indica cuándo se produjo el mensaje.
Ejemplo de un Mensaje
{ "key": "user123", "value": "User login event", "headers": { "eventType": "login" }, "timestamp": 1633024800000 }
¿Qué es un Desplazamiento (Offset)?
El desplazamiento es un número secuencial único que Kafka asigna a cada mensaje dentro de una partición. Los desplazamientos permiten a los consumidores rastrear su posición en el flujo de mensajes y garantizar que no se pierdan ni se dupliquen mensajes.
Características de los Desplazamientos
- Secuenciales: Cada mensaje en una partición tiene un desplazamiento único y secuencial.
- Inmutables: Una vez asignado, el desplazamiento de un mensaje no cambia.
- Persistentes: Los desplazamientos se almacenan en el log de Kafka y se pueden recuperar en caso de fallos.
Cómo Funcionan los Desplazamientos
Cuando un productor envía un mensaje a un tema, Kafka asigna un desplazamiento a ese mensaje. Los consumidores utilizan estos desplazamientos para leer mensajes de manera ordenada y para mantener su posición en el log.
Ejemplo de Desplazamientos
Supongamos que tenemos un tema con una partición y los siguientes mensajes:
Desplazamiento (Offset) | Clave (Key) | Valor (Value) |
---|---|---|
0 | user123 | User login event |
1 | user456 | User logout event |
2 | user789 | User purchase event |
Un consumidor que ha leído hasta el desplazamiento 1 sabe que su próximo mensaje a leer tiene el desplazamiento 2.
Ejemplo Práctico: Producción y Consumo de Mensajes
Producción de Mensajes
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "user123", "User login event"); producer.send(record); producer.close(); } }
Consumo de Mensajes
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value()); } } } }
Ejercicio Práctico
Ejercicio 1: Producción y Consumo de Mensajes
- Objetivo: Crear un productor y un consumidor en Java que envíen y reciban mensajes en un tema de Kafka.
- Instrucciones:
- Configura un clúster de Kafka localmente.
- Escribe un programa en Java para producir mensajes con claves y valores.
- Escribe un programa en Java para consumir esos mensajes y mostrar los desplazamientos.
Solución
Productor
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class MyProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key" + i, "value" + i); producer.send(record); } producer.close(); } }
Consumidor
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; public class MyConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value()); } } } }
Resumen
En esta sección, hemos aprendido sobre los mensajes y los desplazamientos en Kafka. Los mensajes son la unidad básica de datos en Kafka, y los desplazamientos permiten a los consumidores rastrear su posición en el flujo de mensajes. También hemos visto ejemplos prácticos de cómo producir y consumir mensajes en Kafka utilizando Java. Estos conceptos son fundamentales para trabajar eficazmente con Kafka y garantizar la entrega ordenada y confiable de mensajes.
En el próximo tema, exploraremos los Productores y Consumidores en mayor detalle, incluyendo cómo configurar y optimizar estos componentes para diferentes casos de uso.
Curso de Kafka
Módulo 1: Introducción a Kafka
- ¿Qué es Kafka?
- Casos de Uso de Kafka
- Visión General de la Arquitectura de Kafka
- Configuración de Kafka
Módulo 2: Conceptos Básicos de Kafka
Módulo 3: Operaciones de Kafka
Módulo 4: Configuración y Gestión de Kafka
Módulo 5: Temas Avanzados de Kafka
- Ajuste de Rendimiento de Kafka
- Kafka en una Configuración Multi-Centro de Datos
- Kafka con Registro de Esquemas
- Kafka Streams Avanzado