En este módulo, aprenderemos cómo consumir mensajes de un clúster de Kafka. Entenderemos los conceptos clave, veremos ejemplos prácticos y realizaremos ejercicios para reforzar lo aprendido.
Conceptos Clave
Consumidores
- Consumidor: Una aplicación que suscribe a uno o más temas y procesa los mensajes publicados en ellos.
- Grupo de Consumidores: Un conjunto de consumidores que trabajan juntos para consumir mensajes de uno o más temas. Cada mensaje se entrega a un solo consumidor dentro del grupo.
Desplazamientos (Offsets)
- Desplazamiento: Un número que identifica de manera única cada mensaje dentro de una partición. Los consumidores utilizan los desplazamientos para rastrear qué mensajes han sido consumidos.
Rebalanceo
- Rebalanceo: Proceso mediante el cual Kafka redistribuye las particiones entre los consumidores de un grupo cuando hay cambios en el grupo (por ejemplo, cuando un consumidor se une o se va).
Ejemplo Práctico: Consumidor en Java
Configuración del Consumidor
Primero, necesitamos configurar nuestro consumidor. Aquí hay un ejemplo de configuración básica en Java:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
public class ConsumerExample {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Resto del código
}
}Suscripción a Temas
Después de configurar el consumidor, debemos suscribirnos a uno o más temas:
Consumo de Mensajes
Finalmente, podemos consumir mensajes en un bucle:
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}Cierre del Consumidor
Es importante cerrar el consumidor para liberar los recursos:
Código Completo
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerExample {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}Ejercicio Práctico
Ejercicio 1: Consumir Mensajes de un Tema
- Configura un consumidor en Java utilizando las propiedades adecuadas.
- Suscríbete a un tema llamado "test-topic".
- Consume y muestra los mensajes en la consola.
Solución
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerExercise {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "exercise-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}Errores Comunes y Consejos
Errores Comunes
- No cerrar el consumidor: Puede llevar a fugas de recursos.
- No manejar el rebalanceo: Puede causar que los mensajes se procesen más de una vez o se pierdan.
Consejos
- Usar un grupo de consumidores: Para escalar la aplicación y distribuir la carga.
- Configurar el tiempo de espera de la encuesta (poll timeout): Para equilibrar entre la latencia y el uso de CPU.
Conclusión
En esta sección, hemos aprendido cómo configurar y utilizar un consumidor de Kafka en Java. Hemos cubierto los conceptos clave, visto un ejemplo práctico y realizado un ejercicio para reforzar lo aprendido. En el próximo módulo, exploraremos Kafka Connect y cómo integrarlo con otros sistemas.
Curso de Kafka
Módulo 1: Introducción a Kafka
- ¿Qué es Kafka?
- Casos de Uso de Kafka
- Visión General de la Arquitectura de Kafka
- Configuración de Kafka
Módulo 2: Conceptos Básicos de Kafka
Módulo 3: Operaciones de Kafka
Módulo 4: Configuración y Gestión de Kafka
Módulo 5: Temas Avanzados de Kafka
- Ajuste de Rendimiento de Kafka
- Kafka en una Configuración Multi-Centro de Datos
- Kafka con Registro de Esquemas
- Kafka Streams Avanzado
