Striim 3.10.3 documentation

Kafka Writer

Écrit sur un sujet dans Apache Kafka.

Il existe cinq versions de KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 et 2.1.0. Utilisez celui qui correspond au courtier Kafka cible. Par exemple, pour utiliser 0.9.0, la syntaxe est CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Si vous écrivez dans l’instance interne de Kafka, utilisez 0.11.0.

Problème connu DEV-13039 : l’application avec KafkaWriter 0.9 ou 0.10 se bloque si Kafka broker se déconnecte.

propriété

type

valeur par défaut

notes

Adresse du courtier

Chaîne

Configuration Kafka

Chaîne

Spécifiez éventuellement les propriétés du producteur Kafka, séparées par des points-virgules. Voir le tableau ci-dessous pour plus de détails.

Séparateur de propriétés de configuration Kafka

Chaîne

Spécifiez un séparateur différent si l’une des valeurs de propriété de producteur spécifiées dans KafkaConfig contient un point-virgule.

Séparateur de valeur de configuration Kafka

Chaîne

=

Spécifiez un séparateur différent si l’une des valeurs de propriété de producteur spécifiées dans KafkaConfig contient un symbole égal.

En-tête de message

Chaîne

Éventuellement, si vous utilisez Kafka 0.11 ou une version ultérieure en mode asynchrone, ou en mode synchronisation avec KafkaConfig batch.size=-1, spécifiez un ou plusieurs en-têtes personnalisés à ajouter aux messages en tant que paires clé-valeur. Les valeurs peuvent être :

  • un nom de champ d’un flux in put d’un type défini par l’utilisateur : par exemple, MerchantID=merchantID

  • une chaîne statique : par exemple, Company="My Company"

  • une fonction: par exemple, pour obtenir le nom de la table source à partir d’un flux d’entrée WAEvent qui est la sortie d’un lecteur CDC, Table Name=@metadata(TableName)

Pour spécifier plusieurs en-têtes personnalisés, séparez-les par des points-virgules.

Clé de message

Chaîne

Si vous utilisez Kafka 0.11 ou une version ultérieure en mode asynchrone, ou en mode de synchronisation avec KafkaConfig batch.size=-1, spécifiez une ou plusieurs clés à ajouter aux messages en tant que paires clé-valeur . La valeur de la propriété peut être une chaîne statique, un ou plusieurs champs du flux d’entrée, ou une combinaison des deux. Exemples :

MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

Entre autres possibilités, vous pouvez utiliser cette propriété pour prendre en charge le compactage des journaux ou pour permettre aux applications en aval d’utiliser des requêtes basées sur la charge utile du message..

Mode

Chaîne

Sync

voir Définition de la propriété mode de KafkaWriter: sync versus asynchrone

Threads parallèles

Entier

Voir Création de plusieurs instances d’écriture.

Clé de partition

Chaîne

Nom d’un champ du flux d’entrée dont les valeurs déterminent la répartition des événements entre plusieurs partitions. Les événements avec la même valeur de champ de clé de partition seront écrits sur la même partition.

Si le flux d’entrée est de tout type sauf WAEvent, spécifiez le nom de l’un de ses champs.

Si le flux d’entrée est de type WAEvent, spécifiez un champ dans la carte de MÉTADONNÉES (voir Champs WAEvent HP NonStop reader, champs WAEVENT MySQLReader, champs WAEvent OracleReader ou champs WAEvent MS SQL Reader) en utilisant la syntaxe @METADATA(<field name>), ou un champ dans la carte de données UTILISATEUR (voir Ajout de données définies par l’utilisateur aux flux WAEvent), en utilisant la syntaxe @USERDATA(<field name>).

Topic

String

Le sujet Kafka existant sur lequel écrire (ne sera pas créé s’il n’existe pas). Si plusieurs auteurs Kafka écrivent sur le même sujet, la récupération n’est pas prise en charge (voir Récupération d’applications. (La récupération est prise en charge lors de l’utilisation de Threads parallèles.)

Cet adaptateur a un choix de formateurs. Pour plus d’informations, reportez-vous à la section Combinaisons écrivain-formateur prises en charge.

Remarques sur la propriété KafkaConfig

Avec les exceptions indiquées dans le tableau suivant, vous pouvez spécifier n’importe quelle propriété de producteur Kafka dans KafkaConfig.

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

tente

  • En mode de synchronisation, pour éviter les événements hors service, les propriétés du producteur définies dans Kafka with seront inchangées et ignorées, et Striim les gérera en interne.

  • En mode asynchrone, Striim mettra à jour les propriétés du producteur Kafka et celles-ci seront gérées par Kafka.

  • En mode de synchronisation, vous pouvez définir batch.size=-1 pour écrire un événement par message Kafka. Cela dégradera sérieusement les performances et n’est donc pas recommandé dans un environnement de production. Avec ce paramètre, les messages seront similaires à ceux en mode asynchrone.

activer.idempotence

Lorsque vous utilisez la version 2.1.0 et le mode asynchrone, définissez la valeur true pour écrire les événements dans l’ordre (voir la touche

.la valeur du désérialiseur

est toujours org.Apache.Kafka.commun.sérialisation.ByteArrayDeserializer, ne peut pas être remplacé par KafkaConfig

En interne, KafkaWriter appelle KafkaConsumer à diverses fins, et l’AVERTISSEMENT de l’API consumer dû au passage des propriétés KafkaConfig peut être ignoré en toute sécurité. Voir Configuration de Kafka pour plus d’informations sur les propriétés du producteur Kafka.

Exemple d’application KafkaWriter

L’exemple de code suivant écrit les données de PosDataPreview.csv dans la rubrique Kafka KafkaWriterSample. Ce sujet existe déjà dans l’instance interne de Kafka de Striim. Si vous utilisez une instance Kafka externe, vous devez créer la rubrique avant d’exécuter l’application.

CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

Vous pouvez vérifier que les données ont été écrites dans Kafka en exécutant l’exemple d’application Kafka Reader.

Le premier champ de la sortie (position) stocke les informations nécessaires pour éviter les événements perdus ou dupliqués après la récupération (voir Récupération des applications). Si la récupération n’est pas activée, sa valeur est NULL.

monla sortie (voir Utilisation de la commande MON) pour les cibles utilisant KafkaWriter comprend:

  • en mode asynchrone uniquement, Taux d’octets envoyés: combien de mégaoctets par seconde ont été envoyés aux courtiers

  • à la fois dans la synchronisation et mode asynchrone, Taux d’octets d’écriture : combien de mégaoctets par seconde ont été écrits par les courtiers et accusé de réception reçu par Striim

Activation de la compression

Lorsque vous activez la compression dans KafkaWriter, le courtier et le consommateur doivent gérer automatiquement les lots compressés. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'



Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.