En este tema, exploraremos los conceptos fundamentales de los mensajes y los desplazamientos (offsets) en Apache Kafka. Estos conceptos son cruciales para entender cómo Kafka maneja la transmisión de datos y garantiza la entrega ordenada y confiable de mensajes.
¿Qué es un Mensaje en Kafka?
Un mensaje en Kafka es la unidad básica de datos que se transmite entre productores y consumidores. Cada mensaje contiene:
- Clave (Key): Opcional, utilizada para determinar la partición a la que se enviará el mensaje.
- Valor (Value): El contenido del mensaje.
- Encabezados (Headers): Opcionales, metadatos adicionales.
- Timestamp: Marca de tiempo que indica cuándo se produjo el mensaje.
Ejemplo de un Mensaje
{
"key": "user123",
"value": "User login event",
"headers": {
"eventType": "login"
},
"timestamp": 1633024800000
}¿Qué es un Desplazamiento (Offset)?
El desplazamiento es un número secuencial único que Kafka asigna a cada mensaje dentro de una partición. Los desplazamientos permiten a los consumidores rastrear su posición en el flujo de mensajes y garantizar que no se pierdan ni se dupliquen mensajes.
Características de los Desplazamientos
- Secuenciales: Cada mensaje en una partición tiene un desplazamiento único y secuencial.
- Inmutables: Una vez asignado, el desplazamiento de un mensaje no cambia.
- Persistentes: Los desplazamientos se almacenan en el log de Kafka y se pueden recuperar en caso de fallos.
Cómo Funcionan los Desplazamientos
Cuando un productor envía un mensaje a un tema, Kafka asigna un desplazamiento a ese mensaje. Los consumidores utilizan estos desplazamientos para leer mensajes de manera ordenada y para mantener su posición en el log.
Ejemplo de Desplazamientos
Supongamos que tenemos un tema con una partición y los siguientes mensajes:
| Desplazamiento (Offset) | Clave (Key) | Valor (Value) |
|---|---|---|
| 0 | user123 | User login event |
| 1 | user456 | User logout event |
| 2 | user789 | User purchase event |
Un consumidor que ha leído hasta el desplazamiento 1 sabe que su próximo mensaje a leer tiene el desplazamiento 2.
Ejemplo Práctico: Producción y Consumo de Mensajes
Producción de Mensajes
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "user123", "User login event");
producer.send(record);
producer.close();
}
}Consumo de Mensajes
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value());
}
}
}
}Ejercicio Práctico
Ejercicio 1: Producción y Consumo de Mensajes
- Objetivo: Crear un productor y un consumidor en Java que envíen y reciban mensajes en un tema de Kafka.
- Instrucciones:
- Configura un clúster de Kafka localmente.
- Escribe un programa en Java para producir mensajes con claves y valores.
- Escribe un programa en Java para consumir esos mensajes y mostrar los desplazamientos.
Solución
Productor
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key" + i, "value" + i);
producer.send(record);
}
producer.close();
}
}Consumidor
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class MyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value());
}
}
}
}Resumen
En esta sección, hemos aprendido sobre los mensajes y los desplazamientos en Kafka. Los mensajes son la unidad básica de datos en Kafka, y los desplazamientos permiten a los consumidores rastrear su posición en el flujo de mensajes. También hemos visto ejemplos prácticos de cómo producir y consumir mensajes en Kafka utilizando Java. Estos conceptos son fundamentales para trabajar eficazmente con Kafka y garantizar la entrega ordenada y confiable de mensajes.
En el próximo tema, exploraremos los Productores y Consumidores en mayor detalle, incluyendo cómo configurar y optimizar estos componentes para diferentes casos de uso.
Curso de Kafka
Módulo 1: Introducción a Kafka
- ¿Qué es Kafka?
- Casos de Uso de Kafka
- Visión General de la Arquitectura de Kafka
- Configuración de Kafka
Módulo 2: Conceptos Básicos de Kafka
Módulo 3: Operaciones de Kafka
Módulo 4: Configuración y Gestión de Kafka
Módulo 5: Temas Avanzados de Kafka
- Ajuste de Rendimiento de Kafka
- Kafka en una Configuración Multi-Centro de Datos
- Kafka con Registro de Esquemas
- Kafka Streams Avanzado
