Striim 3.10.3 dokumentation

Kafka forfatter

skriver til et emne i Apache Kafka.

Der er fem versioner af Kafkavirer, 0.8.0, 0.9.0, 0.10.0, 0.11.0 og 2.1.0. Brug den, der svarer til målet Kafka mægler. For eksempel at bruge 0.9.0 er syntaksen CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Hvis du skriver til den interne Kafka-forekomst, skal du bruge 0.11.0.

kendt problem DEV-13039: ansøgning med Kafkaskriter 0.9 eller 0.10 går ned, hvis Kafka broker går offline.

type

standardværdi

noter

string

String

streng

=

String

String

String

Sync

heltal

streng

ejendom

mægler adresse

Kafka config

streng

Angiv eventuelt Kafka producer properties, adskilt af semikoloner. Se nedenstående tabel for detaljer.

Kafka Config Property Separator

Angiv en anden separator, hvis en af de producentegenskabsværdier, der er angivet i KafkaConfig, indeholder et semikolon.

Kafka Config Value Separator

Angiv en anden separator, hvis en af de producentegenskabsværdier, der er angivet i KafkaConfig, indeholder et lige symbol.

Message Header

valgfrit, hvis du bruger Kafka 0.11 eller nyere I async-tilstand eller i synkroniseringstilstand med KafkaConfig batch.size=-1, angiv en eller flere brugerdefinerede overskrifter, der skal føjes til meddelelser som nøgleværdipar. Værdier kan være:

  • et feltnavn fra en in put-strøm af en brugerdefineret type: for eksempel MerchantID=merchantID

  • en statisk streng: for eksempel Company="My Company"

  • en funktion: hvis du f.eks. vil hente kildetabelnavnet fra en indtastningsstrøm, der er output fra en CDC-læser, skal du Table Name=@metadata(TableName)

for at angive flere brugerdefinerede overskrifter skal du adskille dem med semikoloner.

Message Key

valgfrit, hvis du bruger Kafka 0.11 eller nyere I async-tilstand eller i synkroniseringstilstand med KafkaConfig batch.size=-1, angiv en eller flere flere nøgler, der skal føjes til meddelelser som nøgleværdipar. Egenskabsværdien kan være en statisk streng, et eller flere felter fra inputstrømmen eller en kombination af begge. Eksempler:

MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

blandt andre muligheder kan du bruge denne egenskab til at understøtte logkomprimering eller til at tillade nedstrøms applikationer at bruge forespørgsler baseret på meddelelsens nyttelast..

tilstand

se Indstilling af kafkaforfatterens tilstandsegenskab: Synkroniser versus Async

parallelle tråde

se Oprettelse af flere forfatterinstanser.

Partitionstast

navnet på et felt i inputstrømmen, hvis værdier bestemmer, hvordan begivenheder fordeles mellem flere partitioner. Begivenheder med den samme partitionsnøglefeltværdi skrives til den samme partition.

Hvis inputstrømmen er af en hvilken som helst type undtagen vevent, skal du angive navnet på et af dens felter.

Hvis inputstrømmen er af typen vevent, skal du angive et felt i METADATAKORTET (se HP NonStop reader vevent-felter, vevent-felter, oraclereader vevent-felter) ved hjælp af syntaksen @METADATA(<field name>) eller et felt i USERDATA-kortet (se Tilføjelse af brugerdefinerede data til vevent-strømme) ved hjælp af syntaksen @METADATA(<field name>)eller et felt i USERDATA-kortet (se Tilføjelse af brugerdefinerede data til vevent-strømme) ved hjælp af syntaksen @USERDATA(<field name>).

emne

String

det eksisterende Kafka-emne, der skal skrives til (oprettes ikke, hvis det ikke findes). Hvis mere end en Kafka-forfatter skriver til det samme emne, understøttes gendannelse ikke (se Gendannelse af applikationer. (Gendannelse understøttes ved brug af parallelle tråde.)

denne adapter har et valg af formattere. Se understøttede forfatter-formatterkombinationer for at få flere oplysninger.

bemærkninger til kafkaconfig-ejendommen

med de undtagelser, der er angivet i nedenstående tabel, kan du angive enhver Kafka-producentejendom i KafkaConfig.

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

forsøg

  • i synkroniseringstilstand for at forhindre begivenheder, der ikke er i orden, vil producentegenskaberne, der er angivet i Kafka med, være uændrede og ignoreres, og Striim håndterer disse internt.

  • i async-tilstand opdaterer Striim Kafka-producentegenskaberne, og disse håndteres af Kafka.

  • i synkroniseringstilstand kan du indstillebatch.size=-1 for at skrive en begivenhed pr. Dette vil alvorligt forringe ydeevnen, så det anbefales ikke i et produktionsmiljø. Med denne indstilling svarer meddelelser til dem i async-tilstand.

aktiver.idempotens

når du bruger version 2.1.0 og async-tilstand, skal du indstille til sand for at skrive begivenheder i rækkefølge (Se

nøgle.deserialisator

værdi er altid org.apache.Kafka.fælles.serialisering.Bytearraydeserialisator, kan ikke tilsidesættes af KafkaConfig

internt påberåber Kafkaskriter KafkaConsumer til forskellige formål, og advarslen fra forbrugerens API på grund af passering af kafkaconfig egenskaber kan sikkert ignoreres. Se Konfiguration af Kafka for at få flere oplysninger om Kafka producer properties.

Kafkaskriterprøveapplikation

følgende prøvekode skriver data fraPosDataPreview.csvtil Kafka-emnetKafkaWriterSample. Dette emne findes allerede i Striims interne Kafka-instans. Hvis du bruger en ekstern Kafka-forekomst, skal du oprette emnet, før du kører applikationen.

CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

Du kan kontrollere, at data blev skrevet til Kafka ved at køre Kafka Reader sample-applikationen.

det første felt i output (position) gemmer oplysninger, der kræves for at undgå mistede eller duplikerede begivenheder efter gendannelse (se Gendannelse af applikationer). Hvis gendannelse ikke er aktiveret, er dens værdi NULL.

mon output (Se brug af Mon-kommandoen) for mål, der bruger kafkavirer, inkluderer:

  • kun i Async-tilstand, sendt Bytes Rate: hvor mange megabyte per sekund blev sendt til mæglerne

  • i både synkronisering og sekund blev skrevet af mæglerne og bekræftelse modtaget af Striim

aktivering af komprimering

når du aktiverer komprimering i kafkavirer, bør mægleren og forbrugeren håndtere de komprimerede batches automatisk. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'



Skriv et svar

Din e-mailadresse vil ikke blive publiceret.