Striim 3.10.3 documentation
Kafka Writer
escreve para um tópico no Apache Kafka.
Existem cinco versões de kafkawriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 e 2.1.0. Use o que corresponde ao corretor Kafka alvo. Por exemplo, para usar 0.9.0, a sintaxe é CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Se escrever para a instância Kafka interna, use 0.11.0.
problema conhecido DEV-13039: aplicação com kafkawriter 0.9 ou 0.10 falhas se o corretor de Kafka ficar offline.
|
propriedade |
tipo |
valor padrão |
notas |
|---|---|---|---|
|
Corretor Endereço |
String |
||
|
Kafka Config |
String |
Opcionalmente, especificar Kafka produtor propriedades, separados por ponto-e-vírgula. Veja a tabela abaixo para mais detalhes. |
|
|
Kafka Config Propriedade Separador |
String |
Especificar um separador diferente se o produtor de valores de propriedades especificadas no KafkaConfig contém um ponto-e-vírgula. |
|
|
Kafka Config Valor Separador |
String |
= |
Especificar um separador diferente se o produtor de valores de propriedades especificadas no KafkaConfig contém um símbolo de igual. |
|
Cabeçalho da Mensagem |
String |
Opcionalmente, se utilizando de Kafka 0.11 ou mais tarde no modo async, ou no modo de sincronização com KafkaConfig
para especificar vários cabeçalhos personalizados, separe-os com ponto-e-vírgula. |
|
|
Mensagem-Chave |
String |
Opcionalmente, se utilizando de Kafka 0.11 ou posterior no modo assíncrono, ou no modo de sincronização com KafkaConfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) entre outras possibilidades, você pode usar esta propriedade para suportar a compactação de log ou para permitir que aplicações a jusante usem consultas baseadas na carga de mensagem.. |
|
|
Modo |
String |
Sincronização |
veja a Definição de KafkaWriter modo propriedade: sincronização versus assíncrono |
|
Threads Paralelas |
Integer |
Veja a Criação de vários escritor instâncias. |
|
|
Chave de Partição |
String |
O nome de um campo no fluxo de entrada, cujos valores determinam a forma como os eventos são distribuídos entre várias partições. Os Eventos com o mesmo valor do campo chave da partição serão escritos na mesma partição. Se o fluxo de entrada for de qualquer tipo excepto WAEvent, indique o nome de um dos seus campos. Se o fluxo de entrada é de WAEvent tipo, especifique um campo de METADADOS mapa (veja o HP NonStop leitor WAEvent campos, MySQLReader WAEvent campos, OracleReader WAEvent campos, ou MS SQL Leitor WAEvent campos) usando a sintaxe |
|
|
Tópico |
String |
O existente Kafka tópico para escrever (não será criado se ele não existir). Se mais de um escritor Kafka escrever para o mesmo tópico, a recuperação não é suportada (veja aplicações de recuperação. (Recuperação é suportada ao usar fios paralelos.) |
Esta placa tem uma escolha de formatadores. Veja combinações de writer-formatter suportadas para mais informações.
Notas sobre a propriedade kafkaconfig
com as exceções indicadas na tabela seguinte, você pode especificar qualquer propriedade do produtor Kafka em KafkaConfig.
|
Kafka producer property |
notes |
|---|---|
|
acks |
|
|
batch.size linger.ms repetições |
|
|
ativar.idempotence |
ao usar a versão 2.1.0 e modo async, configurado como verdadeiro para escrever eventos em ordem (ver |
|
chave.o valor do deserializador |
é sempre org.Apache.kafka.comum.seriacao.ByteArrayDeserializer, não pode ser substituído por KafkaConfig |
Internamente, KafkaWriter invoca KafkaConsumer para vários fins, e que o AVISO de que o consumidor API devido à passagem KafkaConfig propriedades podem ser ignorados com segurança. Veja a configuração do Kafka para mais informações sobre as propriedades do produtor do Kafka.
kafkawriter sample application
The following sample code writes data from PosDataPreview.csv to the Kafka topicKafkaWriterSample. Este tópico já existe na instância interna do Striim Kafka. Se você estiver usando uma instância Kafka externa, você deve criar o tópico antes de executar a aplicação.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
pode verificar se os dados foram escritos ao Kafka, executando a aplicação de amostras do leitor do Kafka.
o primeiro campo na saída () armazena a informação necessária para evitar eventos perdidos ou duplicados após a recuperação (ver aplicações em recuperação). Se a recuperação não estiver ativada, seu valor é nulo.
mon saída (consulte Usando o MON de comando) para alvos usando KafkaWriter inclui:
-
no modo assíncrono só, Bytes Enviados Taxa: quantos megabytes por segundo foram enviados para os corretores
-
sincronização e modo assíncrono, Bytes de gravação de Taxa: quantos megabytes por segundo foram escritos pelos corretores e confirmação recebida pelo Striim
Activar compressão
Quando você ativar a compactação em KafkaWriter, o corretor e o consumidor deve lidar com o comprimido lotes automaticamente. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:
KafkaConfig:'compression.type=snappy'