En este tema, exploraremos los conceptos fundamentales de los mensajes y los desplazamientos (offsets) en Apache Kafka. Estos conceptos son cruciales para entender cómo Kafka maneja la transmisión de datos y garantiza la entrega ordenada y confiable de mensajes.

¿Qué es un Mensaje en Kafka?

Un mensaje en Kafka es la unidad básica de datos que se transmite entre productores y consumidores. Cada mensaje contiene:

  • Clave (Key): Opcional, utilizada para determinar la partición a la que se enviará el mensaje.
  • Valor (Value): El contenido del mensaje.
  • Encabezados (Headers): Opcionales, metadatos adicionales.
  • Timestamp: Marca de tiempo que indica cuándo se produjo el mensaje.

Ejemplo de un Mensaje

{
  "key": "user123",
  "value": "User login event",
  "headers": {
    "eventType": "login"
  },
  "timestamp": 1633024800000
}

¿Qué es un Desplazamiento (Offset)?

El desplazamiento es un número secuencial único que Kafka asigna a cada mensaje dentro de una partición. Los desplazamientos permiten a los consumidores rastrear su posición en el flujo de mensajes y garantizar que no se pierdan ni se dupliquen mensajes.

Características de los Desplazamientos

  • Secuenciales: Cada mensaje en una partición tiene un desplazamiento único y secuencial.
  • Inmutables: Una vez asignado, el desplazamiento de un mensaje no cambia.
  • Persistentes: Los desplazamientos se almacenan en el log de Kafka y se pueden recuperar en caso de fallos.

Cómo Funcionan los Desplazamientos

Cuando un productor envía un mensaje a un tema, Kafka asigna un desplazamiento a ese mensaje. Los consumidores utilizan estos desplazamientos para leer mensajes de manera ordenada y para mantener su posición en el log.

Ejemplo de Desplazamientos

Supongamos que tenemos un tema con una partición y los siguientes mensajes:

Desplazamiento (Offset) Clave (Key) Valor (Value)
0 user123 User login event
1 user456 User logout event
2 user789 User purchase event

Un consumidor que ha leído hasta el desplazamiento 1 sabe que su próximo mensaje a leer tiene el desplazamiento 2.

Ejemplo Práctico: Producción y Consumo de Mensajes

Producción de Mensajes

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "user123", "User login event");

        producer.send(record);
        producer.close();
    }
}

Consumo de Mensajes

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

Ejercicio Práctico

Ejercicio 1: Producción y Consumo de Mensajes

  1. Objetivo: Crear un productor y un consumidor en Java que envíen y reciban mensajes en un tema de Kafka.
  2. Instrucciones:
    • Configura un clúster de Kafka localmente.
    • Escribe un programa en Java para producir mensajes con claves y valores.
    • Escribe un programa en Java para consumir esos mensajes y mostrar los desplazamientos.

Solución

Productor

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class MyProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key" + i, "value" + i);
            producer.send(record);
        }
        producer.close();
    }
}

Consumidor

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

Resumen

En esta sección, hemos aprendido sobre los mensajes y los desplazamientos en Kafka. Los mensajes son la unidad básica de datos en Kafka, y los desplazamientos permiten a los consumidores rastrear su posición en el flujo de mensajes. También hemos visto ejemplos prácticos de cómo producir y consumir mensajes en Kafka utilizando Java. Estos conceptos son fundamentales para trabajar eficazmente con Kafka y garantizar la entrega ordenada y confiable de mensajes.

En el próximo tema, exploraremos los Productores y Consumidores en mayor detalle, incluyendo cómo configurar y optimizar estos componentes para diferentes casos de uso.

© Copyright 2024. Todos los derechos reservados