Striim 3.10.3 documentatie
Kafka Writer
schrijft naar een onderwerp in Apache Kafka.
Er zijn vijf versies van KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 en 2.1.0. Gebruik degene die overeenkomt met het doel Kafka broker. Om bijvoorbeeld 0.9.0 te gebruiken, is de syntaxis CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. Als u naar de interne Kafka instantie schrijft, gebruik dan 0.11.0
.
bekend probleem DEV-13039: toepassing met KafkaWriter 0.9 of 0.10 crasht als Kafka broker offline gaat.
goederen |
type |
standaard waarde |
opmerkingen |
---|---|---|---|
Adres Makelaar |
String |
||
Kafka-Config |
String |
Optioneel, opgeven Kafka producent eigenschappen, gescheiden door puntkomma ‘ s. Zie onderstaande tabel voor details. |
|
Kafka Config Eigenschappenscheidingsteken |
|||
Kafka Config Value Separator |
Geef een ander scheidingsteken op als een van de in KafkaConfig gespecificeerde eigenschapswaarden van de producent een gelijk symbool bevat. |
||
tekenreeks |
optioneel, bij gebruik van Kafka 0.11 of hoger in async-modus, of in synchronisatie-modus met KafkaConfig
om meerdere aangepaste headers op te geven, worden gescheiden met puntkomma ‘ s. |
||
Message Key |
|||
String |
Sync |
zie de eigenschap modus van KafkaWriter instellen: synchroniseren versus async |
|
de naam van een veld in de invoerstroom waarvan de waarden bepalen hoe gebeurtenissen over meerdere partities worden verdeeld. Gebeurtenissen met dezelfde veldwaarde van de partitiesleutel zullen naar dezelfde partitie worden geschreven. als de invoerstroom van een ander type is dan WAEvent, geef dan de naam van een van de velden op. Als de input stream is van de WAEvent type, een veld opgeven in de METADATA-kaart (zie HP NonStop reader WAEvent velden, MySQLReader WAEvent velden, OracleReader WAEvent velden, of MS SQL-Reader WAEvent velden) met de syntax |
|||
onderwerp |
het bestaande Kafka-onderwerp om naar te schrijven (wordt niet aangemaakt als het niet bestaat). Als meer dan één Kafka Writer naar hetzelfde onderwerp schrijft, wordt herstel niet ondersteund (zie toepassingen herstellen. (Herstel wordt ondersteund bij het gebruik van parallelle Threads.) |
Deze adapter heeft een keuze uit opmaak. Zie ondersteunde writer-formattercombinaties voor meer informatie.
opmerkingen over de kafkaconfig-eigenschap
met de uitzonderingen in de volgende tabel kunt u elke Kafka-producent-eigenschap opgeven in KafkaConfig.
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms probeert |
|
inschakelen.idempotence |
bij gebruik van versie 2.1.0 en async-modus, ingesteld op true om gebeurtenissen in volgorde te schrijven (zie |
sleutel.deserializer |
waarde is altijd org.Apache.Kafka.gemeenschappelijk.serialisatie.ByteArrayDeserializer, kan niet worden overschreven door KafkaConfig |
intern roept KafkaWriter KafkaConsumer aan voor verschillende doeleinden, en de waarschuwing van de consumenten API vanwege het doorgeven van KafkaConfig-eigenschappen kan wees veilig genegeerd. Zie Kafka configureren voor meer informatie over de eigenschappen van Kafka-producenten.
kafkawriter voorbeeldtoepassing
de volgende voorbeeldcode schrijft gegevens van PosDataPreview.csv
naar het Kafka-onderwerp KafkaWriterSample
. Dit onderwerp bestaat al in het interne Kafka-exemplaar van Striim. Als u een externe Kafka-instantie gebruikt, moet u het onderwerp maken voordat u de toepassing uitvoert.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
u kunt controleren of gegevens naar Kafka zijn geschreven door de Kafka Reader sample applicatie uit te voeren.
het eerste veld in de uitvoer (position
) slaat informatie op die nodig is om verloren of dubbele gebeurtenissen na herstel te voorkomen (zie toepassingen herstellen). Als herstel niet is ingeschakeld, is de waarde NULL.
mon
output (zie Behulp van de MON-opdracht) voor doelen met KafkaWriter is inclusief:
-
in de asynchrone modus, Verzonden Bytes Tarief: hoeveel megabyte per seconde werden verzonden naar de makelaars
-
in zowel synchrone en asynchrone modus, Schrijf Bytes Tarief: hoeveel megabyte per seconde werden geschreven door de makelaars en erkenning ontvangen door Striim
het Inschakelen van compressie
Wanneer u de compressie inschakelen in KafkaWriter, de makelaar en de consument moet omgaan met de gecomprimeerde batches automatisch. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'