Striim 3.10.3 dokumentáció

Kafka Writer

ír egy témát az Apache Kafka.

a kafkawriter öt változata létezik: 0.8.0, 0.9.0, 0.10.0, 0.11.0 és 2.1.0. Használja azt, amelyik megfelel a cél Kafka brókernek. Például a 0.9.0 használatához a szintaxis CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Ha a belső Kafka példányra ír, használja a 0.11.0parancsot.

ismert probléma DEV-13039: a kafkawriter 0.9 vagy 0.10 alkalmazás összeomlik, ha a Kafka broker offline állapotba kerül.

tulajdonság

típus

alapértelmezett érték

megjegyzések

bróker címe

string

Kafka config

string

opcionálisan adja meg a Kafka termelő tulajdonságait pontosvesszővel elválasztva. A részletekért lásd az alábbi táblázatot.

Kafka Config tulajdonság elválasztó

String

adjon meg egy másik elválasztót, ha a kafkaconfig-ban megadott termelőtulajdonság-értékek egyike pontosvesszőt tartalmaz.

Kafka konfigurációs érték elválasztó

String

=

adjon meg egy másik elválasztót, ha a kafkaconfig-ban megadott termelőtulajdonság-értékek egyike egyenlő szimbólumot tartalmaz.

üzenet fejléc

karakterlánc

opcionálisan, ha Kafka 0-t használ.11 vagy újabb aszinkron módban, vagy szinkron módban kafkaconfig batch.size=-1, adjon meg egy vagy több egyéni fejlécet, amelyet kulcs-érték párként kell hozzáadni az üzenetekhez. Az értékek lehetnek:

  • egy mező neve egy felhasználó által definiált típusú adatfolyamból: például MerchantID=merchantID

  • statikus karakterlánc: például Company="My Company"

  • a függvény: például, ha a forrástáblázat nevét egy WAEVENT bemeneti adatfolyamból szeretné megkapni, amely egy CDC olvasó kimenete, Table Name=@metadata(TableName)

több egyéni fejléc megadásához pontosvesszővel válassza el őket.

Üzenetkulcs

String

adott esetben, ha a Kafka 0.11-es vagy újabb verzióját aszinkron módban használja, vagy szinkron módban a kafkaconfigbatch.size=-1, adjon meg egy vagy több billentyűt az üzenetekhez kulcs-érték párként kell hozzáadni. A tulajdonság értéke lehet statikus karakterlánc, egy vagy több mező a bemeneti adatfolyamból, vagy mindkettő kombinációja. Példák:

MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

többek között használhatja ezt a tulajdonságot a napló tömörítésének támogatására vagy a downstream alkalmazások számára az üzenet hasznos terhelése alapján történő lekérdezések használatára..

mód

String

Sync

lásd a KafkaWriter mód tulajdonságának beállítását: sync versus async

párhuzamos szálak

egész

lásd: több írópéldány létrehozása.

partíciós kulcs

String

a bemeneti adatfolyam azon mezőjének neve, amelynek értékei meghatározzák, hogy az események hogyan oszlanak meg több partíció között. Az azonos partíciós kulcs mezőértékkel rendelkező események ugyanarra a partícióra lesznek írva.

Ha a bemeneti adatfolyam a WAEvent kivételével bármilyen típusú, adja meg az egyik mező nevét.

Ha a bemeneti adatfolyam WAEvent típusú, adjon meg egy mezőt a METAADATTÉRKÉPEN (lásd HP NonStop reader WAEvent mezők, MySQLReader WAEvent mezők, OracleReader WAEvent mezők vagy MS SQL Reader waevent mezők) a @METADATA(<field name>) szintaxissal, vagy a USERDATA térkép egyik mezőjével (lásd: felhasználó által definiált adatok hozzáadása a waevent stream-ekhez), a @USERDATA(<field name>).

téma

karakterlánc

a meglévő Kafka téma, amelyre írni kell (nem jön létre, ha nem létezik). Ha egynél több Kafka író ír ugyanarra a témára, a helyreállítás nem támogatott (lásd: Alkalmazások helyreállítása. (A helyreállítás párhuzamos szálak használata esetén támogatott.)

Ez az adapter választható formázókkal rendelkezik. További információkért lásd: Támogatott író-formázó kombinációk.

Megjegyzések a KafkaConfig tulajdonsághoz

az alábbi táblázatban felsorolt kivételekkel a KafkaConfig-ban megadhat bármely Kafka termelő tulajdonságot.

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

újra próbálkozik

  • szinkron módban a rendelésen kívüli események elkerülése érdekében a Kafka with-Ben beállított termelői tulajdonságok változatlanok és figyelmen kívül maradnak, és a Striim ezeket belsőleg kezeli.

  • aszinkron módban a Striim frissíti a Kafka termelő tulajdonságait, és ezeket a Kafka kezeli.

  • szinkronizálási módban beállíthatja, hogy a batch.size=-1 Kafka-üzenetenként egy eseményt írjon. Ez súlyosan rontja a teljesítményt, ezért nem ajánlott termelési környezetben. Ezzel a beállítással az üzenetek hasonlóak lesznek az aszinkron módhoz.

engedélyezze.idempotence

a 2.1.0 verzió és az aszinkron mód használatakor állítsa true értékre az események sorrendbe írásához (lásd

gombot.deserializer

érték mindig org.apacs.kafka.gyakori.sorosítás.ByteArrayDeserializer, nem lehet felülírni kafkaconfig

belsőleg, Kafkawriter meghívja KafkaConsumer különböző célokra, és a figyelmeztetés a fogyasztói API miatt elhaladó a kafkaconfig tulajdonságai biztonságosan figyelmen kívül hagyhatók. A Kafka gyártó tulajdonságairól további információt a Kafka konfigurálása című témakörben talál.

KafkaWriter minta alkalmazás

A következő mintakód a PosDataPreview.csvadatokat írja a Kafka témához KafkaWriterSample. Ez a téma már létezik Striim belső Kafka példányában. Ha külső Kafka példányt használ, akkor az alkalmazás futtatása előtt létre kell hoznia a témát.

CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

a Kafka Reader minta alkalmazás futtatásával ellenőrizheti, hogy az adatokat a Kafka-Ba írták-e.

a kimenet első mezője (position) a helyreállítás utáni elveszett vagy ismétlődő események elkerüléséhez szükséges információkat tárolja (lásd: Alkalmazások helyreállítása). Ha a helyreállítás nincs engedélyezve, annak értéke NULL.

mon kimenet (lásd a MON parancs használata) a kafkawriter-t használó célokhoz a következőket tartalmazza:

  • csak aszinkron módban, küldött bájtok aránya: hány megabájt/másodperc küldtek a brókereknek

  • szinkronban és szinkronban egyaránt async mód, írási bájtok aránya: hány megabájt/másodpercet írtak a brókerek és a striim által kapott visszaigazolás

a tömörítés engedélyezése

Ha engedélyezi a tömörítést a kafkawriterben, a brókernek és a fogyasztónak automatikusan kezelnie kell a tömörített tételeket. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'



Vélemény, hozzászólás?

Az e-mail-címet nem tesszük közzé.