Striim 3.10.3 documentation
Kafka Writer
Écrit sur un sujet dans Apache Kafka.
Il existe cinq versions de KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 et 2.1.0. Utilisez celui qui correspond au courtier Kafka cible. Par exemple, pour utiliser 0.9.0, la syntaxe est CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. Si vous écrivez dans l’instance interne de Kafka, utilisez 0.11.0
.
Problème connu DEV-13039 : l’application avec KafkaWriter 0.9 ou 0.10 se bloque si Kafka broker se déconnecte.
propriété |
type |
valeur par défaut |
notes |
---|---|---|---|
Adresse du courtier |
Chaîne |
||
Configuration Kafka |
Chaîne |
Spécifiez éventuellement les propriétés du producteur Kafka, séparées par des points-virgules. Voir le tableau ci-dessous pour plus de détails. |
|
Séparateur de propriétés de configuration Kafka |
Chaîne |
Spécifiez un séparateur différent si l’une des valeurs de propriété de producteur spécifiées dans KafkaConfig contient un point-virgule. |
|
Séparateur de valeur de configuration Kafka |
Chaîne |
= |
Spécifiez un séparateur différent si l’une des valeurs de propriété de producteur spécifiées dans KafkaConfig contient un symbole égal. |
En-tête de message |
Chaîne |
Éventuellement, si vous utilisez Kafka 0.11 ou une version ultérieure en mode asynchrone, ou en mode synchronisation avec KafkaConfig
Pour spécifier plusieurs en-têtes personnalisés, séparez-les par des points-virgules. |
|
Clé de message |
Chaîne |
Si vous utilisez Kafka 0.11 ou une version ultérieure en mode asynchrone, ou en mode de synchronisation avec KafkaConfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) Entre autres possibilités, vous pouvez utiliser cette propriété pour prendre en charge le compactage des journaux ou pour permettre aux applications en aval d’utiliser des requêtes basées sur la charge utile du message.. |
|
Mode |
Chaîne |
Sync |
voir Définition de la propriété mode de KafkaWriter: sync versus asynchrone |
Threads parallèles |
Entier |
Voir Création de plusieurs instances d’écriture. |
|
Clé de partition |
Chaîne |
Nom d’un champ du flux d’entrée dont les valeurs déterminent la répartition des événements entre plusieurs partitions. Les événements avec la même valeur de champ de clé de partition seront écrits sur la même partition. Si le flux d’entrée est de tout type sauf WAEvent, spécifiez le nom de l’un de ses champs. Si le flux d’entrée est de type WAEvent, spécifiez un champ dans la carte de MÉTADONNÉES (voir Champs WAEvent HP NonStop reader, champs WAEVENT MySQLReader, champs WAEvent OracleReader ou champs WAEvent MS SQL Reader) en utilisant la syntaxe |
|
Topic |
String |
Le sujet Kafka existant sur lequel écrire (ne sera pas créé s’il n’existe pas). Si plusieurs auteurs Kafka écrivent sur le même sujet, la récupération n’est pas prise en charge (voir Récupération d’applications. (La récupération est prise en charge lors de l’utilisation de Threads parallèles.) |
Cet adaptateur a un choix de formateurs. Pour plus d’informations, reportez-vous à la section Combinaisons écrivain-formateur prises en charge.
Remarques sur la propriété KafkaConfig
Avec les exceptions indiquées dans le tableau suivant, vous pouvez spécifier n’importe quelle propriété de producteur Kafka dans KafkaConfig.
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms tente |
|
activer.idempotence |
Lorsque vous utilisez la version 2.1.0 et le mode asynchrone, définissez la valeur true pour écrire les événements dans l’ordre (voir la touche |
.la valeur du désérialiseur |
est toujours org.Apache.Kafka.commun.sérialisation.ByteArrayDeserializer, ne peut pas être remplacé par KafkaConfig |
En interne, KafkaWriter appelle KafkaConsumer à diverses fins, et l’AVERTISSEMENT de l’API consumer dû au passage des propriétés KafkaConfig peut être ignoré en toute sécurité. Voir Configuration de Kafka pour plus d’informations sur les propriétés du producteur Kafka.
Exemple d’application KafkaWriter
L’exemple de code suivant écrit les données de PosDataPreview.csv
dans la rubrique Kafka KafkaWriterSample
. Ce sujet existe déjà dans l’instance interne de Kafka de Striim. Si vous utilisez une instance Kafka externe, vous devez créer la rubrique avant d’exécuter l’application.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;
Vous pouvez vérifier que les données ont été écrites dans Kafka en exécutant l’exemple d’application Kafka Reader.
Le premier champ de la sortie (position
) stocke les informations nécessaires pour éviter les événements perdus ou dupliqués après la récupération (voir Récupération des applications). Si la récupération n’est pas activée, sa valeur est NULL.
mon
la sortie (voir Utilisation de la commande MON) pour les cibles utilisant KafkaWriter comprend:
-
en mode asynchrone uniquement, Taux d’octets envoyés: combien de mégaoctets par seconde ont été envoyés aux courtiers
-
à la fois dans la synchronisation et mode asynchrone, Taux d’octets d’écriture : combien de mégaoctets par seconde ont été écrits par les courtiers et accusé de réception reçu par Striim
Activation de la compression
Lorsque vous activez la compression dans KafkaWriter, le courtier et le consommateur doivent gérer automatiquement les lots compressés. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'