Striim 3.10.3 documentation

Kafka Writer

kirjoittaa aiheeseen Apache Kafka.

Kafkawriterista on viisi versiota, 0.8.0, 0.9.0, 0.10.0, 0.11.0 ja 2.1.0. Käytä yksi, joka vastaa kohde Kafka välittäjä. Esimerkiksi 0.9.0: n käyttämiseksi syntaksi on CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Jos kirjoitat sisäiseen Kafka-instanssiin, käytä 0.11.0.

Known issue DEV-13039: application with KafkaWriter 0.9 or 0.10 kaatuu, jos Kafka broker sammuu.

ominaisuus

tyyppi

oletusarvo

muistiinpanot

td>

merkkijono

Kafka Configin Ominaisuuserotin

String

Kafka Config-arvon erotin

merkkijono

merkkijono

String

String

Sync

kokonaisluku

merkkijono

välittäjän osoite

Kafka config

valinnaisesti määritä Kafkan tuottajan ominaisuudet puolipisteillä erotettuna. Katso tarkemmat tiedot alla olevasta taulukosta.

Määritä toinen erotin, jos jokin kafkaconfigissä määritellyistä tuottajan ominaisuusarvoista sisältää puolipisteen.

=

Määritä eri erotin, jos jokin kafkaconfigissä määritellyistä tuottajan ominaisarvoista sisältää yhtä suuren symbolin.

viestin otsikko

valinnaisesti, jos käytetään Kafka 0: ta.11 tai myöhemmin Async-tilassa tai synkronointitilassa kafkaconfig batch.size=-1, määritä yksi tai useampi mukautettu otsake, joka lisätään viesteihin avainarvopareina. Arvot voivat olla:

  • kentän nimi käyttäjän määrittelemästä in put-virrasta: esimerkiksi MerchantID=merchantID

  • staattinen merkkijono: esimerkiksi Company="My Company"

  • funktio: jos haluat esimerkiksi saada lähdetaulukon nimen CDC-lukijan ulostulosta peräisin olevasta WAEvent-tulovirrasta, Table Name=@metadata(TableName)

, voit määrittää useita mukautettuja otsikoita erottelemalla ne puolipisteillä.

Viestinäppäin

valinnaisesti, jos käytetään kafkaa 0.11 tai uudempaa async-tilassa tai sync-tilassa kafkaconfig batch.size=-1, määritä yksi tai useampi näppäin lisätään viesteihin avainarvopareina. Ominaisuuden arvo voi olla staattinen merkkijono, yksi tai useampi kenttä tulovirrasta tai näiden yhdistelmä. Esimerkkejä:

MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

muun muassa voit käyttää tätä ominaisuutta tukeaksesi lokin tiivistämistä tai salliaksesi loppupään sovellusten käyttää viestin hyötykuormaan perustuvia kyselyjä..

Mode

katso Kafkawriterin tilan ominaisuus: sync versus async

yhdensuuntaiset viestiketjut

Katso useiden kirjoittajien esiintymien luominen.

Osioavain

tulovirran kentän nimi, jonka arvot määrittävät, miten tapahtumat jakautuvat useiden osioiden kesken. Tapahtumat, joilla on sama osioavainkentän arvo, kirjoitetaan samaan osioon.

Jos tulovirta on mitä tahansa tyyppiä paitsi WAEvent, ilmoitetaan yhden sen kenttien nimi.

Jos tulovirta on WAEvent-tyyppiä, määritä kenttä METATIETOKARTASSA (katso HP NonStop reader WAEvent-kentät, MySQLReader WAEvent-kentät, ORACLEREADER WAEvent-kentät tai MS SQL Reader WAEvent-kentät) käyttäen syntaksia @METADATA(<field name>) tai kenttää USERDATA-kartassa (katso käyttäjän määrittelemien tietojen lisääminen WAEvent-puroihin) käyttäen syntaksia @USERDATA(<field name>).

aihe

merkkijono

olemassa oleva Kafkan aihe, jolle kirjoittaa (ei luoda, jos sitä ei ole). Jos useampi kuin yksi Kafka-kirjoittaja kirjoittaa samaan aiheeseen, palautumista ei tueta (KS.palautettavat Sovellukset. (Palautumista tuetaan käytettäessä rinnakkaisia säikeitä.)

tällä adapterilla on valittavana formaatteja. Katso Tuetut writer-formatter-yhdistelmät lisätietoja.

Notes on the KafkaConfig property

with the exceptions noted in oheinen table, you may specified any Kafka producer property in KafkaConfig.

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

retries

  • sync-tilassa, jotta voidaan estää epäkuntoiset tapahtumat, Kafkan kanssa asetetut tuottajan ominaisuudet säilyvät muuttumattomina ja huomiotta, ja Striim hoitaa nämä sisäisesti.

  • async-tilassa Striim päivittää Kafkan tuottajan ominaisuudet ja nämä hoitaa Kafka.

  • sync-tilassa voi asettaabatch.size=-1kirjoittamaan yhden tapahtuman per Kafka-viesti. Tämä heikentää merkittävästi suorituskykyä, joten sitä ei suositella tuotantoympäristössä. Tämän asetuksen, viestit ovat samanlaisia kuin Async-tilassa.

mahdollistavat.idempotenssi

käytettäessä versiota 2.1.0 ja async-tilaa, asetetaan todeksi, jotta tapahtumat voidaan kirjoittaa järjestyksessä (katso

avain.deserializer

arvo on aina org.apassit.kafka.yhteinen.sarjallistaminen.ByteArrayDeserializer, ei voi ohittaa kafkaconfig

sisäisesti Kafkawriter vetoaa Kafkaconsumeriin eri tarkoituksiin, ja kuluttajarajapinnan varoituksesta johtuen ohimennen kafkaconfig ominaisuudet voidaan turvallisesti sivuuttaa. Lisätietoja Kafka-tuottajan ominaisuuksista on ohjeaiheessa Kafkan määrittäminen.

Kafkawriterin esimerkkisovellus

seuraava näytekoodi kirjoittaa tiedot PosDataPreview.csv Kafkan aiheelle KafkaWriterSample. Tämä aihe on jo olemassa striimin sisäisessä Kafka-instanssissa. Jos käytät ulkoista Kafka-instanssia, sinun on luotava aihe ennen sovelluksen käynnistämistä.

CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

voit tarkistaa, että tiedot on kirjoitettu Kafkalle ajamalla Kafka-Lukijanäytesovelluksen.

tuotoksen ensimmäinen kenttä (position) tallentaa tiedot, joita tarvitaan, jotta vältyttäisiin katoamisilta tai toistuvilta tapahtumilta palautumisen jälkeen (ks.palautettavat Sovellukset). Jos palautusta ei ole käytössä, sen arvo on nolla.

mon ulostulo (katso mon-komennon käyttäminen) kafkawriteria käyttäville kohteille sisältää:

    vain async-tilassa lähetettyjen tavujen määrä: kuinka monta megatavua sekunnissa välittäjille lähetettiin

    sekä synkronoituna että async-tila, kirjoitustaajuus: kuinka monta megatavua sekunnissa välittäjät kirjoittivat ja striimin

mahdollistava pakkaus

kun otat pakkauksen käyttöön kafkawriterissa, välittäjän ja kuluttajan tulee käsitellä pakatut erät automaattisesti. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'



Vastaa

Sähköpostiosoitettasi ei julkaista.