Striim 3.10.3 documentazione
Kafka Writer
Scrive su un argomento in Apache Kafka.
Esistono cinque versioni di KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 e 2.1.0. Usa quello che corrisponde al broker Kafka target. Ad esempio, per usare 0.9.0, la sintassi è CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. Se si scrive sull’istanza interna di Kafka, utilizzare 0.11.0
.
Problema noto DEV-13039: applicazione con KafkaWriter 0.9 o 0.10 si blocca se Kafka broker va offline.
struttura |
tipo |
valore predefinito |
note |
---|---|---|---|
Broker Indirizzo |
String |
||
Kafka Config |
String |
se si desidera, specificare Kafka produttore proprietà, separati da punto e virgola. Vedere la tabella qui sotto per i dettagli. |
|
Kafka Config Property Separator |
String |
Specificare un separatore diverso se uno dei valori delle proprietà producer specificati in KafkaConfig contiene un punto e virgola. |
|
Kafka Config Value Separator |
String |
= |
Specificare un separatore diverso se uno dei valori della proprietà producer specificati in KafkaConfig contiene un simbolo uguale. |
Intestazione del messaggio |
Stringa |
Facoltativamente, se si utilizza Kafka 0.11 o versioni successive in modalità asincrona o in modalità di sincronizzazione con KafkaConfig
Per specificare più intestazioni personalizzate, separarle con punto e virgola. |
|
Messaggio Chiave |
String |
se si utilizza Kafka 0.11 o poi in modalità asincrona, o in modalità di sincronizzazione con KafkaConfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) Tra le altre possibilità, è possibile utilizzare questa proprietà per supportare la compattazione dei log o per consentire alle applicazioni downstream di utilizzare query basate sul payload dei messaggi.. |
|
Modalità |
Stringa |
Sincronizzazione |
vedere Impostazione della proprietà mode di KafkaWriter: sync versus async |
Thread paralleli |
Integer |
Vedere Creazione di più istanze di writer. |
|
Chiave di partizione |
Stringa |
Il nome di un campo nel flusso di input i cui valori determinano come gli eventi sono distribuiti tra più partizioni. Gli eventi con lo stesso valore del campo chiave di partizione verranno scritti nella stessa partizione. Se il flusso di input è di qualsiasi tipo tranne WAEvent, specificare il nome di uno dei suoi campi. Se il flusso in ingresso è di WAEvent tipo, specificare un campo di METADATI mappa (vedi HP NonStop lettore WAEvent campi, MySQLReader WAEvent campi, OracleReader WAEvent campi, o MS SQL Lettore WAEvent campi) utilizzando la seguente sintassi: |
|
Argomento |
Stringa |
L’argomento Kafka esistente su cui scrivere (non verrà creato se non esiste). Se più di un writer Kafka scrive sullo stesso argomento, il ripristino non è supportato (vedere Ripristino delle applicazioni. (Il ripristino è supportato quando si utilizzano thread paralleli.) |
Questo adattatore ha una scelta di formattatori. Vedere Combinazioni writer-formattatore supportate per ulteriori informazioni.
Note sulla proprietà KafkaConfig
Con le eccezioni indicate nella tabella seguente, è possibile specificare qualsiasi proprietà Kafka producer in KafkaConfig.
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms ritenta |
|
attiva.idempotenza |
Quando si utilizza la versione 2.1.0 e la modalità asincrona, impostare su true per scrivere eventi in ordine (vedere |
chiave.deserializzatore |
il valore è sempre org.Apache.Kafka.comune.serializzazione.ByteArrayDeserializer, non può essere sovrascritto da KafkaConfig |
Internamente, KafkaWriter invoca KafkaConsumer per vari scopi e l’AVVISO dall’API consumer dovuto al passaggio delle proprietà KafkaConfig può essere ignorato in modo sicuro. Vedere Configurazione di Kafka per ulteriori informazioni sulle proprietà di Kafka producer.
Applicazione di esempio KafkaWriter
Il seguente codice di esempio scrive i dati daPosDataPreview.csv
all’argomento KafkaKafkaWriterSample
. Questo argomento esiste già nell’istanza Kafka interna di Striim. Se si utilizza un’istanza Kafka esterna, è necessario creare l’argomento prima di eseguire l’applicazione.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
È possibile verificare che i dati siano stati scritti su Kafka eseguendo l’applicazione di esempio Kafka Reader.
Il primo campo nell’output (position
) memorizza le informazioni necessarie per evitare eventi persi o duplicati dopo il ripristino (vedere Ripristino delle applicazioni). Se il ripristino non è abilitato, il suo valore è NULL.
mon
output (vedere Utilizzo del MON comando) per obiettivi KafkaWriter include:
-
in modalità asincrona solo, Byte Inviati Rate: quanti megabyte al secondo, sono stati inviati agli intermediari
-
in entrambi i sync e modalità asincrona, Scrittura di Byte Rate: quanti megabyte al secondo, sono stati scritti dai broker e riconoscimento per Striim
compressione
Quando si attiva la compressione in KafkaWriter, il broker e il consumatore deve gestire la compressa lotti automaticamente. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'