Striim 3.10.3 documentație
Kafka Writer
scrie la un subiect în Apache Kafka.
există cinci versiuni ale KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 și 2.1.0. Utilizați cel care corespunde brokerului țintă Kafka. De exemplu, pentru a utiliza 0.9.0, sintaxa este CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. Dacă scrieți la instanța internă Kafka, utilizați 0.11.0
.
problemă cunoscută DEV-13039: aplicarea cu kafkawriter 0.9 sau 0.10 accidente în cazul în care Kafka broker merge offline.
proprietate |
tip |
valoare implicită |
note |
---|---|---|---|
adresa brokerului |
string |
||
Kafka config |
șir |
opțional, specificați proprietățile producătorului Kafka, separate prin punct și virgulă. Consultați tabelul de mai jos pentru detalii. |
|
Kafka Config Property Separator |
String |
specificați un separator diferit dacă una dintre valorile proprietății producătorului specificate în KafkaConfig conține punct și virgulă. |
|
Separator de valori de configurare Kafka |
String |
= |
specificați un separator diferit dacă una dintre valorile proprietății producătorului specificate în KafkaConfig conține un simbol egal. |
antetul mesajului |
șir |
opțional, dacă se utilizează Kafka 0.11 sau mai târziu în modul asincron, sau în modul de sincronizare cu KafkaConfig
pentru a specifica mai multe anteturi personalizate, separați-le cu punct și virgulă. |
|
cheie de mesaj |
șir |
opțional, dacă utilizați Kafka 0.11 sau o versiune ulterioară în modul asincron sau în modul sincronizare cu KafkaConfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) printre alte posibilități, puteți utiliza această proprietate pentru a sprijini compactarea jurnalului sau pentru a permite aplicațiilor din aval să utilizeze interogări bazate pe sarcina utilă a mesajului.. |
|
mod |
String |
sincronizare |
vezi setarea proprietății modului KafkaWriter: sincronizare versus asincronizare |
fire paralele |
întreg |
vezi Crearea mai multor instanțe de scriere. |
|
cheie de partiție |
șir |
numele unui câmp din fluxul de intrare ale cărui valori determină modul în care evenimentele sunt distribuite între mai multe partiții. Evenimentele cu aceeași valoare a câmpului cheie de partiție vor fi scrise în aceeași partiție. dacă fluxul de intrare este de orice tip, cu excepția WAEvent, specificați numele unuia dintre câmpurile sale. dacă fluxul de intrare este de tip WAEvent, specificați un câmp în harta metadatelor (consultați câmpurile hp nonstop reader WAEvent, câmpurile MySQLReader WAEvent, câmpurile OracleReader WAEvent sau câmpurile MS SQL Reader WAEvent) folosind sintaxa |
|
subiect |
șir |
subiectul Kafka existent pentru a scrie (nu va fi creat dacă nu există). Dacă mai mult de un scriitor Kafka scrie la același subiect, recuperarea nu este acceptată (consultați recuperarea aplicațiilor. (Recuperarea este acceptată atunci când se utilizează fire paralele.) |
Acest adaptor are o alegere de formatori. Consultați combinațiile writer-formatter acceptate pentru mai multe informații.
Note privind proprietatea KafkaConfig
cu excepțiile menționate în tabelul următor, puteți specifica orice proprietate a producătorului Kafka în KafkaConfig.
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms încearcă din nou |
|
activare.idempotence |
când se utilizează versiunea 2.1.0 și modul asincron, setați la true pentru a scrie evenimente în ordine (a se vedea |
cheie.deserializer |
valoarea este întotdeauna org.apache.kafka.comun.serializare.ByteArrayDeserializer, nu poate fi înlocuită de KafkaConfig |
intern, KafkaWriter invocă KafkaConsumer pentru diverse scopuri, și avertismentul de API de consum din cauza trecerii proprietățile kafkaconfig pot fi ignorate în siguranță. Consultați Configurarea Kafka pentru mai multe informații despre proprietățile producătorului Kafka.
kafkawriter exemplu de aplicare
următorul cod eșantion scrie date de laPosDataPreview.csv
la subiectul KafkaKafkaWriterSample
. Acest subiect există deja în instanța internă Kafka a lui Striim. Dacă utilizați o instanță externă Kafka, trebuie să creați subiectul înainte de a rula aplicația.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
puteți verifica dacă datele au fost scrise în Kafka rulând aplicația Kafka Reader sample.
primul câmp din ieșire (position
) stochează informațiile necesare pentru a evita evenimentele pierdute sau duplicate după recuperare (consultați aplicațiile de recuperare). Dacă recuperarea nu este activată, valoarea sa este nulă.
mon
ieșire (a se vedea folosind comanda MON) pentru obiective folosind KafkaWriter include:
-
în modul asincron numai, Rata de octeți trimise: cât de multe megaocteți pe secundă au fost trimise la brokerii
-
în ambele sincronizare și modul asincron, rata de scriere a octeților: câte megaocteți pe secundă au fost scrise de brokeri și confirmarea primită de striim
activarea compresiei
când activați compresia în kafkawriter, brokerul și consumatorul ar trebui să gestioneze automat loturile comprimate. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'