Striim 3.10.3 documentación
Kafka, Escritor
Escribe a un tema en Apache Kafka.
Hay cinco versiones de KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0, y 2.1.0. Utilice el que corresponda al broker Kafka objetivo. Por ejemplo, para usar 0.9.0, la sintaxis es CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. Si escribe en la instancia interna de Kafka, use 0.11.0
.
Problema conocido DEV-13039: la aplicación con KafkaWriter 0.9 o 0.10 se bloquea si el agente Kafka se desconecta.
propiedad |
tipo |
valor predeterminado |
notas |
|
---|---|---|---|---|
Corredor de Dirección |
String |
|||
Kafka Config |
String |
Opcionalmente, especifique Kafka productor de propiedades, separados por punto y coma. Consulte la tabla a continuación para obtener más detalles. |
||
Kafka Config Propiedad Separador |
String |
Especificar un separador diferente si uno de los productores de la propiedad de los valores especificados en KafkaConfig contiene un punto y coma. |
||
Kafka Config Valor de Separador |
String |
= |
Especificar un separador diferente si uno de los productores de la propiedad de los valores especificados en KafkaConfig contiene un símbolo de igualdad. |
|
Encabezado de Mensaje |
String |
Opcionalmente, si el uso de Kafka 0.11 o posterior en modo asíncrono, o en modo de sincronización con KafkaConfig
una cadena estática: por ejemplo, una función: por ejemplo, para obtener el nombre de la tabla de origen de un flujo de entrada WAEvent que es la salida de un lector CDC, Para especificar varios encabezados personalizados, sepárelos con punto y coma. |
||
Clave de mensaje |
Cadena |
Opcionalmente, si utiliza Kafka 0.11 o posterior en modo asíncrono, o en modo de sincronización con KafkaConfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) Entre otras posibilidades, puede usar esta propiedad para admitir la compactación de registros o para permitir que las aplicaciones posteriores usen consultas basadas en la carga útil del mensaje.. |
||
Modo |
String |
Sync |
consulte Configuración de KafkaWriter de modo de la propiedad: sincronización frente a async |
|
Hilos en Paralelo |
Integer |
Consulte Creación de varias escritor instancias. |
||
Clave de Partición |
String |
El nombre de un campo en el flujo de entrada cuyos valores determinan cómo los eventos se distribuyen entre varias particiones. Los eventos con el mismo valor de campo de clave de partición se escribirán en la misma partición. Si el flujo de entrada es de cualquier tipo excepto WAEvent, especifique el nombre de uno de sus campos. Si el flujo de entrada es del tipo WAEvent, especifique un campo en el mapa de METADATOS (consulte campos HP NonStop reader WAEvent, campos MySQLReader WAEvent, campos OracleReader WAEvent o campos MS SQL Reader WAEvent) utilizando la sintaxis |
||
Tema |
String |
La existente Kafka tema de escritura (no se crea si no existe). Si más de un escritor de Kafka escribe en el mismo tema, no se admite la recuperación (consulte Recuperación de aplicaciones. (Se admite la recuperación cuando se utilizan Subprocesos paralelos.) |
Este adaptador tiene una opción de formatters. Consulte Combinaciones de escritor y formateador compatibles para obtener más información.
Notas sobre la propiedad KafkaConfig
Con las excepciones indicadas en la siguiente tabla, puede especificar cualquier propiedad de Kafka producer en KafkaConfig.
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms reintentos |
|
activar.idempotencia |
Al usar la versión 2.1.0 y el modo asíncrono, establezca el valor true para escribir eventos en orden (consulte tecla |
.el valor del deserializador |
siempre es org.apache.kafka.común.serialización.ByteArrayDeserializer, no puede ser anulado por KafkaConfig |
Internamente, KafkaWriter invoca KafkaConsumer para varios fines, y la ADVERTENCIA de la API de consumo debido a la aprobación de propiedades KafkaConfig se puede ignorar de forma segura. Consulte Configuración de Kafka para obtener más información sobre las propiedades del productor de Kafka.
Aplicación de ejemplo de KafkaWriter
El siguiente código de ejemplo escribe datos de PosDataPreview.csv
al tema de Kafka KafkaWriterSample
. Este tema ya existe en la instancia interna de Kafka de Striim. Si está utilizando una instancia de Kafka externa, debe crear el tema antes de ejecutar la aplicación.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
Puede verificar que los datos se escribieron en Kafka ejecutando la aplicación de ejemplo de lector Kafka.
El primer campo de la salida (position
) almacena la información necesaria para evitar eventos perdidos o duplicados después de la recuperación (consulte Recuperación de aplicaciones). Si la recuperación no está habilitada, su valor es NULL.
mon
la salida (consulte Uso del comando MON) para destinos que utilizan KafkaWriter incluye:
-
solo en modo asíncrono, Tasa de bytes enviados: cuántos megabytes por segundo se enviaron a los corredores
-
tanto en sincronización como en asíncrono modo, Tasa de bytes de escritura: cuántos megabytes por segundo escribieron los corredores y el acuse de recibo recibido por Striim
Habilitar la compresión
Cuando habilita la compresión en KafkaWriter, el corredor y el consumidor deben manejar los lotes comprimidos automáticamente. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'