Striim 3.10.3 documentación

Kafka, Escritor

Escribe a un tema en Apache Kafka.

Hay cinco versiones de KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0, y 2.1.0. Utilice el que corresponda al broker Kafka objetivo. Por ejemplo, para usar 0.9.0, la sintaxis es CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Si escribe en la instancia interna de Kafka, use 0.11.0.

Problema conocido DEV-13039: la aplicación con KafkaWriter 0.9 o 0.10 se bloquea si el agente Kafka se desconecta.

propiedad

tipo

valor predeterminado

notas

Corredor de Dirección

String

Kafka Config

String

Opcionalmente, especifique Kafka productor de propiedades, separados por punto y coma. Consulte la tabla a continuación para obtener más detalles.

Kafka Config Propiedad Separador

String

Especificar un separador diferente si uno de los productores de la propiedad de los valores especificados en KafkaConfig contiene un punto y coma.

Kafka Config Valor de Separador

String

=

Especificar un separador diferente si uno de los productores de la propiedad de los valores especificados en KafkaConfig contiene un símbolo de igualdad.

Encabezado de Mensaje

String

Opcionalmente, si el uso de Kafka 0.11 o posterior en modo asíncrono, o en modo de sincronización con KafkaConfig batch.size=-1, especifique uno o más encabezados personalizados que se agregarán a los mensajes como pares clave-valor. Los valores pueden ser:

  • un nombre de campo de un puesto de secuencia de un tipo definido por el usuario, por ejemplo: MerchantID=merchantID

  • una cadena estática: por ejemplo, Company="My Company"

  • una función: por ejemplo, para obtener el nombre de la tabla de origen de un flujo de entrada WAEvent que es la salida de un lector CDC, Table Name=@metadata(TableName)

  • Para especificar varios encabezados personalizados, sepárelos con punto y coma.

    Clave de mensaje

    Cadena

    Opcionalmente, si utiliza Kafka 0.11 o posterior en modo asíncrono, o en modo de sincronización con KafkaConfig batch.size=-1, especifique una o más claves que se agregarán a los mensajes como pares clave-valor. El valor de la propiedad puede ser una cadena estática, uno o más campos del flujo de entrada, o una combinación de ambos. Ejemplos:

    MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

    Entre otras posibilidades, puede usar esta propiedad para admitir la compactación de registros o para permitir que las aplicaciones posteriores usen consultas basadas en la carga útil del mensaje..

    Modo

    String

    Sync

    consulte Configuración de KafkaWriter de modo de la propiedad: sincronización frente a async

    Hilos en Paralelo

    Integer

    Consulte Creación de varias escritor instancias.

    Clave de Partición

    String

    El nombre de un campo en el flujo de entrada cuyos valores determinan cómo los eventos se distribuyen entre varias particiones. Los eventos con el mismo valor de campo de clave de partición se escribirán en la misma partición.

    Si el flujo de entrada es de cualquier tipo excepto WAEvent, especifique el nombre de uno de sus campos.

    Si el flujo de entrada es del tipo WAEvent, especifique un campo en el mapa de METADATOS (consulte campos HP NonStop reader WAEvent, campos MySQLReader WAEvent, campos OracleReader WAEvent o campos MS SQL Reader WAEvent) utilizando la sintaxis @METADATA(<field name>), o un campo en el mapa de DATOS de usuario (consulte Agregar datos definidos por el usuario a flujos WAEvent), utilizando la sintaxis @USERDATA(<field name>).

    Tema

    String

    La existente Kafka tema de escritura (no se crea si no existe). Si más de un escritor de Kafka escribe en el mismo tema, no se admite la recuperación (consulte Recuperación de aplicaciones. (Se admite la recuperación cuando se utilizan Subprocesos paralelos.)

    Este adaptador tiene una opción de formatters. Consulte Combinaciones de escritor y formateador compatibles para obtener más información.

    Notas sobre la propiedad KafkaConfig

    Con las excepciones indicadas en la siguiente tabla, puede especificar cualquier propiedad de Kafka producer en KafkaConfig.

    Kafka producer property

    notes

    acks

    • in sync mode, may be set to 1 or all

    • in async mode, may be set to 0, 1, or all

    batch.size

    linger.ms

    reintentos

    • En modo de sincronización, para evitar eventos fuera de orden, las propiedades del productor establecidas en Kafka con se mantendrán sin cambios e ignoradas, y Striim las manejará internamente.

    • En modo asíncrono, Striim actualizará las propiedades del productor de Kafka y Kafka las gestionará.

    • En el modo de sincronización, puede configurar batch.size=-1 para escribir un evento por mensaje Kafka. Esto degradará seriamente el rendimiento, por lo que no se recomienda en un entorno de producción. Con esta configuración, los mensajes serán similares a los del modo asíncrono.

    activar.idempotencia

    Al usar la versión 2.1.0 y el modo asíncrono, establezca el valor true para escribir eventos en orden (consulte tecla

    .el valor del deserializador

    siempre es org.apache.kafka.común.serialización.ByteArrayDeserializer, no puede ser anulado por KafkaConfig

    Internamente, KafkaWriter invoca KafkaConsumer para varios fines, y la ADVERTENCIA de la API de consumo debido a la aprobación de propiedades KafkaConfig se puede ignorar de forma segura. Consulte Configuración de Kafka para obtener más información sobre las propiedades del productor de Kafka.

    Aplicación de ejemplo de KafkaWriter

    El siguiente código de ejemplo escribe datos de PosDataPreview.csval tema de Kafka KafkaWriterSample. Este tema ya existe en la instancia interna de Kafka de Striim. Si está utilizando una instancia de Kafka externa, debe crear el tema antes de ejecutar la aplicación.

    CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

    Puede verificar que los datos se escribieron en Kafka ejecutando la aplicación de ejemplo de lector Kafka.

    El primer campo de la salida (position) almacena la información necesaria para evitar eventos perdidos o duplicados después de la recuperación (consulte Recuperación de aplicaciones). Si la recuperación no está habilitada, su valor es NULL.

    mon la salida (consulte Uso del comando MON) para destinos que utilizan KafkaWriter incluye:

    • solo en modo asíncrono, Tasa de bytes enviados: cuántos megabytes por segundo se enviaron a los corredores

    • tanto en sincronización como en asíncrono modo, Tasa de bytes de escritura: cuántos megabytes por segundo escribieron los corredores y el acuse de recibo recibido por Striim

    Habilitar la compresión

    Cuando habilita la compresión en KafkaWriter, el corredor y el consumidor deben manejar los lotes comprimidos automáticamente. No additional configuration should be required in Kafka.

    To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

    KafkaConfg:'compression.codec=snappy'

    To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

    KafkaConfig:'compression.type=snappy'



    Deja una respuesta

    Tu dirección de correo electrónico no será publicada.