Striim 3.10.3 dokumentation
Kafka forfatter
skriver til et emne i Apache Kafka.
Der er fem versioner af Kafkavirer, 0.8.0, 0.9.0, 0.10.0, 0.11.0 og 2.1.0. Brug den, der svarer til målet Kafka mægler. For eksempel at bruge 0.9.0 er syntaksen CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. Hvis du skriver til den interne Kafka-forekomst, skal du bruge 0.11.0
.
kendt problem DEV-13039: ansøgning med Kafkaskriter 0.9 eller 0.10 går ned, hvis Kafka broker går offline.
ejendom |
|||
---|---|---|---|
mægler adresse |
|||
Kafka config |
streng |
Angiv eventuelt Kafka producer properties, adskilt af semikoloner. Se nedenstående tabel for detaljer. |
|
Kafka Config Property Separator |
Angiv en anden separator, hvis en af de producentegenskabsværdier, der er angivet i KafkaConfig, indeholder et semikolon. |
||
Kafka Config Value Separator |
Angiv en anden separator, hvis en af de producentegenskabsværdier, der er angivet i KafkaConfig, indeholder et lige symbol. |
||
Message Header |
valgfrit, hvis du bruger Kafka 0.11 eller nyere I async-tilstand eller i synkroniseringstilstand med KafkaConfig
for at angive flere brugerdefinerede overskrifter skal du adskille dem med semikoloner. |
||
Message Key |
valgfrit, hvis du bruger Kafka 0.11 eller nyere I async-tilstand eller i synkroniseringstilstand med KafkaConfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) blandt andre muligheder kan du bruge denne egenskab til at understøtte logkomprimering eller til at tillade nedstrøms applikationer at bruge forespørgsler baseret på meddelelsens nyttelast.. |
||
tilstand |
se Indstilling af kafkaforfatterens tilstandsegenskab: Synkroniser versus Async |
||
parallelle tråde |
se Oprettelse af flere forfatterinstanser. |
||
Partitionstast |
navnet på et felt i inputstrømmen, hvis værdier bestemmer, hvordan begivenheder fordeles mellem flere partitioner. Begivenheder med den samme partitionsnøglefeltværdi skrives til den samme partition. Hvis inputstrømmen er af en hvilken som helst type undtagen vevent, skal du angive navnet på et af dens felter. Hvis inputstrømmen er af typen vevent, skal du angive et felt i METADATAKORTET (se HP NonStop reader vevent-felter, vevent-felter, oraclereader vevent-felter) ved hjælp af syntaksen |
||
emne |
String |
det eksisterende Kafka-emne, der skal skrives til (oprettes ikke, hvis det ikke findes). Hvis mere end en Kafka-forfatter skriver til det samme emne, understøttes gendannelse ikke (se Gendannelse af applikationer. (Gendannelse understøttes ved brug af parallelle tråde.) |
denne adapter har et valg af formattere. Se understøttede forfatter-formatterkombinationer for at få flere oplysninger.
bemærkninger til kafkaconfig-ejendommen
med de undtagelser, der er angivet i nedenstående tabel, kan du angive enhver Kafka-producentejendom i KafkaConfig.
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms forsøg |
|
aktiver.idempotens |
når du bruger version 2.1.0 og async-tilstand, skal du indstille til sand for at skrive begivenheder i rækkefølge (Se |
nøgle.deserialisator |
værdi er altid org.apache.Kafka.fælles.serialisering.Bytearraydeserialisator, kan ikke tilsidesættes af KafkaConfig |
internt påberåber Kafkaskriter KafkaConsumer til forskellige formål, og advarslen fra forbrugerens API på grund af passering af kafkaconfig egenskaber kan sikkert ignoreres. Se Konfiguration af Kafka for at få flere oplysninger om Kafka producer properties.
Kafkaskriterprøveapplikation
følgende prøvekode skriver data fraPosDataPreview.csv
til Kafka-emnetKafkaWriterSample
. Dette emne findes allerede i Striims interne Kafka-instans. Hvis du bruger en ekstern Kafka-forekomst, skal du oprette emnet, før du kører applikationen.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
Du kan kontrollere, at data blev skrevet til Kafka ved at køre Kafka Reader sample-applikationen.
det første felt i output (position
) gemmer oplysninger, der kræves for at undgå mistede eller duplikerede begivenheder efter gendannelse (se Gendannelse af applikationer). Hvis gendannelse ikke er aktiveret, er dens værdi NULL.
mon
output (Se brug af Mon-kommandoen) for mål, der bruger kafkavirer, inkluderer:
-
kun i Async-tilstand, sendt Bytes Rate: hvor mange megabyte per sekund blev sendt til mæglerne
-
i både synkronisering og sekund blev skrevet af mæglerne og bekræftelse modtaget af Striim
aktivering af komprimering
når du aktiverer komprimering i kafkavirer, bør mægleren og forbrugeren håndtere de komprimerede batches automatisk. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'