Introducción

Kafka Connect es una herramienta integral para la integración de datos en Apache Kafka. Permite la conexión de Kafka con sistemas externos, como bases de datos, sistemas de almacenamiento, y otros sistemas de mensajería, de una manera sencilla y escalable. En este módulo, aprenderemos sobre los conceptos básicos de Kafka Connect, cómo configurarlo y cómo utilizarlo para mover datos hacia y desde Kafka.

Conceptos Clave

  1. Conectores

  • Fuente (Source) Conectores: Permiten la importación de datos desde sistemas externos a Kafka.
  • Sumidero (Sink) Conectores: Permiten la exportación de datos desde Kafka a sistemas externos.

  1. Tareas

  • Tareas (Tasks): Son las unidades de trabajo que ejecutan los conectores. Un conector puede tener múltiples tareas para paralelizar el trabajo.

  1. Trabajadores

  • Trabajadores (Workers): Son las instancias de Kafka Connect que ejecutan las tareas. Pueden ser distribuidos o independientes.

  1. Configuración

  • Configuración de Conectores: Define cómo se comportan los conectores, incluyendo las propiedades de conexión y las transformaciones de datos.

Configuración de Kafka Connect

Paso 1: Instalación de Kafka Connect

Kafka Connect viene incluido con la distribución de Apache Kafka. No se requiere una instalación separada.

Paso 2: Configuración de un Conector de Fuente

Ejemplo: Conector de Fuente JDBC

{
  "name": "jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://localhost:3306/mydatabase",
    "connection.user": "myuser",
    "connection.password": "mypassword",
    "table.whitelist": "mytable",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "jdbc-"
  }
}
  • name: Nombre del conector.
  • connector.class: Clase del conector.
  • tasks.max: Número máximo de tareas.
  • connection.url: URL de conexión a la base de datos.
  • connection.user: Usuario de la base de datos.
  • connection.password: Contraseña de la base de datos.
  • table.whitelist: Lista de tablas a importar.
  • mode: Modo de importación (incremental en este caso).
  • incrementing.column.name: Columna utilizada para el seguimiento incremental.
  • topic.prefix: Prefijo para los temas de Kafka.

Paso 3: Configuración de un Conector de Sumidero

Ejemplo: Conector de Sumidero Elasticsearch

{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "jdbc-mytable",
    "connection.url": "http://localhost:9200",
    "type.name": "kafka-connect",
    "key.ignore": "true",
    "schema.ignore": "true"
  }
}
  • name: Nombre del conector.
  • connector.class: Clase del conector.
  • tasks.max: Número máximo de tareas.
  • topics: Temas de Kafka a exportar.
  • connection.url: URL de conexión a Elasticsearch.
  • type.name: Tipo de documento en Elasticsearch.
  • key.ignore: Ignorar la clave del mensaje.
  • schema.ignore: Ignorar el esquema del mensaje.

Ejercicio Práctico

Ejercicio 1: Configurar un Conector de Fuente JDBC

  1. Objetivo: Configurar un conector de fuente JDBC para importar datos desde una base de datos MySQL a Kafka.
  2. Pasos:
    • Configura una base de datos MySQL con una tabla llamada employees.
    • Inserta algunos datos de ejemplo en la tabla employees.
    • Configura un conector de fuente JDBC para importar los datos de employees a un tema de Kafka llamado jdbc-employees.

Solución:

{
  "name": "jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://localhost:3306/mydatabase",
    "connection.user": "myuser",
    "connection.password": "mypassword",
    "table.whitelist": "employees",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "jdbc-"
  }
}

Ejercicio 2: Configurar un Conector de Sumidero Elasticsearch

  1. Objetivo: Configurar un conector de sumidero Elasticsearch para exportar datos desde un tema de Kafka a Elasticsearch.
  2. Pasos:
    • Configura un clúster de Elasticsearch.
    • Configura un conector de sumidero Elasticsearch para exportar los datos del tema jdbc-employees a un índice de Elasticsearch llamado employees.

Solución:

{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "jdbc-employees",
    "connection.url": "http://localhost:9200",
    "type.name": "kafka-connect",
    "key.ignore": "true",
    "schema.ignore": "true"
  }
}

Resumen

En este módulo, hemos aprendido sobre Kafka Connect, una herramienta poderosa para la integración de datos en Kafka. Hemos cubierto los conceptos clave, cómo configurar conectores de fuente y sumidero, y hemos realizado ejercicios prácticos para reforzar el aprendizaje. Con Kafka Connect, puedes mover datos de manera eficiente y escalable entre Kafka y otros sistemas, facilitando la construcción de pipelines de datos robustos.

© Copyright 2024. Todos los derechos reservados