Striim 3.10.3 dokumentation

Kafka Writer

skriver till ett ämne i Apache Kafka.

det finns fem versioner av KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 och 2.1.0. Använd den som motsvarar målkafka-mäklaren. Till exempel, för att använda 0.9.0, är syntaxen CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Om du skriver till den interna Kafka-instansen, använd 0.11.0.

känt problem DEV-13039: ansökan med KafkaWriter 0.9 eller 0.10 kraschar om Kafka broker går offline.

egenskap

typ

standardvärde

String

sträng

String

sträng

anteckningar

mäklaradress

sträng

Kafka config

string

ange eventuellt Kafka producer-egenskaper, åtskilda av semikolon. Se tabellen nedan för mer information.

Egenskapsavskiljare Kafka Config

Ange en annan avgränsare om ett av producentegenskapsvärdena som anges i KafkaConfig innehåller ett semikolon.

Kafka Konfigurationsvärdeseparator

=

Ange en annan separator om ett av producentegenskapsvärdena som anges i KafkaConfig innehåller en lika symbol.

meddelandehuvud

sträng

valfritt, om du använder Kafka 0.11 eller senare i async-läge, eller i synkroniseringsläge med KafkaConfig batch.size=-1, ange en eller flera anpassade rubriker som ska läggas till meddelanden som nyckelvärdespar. Värden kan vara:

  • ett fältnamn från en in put-ström av en användardefinierad typ: till exempel MerchantID=merchantID

  • en statisk sträng: till exempel Company="My Company"

  • en funktion: till exempel, för att få källtabellnamnet från en WAEvent-Ingångsström som är utgången från en CDC-läsare, Table Name=@metadata(TableName)

för att ange flera anpassade rubriker, separera dem med semikolon.

meddelandeknapp

sträng

Om du använder Kafka 0.11 eller senare i async-läge eller i synkroniseringsläge med KafkaConfigbatch.size=-1, ange en eller flera tangenter för att läggas till meddelanden som nyckelvärdespar. Egenskapsvärdet kan vara en statisk sträng, ett eller flera fält från inmatningsströmmen eller en kombination av båda. Exempel:

MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

bland andra möjligheter kan du använda den här egenskapen för att stödja loggkomprimering eller för att tillåta nedströms applikationer Att Använda frågor baserat på meddelandet nyttolast..

läge

sträng

synkronisera

se Inställning av egenskapen Kafkawriters läge: synkronisera mot async

parallella trådar

heltal

Se skapa flera skrivarinstanser.

Partitionstangent

namnet på ett fält i inmatningsströmmen vars värden bestämmer hur händelser fördelas mellan flera partitioner. Händelser med samma partitionsnyckelfältvärde kommer att skrivas till samma partition.

om ingångsströmmen är av någon typ utom WAEvent, ange namnet på ett av dess fält.

om ingångsströmmen är av typen WAEvent anger du ett fält i METADATAKARTAN (se HP NonStop reader waevent fields, MySQLReader WAEvent fields, OracleReader WAEvent fields eller MS SQL Reader WAEvent fields) med syntaxen @METADATA(<field name>) eller ett fält i USERDATA-kartan (Se Lägga till användardefinierade data i WAEvent-strömmar), med syntaxen @USERDATA(<field name>).

ämne

det befintliga Kafka-ämnet att skriva till (skapas inte om det inte finns). Om mer än en Kafka-skribent skriver till samma ämne stöds inte återställning (se återställa program. (Återställning stöds vid användning av parallella trådar.)

denna adapter har ett urval av format. Mer information finns i writer-formatterkombinationer som stöds.

Anmärkningar om egenskapen KafkaConfig

med de undantag som anges i följande tabell kan du ange vilken Kafka producer-egenskap som helst i KafkaConfig.

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

försök igen

  • i synkroniseringsläge, för att förhindra out-of-order-händelser, kommer producentegenskaperna i Kafka med att vara oförändrade och ignoreras, och Striim hanterar dessa internt.

  • i async-läge kommer Striim att uppdatera Kafka producer-egenskaperna och dessa kommer att hanteras av Kafka.

  • i synkroniseringsläge kan du ställa in batch.size=-1 för att skriva en händelse per Kafka-meddelande. Detta kommer allvarligt försämra prestanda så rekommenderas inte i en produktionsmiljö. Med den här inställningen kommer meddelanden att likna dem i async-läge.

Aktivera.idempotence

När du använder version 2.1.0 och async-läge, Ställ in true för att skriva händelser i ordning (Se

nyckel.deserializer

värdet är alltid org.Apache.kafka.gemensam.serialiserad.ByteArrayDeserializer, kan inte åsidosättas av KafkaConfig

Internt, kafkawriter åberopar KafkaConsumer för olika ändamål, och varningen från konsumenten API på grund av passerar kafkaconfig-egenskaper kan ignoreras säkert. Se Konfigurera Kafka för mer information om Kafka producer-egenskaper.

kafkawriter sample application

följande exempelkod skriver data från PosDataPreview.csv till Kafka-ämnet KafkaWriterSample. Detta ämne finns redan i Striims interna Kafka-instans. Om du använder en extern Kafka-instans måste du skapa ämnet innan du kör programmet.

CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

Du kan verifiera att data har skrivits till Kafka genom att köra exempelprogrammet Kafka Reader.

det första fältet i utmatningen (position) lagrar information som krävs för att undvika förlorade eller dubbla händelser efter återställning (se Återställa applikationer). Om återställning inte är aktiverad är dess värde NULL.

mon utgång (se använda kommandot Mon) för mål som använder kafkawriter inkluderar:

  • endast i async-läge, skickade Byte Rate: hur många megabyte per sekund skickades till mäklare

  • i både sync och async läge, skriv byte rate: hur många megabyte per sekund skrevs av mäklare och bekräftelse mottagits av striim

aktivera komprimering

När du aktiverar komprimering i kafkawriter, mäklaren och konsumenten bör hantera de komprimerade satser automatiskt. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'



Lämna ett svar

Din e-postadress kommer inte publiceras.