Striim 3.10.3 dokumentation
Kafka Writer
skriver till ett ämne i Apache Kafka.
det finns fem versioner av KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 och 2.1.0. Använd den som motsvarar målkafka-mäklaren. Till exempel, för att använda 0.9.0, är syntaxen CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. Om du skriver till den interna Kafka-instansen, använd 0.11.0
.
känt problem DEV-13039: ansökan med KafkaWriter 0.9 eller 0.10 kraschar om Kafka broker går offline.
anteckningar |
||||
---|---|---|---|---|
mäklaradress |
sträng |
|||
Kafka config |
string |
ange eventuellt Kafka producer-egenskaper, åtskilda av semikolon. Se tabellen nedan för mer information. |
||
Egenskapsavskiljare Kafka Config |
Ange en annan avgränsare om ett av producentegenskapsvärdena som anges i KafkaConfig innehåller ett semikolon. |
|||
Kafka Konfigurationsvärdeseparator |
= |
Ange en annan separator om ett av producentegenskapsvärdena som anges i KafkaConfig innehåller en lika symbol. |
||
meddelandehuvud |
sträng |
valfritt, om du använder Kafka 0.11 eller senare i async-läge, eller i synkroniseringsläge med KafkaConfig
för att ange flera anpassade rubriker, separera dem med semikolon. |
||
meddelandeknapp |
sträng |
Om du använder Kafka 0.11 eller senare i async-läge eller i synkroniseringsläge med KafkaConfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) bland andra möjligheter kan du använda den här egenskapen för att stödja loggkomprimering eller för att tillåta nedströms applikationer Att Använda frågor baserat på meddelandet nyttolast.. |
||
läge |
sträng |
synkronisera |
se Inställning av egenskapen Kafkawriters läge: synkronisera mot async |
|
parallella trådar |
heltal |
Se skapa flera skrivarinstanser. |
||
Partitionstangent |
namnet på ett fält i inmatningsströmmen vars värden bestämmer hur händelser fördelas mellan flera partitioner. Händelser med samma partitionsnyckelfältvärde kommer att skrivas till samma partition. om ingångsströmmen är av någon typ utom WAEvent, ange namnet på ett av dess fält. om ingångsströmmen är av typen WAEvent anger du ett fält i METADATAKARTAN (se HP NonStop reader waevent fields, MySQLReader WAEvent fields, OracleReader WAEvent fields eller MS SQL Reader WAEvent fields) med syntaxen |
|||
ämne |
det befintliga Kafka-ämnet att skriva till (skapas inte om det inte finns). Om mer än en Kafka-skribent skriver till samma ämne stöds inte återställning (se återställa program. (Återställning stöds vid användning av parallella trådar.) |
denna adapter har ett urval av format. Mer information finns i writer-formatterkombinationer som stöds.
Anmärkningar om egenskapen KafkaConfig
med de undantag som anges i följande tabell kan du ange vilken Kafka producer-egenskap som helst i KafkaConfig.
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms försök igen |
|
Aktivera.idempotence |
När du använder version 2.1.0 och async-läge, Ställ in true för att skriva händelser i ordning (Se |
nyckel.deserializer |
värdet är alltid org.Apache.kafka.gemensam.serialiserad.ByteArrayDeserializer, kan inte åsidosättas av KafkaConfig |
Internt, kafkawriter åberopar KafkaConsumer för olika ändamål, och varningen från konsumenten API på grund av passerar kafkaconfig-egenskaper kan ignoreras säkert. Se Konfigurera Kafka för mer information om Kafka producer-egenskaper.
kafkawriter sample application
följande exempelkod skriver data från PosDataPreview.csv
till Kafka-ämnet KafkaWriterSample
. Detta ämne finns redan i Striims interna Kafka-instans. Om du använder en extern Kafka-instans måste du skapa ämnet innan du kör programmet.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
Du kan verifiera att data har skrivits till Kafka genom att köra exempelprogrammet Kafka Reader.
det första fältet i utmatningen (position
) lagrar information som krävs för att undvika förlorade eller dubbla händelser efter återställning (se Återställa applikationer). Om återställning inte är aktiverad är dess värde NULL.
mon
utgång (se använda kommandot Mon) för mål som använder kafkawriter inkluderar:
-
endast i async-läge, skickade Byte Rate: hur många megabyte per sekund skickades till mäklare
-
i både sync och async läge, skriv byte rate: hur många megabyte per sekund skrevs av mäklare och bekräftelse mottagits av striim
aktivera komprimering
När du aktiverar komprimering i kafkawriter, mäklaren och konsumenten bör hantera de komprimerade satser automatiskt. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'