Striim 3.10.3 documentation

Kafka Writer

Skriver Til Et emne I Apache Kafka.Det finnes fem Versjoner Av KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 og 2.1.0. Bruk den som tilsvarer Målet Kafka megler. Hvis du for eksempel vil bruke 0.9.0, er syntaksen CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Hvis du skriver til Den interne Kafka-forekomsten, bruk 0.11.0.

Kjent problem DEV-13039: søknad Med KafkaWriter 0.9 eller 0.10 krasjer hvis Kafka megler går offline.

Streng

Streng

=

String

megler adresse

kafka config

streng

angi eventuelt kafka produsentegenskaper, atskilt med semikolon. Se tabellen nedenfor for detaljer.

Kafka Config Property Separator

Angi en annen separator hvis en av produsentegenskapsverdiene angitt i KafkaConfig inneholder et semikolon.

Kafka Config Value Separator

Angi et annet skilletegn hvis en av produsentegenskapsverdiene som er angitt i KafkaConfig, inneholder et likt symbol.

Meldingshode

Eventuelt, hvis Du bruker Kafka 0.11 eller nyere i asynkron-modus eller i synkroniseringsmodus med KafkaConfig batch.size=-1 angir du ett eller flere tilpassede overskrifter som skal legges til meldinger som nøkkelverdipar. Verdier kan være:

  • et feltnavn fra en i sette strøm av en brukerdefinert type: for eksempel MerchantID=merchantID

  • en statisk streng: for eksempel Company="My Company"

  • en funksjon: hvis du for eksempel vil hente kildetabellnavnet fra En WAEvent – inndatastrøm som er resultatet AV EN CDC-leser, Table Name=@metadata(TableName)

hvis du vil angi flere egendefinerte overskrifter, skiller du dem med semikolon.Hvis Du bruker Kafka 0.11 eller senere i asynkron-modus, eller i synkroniseringsmodus med KafkaConfig batch.size=-1, kan du angi en eller flere taster for Å legges til meldinger som nøkkelverdipar. Egenskapsverdien kan være en statisk streng, ett eller flere felt fra inndatastrømmen, eller en kombinasjon av begge. Eksempler:

MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

du kan blant annet bruke denne egenskapen til å støtte loggkomprimering eller til å tillate nedstrømsprogrammer å bruke spørringer basert på meldings nyttelast..

se Innstilling KafkaWriter modus egenskap: synkroniser mot async

Parallelle Tråder

Se Opprette flere forfatterforekomster.

Partisjonsnøkkel

navnet på et felt i inndatastrømmen hvis verdier bestemmer hvordan hendelser fordeles mellom flere partisjoner. Hendelser med samme partition key-feltverdi vil bli skrevet til samme partisjon.

hvis inndatastrømmen er av en hvilken som helst type unntatt WAEvent, angir du navnet på ett av feltene.

hvis inndatastrømmen er Av Typen WAEvent, angir Du et felt I METADATAKARTET (SE HP NonStop reader WAEvent-felt, MySQLReader WAEvent-felt, OracleReader WAEvent-felt eller MS SQL Reader WAEvent-felt) ved hjelp av syntaksen @METADATA(<field name>), eller et felt I userdata-kartet (Se Legge til brukerdefinerte data I waevent-strømmer), ved hjelp av syntaksen @USERDATA(<field name>).

Emne

Det Eksisterende Kafka-emnet som skal skrives til (vil ikke bli opprettet hvis det ikke finnes). Hvis Mer enn En Kafka Writer skriver til samme emne, er recovery støttes ikke (se Gjenopprette programmer. (Recovery støttes ved Bruk Av Parallelle Tråder.)

denne adapteren har et utvalg av formatere. Se Støttede writer-formatter kombinasjoner for mer informasjon.

Merknader om KafkaConfig-egenskapen

med unntakene som er angitt i tabellen nedenfor, kan Du angi hvilken Som Helst Kafka-produsenteiendom i KafkaConfig.

  • i synkroniseringsmodus, for å forhindre hendelser som ikke er i orden, vil produsentegenskapene satt i Kafka med være uendret og ignorert, Og Striim vil håndtere disse internt.

  • I async-modus vil Striim oppdatere Kafkas produsentegenskaper, og disse vil bli håndtert av Kafka.

  • i synkroniseringsmodus kan du angi batch.size=-1 for å skrive en hendelse per Kafka-melding. Dette vil alvorlig forringe ytelsen, så det anbefales ikke i et produksjonsmiljø. Med denne innstillingen vil meldinger være lik de i async-modus.

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

prøver på nytt

aktiver.idempotence

når du bruker versjon 2.1.0 og async-modus, sett til true for å skrive hendelser i rekkefølge (se

– tasten.deserializer

verdien er alltid org.apache.kafka.vanlig.Serialization.ByteArrayDeserializer, kan ikke overstyres av KafkaConfig

kafkaconfig egenskaper kan trygt ignoreres. Se Konfigurere Kafka for mer informasjon om Kafka producer properties.

KafkaWriter sample application

følgende eksempelkode skriver data fra PosDataPreview.csvTil Kafka emnet KafkaWriterSample. Dette emnet finnes allerede i Striims interne Kafka-forekomst. Hvis du bruker en ekstern Kafka-forekomst, må du opprette emnet før du kjører programmet.

CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

du kan bekrefte at data ble skrevet Til Kafka ved å kjøre Kafka Reader sample-programmet.

det første feltet i utdataene (position) lagrer informasjon som kreves for å unngå tapte eller dupliserte hendelser etter gjenoppretting (se Gjenopprette programmer). Hvis gjenoppretting ikke er aktivert, ER VERDIEN NULL.

monoutput (Se Bruke MON kommandoen) for mål som bruker KafkaWriter inkluderer:

  • I Async-modus, Sendte Byte Rate: Hvor Mange megabyte per sekund ble sendt til meglere

  • i både sync og async antall megabyte per sekund ble skrevet av meglere og bekreftelse mottatt av striim

Aktivere komprimering

når du aktiverer komprimering i kafkawriter, megler og forbruker skal håndtere de komprimerte grupper automatisk. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'



Legg igjen en kommentar

Din e-postadresse vil ikke bli publisert.