Striim 3.10.3 documentație

Kafka Writer

scrie la un subiect în Apache Kafka.

există cinci versiuni ale KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 și 2.1.0. Utilizați cel care corespunde brokerului țintă Kafka. De exemplu, pentru a utiliza 0.9.0, sintaxa este CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Dacă scrieți la instanța internă Kafka, utilizați 0.11.0.

problemă cunoscută DEV-13039: aplicarea cu kafkawriter 0.9 sau 0.10 accidente în cazul în care Kafka broker merge offline.

proprietate

tip

valoare implicită

note

adresa brokerului

string

Kafka config

șir

opțional, specificați proprietățile producătorului Kafka, separate prin punct și virgulă. Consultați tabelul de mai jos pentru detalii.

Kafka Config Property Separator

String

specificați un separator diferit dacă una dintre valorile proprietății producătorului specificate în KafkaConfig conține punct și virgulă.

Separator de valori de configurare Kafka

String

=

specificați un separator diferit dacă una dintre valorile proprietății producătorului specificate în KafkaConfig conține un simbol egal.

antetul mesajului

șir

opțional, dacă se utilizează Kafka 0.11 sau mai târziu în modul asincron, sau în modul de sincronizare cu KafkaConfigbatch.size=-1, specificați unul sau mai multe anteturi personalizate pentru a fi adăugate la mesaje ca perechi cheie-valoare. Valorile pot fi:

  • un nume de câmp dintr-un flux in put de tip definit de utilizator: de exemplu, MerchantID=merchantID

  • un șir static: de exemplu, Company="My Company"

  • o funcție: de exemplu, pentru a obține numele tabelului sursă dintr-un flux de intrare WAEvent care este ieșirea unui cititor CDC, Table Name=@metadata(TableName)

pentru a specifica mai multe anteturi personalizate, separați-le cu punct și virgulă.

cheie de mesaj

șir

opțional, dacă utilizați Kafka 0.11 sau o versiune ulterioară în modul asincron sau în modul sincronizare cu KafkaConfigbatch.size=-1, specificați una sau mai multe taste pentru a fi adăugate la mesaje ca perechi cheie-valoare. Valoarea proprietății poate fi un șir static, unul sau mai multe câmpuri din fluxul de intrare sau o combinație a ambelor. Exemple:

MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

printre alte posibilități, puteți utiliza această proprietate pentru a sprijini compactarea jurnalului sau pentru a permite aplicațiilor din aval să utilizeze interogări bazate pe sarcina utilă a mesajului..

mod

String

sincronizare

vezi setarea proprietății modului KafkaWriter: sincronizare versus asincronizare

fire paralele

întreg

vezi Crearea mai multor instanțe de scriere.

cheie de partiție

șir

numele unui câmp din fluxul de intrare ale cărui valori determină modul în care evenimentele sunt distribuite între mai multe partiții. Evenimentele cu aceeași valoare a câmpului cheie de partiție vor fi scrise în aceeași partiție.

dacă fluxul de intrare este de orice tip, cu excepția WAEvent, specificați numele unuia dintre câmpurile sale.

dacă fluxul de intrare este de tip WAEvent, specificați un câmp în harta metadatelor (consultați câmpurile hp nonstop reader WAEvent, câmpurile MySQLReader WAEvent, câmpurile OracleReader WAEvent sau câmpurile MS SQL Reader WAEvent) folosind sintaxa @METADATA(<field name>) sau un câmp din harta USERDATA (consultați Adăugarea datelor definite de utilizator la fluxurile WAEvent), folosind sintaxa @USERDATA(<field name>).

subiect

șir

subiectul Kafka existent pentru a scrie (nu va fi creat dacă nu există). Dacă mai mult de un scriitor Kafka scrie la același subiect, recuperarea nu este acceptată (consultați recuperarea aplicațiilor. (Recuperarea este acceptată atunci când se utilizează fire paralele.)

Acest adaptor are o alegere de formatori. Consultați combinațiile writer-formatter acceptate pentru mai multe informații.

Note privind proprietatea KafkaConfig

cu excepțiile menționate în tabelul următor, puteți specifica orice proprietate a producătorului Kafka în KafkaConfig.

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

încearcă din nou

  • în modul de sincronizare, pentru a preveni evenimentele în afara ordinii, proprietățile producătorului setate în Kafka cu vor fi neschimbate și ignorate, iar Striim le va gestiona intern.

  • în modul asincron, Striim va actualiza proprietățile producătorului Kafka și acestea vor fi gestionate de Kafka.

  • în modul de sincronizare, puteți setabatch.size=-1 pentru a scrie un eveniment per mesaj Kafka. Acest lucru va degrada serios performanța, deci nu este recomandat într-un mediu de producție. Cu această setare, mesajele vor fi similare cu cele din modul asincron.

activare.idempotence

când se utilizează versiunea 2.1.0 și modul asincron, setați la true pentru a scrie evenimente în ordine (a se vedea

cheie.deserializer

valoarea este întotdeauna org.apache.kafka.comun.serializare.ByteArrayDeserializer, nu poate fi înlocuită de KafkaConfig

intern, KafkaWriter invocă KafkaConsumer pentru diverse scopuri, și avertismentul de API de consum din cauza trecerii proprietățile kafkaconfig pot fi ignorate în siguranță. Consultați Configurarea Kafka pentru mai multe informații despre proprietățile producătorului Kafka.

kafkawriter exemplu de aplicare

următorul cod eșantion scrie date de laPosDataPreview.csv la subiectul KafkaKafkaWriterSample. Acest subiect există deja în instanța internă Kafka a lui Striim. Dacă utilizați o instanță externă Kafka, trebuie să creați subiectul înainte de a rula aplicația.

CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

puteți verifica dacă datele au fost scrise în Kafka rulând aplicația Kafka Reader sample.

primul câmp din ieșire (position) stochează informațiile necesare pentru a evita evenimentele pierdute sau duplicate după recuperare (consultați aplicațiile de recuperare). Dacă recuperarea nu este activată, valoarea sa este nulă.

mon ieșire (a se vedea folosind comanda MON) pentru obiective folosind KafkaWriter include:

  • în modul asincron numai, Rata de octeți trimise: cât de multe megaocteți pe secundă au fost trimise la brokerii

  • în ambele sincronizare și modul asincron, rata de scriere a octeților: câte megaocteți pe secundă au fost scrise de brokeri și confirmarea primită de striim

activarea compresiei

când activați compresia în kafkawriter, brokerul și consumatorul ar trebui să gestioneze automat loturile comprimate. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'



Lasă un răspuns

Adresa ta de email nu va fi publicată.