Striim 3.10.3 documentation
Kafka Writer
Skriver Til Et emne I Apache Kafka.Det finnes fem Versjoner Av KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 og 2.1.0. Bruk den som tilsvarer Målet Kafka megler. Hvis du for eksempel vil bruke 0.9.0, er syntaksen CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. Hvis du skriver til Den interne Kafka-forekomsten, bruk 0.11.0
.
Kjent problem DEV-13039: søknad Med KafkaWriter 0.9 eller 0.10 krasjer hvis Kafka megler går offline.
megler adresse |
||||
kafka config |
streng |
angi eventuelt kafka produsentegenskaper, atskilt med semikolon. Se tabellen nedenfor for detaljer. |
||
Kafka Config Property Separator |
Angi en annen separator hvis en av produsentegenskapsverdiene angitt i KafkaConfig inneholder et semikolon. |
|||
Kafka Config Value Separator |
Angi et annet skilletegn hvis en av produsentegenskapsverdiene som er angitt i KafkaConfig, inneholder et likt symbol. |
|||
Meldingshode |
Eventuelt, hvis Du bruker Kafka 0.11 eller nyere i asynkron-modus eller i synkroniseringsmodus med KafkaConfig
hvis du vil angi flere egendefinerte overskrifter, skiller du dem med semikolon.Hvis Du bruker Kafka 0.11 eller senere i asynkron-modus, eller i synkroniseringsmodus med KafkaConfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) du kan blant annet bruke denne egenskapen til å støtte loggkomprimering eller til å tillate nedstrømsprogrammer å bruke spørringer basert på meldings nyttelast.. |
se Innstilling KafkaWriter modus egenskap: synkroniser mot async |
||
Parallelle Tråder |
Se Opprette flere forfatterforekomster. |
|||
Partisjonsnøkkel |
navnet på et felt i inndatastrømmen hvis verdier bestemmer hvordan hendelser fordeles mellom flere partisjoner. Hendelser med samme partition key-feltverdi vil bli skrevet til samme partisjon. hvis inndatastrømmen er av en hvilken som helst type unntatt WAEvent, angir du navnet på ett av feltene. hvis inndatastrømmen er Av Typen WAEvent, angir Du et felt I METADATAKARTET (SE HP NonStop reader WAEvent-felt, MySQLReader WAEvent-felt, OracleReader WAEvent-felt eller MS SQL Reader WAEvent-felt) ved hjelp av syntaksen |
|||
Emne |
Det Eksisterende Kafka-emnet som skal skrives til (vil ikke bli opprettet hvis det ikke finnes). Hvis Mer enn En Kafka Writer skriver til samme emne, er recovery støttes ikke (se Gjenopprette programmer. (Recovery støttes ved Bruk Av Parallelle Tråder.) |
denne adapteren har et utvalg av formatere. Se Støttede writer-formatter kombinasjoner for mer informasjon.
Merknader om KafkaConfig-egenskapen
med unntakene som er angitt i tabellen nedenfor, kan Du angi hvilken Som Helst Kafka-produsenteiendom i KafkaConfig.
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms prøver på nytt |
|
aktiver.idempotence |
når du bruker versjon 2.1.0 og async-modus, sett til true for å skrive hendelser i rekkefølge (se |
– tasten.deserializer |
verdien er alltid org.apache.kafka.vanlig.Serialization.ByteArrayDeserializer, kan ikke overstyres av KafkaConfig |
kafkaconfig egenskaper kan trygt ignoreres. Se Konfigurere Kafka for mer informasjon om Kafka producer properties.
KafkaWriter sample application
følgende eksempelkode skriver data fra PosDataPreview.csv
Til Kafka emnet KafkaWriterSample
. Dette emnet finnes allerede i Striims interne Kafka-forekomst. Hvis du bruker en ekstern Kafka-forekomst, må du opprette emnet før du kjører programmet.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
du kan bekrefte at data ble skrevet Til Kafka ved å kjøre Kafka Reader sample-programmet.
det første feltet i utdataene (position
) lagrer informasjon som kreves for å unngå tapte eller dupliserte hendelser etter gjenoppretting (se Gjenopprette programmer). Hvis gjenoppretting ikke er aktivert, ER VERDIEN NULL.
mon
output (Se Bruke MON kommandoen) for mål som bruker KafkaWriter inkluderer:
-
I Async-modus, Sendte Byte Rate: Hvor Mange megabyte per sekund ble sendt til meglere
-
i både sync og async antall megabyte per sekund ble skrevet av meglere og bekreftelse mottatt av striim
Aktivere komprimering
når du aktiverer komprimering i kafkawriter, megler og forbruker skal håndtere de komprimerte grupper automatisk. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'