Striim 3.10.3 documentation
Kafka Writer
escreve para um tópico no Apache Kafka.
Existem cinco versões de kafkawriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 e 2.1.0. Use o que corresponde ao corretor Kafka alvo. Por exemplo, para usar 0.9.0, a sintaxe é CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. Se escrever para a instância Kafka interna, use 0.11.0
.
problema conhecido DEV-13039: aplicação com kafkawriter 0.9 ou 0.10 falhas se o corretor de Kafka ficar offline.
propriedade |
tipo |
valor padrão |
notas |
---|---|---|---|
Corretor Endereço |
String |
||
Kafka Config |
String |
Opcionalmente, especificar Kafka produtor propriedades, separados por ponto-e-vírgula. Veja a tabela abaixo para mais detalhes. |
|
Kafka Config Propriedade Separador |
String |
Especificar um separador diferente se o produtor de valores de propriedades especificadas no KafkaConfig contém um ponto-e-vírgula. |
|
Kafka Config Valor Separador |
String |
= |
Especificar um separador diferente se o produtor de valores de propriedades especificadas no KafkaConfig contém um símbolo de igual. |
Cabeçalho da Mensagem |
String |
Opcionalmente, se utilizando de Kafka 0.11 ou mais tarde no modo async, ou no modo de sincronização com KafkaConfig
para especificar vários cabeçalhos personalizados, separe-os com ponto-e-vírgula. |
|
Mensagem-Chave |
String |
Opcionalmente, se utilizando de Kafka 0.11 ou posterior no modo assíncrono, ou no modo de sincronização com KafkaConfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) entre outras possibilidades, você pode usar esta propriedade para suportar a compactação de log ou para permitir que aplicações a jusante usem consultas baseadas na carga de mensagem.. |
|
Modo |
String |
Sincronização |
veja a Definição de KafkaWriter modo propriedade: sincronização versus assíncrono |
Threads Paralelas |
Integer |
Veja a Criação de vários escritor instâncias. |
|
Chave de Partição |
String |
O nome de um campo no fluxo de entrada, cujos valores determinam a forma como os eventos são distribuídos entre várias partições. Os Eventos com o mesmo valor do campo chave da partição serão escritos na mesma partição. Se o fluxo de entrada for de qualquer tipo excepto WAEvent, indique o nome de um dos seus campos. Se o fluxo de entrada é de WAEvent tipo, especifique um campo de METADADOS mapa (veja o HP NonStop leitor WAEvent campos, MySQLReader WAEvent campos, OracleReader WAEvent campos, ou MS SQL Leitor WAEvent campos) usando a sintaxe |
|
Tópico |
String |
O existente Kafka tópico para escrever (não será criado se ele não existir). Se mais de um escritor Kafka escrever para o mesmo tópico, a recuperação não é suportada (veja aplicações de recuperação. (Recuperação é suportada ao usar fios paralelos.) |
Esta placa tem uma escolha de formatadores. Veja combinações de writer-formatter suportadas para mais informações.
Notas sobre a propriedade kafkaconfig
com as exceções indicadas na tabela seguinte, você pode especificar qualquer propriedade do produtor Kafka em KafkaConfig.
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms repetições |
|
ativar.idempotence |
ao usar a versão 2.1.0 e modo async, configurado como verdadeiro para escrever eventos em ordem (ver |
chave.o valor do deserializador |
é sempre org.Apache.kafka.comum.seriacao.ByteArrayDeserializer, não pode ser substituído por KafkaConfig |
Internamente, KafkaWriter invoca KafkaConsumer para vários fins, e que o AVISO de que o consumidor API devido à passagem KafkaConfig propriedades podem ser ignorados com segurança. Veja a configuração do Kafka para mais informações sobre as propriedades do produtor do Kafka.
kafkawriter sample application
The following sample code writes data from PosDataPreview.csv
to the Kafka topicKafkaWriterSample
. Este tópico já existe na instância interna do Striim Kafka. Se você estiver usando uma instância Kafka externa, você deve criar o tópico antes de executar a aplicação.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
pode verificar se os dados foram escritos ao Kafka, executando a aplicação de amostras do leitor do Kafka.
o primeiro campo na saída () armazena a informação necessária para evitar eventos perdidos ou duplicados após a recuperação (ver aplicações em recuperação). Se a recuperação não estiver ativada, seu valor é nulo.
mon
saída (consulte Usando o MON de comando) para alvos usando KafkaWriter inclui:
-
no modo assíncrono só, Bytes Enviados Taxa: quantos megabytes por segundo foram enviados para os corretores
-
sincronização e modo assíncrono, Bytes de gravação de Taxa: quantos megabytes por segundo foram escritos pelos corretores e confirmação recebida pelo Striim
Activar compressão
Quando você ativar a compactação em KafkaWriter, o corretor e o consumidor deve lidar com o comprimido lotes automaticamente. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'