Striim 3.10.3 documentatie

Kafka Writer

schrijft naar een onderwerp in Apache Kafka.

Er zijn vijf versies van KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 en 2.1.0. Gebruik degene die overeenkomt met het doel Kafka broker. Om bijvoorbeeld 0.9.0 te gebruiken, is de syntaxis CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Als u naar de interne Kafka instantie schrijft, gebruik dan 0.11.0.

bekend probleem DEV-13039: toepassing met KafkaWriter 0.9 of 0.10 crasht als Kafka broker offline gaat.

String

Geef een ander scheidingsteken op als een van de in KafkaConfig gespecificeerde eigenschappenwaarden een puntkomma bevat.

String

=

berichtkop

String

bij gebruik van Kafka 0.11 of hoger in async-modus, of in synchronisatiemodus met KafkaConfig batch.size=-1, geef een of meer sleutels op die als sleutel-waardeparen aan berichten moeten worden toegevoegd. De eigenschapswaarde kan een statische tekenreeks zijn, een of meer velden uit de invoerstroom, of een combinatie van beide. Voorbeelden:

MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

naast andere mogelijkheden kunt u deze eigenschap gebruiken om logcompactie te ondersteunen of om downstreamtoepassingen toe te staan query ‘ s te gebruiken op basis van de payload van het bericht..

Mode

parallelle Threads

Integer

zie meerdere writer-instanties maken.

partitiesleutel

String

String

goederen

type

standaard waarde

opmerkingen

Adres Makelaar

String

Kafka-Config

String

Optioneel, opgeven Kafka producent eigenschappen, gescheiden door puntkomma ‘ s. Zie onderstaande tabel voor details.

Kafka Config Eigenschappenscheidingsteken

Kafka Config Value Separator

Geef een ander scheidingsteken op als een van de in KafkaConfig gespecificeerde eigenschapswaarden van de producent een gelijk symbool bevat.

tekenreeks

optioneel, bij gebruik van Kafka 0.11 of hoger in async-modus, of in synchronisatie-modus met KafkaConfig batch.size=-1, Specificeer een of meer aangepaste headers die als sleutel-waardeparen aan berichten moeten worden toegevoegd. De waarden kunnen zijn:

  • een veldnaam uit een in put-stroom van een door de gebruiker gedefinieerd type: bijvoorbeeld MerchantID=merchantID

  • een statische tekenreeks: bijvoorbeeld Company="My Company"

  • a functie: om bijvoorbeeld de naam van de brontabel te verkrijgen van een WAEvent-invoerstroom die de uitvoer is van een CDC-lezer, moet Table Name=@metadata(TableName)

om meerdere aangepaste headers op te geven, worden gescheiden met puntkomma ‘ s.

Message Key

String

Sync

zie de eigenschap modus van KafkaWriter instellen: synchroniseren versus async

de naam van een veld in de invoerstroom waarvan de waarden bepalen hoe gebeurtenissen over meerdere partities worden verdeeld. Gebeurtenissen met dezelfde veldwaarde van de partitiesleutel zullen naar dezelfde partitie worden geschreven.

als de invoerstroom van een ander type is dan WAEvent, geef dan de naam van een van de velden op.

Als de input stream is van de WAEvent type, een veld opgeven in de METADATA-kaart (zie HP NonStop reader WAEvent velden, MySQLReader WAEvent velden, OracleReader WAEvent velden, of MS SQL-Reader WAEvent velden) met de syntax @METADATA(<field name>) of een veld in de USERDATA map (zie het Toevoegen van user-defined data te WAEvent streams) met de syntax @USERDATA(<field name>).

onderwerp

het bestaande Kafka-onderwerp om naar te schrijven (wordt niet aangemaakt als het niet bestaat). Als meer dan één Kafka Writer naar hetzelfde onderwerp schrijft, wordt herstel niet ondersteund (zie toepassingen herstellen. (Herstel wordt ondersteund bij het gebruik van parallelle Threads.)

Deze adapter heeft een keuze uit opmaak. Zie ondersteunde writer-formattercombinaties voor meer informatie.

opmerkingen over de kafkaconfig-eigenschap

met de uitzonderingen in de volgende tabel kunt u elke Kafka-producent-eigenschap opgeven in KafkaConfig.

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

probeert

  • in synchronisatiemodus worden de producenteneigenschappen die in Kafka zijn ingesteld ongewijzigd en genegeerd, en Striim zal deze intern afhandelen.

  • in async-modus zal Striim de eigenschappen van de Kafka-producent bijwerken en deze zullen door Kafka worden afgehandeld.

  • in synchronisatiemodus kunt u batch.size=-1 Instellen om één gebeurtenis per Kafka-bericht te schrijven. Dit zal de prestaties ernstig verminderen, dus wordt niet aanbevolen in een productieomgeving. Met deze instelling zullen berichten vergelijkbaar zijn met die in async-modus.

inschakelen.idempotence

bij gebruik van versie 2.1.0 en async-modus, ingesteld op true om gebeurtenissen in volgorde te schrijven (zie

sleutel.deserializer

waarde is altijd org.Apache.Kafka.gemeenschappelijk.serialisatie.ByteArrayDeserializer, kan niet worden overschreven door KafkaConfig

intern roept KafkaWriter KafkaConsumer aan voor verschillende doeleinden, en de waarschuwing van de consumenten API vanwege het doorgeven van KafkaConfig-eigenschappen kan wees veilig genegeerd. Zie Kafka configureren voor meer informatie over de eigenschappen van Kafka-producenten.

kafkawriter voorbeeldtoepassing

de volgende voorbeeldcode schrijft gegevens van PosDataPreview.csv naar het Kafka-onderwerp KafkaWriterSample. Dit onderwerp bestaat al in het interne Kafka-exemplaar van Striim. Als u een externe Kafka-instantie gebruikt, moet u het onderwerp maken voordat u de toepassing uitvoert.

CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

u kunt controleren of gegevens naar Kafka zijn geschreven door de Kafka Reader sample applicatie uit te voeren.

het eerste veld in de uitvoer (position) slaat informatie op die nodig is om verloren of dubbele gebeurtenissen na herstel te voorkomen (zie toepassingen herstellen). Als herstel niet is ingeschakeld, is de waarde NULL.

mon output (zie Behulp van de MON-opdracht) voor doelen met KafkaWriter is inclusief:

  • in de asynchrone modus, Verzonden Bytes Tarief: hoeveel megabyte per seconde werden verzonden naar de makelaars

  • in zowel synchrone en asynchrone modus, Schrijf Bytes Tarief: hoeveel megabyte per seconde werden geschreven door de makelaars en erkenning ontvangen door Striim

het Inschakelen van compressie

Wanneer u de compressie inschakelen in KafkaWriter, de makelaar en de consument moet omgaan met de gecomprimeerde batches automatisch. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'



Geef een antwoord

Het e-mailadres wordt niet gepubliceerd.