Striim 3.10.3 Dokumentation

Kafka Writer

Schreibt in ein Thema in Apache Kafka.

Es gibt fünf Versionen von KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 und 2.1.0. Verwenden Sie den, der dem Ziel-Kafka-Broker entspricht. Um beispielsweise 0.9.0 zu verwenden, lautet die Syntax CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0' . Wenn Sie in die interne Kafka-Instanz schreiben, verwenden Sie 0.11.0 .

Bekanntes Problem DEV-13039: Anwendung mit KafkaWriter 0.9 oder 0.10 stürzt ab, wenn Kafka Broker offline geht.

Eigenschaft

Typ

Standardwert

Anmerkungen

Makleradresse

Zeichenfolge

Kafka-Konfiguration

Zeichenfolge

Geben Sie optional Kafka producer-Eigenschaften an, die durch Semikolons getrennt sind. Einzelheiten finden Sie in der Tabelle unten.

Kafka Config Eigenschaftstrennzeichen

String

Geben Sie ein anderes Trennzeichen an, wenn einer der in KafkaConfig angegebenen producer Eigenschaftswerte ein Semikolon enthält.

Kafka-Konfigurationswerttrennzeichen

String

=

Geben Sie ein anderes Trennzeichen an, wenn einer der in KafkaConfig angegebenen Producer-Eigenschaftswerte ein Gleichheitszeichen enthält.

Nachrichtenkopf

String

Optional, wenn Kafka 0 verwendet wird.11 oder höher Geben Sie im asynchronen Modus oder im Synchronisierungsmodus mit KafkaConfig batch.size=-1 einen oder mehrere benutzerdefinierte Header an, die Nachrichten als Schlüssel-Wert-Paare hinzugefügt werden sollen. Werte können sein:

  • ein Feldname aus einem In-Put-Stream eines benutzerdefinierten Typs: zum Beispiel MerchantID=merchantID

  • eine statische Zeichenfolge: zum Beispiel Company="My Company"

  • eine Funktion: um beispielsweise den Namen der Quelltabelle aus einem WAEvent-Eingabestream abzurufen, der die Ausgabe eines CDC-Lesegeräts ist, Table Name=@metadata(TableName)

Trennen Sie diese durch Semikolons, um mehrere benutzerdefinierte Header anzugeben.

Nachrichtenschlüssel

String

Wenn Sie Kafka 0.11 oder höher im asynchronen Modus oder im Synchronisierungsmodus mit KafkaConfig batch.size=-1verwenden, geben Sie optional einen oder mehrere Schlüssel an, die Nachrichten als Schlüssel-Wert-Paare hinzugefügt werden sollen. Der Eigenschaftswert kann eine statische Zeichenfolge, ein oder mehrere Felder aus dem Eingabestream oder eine Kombination aus beidem sein. Beispiele:

MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

Sie können diese Eigenschaft unter anderem verwenden, um die Protokollkomprimierung zu unterstützen oder um nachgelagerten Anwendungen die Verwendung von Abfragen basierend auf der Nachrichtennutzlast zu ermöglichen..

Mode

String

Sync

siehe Festlegen der mode-Eigenschaft von KafkaWriter: sync versus async

Parallele Threads

Integer

Siehe Erstellen mehrerer Writer-Instanzen.

Partitionsschlüssel

String

Der Name eines Feldes im Eingabestream, dessen Werte bestimmen, wie Ereignisse auf mehrere Partitionen verteilt werden. Ereignisse mit demselben Partitionsschlüsselfeldwert werden in dieselbe Partition geschrieben.

Wenn der Eingabestream einen beliebigen Typ außer WAEvent hat, geben Sie den Namen eines seiner Felder an.

Wenn der Eingabestream vom Typ WAEvent ist, geben Sie ein Feld in der Metadatenzuordnung (siehe HP NonStop Reader-WAEvent-Felder, MySQLReader-WAEvent-Felder, OracleReader-WAEvent-Felder oder MS SQL Reader-WAEvent-Felder) mit der Syntax @METADATA(<field name>) oder ein Feld in der USERDATA-Zuordnung (siehe Hinzufügen benutzerdefinierter Daten zu WAEvent-Streams) mit der Syntax @USERDATA(<field name>).

