Striim 3.10.3 documentation

Kafka Writer

escreve para um tópico no Apache Kafka.

Existem cinco versões de kafkawriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 e 2.1.0. Use o que corresponde ao corretor Kafka alvo. Por exemplo, para usar 0.9.0, a sintaxe é CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Se escrever para a instância Kafka interna, use 0.11.0.

problema conhecido DEV-13039: aplicação com kafkawriter 0.9 ou 0.10 falhas se o corretor de Kafka ficar offline.

propriedade

tipo

valor padrão

notas

Corretor Endereço

String

Kafka Config

String

Opcionalmente, especificar Kafka produtor propriedades, separados por ponto-e-vírgula. Veja a tabela abaixo para mais detalhes.

Kafka Config Propriedade Separador

String

Especificar um separador diferente se o produtor de valores de propriedades especificadas no KafkaConfig contém um ponto-e-vírgula.

Kafka Config Valor Separador

String

=

Especificar um separador diferente se o produtor de valores de propriedades especificadas no KafkaConfig contém um símbolo de igual.

Cabeçalho da Mensagem

String

Opcionalmente, se utilizando de Kafka 0.11 ou mais tarde no modo async, ou no modo de sincronização com KafkaConfig batch.size=-1, especifique um ou mais cabeçalhos personalizados a serem adicionados às mensagens como pares chave-valor. Os valores podem ser:

  • um nome de campo a partir de um posto no fluxo de um tipo definido pelo usuário: por exemplo, MerchantID=merchantID

  • uma seqüência de caracteres estática: por exemplo, Company="My Company"

  • uma função: por exemplo, para obter o nome da tabela de origem de um fluxo de entrada WAEvent que é o resultado de um leitor CDC, Table Name=@metadata(TableName)

para especificar vários cabeçalhos personalizados, separe-os com ponto-e-vírgula.

Mensagem-Chave

String

Opcionalmente, se utilizando de Kafka 0.11 ou posterior no modo assíncrono, ou no modo de sincronização com KafkaConfig batch.size=-1 especifique uma ou mais chaves para ser adicionado às mensagens como pares chave-valor. O valor da propriedade pode ser uma cadeia estática, um ou mais campos do fluxo de entrada, ou uma combinação de ambos. Exemplos:

MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

entre outras possibilidades, você pode usar esta propriedade para suportar a compactação de log ou para permitir que aplicações a jusante usem consultas baseadas na carga de mensagem..

Modo

String

Sincronização

veja a Definição de KafkaWriter modo propriedade: sincronização versus assíncrono

Threads Paralelas

Integer

Veja a Criação de vários escritor instâncias.

Chave de Partição

String

O nome de um campo no fluxo de entrada, cujos valores determinam a forma como os eventos são distribuídos entre várias partições. Os Eventos com o mesmo valor do campo chave da partição serão escritos na mesma partição.

Se o fluxo de entrada for de qualquer tipo excepto WAEvent, indique o nome de um dos seus campos.

Se o fluxo de entrada é de WAEvent tipo, especifique um campo de METADADOS mapa (veja o HP NonStop leitor WAEvent campos, MySQLReader WAEvent campos, OracleReader WAEvent campos, ou MS SQL Leitor WAEvent campos) usando a sintaxe @METADATA(<field name>), ou um campo no objeto USERDATA mapa (consulte Adicionar dados definidos pelo usuário para WAEvent córregos), usando a sintaxe @USERDATA(<field name>).

Tópico

String

O existente Kafka tópico para escrever (não será criado se ele não existir). Se mais de um escritor Kafka escrever para o mesmo tópico, a recuperação não é suportada (veja aplicações de recuperação. (Recuperação é suportada ao usar fios paralelos.)

Esta placa tem uma escolha de formatadores. Veja combinações de writer-formatter suportadas para mais informações.

Notas sobre a propriedade kafkaconfig

com as exceções indicadas na tabela seguinte, você pode especificar qualquer propriedade do produtor Kafka em KafkaConfig.

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

repetições

  • no modo de sincronização, para evitar eventos fora de ordem, as propriedades do produtor definidas em Kafka com serão inalteradas e ignoradas, e Striim irá lidar com estas internamente.

  • no modo async, o Striim irá actualizar as propriedades do produtor Kafka e estas serão tratadas pelo Kafka.

  • no modo de sincronização, poderá definir batch.size=-1 para escrever um evento por mensagem de Kafka. Isto irá degradar seriamente o desempenho, por isso não é recomendado em um ambiente de produção. Com esta configuração, as mensagens serão semelhantes às do modo async.

ativar.idempotence

ao usar a versão 2.1.0 e modo async, configurado como verdadeiro para escrever eventos em ordem (ver

chave.o valor do deserializador

é sempre org.Apache.kafka.comum.seriacao.ByteArrayDeserializer, não pode ser substituído por KafkaConfig

Internamente, KafkaWriter invoca KafkaConsumer para vários fins, e que o AVISO de que o consumidor API devido à passagem KafkaConfig propriedades podem ser ignorados com segurança. Veja a configuração do Kafka para mais informações sobre as propriedades do produtor do Kafka.

kafkawriter sample application

The following sample code writes data from PosDataPreview.csv to the Kafka topicKafkaWriterSample. Este tópico já existe na instância interna do Striim Kafka. Se você estiver usando uma instância Kafka externa, você deve criar o tópico antes de executar a aplicação.

CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

pode verificar se os dados foram escritos ao Kafka, executando a aplicação de amostras do leitor do Kafka.

o primeiro campo na saída () armazena a informação necessária para evitar eventos perdidos ou duplicados após a recuperação (ver aplicações em recuperação). Se a recuperação não estiver ativada, seu valor é nulo.

mon saída (consulte Usando o MON de comando) para alvos usando KafkaWriter inclui:

  • no modo assíncrono só, Bytes Enviados Taxa: quantos megabytes por segundo foram enviados para os corretores

  • sincronização e modo assíncrono, Bytes de gravação de Taxa: quantos megabytes por segundo foram escritos pelos corretores e confirmação recebida pelo Striim

Activar compressão

Quando você ativar a compactação em KafkaWriter, o corretor e o consumidor deve lidar com o comprimido lotes automaticamente. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'



Deixe uma resposta

O seu endereço de email não será publicado.