Introducción
Kafka Connect es una herramienta integral para la integración de datos en Apache Kafka. Permite la conexión de Kafka con sistemas externos, como bases de datos, sistemas de almacenamiento, y otros sistemas de mensajería, de una manera sencilla y escalable. En este módulo, aprenderemos sobre los conceptos básicos de Kafka Connect, cómo configurarlo y cómo utilizarlo para mover datos hacia y desde Kafka.
Conceptos Clave
- Conectores
- Fuente (Source) Conectores: Permiten la importación de datos desde sistemas externos a Kafka.
- Sumidero (Sink) Conectores: Permiten la exportación de datos desde Kafka a sistemas externos.
- Tareas
- Tareas (Tasks): Son las unidades de trabajo que ejecutan los conectores. Un conector puede tener múltiples tareas para paralelizar el trabajo.
- Trabajadores
- Trabajadores (Workers): Son las instancias de Kafka Connect que ejecutan las tareas. Pueden ser distribuidos o independientes.
- Configuración
- Configuración de Conectores: Define cómo se comportan los conectores, incluyendo las propiedades de conexión y las transformaciones de datos.
Configuración de Kafka Connect
Paso 1: Instalación de Kafka Connect
Kafka Connect viene incluido con la distribución de Apache Kafka. No se requiere una instalación separada.
Paso 2: Configuración de un Conector de Fuente
Ejemplo: Conector de Fuente JDBC
{ "name": "jdbc-source-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://localhost:3306/mydatabase", "connection.user": "myuser", "connection.password": "mypassword", "table.whitelist": "mytable", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "jdbc-" } }
- name: Nombre del conector.
- connector.class: Clase del conector.
- tasks.max: Número máximo de tareas.
- connection.url: URL de conexión a la base de datos.
- connection.user: Usuario de la base de datos.
- connection.password: Contraseña de la base de datos.
- table.whitelist: Lista de tablas a importar.
- mode: Modo de importación (incremental en este caso).
- incrementing.column.name: Columna utilizada para el seguimiento incremental.
- topic.prefix: Prefijo para los temas de Kafka.
Paso 3: Configuración de un Conector de Sumidero
Ejemplo: Conector de Sumidero Elasticsearch
{ "name": "elasticsearch-sink-connector", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "topics": "jdbc-mytable", "connection.url": "http://localhost:9200", "type.name": "kafka-connect", "key.ignore": "true", "schema.ignore": "true" } }
- name: Nombre del conector.
- connector.class: Clase del conector.
- tasks.max: Número máximo de tareas.
- topics: Temas de Kafka a exportar.
- connection.url: URL de conexión a Elasticsearch.
- type.name: Tipo de documento en Elasticsearch.
- key.ignore: Ignorar la clave del mensaje.
- schema.ignore: Ignorar el esquema del mensaje.
Ejercicio Práctico
Ejercicio 1: Configurar un Conector de Fuente JDBC
- Objetivo: Configurar un conector de fuente JDBC para importar datos desde una base de datos MySQL a Kafka.
- Pasos:
- Configura una base de datos MySQL con una tabla llamada
employees
. - Inserta algunos datos de ejemplo en la tabla
employees
. - Configura un conector de fuente JDBC para importar los datos de
employees
a un tema de Kafka llamadojdbc-employees
.
- Configura una base de datos MySQL con una tabla llamada
Solución:
{ "name": "jdbc-source-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://localhost:3306/mydatabase", "connection.user": "myuser", "connection.password": "mypassword", "table.whitelist": "employees", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "jdbc-" } }
Ejercicio 2: Configurar un Conector de Sumidero Elasticsearch
- Objetivo: Configurar un conector de sumidero Elasticsearch para exportar datos desde un tema de Kafka a Elasticsearch.
- Pasos:
- Configura un clúster de Elasticsearch.
- Configura un conector de sumidero Elasticsearch para exportar los datos del tema
jdbc-employees
a un índice de Elasticsearch llamadoemployees
.
Solución:
{ "name": "elasticsearch-sink-connector", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "topics": "jdbc-employees", "connection.url": "http://localhost:9200", "type.name": "kafka-connect", "key.ignore": "true", "schema.ignore": "true" } }
Resumen
En este módulo, hemos aprendido sobre Kafka Connect, una herramienta poderosa para la integración de datos en Kafka. Hemos cubierto los conceptos clave, cómo configurar conectores de fuente y sumidero, y hemos realizado ejercicios prácticos para reforzar el aprendizaje. Con Kafka Connect, puedes mover datos de manera eficiente y escalable entre Kafka y otros sistemas, facilitando la construcción de pipelines de datos robustos.
Curso de Kafka
Módulo 1: Introducción a Kafka
- ¿Qué es Kafka?
- Casos de Uso de Kafka
- Visión General de la Arquitectura de Kafka
- Configuración de Kafka
Módulo 2: Conceptos Básicos de Kafka
Módulo 3: Operaciones de Kafka
Módulo 4: Configuración y Gestión de Kafka
Módulo 5: Temas Avanzados de Kafka
- Ajuste de Rendimiento de Kafka
- Kafka en una Configuración Multi-Centro de Datos
- Kafka con Registro de Esquemas
- Kafka Streams Avanzado