Striim 3.10.3 dokumentace
Kafka Spisovatel
Píše k tématu, v Apache Kafka.
existuje pět verzí KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 a 2.1.0. Použijte ten, který odpovídá cílovému zprostředkovateli Kafka. Například pro použití 0.9.0 je syntaxe CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. Při zápisu do interní instance Kafka použijte 0.11.0
.
známý problém DEV-13039: aplikace s KafkaWriter 0.9 nebo 0.10 havaruje, pokud Kafka broker přejde do režimu offline.
ubytování |
typ |
výchozí hodnota |
poznámky |
||
---|---|---|---|---|---|
Broker Adresa |
String |
||||
Kafka Config |
Řetězec |
Volitelně zadejte Kafka výrobce vlastnosti, které jsou odděleny středníky. Podrobnosti naleznete v tabulce níže. |
|||
Kafka Config Majetku Oddělovač |
String |
Určit jiný oddělovač, pokud jeden z producentů majetkové hodnoty uvedené v KafkaConfig obsahuje středník. |
|||
Kafka Config Hodnota Oddělovač a |
String |
= |
Určit jiný oddělovač, pokud jeden z producentů majetkové hodnoty uvedené v KafkaConfig obsahuje stejný symbol. |
||
záhlaví zprávy |
řetězec |
volitelně, pokud používáte Kafka 0.11 nebo novější v asynchronním režimu, nebo v režimu synchronizace s KafkaConfig
Chcete-li zadat více vlastní záhlaví, oddělte je středníky. |
|||
Message Key |
String |
Případně, pokud používáte Kafka 0.11 nebo později v asynchronním režimu, nebo v režimu synchronizace s KafkaConfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) Mezi další možnosti, můžete použít tuto vlastnost podporovat protokol zhutnění nebo umožnit následné aplikace používat dotazy na základě zprávy užitečného zatížení.. |
|||
Režim |
String |
Synchronizace |
viz Nastavení KafkaWriter je režim majetku: překlad versus asynchronní |
||
Paralelní Vlákna |
Integer |
Viz Vytvoření více spisovatel případech. |
|||
Klíč Oddílu |
String |
jméno pole ve vstupním proudu, jehož hodnoty určují, jak se události jsou rozděleny mezi více oddílů. Události se stejnou hodnotou pole klíče oddílu budou zapsány do stejného oddílu. Pokud je vstupní proud jakéhokoli typu kromě Waeventu, zadejte název jednoho z jeho polí. Pokud vstupní proud je WAEvent typ, určit pole v METADATECH mapě (viz HP NonStop čtenář WAEvent pole, MySQLReader WAEvent pole, OracleReader WAEvent pole, nebo MS SQL Čtenář WAEvent pole) pomocí syntaxe |
|||
Téma |
String |
stávající Kafka téma psát (nebude vytvořen, pokud již neexistuje). Pokud více než jeden spisovatel Kafka píše na stejné téma, obnovení není podporováno (viz obnovení aplikací. (Obnova je podporována při použití paralelních vláken.) |
Tento adaptér má na výběr z formátovače. Viz Podporované writer-formatter kombinace pro více informací.
Poznámky na KafkaConfig majetku
S výjimkami uvedenými v následující tabulce můžete zadat libovolné Kafka výrobce majetku v KafkaConfig.
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms opakování |
|
povolit.idempotence |
při použití verze 2.1.0 a asynchronního režimu nastavte hodnotu true pro zápis událostí v pořadí (viz |
klíč.deserializátor |
hodnota je vždy org.Apač.Kafka.společný.seriál.ByteArrayDeserializer, nemůže být přepsána KafkaConfig |
Interně, KafkaWriter vyvolá KafkaConsumer pro různé účely, a UPOZORNĚNÍ od spotřebitelů API vzhledem k absolvování KafkaConfig vlastnosti mohou být bezpečně ignorována. Viz Konfigurace Kafka pro více informací o vlastnostech Kafka producer.
KafkaWriter ukázkové aplikace
následující ukázkový kód zapíše data z PosDataPreview.csv
Kafka téma KafkaWriterSample
. Toto téma již existuje ve Striimově interní Kafkově instanci. Pokud používáte externí instanci Kafka, musíte téma vytvořit před spuštěním aplikace.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
můžete ověřit, že data byla zapsána do Kafky spuštěním ukázkové aplikace Kafka Reader.
první pole ve výstupu (position
) ukládá informace potřebné, aby se zabránilo ztrátě nebo duplicitní události po zotavení (viz Obnovení aplikací). Pokud obnovení není povoleno, jeho hodnota je NULL.
mon
výstup (viz Použití MON příkaz) pro cíle pomocí KafkaWriter zahrnuje:
-
v asynchronním režimu pouze, Poslal Bajtů Sazba: kolik megabajtů za sekundu byly zaslány makléři,
-
v obou sync a async mode, Napište Bajtů Sazba: kolik megabajtů za sekundu napsal makléři a potvrzení přijaté Striim
Povolení komprese
Při povolení komprese v KafkaWriter, makléř a spotřebitel by měl zvládnout komprimovaný dávky automaticky. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'