Striim 3.10.3 Dokumentation
Kafka Writer
Schreibt in ein Thema in Apache Kafka.
Es gibt fünf Versionen von KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 und 2.1.0. Verwenden Sie den, der dem Ziel-Kafka-Broker entspricht. Um beispielsweise 0.9.0 zu verwenden, lautet die Syntax CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. Wenn Sie in die interne Kafka-Instanz schreiben, verwenden Sie 0.11.0
.
Bekanntes Problem DEV-13039: Anwendung mit KafkaWriter 0.9 oder 0.10 stürzt ab, wenn Kafka Broker offline geht.
Eigenschaft |
Typ |
Standardwert |
Anmerkungen |
---|---|---|---|
Makleradresse |
Zeichenfolge |
||
Kafka-Konfiguration |
Zeichenfolge |
Geben Sie optional Kafka producer-Eigenschaften an, die durch Semikolons getrennt sind. Einzelheiten finden Sie in der Tabelle unten. |
|
Kafka Config Eigenschaftstrennzeichen |
String |
Geben Sie ein anderes Trennzeichen an, wenn einer der in KafkaConfig angegebenen producer Eigenschaftswerte ein Semikolon enthält. |
|
Kafka-Konfigurationswerttrennzeichen |
String |
= |
Geben Sie ein anderes Trennzeichen an, wenn einer der in KafkaConfig angegebenen Producer-Eigenschaftswerte ein Gleichheitszeichen enthält. |
Nachrichtenkopf |
String |
Optional, wenn Kafka 0 verwendet wird.11 oder höher Geben Sie im asynchronen Modus oder im Synchronisierungsmodus mit KafkaConfig
Trennen Sie diese durch Semikolons, um mehrere benutzerdefinierte Header anzugeben. |
|
Nachrichtenschlüssel |
String |
Wenn Sie Kafka 0.11 oder höher im asynchronen Modus oder im Synchronisierungsmodus mit KafkaConfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) Sie können diese Eigenschaft unter anderem verwenden, um die Protokollkomprimierung zu unterstützen oder um nachgelagerten Anwendungen die Verwendung von Abfragen basierend auf der Nachrichtennutzlast zu ermöglichen.. |
|
Mode |
String |
Sync |
siehe Festlegen der mode-Eigenschaft von KafkaWriter: sync versus async |
Parallele Threads |
Integer |
Siehe Erstellen mehrerer Writer-Instanzen. |
|
Partitionsschlüssel |
String |
Der Name eines Feldes im Eingabestream, dessen Werte bestimmen, wie Ereignisse auf mehrere Partitionen verteilt werden. Ereignisse mit demselben Partitionsschlüsselfeldwert werden in dieselbe Partition geschrieben. Wenn der Eingabestream einen beliebigen Typ außer WAEvent hat, geben Sie den Namen eines seiner Felder an. Wenn der Eingabestream vom Typ WAEvent ist, geben Sie ein Feld in der Metadatenzuordnung (siehe HP NonStop Reader-WAEvent-Felder, MySQLReader-WAEvent-Felder, OracleReader-WAEvent-Felder oder MS SQL Reader-WAEvent-Felder) mit der Syntax |
|
Topic |
String |
Das vorhandene Kafka-Thema, in das geschrieben werden soll (wird nicht erstellt, wenn es nicht vorhanden ist). Wenn mehr als ein Kafka Writer in dasselbe Thema schreibt, wird die Wiederherstellung nicht unterstützt (siehe Wiederherstellen von Anwendungen. (Die Wiederherstellung wird unterstützt, wenn parallele Threads verwendet werden.) |
Dieser Adapter hat eine Auswahl an Formatierern. Weitere Informationen finden Sie unter Unterstützte Writer-Formatter-Kombinationen.
Hinweise zur KafkaConfig-Eigenschaft
Mit den in der folgenden Tabelle aufgeführten Ausnahmen können Sie jede Kafka producer-Eigenschaft in KafkaConfig angeben.
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms wiederholt |
|
aktivieren.idempotenz |
Wenn Sie Version 2.1.0 und den asynchronen Modus verwenden, setzen Sie auf true, um Ereignisse der Reihe nach zu schreiben (siehe |
).deserializer |
Wert ist immer org.Apache.kafka.gemeinsam.Serialisierung.ByteArrayDeserializer, kann nicht von KafkaConfig überschrieben werden |
Intern ruft KafkaWriter KafkaConsumer für verschiedene Zwecke auf, und die WARNUNG der Consumer-API aufgrund der Übergabe von KafkaConfig-Eigenschaften kann sicher ignoriert werden. Weitere Informationen zu Kafka Producer-Eigenschaften finden Sie unter Konfigurieren von Kafka.
KafkaWriter-Beispielanwendung
Der folgende Beispielcode schreibt Daten von PosDataPreview.csv
in das Kafka-Thema KafkaWriterSample
. Dieses Thema ist bereits in der internen Kafka-Instanz von Striim vorhanden. Wenn Sie eine externe Kafka-Instanz verwenden, müssen Sie das Thema erstellen, bevor Sie die Anwendung ausführen.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
Sie können überprüfen, ob Daten in Kafka geschrieben wurden, indem Sie die Beispielanwendung Kafka Reader ausführen.
Das erste Feld in der Ausgabe (position
) speichert Informationen, die erforderlich sind, um verlorene oder doppelte Ereignisse nach der Wiederherstellung zu vermeiden (siehe Wiederherstellen von Anwendungen). Wenn recovery nicht aktiviert ist, ist der Wert NULL.
mon
Die Ausgabe (siehe Verwenden des Befehls MON) für Ziele, die KafkaWriter verwenden, umfasst:
-
Nur im Async-Modus Gesendete Bytes Rate: Wie viele Megabyte pro Sekunde wurden an die Broker gesendet?
-
Sowohl im Sync- als auch im Async-Modus Schreiben Bytes Rate: wie viele Megabyte pro Sekunde wurden von den Brokern geschrieben und die Bestätigung von Striim
Aktivieren der Komprimierung
Wenn Sie die Komprimierung in KafkaWriter aktivieren, sollten der Broker und der Consumer die komprimierten Stapel automatisch verarbeiten. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'