Striim 3.10.3 dokumentace

Kafka Spisovatel

Píše k tématu, v Apache Kafka.

existuje pět verzí KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 a 2.1.0. Použijte ten, který odpovídá cílovému zprostředkovateli Kafka. Například pro použití 0.9.0 je syntaxe CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Při zápisu do interní instance Kafka použijte 0.11.0.

známý problém DEV-13039: aplikace s KafkaWriter 0.9 nebo 0.10 havaruje, pokud Kafka broker přejde do režimu offline.

ubytování

typ

výchozí hodnota

poznámky

Broker Adresa

String

Kafka Config

Řetězec

Volitelně zadejte Kafka výrobce vlastnosti, které jsou odděleny středníky. Podrobnosti naleznete v tabulce níže.

Kafka Config Majetku Oddělovač

String

Určit jiný oddělovač, pokud jeden z producentů majetkové hodnoty uvedené v KafkaConfig obsahuje středník.

Kafka Config Hodnota Oddělovač a

String

=

Určit jiný oddělovač, pokud jeden z producentů majetkové hodnoty uvedené v KafkaConfig obsahuje stejný symbol.

záhlaví zprávy

řetězec

volitelně, pokud používáte Kafka 0.11 nebo novější v asynchronním režimu, nebo v režimu synchronizace s KafkaConfig batch.size=-1, zadejte jeden nebo více vlastní záhlaví, které mají být přidány do zprávy jako páry klíč-hodnota. Hodnoty mohou být:

  • název pole z v dal proud typ definovaný uživatelem: například MerchantID=merchantID

  • statický řetězec: například, Company="My Company"

  • funkce: například, získat zdrojový název tabulky z WAEvent vstupní proud, který je výstupem z CDC reader, Table Name=@metadata(TableName)

Chcete-li zadat více vlastní záhlaví, oddělte je středníky.

Message Key

String

Případně, pokud používáte Kafka 0.11 nebo později v asynchronním režimu, nebo v režimu synchronizace s KafkaConfig batch.size=-1, zadat jeden nebo více klíčů, které mají být přidány do zprávy jako páry klíč-hodnota. Hodnota vlastnosti může být statický řetězec, jedno nebo více polí ze vstupního proudu nebo kombinace obou. Příklady:

MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

Mezi další možnosti, můžete použít tuto vlastnost podporovat protokol zhutnění nebo umožnit následné aplikace používat dotazy na základě zprávy užitečného zatížení..

Režim

String

Synchronizace

viz Nastavení KafkaWriter je režim majetku: překlad versus asynchronní

Paralelní Vlákna

Integer

Viz Vytvoření více spisovatel případech.

Klíč Oddílu

String

jméno pole ve vstupním proudu, jehož hodnoty určují, jak se události jsou rozděleny mezi více oddílů. Události se stejnou hodnotou pole klíče oddílu budou zapsány do stejného oddílu.

Pokud je vstupní proud jakéhokoli typu kromě Waeventu, zadejte název jednoho z jeho polí.

Pokud vstupní proud je WAEvent typ, určit pole v METADATECH mapě (viz HP NonStop čtenář WAEvent pole, MySQLReader WAEvent pole, OracleReader WAEvent pole, nebo MS SQL Čtenář WAEvent pole) pomocí syntaxe @METADATA(<field name>), nebo pole v USERDATA mapě (viz Přidání uživatelem definované údaje WAEvent proudy), pomocí syntaxe @USERDATA(<field name>).

Téma

String

stávající Kafka téma psát (nebude vytvořen, pokud již neexistuje). Pokud více než jeden spisovatel Kafka píše na stejné téma, obnovení není podporováno (viz obnovení aplikací. (Obnova je podporována při použití paralelních vláken.)

Tento adaptér má na výběr z formátovače. Viz Podporované writer-formatter kombinace pro více informací.

Poznámky na KafkaConfig majetku

S výjimkami uvedenými v následující tabulce můžete zadat libovolné Kafka výrobce majetku v KafkaConfig.

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

opakování

  • V režimu synchronizace, aby se zabránilo out-of-order události, producent vlastnosti nastavit v Kafky se bude beze změny a ignorovány, a Striim bude zpracovávat tyto vnitřně.

  • v asynchronním režimu bude Striim aktualizovat vlastnosti výrobce Kafka a ty budou zpracovány Kafka.

  • v režimu synchronizace můžete nastavit batch.size=-1 pro zápis jedné události na Kafkovu zprávu. To vážně zhorší výkon, takže se nedoporučuje ve výrobním prostředí. Při tomto nastavení budou zprávy podobné zprávám v asynchronním režimu.

povolit.idempotence

při použití verze 2.1.0 a asynchronního režimu nastavte hodnotu true pro zápis událostí v pořadí (viz

klíč.deserializátor

hodnota je vždy org.Apač.Kafka.společný.seriál.ByteArrayDeserializer, nemůže být přepsána KafkaConfig

Interně, KafkaWriter vyvolá KafkaConsumer pro různé účely, a UPOZORNĚNÍ od spotřebitelů API vzhledem k absolvování KafkaConfig vlastnosti mohou být bezpečně ignorována. Viz Konfigurace Kafka pro více informací o vlastnostech Kafka producer.

KafkaWriter ukázkové aplikace

následující ukázkový kód zapíše data z PosDataPreview.csv Kafka téma KafkaWriterSample. Toto téma již existuje ve Striimově interní Kafkově instanci. Pokud používáte externí instanci Kafka, musíte téma vytvořit před spuštěním aplikace.

CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

můžete ověřit, že data byla zapsána do Kafky spuštěním ukázkové aplikace Kafka Reader.

první pole ve výstupu (position) ukládá informace potřebné, aby se zabránilo ztrátě nebo duplicitní události po zotavení (viz Obnovení aplikací). Pokud obnovení není povoleno, jeho hodnota je NULL.

mon výstup (viz Použití MON příkaz) pro cíle pomocí KafkaWriter zahrnuje:

  • v asynchronním režimu pouze, Poslal Bajtů Sazba: kolik megabajtů za sekundu byly zaslány makléři,

  • v obou sync a async mode, Napište Bajtů Sazba: kolik megabajtů za sekundu napsal makléři a potvrzení přijaté Striim

Povolení komprese

Při povolení komprese v KafkaWriter, makléř a spotřebitel by měl zvládnout komprimovaný dávky automaticky. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'



Napsat komentář

Vaše e-mailová adresa nebude zveřejněna.