Striim 3.10.3 dokumentáció
Kafka Writer
ír egy témát az Apache Kafka.
a kafkawriter öt változata létezik: 0.8.0, 0.9.0, 0.10.0, 0.11.0 és 2.1.0. Használja azt, amelyik megfelel a cél Kafka brókernek. Például a 0.9.0 használatához a szintaxis CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Ha a belső Kafka példányra ír, használja a 0.11.0parancsot.
ismert probléma DEV-13039: a kafkawriter 0.9 vagy 0.10 alkalmazás összeomlik, ha a Kafka broker offline állapotba kerül.
|
tulajdonság |
típus |
alapértelmezett érték |
megjegyzések |
|---|---|---|---|
|
bróker címe |
string |
||
|
Kafka config |
string |
opcionálisan adja meg a Kafka termelő tulajdonságait pontosvesszővel elválasztva. A részletekért lásd az alábbi táblázatot. |
|
|
Kafka Config tulajdonság elválasztó |
String |
adjon meg egy másik elválasztót, ha a kafkaconfig-ban megadott termelőtulajdonság-értékek egyike pontosvesszőt tartalmaz. |
|
|
Kafka konfigurációs érték elválasztó |
String |
= |
adjon meg egy másik elválasztót, ha a kafkaconfig-ban megadott termelőtulajdonság-értékek egyike egyenlő szimbólumot tartalmaz. |
|
üzenet fejléc |
karakterlánc |
opcionálisan, ha Kafka 0-t használ.11 vagy újabb aszinkron módban, vagy szinkron módban kafkaconfig
több egyéni fejléc megadásához pontosvesszővel válassza el őket. |
|
|
Üzenetkulcs |
String |
adott esetben, ha a Kafka 0.11-es vagy újabb verzióját aszinkron módban használja, vagy szinkron módban a kafkaconfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) többek között használhatja ezt a tulajdonságot a napló tömörítésének támogatására vagy a downstream alkalmazások számára az üzenet hasznos terhelése alapján történő lekérdezések használatára.. |
|
|
mód |
String |
Sync |
lásd a KafkaWriter mód tulajdonságának beállítását: sync versus async |
|
párhuzamos szálak |
egész |
lásd: több írópéldány létrehozása. |
|
|
partíciós kulcs |
String |
a bemeneti adatfolyam azon mezőjének neve, amelynek értékei meghatározzák, hogy az események hogyan oszlanak meg több partíció között. Az azonos partíciós kulcs mezőértékkel rendelkező események ugyanarra a partícióra lesznek írva. Ha a bemeneti adatfolyam a WAEvent kivételével bármilyen típusú, adja meg az egyik mező nevét. Ha a bemeneti adatfolyam WAEvent típusú, adjon meg egy mezőt a METAADATTÉRKÉPEN (lásd HP NonStop reader WAEvent mezők, MySQLReader WAEvent mezők, OracleReader WAEvent mezők vagy MS SQL Reader waevent mezők) a |
|
|
téma |
karakterlánc |
a meglévő Kafka téma, amelyre írni kell (nem jön létre, ha nem létezik). Ha egynél több Kafka író ír ugyanarra a témára, a helyreállítás nem támogatott (lásd: Alkalmazások helyreállítása. (A helyreállítás párhuzamos szálak használata esetén támogatott.) |
Ez az adapter választható formázókkal rendelkezik. További információkért lásd: Támogatott író-formázó kombinációk.
Megjegyzések a KafkaConfig tulajdonsághoz
az alábbi táblázatban felsorolt kivételekkel a KafkaConfig-ban megadhat bármely Kafka termelő tulajdonságot.
|
Kafka producer property |
notes |
|---|---|
|
acks |
|
|
batch.size linger.ms újra próbálkozik |
|
|
engedélyezze.idempotence |
a 2.1.0 verzió és az aszinkron mód használatakor állítsa true értékre az események sorrendbe írásához (lásd |
|
gombot.deserializer |
érték mindig org.apacs.kafka.gyakori.sorosítás.ByteArrayDeserializer, nem lehet felülírni kafkaconfig |
belsőleg, Kafkawriter meghívja KafkaConsumer különböző célokra, és a figyelmeztetés a fogyasztói API miatt elhaladó a kafkaconfig tulajdonságai biztonságosan figyelmen kívül hagyhatók. A Kafka gyártó tulajdonságairól további információt a Kafka konfigurálása című témakörben talál.
KafkaWriter minta alkalmazás
A következő mintakód a PosDataPreview.csvadatokat írja a Kafka témához KafkaWriterSample. Ez a téma már létezik Striim belső Kafka példányában. Ha külső Kafka példányt használ, akkor az alkalmazás futtatása előtt létre kell hoznia a témát.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
a Kafka Reader minta alkalmazás futtatásával ellenőrizheti, hogy az adatokat a Kafka-Ba írták-e.
a kimenet első mezője (position) a helyreállítás utáni elveszett vagy ismétlődő események elkerüléséhez szükséges információkat tárolja (lásd: Alkalmazások helyreállítása). Ha a helyreállítás nincs engedélyezve, annak értéke NULL.
mon kimenet (lásd a MON parancs használata) a kafkawriter-t használó célokhoz a következőket tartalmazza:
-
csak aszinkron módban, küldött bájtok aránya: hány megabájt/másodperc küldtek a brókereknek
-
szinkronban és szinkronban egyaránt async mód, írási bájtok aránya: hány megabájt/másodpercet írtak a brókerek és a striim által kapott visszaigazolás
a tömörítés engedélyezése
Ha engedélyezi a tömörítést a kafkawriterben, a brókernek és a fogyasztónak automatikusan kezelnie kell a tömörített tételeket. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:
KafkaConfig:'compression.type=snappy'