En este módulo, aprenderemos cómo consumir mensajes de un clúster de Kafka. Entenderemos los conceptos clave, veremos ejemplos prácticos y realizaremos ejercicios para reforzar lo aprendido.
Conceptos Clave
Consumidores
- Consumidor: Una aplicación que suscribe a uno o más temas y procesa los mensajes publicados en ellos.
- Grupo de Consumidores: Un conjunto de consumidores que trabajan juntos para consumir mensajes de uno o más temas. Cada mensaje se entrega a un solo consumidor dentro del grupo.
Desplazamientos (Offsets)
- Desplazamiento: Un número que identifica de manera única cada mensaje dentro de una partición. Los consumidores utilizan los desplazamientos para rastrear qué mensajes han sido consumidos.
Rebalanceo
- Rebalanceo: Proceso mediante el cual Kafka redistribuye las particiones entre los consumidores de un grupo cuando hay cambios en el grupo (por ejemplo, cuando un consumidor se une o se va).
Ejemplo Práctico: Consumidor en Java
Configuración del Consumidor
Primero, necesitamos configurar nuestro consumidor. Aquí hay un ejemplo de configuración básica en Java:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Properties; public class ConsumerExample { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // Resto del código } }
Suscripción a Temas
Después de configurar el consumidor, debemos suscribirnos a uno o más temas:
Consumo de Mensajes
Finalmente, podemos consumir mensajes en un bucle:
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
Cierre del Consumidor
Es importante cerrar el consumidor para liberar los recursos:
Código Completo
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class ConsumerExample { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("my-topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } } }
Ejercicio Práctico
Ejercicio 1: Consumir Mensajes de un Tema
- Configura un consumidor en Java utilizando las propiedades adecuadas.
- Suscríbete a un tema llamado "test-topic".
- Consume y muestra los mensajes en la consola.
Solución
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class ConsumerExercise { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "exercise-group"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("test-topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } } }
Errores Comunes y Consejos
Errores Comunes
- No cerrar el consumidor: Puede llevar a fugas de recursos.
- No manejar el rebalanceo: Puede causar que los mensajes se procesen más de una vez o se pierdan.
Consejos
- Usar un grupo de consumidores: Para escalar la aplicación y distribuir la carga.
- Configurar el tiempo de espera de la encuesta (poll timeout): Para equilibrar entre la latencia y el uso de CPU.
Conclusión
En esta sección, hemos aprendido cómo configurar y utilizar un consumidor de Kafka en Java. Hemos cubierto los conceptos clave, visto un ejemplo práctico y realizado un ejercicio para reforzar lo aprendido. En el próximo módulo, exploraremos Kafka Connect y cómo integrarlo con otros sistemas.
Curso de Kafka
Módulo 1: Introducción a Kafka
- ¿Qué es Kafka?
- Casos de Uso de Kafka
- Visión General de la Arquitectura de Kafka
- Configuración de Kafka
Módulo 2: Conceptos Básicos de Kafka
Módulo 3: Operaciones de Kafka
Módulo 4: Configuración y Gestión de Kafka
Módulo 5: Temas Avanzados de Kafka
- Ajuste de Rendimiento de Kafka
- Kafka en una Configuración Multi-Centro de Datos
- Kafka con Registro de Esquemas
- Kafka Streams Avanzado