Striim 3.10.3 dokumentacja

Kafka Writer

pisze do tematu w Apache Kafka.

Istnieje pięć wersji KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0 i 2.1.0. Użyj tego, który odpowiada docelowemu brokerowi Kafka. Na przykład, aby użyć 0.9.0, składnia jest CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. Podczas zapisu do wewnętrznej instancji Kafki należy użyć 0.11.0.

znany problem DEV-13039: aplikacja z KafkaWriter 0.9 lub 0.10 ulega awarii, jeśli Kafka broker zostanie wyłączony.

właściwość

typ

wartość domyślna

notatki

adres brokera

string

konfiguracja Kafki

string

opcjonalnie należy określić właściwości producenta Kafki, oddzielone średnikami. Szczegóły w poniższej tabeli.

Separator właściwości konfiguracyjnych Kafki

String

określa inny separator, jeśli jedna z wartości właściwości producer podanych w KafkaConfig zawiera średnik.

Separator wartości konfiguracyjnej Kafki

String

=

określa inny separator, jeśli jedna z wartości właściwości producenta określonych w KafkaConfig zawiera symbol równości.

nagłówek wiadomości

String

opcjonalnie, jeśli używasz Kafki 0.11 lub nowszy w trybie asynchronicznym lub w trybie synchronizacji z KafkaConfig batch.size=-1, określ jeden lub więcej niestandardowych nagłówków, które mają być dodane do wiadomości jako pary klucz-wartość. Wartości mogą być następujące:

  • Nazwa pola z wrzuconego strumienia typu zdefiniowanego przez użytkownika: na przykład MerchantID=merchantID

  • statyczny ciąg znaków: na przykład Company="My Company"

  • funkcja: na przykład, aby uzyskać nazwę tabeli źródłowej ze strumienia wejściowego WAEvent, który jest wyjściem czytnika CDC,Table Name=@metadata(TableName)

  • aby określić wiele niestandardowych nagłówków, oddziel je średnikami.

    klucz wiadomości

    String

    opcjonalnie, jeśli używasz Kafki 0.11 lub nowszego w trybie asynchronicznym lub w trybie synchronizacji z KafkaConfigbatch.size=-1, Podaj jeden lub więcej klawiszy dodawane do wiadomości jako pary klucz-wartość. Wartość właściwości może być statycznym ciągiem znaków, jednym lub kilkoma polami ze strumienia wejściowego lub kombinacją obu. Przykłady:

    MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

    oprócz innych możliwości, możesz użyć tej właściwości do obsługi zagęszczania dziennika lub do umożliwienia aplikacjom niższego szczebla korzystania z zapytań opartych na wiadomości payload..

    Mode

    String

    Sync

    zobacz ustawienie właściwości mode Kafkawritera: sync versus async

    wątki równoległe

    liczba całkowita

    zobacz Tworzenie wielu instancji programu writer.

    Partition Key

    String

    Nazwa pola w strumieniu wejściowym, którego wartości określają sposób dystrybucji zdarzeń między wieloma partycjami. Zdarzenia z tą samą wartością pola partition key będą zapisywane na tej samej partycji.

    jeśli strumień wejściowy jest dowolnego typu z wyjątkiem WAEvent, podaj nazwę jednego z jego pól.

    jeśli strumień wejściowy jest typu WAEvent, określ pole na mapie metadanych (zobacz pola HP Nonstop Reader WAEvent, pola MySQLReader WAEvent, pola Oraclereader WAEvent lub pola MS SQL Reader WAEvent) używając składni @METADATA(<field name>) lub pole na mapie danych użytkownika (zobacz Dodawanie danych zdefiniowanych przez użytkownika do strumieni WAEvent), używając składni @USERDATA(<field name>).

    temat

    String

    istniejący temat Kafki do zapisu (nie zostanie utworzony, jeśli nie istnieje). Jeśli więcej niż jeden Kafka Writer pisze do tego samego tematu, odzyskiwanie nie jest obsługiwane (zobacz Odzyskiwanie aplikacji. (Odzyskiwanie jest obsługiwane przy użyciu wątków równoległych.)

    Ten adapter ma wybór formaterów. Zobacz Obsługiwane kombinacje writer-formatter, aby uzyskać więcej informacji.

    uwagi dotyczące właściwości KafkaConfig

    z wyjątkami podanymi w poniższej tabeli, możesz określić dowolną właściwość producenta Kafki w KafkaConfig.

    Kafka producer property

    notes

    acks

    • in sync mode, may be set to 1 or all

    • in async mode, may be set to 0, 1, or all

    batch.size

    linger.ms

    powtarza

    • w trybie synchronizacji, aby zapobiec zdarzeniom out-of-order, właściwości producenta ustawione w Kafka with będą niezmienione i ignorowane, a Striim będzie obsługiwał je wewnętrznie.

    • w trybie asynchronicznym Striim zaktualizuje właściwości producenta Kafki, które będą obsługiwane przez Kafkę.

    • w trybie synchronizacji możesz ustawićbatch.size=-1, aby zapisać jedno zdarzenie dla każdej wiadomości Kafki. Spowoduje to poważne pogorszenie wydajności, więc nie jest zalecane w środowisku produkcyjnym. Przy tym ustawieniu wiadomości będą podobne do tych w trybie asynchronicznym.

    włącz.idempotencja

    podczas korzystania z wersji 2.1.0 i trybu asynchronicznego, Ustaw na true, aby zapisywać zdarzenia w kolejności (patrz

    klucz.deserializer

    wartość jest zawsze org.Apacz.kafka.pospolite.serializacja.ByteArrayDeserializer, nie może być nadpisany przez KafkaConfig

    wewnętrznie KafkaWriter wywołuje KafkaConsumer do różnych celów, a ostrzeżenie z API konsumenta z powodu przekazania właściwości KafkaConfig można bezpiecznie zignorować. Zobacz Konfigurowanie Kafki, aby uzyskać więcej informacji na temat właściwości producenta Kafki.

    Przykładowa aplikacja KafkaWriter

    poniższy przykładowy kod zapisuje dane zPosDataPreview.csv do tematu KafkiKafkaWriterSample. Temat ten istnieje już w wewnętrznej instancji Kafki Striima. Jeśli używasz zewnętrznej instancji Kafki, musisz utworzyć temat przed uruchomieniem aplikacji.

    CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false)PARSE USING DSVParser ( header:yes)OUTPUT TO RawStream;CREATE CQ CsvToPosDataINSERT INTO PosDataStreamSELECT TO_STRING(data) as merchantId, TO_DATEF(data,'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data) as amount, TO_STRING(data) as zipFROM RawStream;CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample')FORMAT USING DSVFormatter ()INPUT FROM PosDataStream;

    Możesz sprawdzić, czy dane zostały zapisane do Kafki, uruchamiając przykładową aplikację Kafka Reader.

    pierwsze pole na wyjściu (position) przechowuje informacje wymagane do uniknięcia utraconych lub zduplikowanych zdarzeń po odzyskaniu (zobacz Odzyskiwanie aplikacji). Jeśli odzyskiwanie nie jest włączone, jego wartość to NULL.

    mon wyjście (zobacz użycie polecenia MON) dla celów używających Kafkawritera zawiera:

    • tylko w trybie asynchronicznym, szybkość wysyłania bajtów: ile megabajtów na sekundę zostało wysłanych do brokerów

    • w obu synchronizacjach i tryb asynchroniczny, napisz szybkość bajtów: ile megabajtów na sekundę zostało zapisanych przez brokerów i potwierdzenie otrzymane przez striim

    włączanie kompresji

    gdy włączysz kompresję w kafkawriter, broker i konsument powinni automatycznie obsługiwać skompresowane partie. No additional configuration should be required in Kafka.

    To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

    KafkaConfg:'compression.codec=snappy'

    To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

    KafkaConfig:'compression.type=snappy'



    Dodaj komentarz

    Twój adres e-mail nie zostanie opublikowany.