Topic

String

Das vorhandene Kafka-Thema, in das geschrieben werden soll (wird nicht erstellt, wenn es nicht vorhanden ist). Wenn mehr als ein Kafka Writer in dasselbe Thema schreibt, wird die Wiederherstellung nicht unterstützt (siehe Wiederherstellen von Anwendungen. (Die Wiederherstellung wird unterstützt, wenn parallele Threads verwendet werden.)

Dieser Adapter hat eine Auswahl an Formatierern. Weitere Informationen finden Sie unter Unterstützte Writer-Formatter-Kombinationen.

Hinweise zur KafkaConfig-Eigenschaft

Mit den in der folgenden Tabelle aufgeführten Ausnahmen können Sie jede Kafka producer-Eigenschaft in KafkaConfig angeben.

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

wiederholt

  • Um Ereignisse außerhalb der Reihenfolge zu verhindern, werden im Synchronisierungsmodus die in Kafka with festgelegten producer-Eigenschaften unverändert und ignoriert, und Striim behandelt diese intern.

  • Im asynchronen Modus aktualisiert Striim die Kafka Producer-Eigenschaften und diese werden von Kafka verarbeitet.

  • Im Sync-Modus können Sie batch.size=-1 , um ein Ereignis pro Kafka-Nachricht zu schreiben. Dies beeinträchtigt die Leistung erheblich und wird in einer Produktionsumgebung nicht empfohlen. Mit dieser Einstellung ähneln Nachrichten denen im asynchronen Modus.

aktivieren.idempotenz

Wenn Sie Version 2.1.0 und den asynchronen Modus verwenden, setzen Sie auf true, um Ereignisse der Reihe nach zu schreiben (siehe

).deserializer

Wert ist immer org.Apache.kafka.gemeinsam.Serialisierung.ByteArrayDeserializer, kann nicht von KafkaConfig überschrieben werden

Intern ruft KafkaWriter KafkaConsumer für verschiedene Zwecke auf, und die WARNUNG der Consumer-API aufgrund der Übergabe von KafkaConfig-Eigenschaften kann sicher ignoriert werden. Weitere Informationen zu Kafka Producer-Eigenschaften finden Sie unter Konfigurieren von Kafka.

KafkaWriter-Beispielanwendung

Der folgende Beispielcode schreibt Daten von PosDataPreview.csv in das Kafka-Thema KafkaWriterSample. Dieses Thema ist bereits in der internen Kafka-Instanz von Striim vorhanden. Wenn Sie eine externe Kafka-Instanz verwenden, müssen Sie das Thema erstellen, bevor Sie die Anwendung ausführen.

CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

Sie können überprüfen, ob Daten in Kafka geschrieben wurden, indem Sie die Beispielanwendung Kafka Reader ausführen.

Das erste Feld in der Ausgabe (position) speichert Informationen, die erforderlich sind, um verlorene oder doppelte Ereignisse nach der Wiederherstellung zu vermeiden (siehe Wiederherstellen von Anwendungen). Wenn recovery nicht aktiviert ist, ist der Wert NULL.

mon Die Ausgabe (siehe Verwenden des Befehls MON) für Ziele, die KafkaWriter verwenden, umfasst:

  • Nur im Async-Modus Gesendete Bytes Rate: Wie viele Megabyte pro Sekunde wurden an die Broker gesendet?

  • Sowohl im Sync- als auch im Async-Modus Schreiben Bytes Rate: wie viele Megabyte pro Sekunde wurden von den Brokern geschrieben und die Bestätigung von Striim

Aktivieren der Komprimierung

Wenn Sie die Komprimierung in KafkaWriter aktivieren, sollten der Broker und der Consumer die komprimierten Stapel automatisch verarbeiten. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'



Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.