Striim 3.10.3 dokumentáció
Kafka Writer
ír egy témát az Apache Kafka.
a kafkawriter öt változata létezik: 0.8.0, 0.9.0, 0.10.0, 0.11.0 és 2.1.0. Használja azt, amelyik megfelel a cél Kafka brókernek. Például a 0.9.0 használatához a szintaxis CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. Ha a belső Kafka példányra ír, használja a 0.11.0
parancsot.
ismert probléma DEV-13039: a kafkawriter 0.9 vagy 0.10 alkalmazás összeomlik, ha a Kafka broker offline állapotba kerül.
tulajdonság |
típus |
alapértelmezett érték |
megjegyzések |
---|---|---|---|
bróker címe |
string |
||
Kafka config |
string |
opcionálisan adja meg a Kafka termelő tulajdonságait pontosvesszővel elválasztva. A részletekért lásd az alábbi táblázatot. |
|
Kafka Config tulajdonság elválasztó |
String |
adjon meg egy másik elválasztót, ha a kafkaconfig-ban megadott termelőtulajdonság-értékek egyike pontosvesszőt tartalmaz. |
|
Kafka konfigurációs érték elválasztó |
String |
= |
adjon meg egy másik elválasztót, ha a kafkaconfig-ban megadott termelőtulajdonság-értékek egyike egyenlő szimbólumot tartalmaz. |
üzenet fejléc |
karakterlánc |
opcionálisan, ha Kafka 0-t használ.11 vagy újabb aszinkron módban, vagy szinkron módban kafkaconfig
több egyéni fejléc megadásához pontosvesszővel válassza el őket. |
|
Üzenetkulcs |
String |
adott esetben, ha a Kafka 0.11-es vagy újabb verzióját aszinkron módban használja, vagy szinkron módban a kafkaconfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) többek között használhatja ezt a tulajdonságot a napló tömörítésének támogatására vagy a downstream alkalmazások számára az üzenet hasznos terhelése alapján történő lekérdezések használatára.. |
|
mód |
String |
Sync |
lásd a KafkaWriter mód tulajdonságának beállítását: sync versus async |
párhuzamos szálak |
egész |
lásd: több írópéldány létrehozása. |
|
partíciós kulcs |
String |
a bemeneti adatfolyam azon mezőjének neve, amelynek értékei meghatározzák, hogy az események hogyan oszlanak meg több partíció között. Az azonos partíciós kulcs mezőértékkel rendelkező események ugyanarra a partícióra lesznek írva. Ha a bemeneti adatfolyam a WAEvent kivételével bármilyen típusú, adja meg az egyik mező nevét. Ha a bemeneti adatfolyam WAEvent típusú, adjon meg egy mezőt a METAADATTÉRKÉPEN (lásd HP NonStop reader WAEvent mezők, MySQLReader WAEvent mezők, OracleReader WAEvent mezők vagy MS SQL Reader waevent mezők) a |
|
téma |
karakterlánc |
a meglévő Kafka téma, amelyre írni kell (nem jön létre, ha nem létezik). Ha egynél több Kafka író ír ugyanarra a témára, a helyreállítás nem támogatott (lásd: Alkalmazások helyreállítása. (A helyreállítás párhuzamos szálak használata esetén támogatott.) |
Ez az adapter választható formázókkal rendelkezik. További információkért lásd: Támogatott író-formázó kombinációk.
Megjegyzések a KafkaConfig tulajdonsághoz
az alábbi táblázatban felsorolt kivételekkel a KafkaConfig-ban megadhat bármely Kafka termelő tulajdonságot.
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms újra próbálkozik |
|
engedélyezze.idempotence |
a 2.1.0 verzió és az aszinkron mód használatakor állítsa true értékre az események sorrendbe írásához (lásd |
gombot.deserializer |
érték mindig org.apacs.kafka.gyakori.sorosítás.ByteArrayDeserializer, nem lehet felülírni kafkaconfig |
belsőleg, Kafkawriter meghívja KafkaConsumer különböző célokra, és a figyelmeztetés a fogyasztói API miatt elhaladó a kafkaconfig tulajdonságai biztonságosan figyelmen kívül hagyhatók. A Kafka gyártó tulajdonságairól további információt a Kafka konfigurálása című témakörben talál.
KafkaWriter minta alkalmazás
A következő mintakód a PosDataPreview.csv
adatokat írja a Kafka témához KafkaWriterSample
. Ez a téma már létezik Striim belső Kafka példányában. Ha külső Kafka példányt használ, akkor az alkalmazás futtatása előtt létre kell hoznia a témát.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
a Kafka Reader minta alkalmazás futtatásával ellenőrizheti, hogy az adatokat a Kafka-Ba írták-e.
a kimenet első mezője (position
) a helyreállítás utáni elveszett vagy ismétlődő események elkerüléséhez szükséges információkat tárolja (lásd: Alkalmazások helyreállítása). Ha a helyreállítás nincs engedélyezve, annak értéke NULL.
mon
kimenet (lásd a MON parancs használata) a kafkawriter-t használó célokhoz a következőket tartalmazza:
-
csak aszinkron módban, küldött bájtok aránya: hány megabájt/másodperc küldtek a brókereknek
-
szinkronban és szinkronban egyaránt async mód, írási bájtok aránya: hány megabájt/másodpercet írtak a brókerek és a striim által kapott visszaigazolás
a tömörítés engedélyezése
Ha engedélyezi a tömörítést a kafkawriterben, a brókernek és a fogyasztónak automatikusan kezelnie kell a tömörített tételeket. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'