Striim 3.10.3 documentazione

Kafka Writer

Scrive su un argomento in Apache Kafka.

Esistono cinque versioni di KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 e 2.1.0. Usa quello che corrisponde al broker Kafka target. Ad esempio, per usare 0.9.0, la sintassi è CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Se si scrive sull’istanza interna di Kafka, utilizzare 0.11.0.

Problema noto DEV-13039: applicazione con KafkaWriter 0.9 o 0.10 si blocca se Kafka broker va offline.

struttura

tipo

valore predefinito

note

Broker Indirizzo

String

Kafka Config

String

se si desidera, specificare Kafka produttore proprietà, separati da punto e virgola. Vedere la tabella qui sotto per i dettagli.

Kafka Config Property Separator

String

Specificare un separatore diverso se uno dei valori delle proprietà producer specificati in KafkaConfig contiene un punto e virgola.

Kafka Config Value Separator

String

=

Specificare un separatore diverso se uno dei valori della proprietà producer specificati in KafkaConfig contiene un simbolo uguale.

Intestazione del messaggio

Stringa

Facoltativamente, se si utilizza Kafka 0.11 o versioni successive in modalità asincrona o in modalità di sincronizzazione con KafkaConfig batch.size=-1, specificare una o più intestazioni personalizzate da aggiungere ai messaggi come coppie chiave-valore. I valori possono essere:

  • il nome di un campo da mettere flusso di un tipo definito dall’utente: ad esempio, MerchantID=merchantID

  • una stringa statica: per esempio, Company="My Company"

  • una funzione: ad esempio, per ottenere il nome della tabella di origine da un flusso di input WAEvent che è l’output di un lettore CDC, Table Name=@metadata(TableName)

Per specificare più intestazioni personalizzate, separarle con punto e virgola.

Messaggio Chiave

String

se si utilizza Kafka 0.11 o poi in modalità asincrona, o in modalità di sincronizzazione con KafkaConfig batch.size=-1 specificare uno o più tasti da aggiungere ai messaggi come coppie chiave-valore. Il valore della proprietà può essere una stringa statica, uno o più campi del flusso di input o una combinazione di entrambi. Esempi:

MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

Tra le altre possibilità, è possibile utilizzare questa proprietà per supportare la compattazione dei log o per consentire alle applicazioni downstream di utilizzare query basate sul payload dei messaggi..

Modalità

Stringa

Sincronizzazione

vedere Impostazione della proprietà mode di KafkaWriter: sync versus async

Thread paralleli

Integer

Vedere Creazione di più istanze di writer.

Chiave di partizione

Stringa

Il nome di un campo nel flusso di input i cui valori determinano come gli eventi sono distribuiti tra più partizioni. Gli eventi con lo stesso valore del campo chiave di partizione verranno scritti nella stessa partizione.

Se il flusso di input è di qualsiasi tipo tranne WAEvent, specificare il nome di uno dei suoi campi.

Se il flusso in ingresso è di WAEvent tipo, specificare un campo di METADATI mappa (vedi HP NonStop lettore WAEvent campi, MySQLReader WAEvent campi, OracleReader WAEvent campi, o MS SQL Lettore WAEvent campi) utilizzando la seguente sintassi: @METADATA(<field name>), o un campo in USERDATA mappa (vedere Aggiunta di dati definito dall’utente per WAEvent corsi d’acqua), utilizzando la seguente sintassi: @USERDATA(<field name>).

Argomento

Stringa

L’argomento Kafka esistente su cui scrivere (non verrà creato se non esiste). Se più di un writer Kafka scrive sullo stesso argomento, il ripristino non è supportato (vedere Ripristino delle applicazioni. (Il ripristino è supportato quando si utilizzano thread paralleli.)

Questo adattatore ha una scelta di formattatori. Vedere Combinazioni writer-formattatore supportate per ulteriori informazioni.

Note sulla proprietà KafkaConfig

Con le eccezioni indicate nella tabella seguente, è possibile specificare qualsiasi proprietà Kafka producer in KafkaConfig.

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

ritenta

  • In modalità sync, per evitare eventi fuori ordine, le proprietà del produttore impostate in Kafka con saranno invariate e ignorate e Striim le gestirà internamente.

  • In modalità asincrona, Striim aggiornerà le proprietà del produttore di Kafka e queste saranno gestite da Kafka.

  • In modalità di sincronizzazione, è possibile impostare batch.size=-1 per scrivere un evento per messaggio Kafka. Ciò degraderà seriamente le prestazioni, quindi non è raccomandato in un ambiente di produzione. Con questa impostazione, i messaggi saranno simili a quelli in modalità asincrona.

attiva.idempotenza

Quando si utilizza la versione 2.1.0 e la modalità asincrona, impostare su true per scrivere eventi in ordine (vedere

chiave.deserializzatore

il valore è sempre org.Apache.Kafka.comune.serializzazione.ByteArrayDeserializer, non può essere sovrascritto da KafkaConfig

Internamente, KafkaWriter invoca KafkaConsumer per vari scopi e l’AVVISO dall’API consumer dovuto al passaggio delle proprietà KafkaConfig può essere ignorato in modo sicuro. Vedere Configurazione di Kafka per ulteriori informazioni sulle proprietà di Kafka producer.

Applicazione di esempio KafkaWriter

Il seguente codice di esempio scrive i dati daPosDataPreview.csv all’argomento KafkaKafkaWriterSample. Questo argomento esiste già nell’istanza Kafka interna di Striim. Se si utilizza un’istanza Kafka esterna, è necessario creare l’argomento prima di eseguire l’applicazione.

CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

È possibile verificare che i dati siano stati scritti su Kafka eseguendo l’applicazione di esempio Kafka Reader.

Il primo campo nell’output (position) memorizza le informazioni necessarie per evitare eventi persi o duplicati dopo il ripristino (vedere Ripristino delle applicazioni). Se il ripristino non è abilitato, il suo valore è NULL.

mon output (vedere Utilizzo del MON comando) per obiettivi KafkaWriter include:

  • in modalità asincrona solo, Byte Inviati Rate: quanti megabyte al secondo, sono stati inviati agli intermediari

  • in entrambi i sync e modalità asincrona, Scrittura di Byte Rate: quanti megabyte al secondo, sono stati scritti dai broker e riconoscimento per Striim

compressione

Quando si attiva la compressione in KafkaWriter, il broker e il consumatore deve gestire la compressa lotti automaticamente. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'



Lascia un commento

Il tuo indirizzo email non sarà pubblicato.