Striim 3.10.3 documentation
Kafka Writer
kirjoittaa aiheeseen Apache Kafka.
Kafkawriterista on viisi versiota, 0.8.0, 0.9.0, 0.10.0, 0.11.0 ja 2.1.0. Käytä yksi, joka vastaa kohde Kafka välittäjä. Esimerkiksi 0.9.0: n käyttämiseksi syntaksi on CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. Jos kirjoitat sisäiseen Kafka-instanssiin, käytä 0.11.0
.
Known issue DEV-13039: application with KafkaWriter 0.9 or 0.10 kaatuu, jos Kafka broker sammuu.
välittäjän osoite |
|||||||
Kafka config |
valinnaisesti määritä Kafkan tuottajan ominaisuudet puolipisteillä erotettuna. Katso tarkemmat tiedot alla olevasta taulukosta. |
||||||
Määritä toinen erotin, jos jokin kafkaconfigissä määritellyistä tuottajan ominaisuusarvoista sisältää puolipisteen. |
|||||||
= |
Määritä eri erotin, jos jokin kafkaconfigissä määritellyistä tuottajan ominaisarvoista sisältää yhtä suuren symbolin. |
||||||
viestin otsikko |
valinnaisesti, jos käytetään Kafka 0: ta.11 tai myöhemmin Async-tilassa tai synkronointitilassa kafkaconfig
staattinen merkkijono: esimerkiksi , voit määrittää useita mukautettuja otsikoita erottelemalla ne puolipisteillä. |
||||||
Viestinäppäin |
valinnaisesti, jos käytetään kafkaa 0.11 tai uudempaa async-tilassa tai sync-tilassa kafkaconfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) muun muassa voit käyttää tätä ominaisuutta tukeaksesi lokin tiivistämistä tai salliaksesi loppupään sovellusten käyttää viestin hyötykuormaan perustuvia kyselyjä.. |
||||||
Mode |
katso Kafkawriterin tilan ominaisuus: sync versus async |
||||||
yhdensuuntaiset viestiketjut |
Katso useiden kirjoittajien esiintymien luominen. |
||||||
Osioavain |
tulovirran kentän nimi, jonka arvot määrittävät, miten tapahtumat jakautuvat useiden osioiden kesken. Tapahtumat, joilla on sama osioavainkentän arvo, kirjoitetaan samaan osioon. Jos tulovirta on mitä tahansa tyyppiä paitsi WAEvent, ilmoitetaan yhden sen kenttien nimi. Jos tulovirta on WAEvent-tyyppiä, määritä kenttä METATIETOKARTASSA (katso HP NonStop reader WAEvent-kentät, MySQLReader WAEvent-kentät, ORACLEREADER WAEvent-kentät tai MS SQL Reader WAEvent-kentät) käyttäen syntaksia |
||||||
aihe |
merkkijono |
olemassa oleva Kafkan aihe, jolle kirjoittaa (ei luoda, jos sitä ei ole). Jos useampi kuin yksi Kafka-kirjoittaja kirjoittaa samaan aiheeseen, palautumista ei tueta (KS.palautettavat Sovellukset. (Palautumista tuetaan käytettäessä rinnakkaisia säikeitä.) |
tällä adapterilla on valittavana formaatteja. Katso Tuetut writer-formatter-yhdistelmät lisätietoja.
Notes on the KafkaConfig property
with the exceptions noted in oheinen table, you may specified any Kafka producer property in KafkaConfig.
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms retries |
sync-tilassa voi asettaa |
mahdollistavat.idempotenssi |
käytettäessä versiota 2.1.0 ja async-tilaa, asetetaan todeksi, jotta tapahtumat voidaan kirjoittaa järjestyksessä (katso |
avain.deserializer |
arvo on aina org.apassit.kafka.yhteinen.sarjallistaminen.ByteArrayDeserializer, ei voi ohittaa kafkaconfig |
sisäisesti Kafkawriter vetoaa Kafkaconsumeriin eri tarkoituksiin, ja kuluttajarajapinnan varoituksesta johtuen ohimennen kafkaconfig ominaisuudet voidaan turvallisesti sivuuttaa. Lisätietoja Kafka-tuottajan ominaisuuksista on ohjeaiheessa Kafkan määrittäminen.
Kafkawriterin esimerkkisovellus
seuraava näytekoodi kirjoittaa tiedot PosDataPreview.csv
Kafkan aiheelle KafkaWriterSample
. Tämä aihe on jo olemassa striimin sisäisessä Kafka-instanssissa. Jos käytät ulkoista Kafka-instanssia, sinun on luotava aihe ennen sovelluksen käynnistämistä.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
voit tarkistaa, että tiedot on kirjoitettu Kafkalle ajamalla Kafka-Lukijanäytesovelluksen.
tuotoksen ensimmäinen kenttä (position
) tallentaa tiedot, joita tarvitaan, jotta vältyttäisiin katoamisilta tai toistuvilta tapahtumilta palautumisen jälkeen (ks.palautettavat Sovellukset). Jos palautusta ei ole käytössä, sen arvo on nolla.
mon
ulostulo (katso mon-komennon käyttäminen) kafkawriteria käyttäville kohteille sisältää:
vain async-tilassa lähetettyjen tavujen määrä: kuinka monta megatavua sekunnissa välittäjille lähetettiin
sekä synkronoituna että async-tila, kirjoitustaajuus: kuinka monta megatavua sekunnissa välittäjät kirjoittivat ja striimin
mahdollistava pakkaus
kun otat pakkauksen käyttöön kafkawriterissa, välittäjän ja kuluttajan tulee käsitellä pakatut erät automaattisesti. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'