Как быстро и без проблем импортировать данные из Apache Pulsar в Apache Doris

KoP — это сокращение от Kafka on Pulsar, и, как следует из названия, это способ чтения и записи данных Kafka на Pulsar. KoP привносит плагин обработки протокола Kafka в брокер Pulsar, чтобы сделать Apache Pulsar совместимым с протоколом Apache Kafka. Добавив плагин обработки протокола KoP к существующему кластеру Pulsar, пользователи могут перенести существующие приложения и сервисы Kafka на Pulsar без изменения кода.

Ключевые особенности Apache Pulsar следующие:

  • Оптимизация операций с помощью функций многопользовательской аренды корпоративного класса.

  • Избегайте перемещения данных и упрощайте операции.

  • Постоянное хранение потоков событий с помощью Apache BookKeeper и многоуровневого хранилища.

  • Использование функций Pulsar Functions для бессерверной обработки событий.

Архитектура KoP показана на следующей схеме, из которой видно, что KoP представляет новый плагин для обработки протоколов, который использует существующие компоненты Pulsar (например, Topic discovery, распределенный репозиторий логов-ManagedLedger, курсор и т.д.) для реализации транспортного протокола Kafka.

Routine Load Подписка на данные Pulsar

Apache Doris Routine Load поддерживает доступ к данным Kafka в Apache Doris и гарантирует транзакционные операции при доступе к данным. Apache Pulsar позиционируется как облачная система публикации и подписки корпоративных сообщений, которая уже используется многими онлайн-сервисами. Как же пользователи Apache Pulsar получают доступ к данным в Apache Doris? Ответ — через KoP.

Поскольку Kop обеспечивает совместимость с Kafka непосредственно в Pulsar, поэтому Plusar можно использовать как Kafka для Apache Doris, и весь процесс может быть выполнен без изменения задачи для Apache Doris, чтобы подключить данные Pulsar к Apache Doris и получить транзакционные гарантии Routine Load.

Практическая работа

Подготовка среды установки Pulsar:

  • Скачайте бинарный пакет Pulsar и разархивируйте:
#Download
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz
#Unzip and enter the installation directory
tar xvfz apache-pulsar-2.10.0-bin.tar.gz
cd apache-pulsar-2.10.0
Вход в полноэкранный режим Выход из полноэкранного режима

Компиляция и установка KoP:

  • Скачайте исходный код KoP
git clone https://github.com/streamnative/kop.git
cd kop
Войти в полноэкранный режим Выход из полноэкранного режима
  • Компиляция KoP:
mvn clean install -DskipTests
Войти в полноэкранный режим Выход из полноэкранного режима
  • конфигурация протоколов: Создайте папку protocols в распакованном каталоге apache-pulsar и скопируйте скомпилированный пакет nar в папку protocols.
mkdir apache-pulsar-2.10.0/protocols
# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols
cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols
Вход в полноэкранный режим Выход из полноэкранного режима
  • Просмотр результатов после добавления:
[root@17a5da45700b apache-pulsar-2.10.0]# ls protocols/
pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar
Войти в полноэкранный режим Выйти из полноэкранного режима

Добавьте конфигурацию KoP:

  • Добавьте следующую конфигурацию в standalone.conf или broker.conf
#Protocols to which KoP is adapted
messagingProtocols=kafka
#KoP's NAR file path
protocolHandlerDirectory=./protocols
#Whether to allow automatic topic creation
allowAutoTopicCreationType=partitioned
Войти в полноэкранный режим Выйти из полноэкранного режима
  • Добавьте следующую конфигурацию прослушивателя служб
# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0 
kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.
# If it’s not configured, it will be the same with `kafkaListeners` config by default
kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
Войти в полноэкранный режим Выйти из полноэкранного режима

При возникновении следующей ошибки:

java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.
Войти в полноэкранный режим Выйти из полноэкранного режима

Добавьте следующую конфигурацию для включения transactionCoordinatorEnabled

kafkaTransactionCoordinatorEnabled=true
transactionCoordinatorEnabled=true
Войти в полноэкранный режим Выйти из полноэкранного режима

Эта ошибка должна быть исправлена, иначе вы увидите, что данные производятся и потребляются на Pulsar с помощью инструментов, поставляемых с kafka: bin/kafka-console-producer.sh и bin/kafka-console-consumer.sh работают нормально, но в Apache Doris данные не могут быть синхронизированы.

Запустите Pulsar
#bin/pulsar standalone
pulsar-daemon start standalone
Войдите в полноэкранный режим Выйдите из полноэкранного режима

Создайте базу данных Doris и постройте таблицы

mysql -u root  -h 127.0.0.1 -P 9030
create database pulsar_doris;
#Switching databases
use pulsar_doris;
#Create clicklog table
CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog
(
    `clickTime` DATETIME NOT NULL COMMENT "clickTime",
    `type` String NOT NULL COMMENT "clickType",
    `id`  VARCHAR(100) COMMENT "id",
    `user` VARCHAR(100) COMMENT "user",
    `city` VARCHAR(50) COMMENT "city"
)
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
Войти в полноэкранный режим Выход из полноэкранного режима

Создание рутинных задач загрузки

CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog
COLUMNS(clickTime,id,type,user)
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "format" = "json"
)
FROM KAFKA
(
    "kafka_broker_list" = "127.0.0.1:9092",
    "kafka_topic" = "test",
    "property.group.id" = "doris"
 );
Войти в полноэкранный режим Выйти из полноэкранного режима

Параметры в приведенной выше команде объясняются следующим образом:

  • pulsar_doris: База данных, в которой находится задание Routine Load

  • load_from_pulsar_test: Имя задачи Routine Load

  • clicklog:Целевая таблица для задачи Routine Load

  • strict_mode: Выполняется ли импорт в строгом режиме, здесь установлено значение false

  • format: Тип данных для импорта, здесь настроен как json

  • kafka_broker_list: Адрес службы брокера kafka

  • kafka_broker_list: Имя темы kafka, т.е. на какую тему синхронизировать данные

  • property.group.id: Идентификатор группы потребителей

Импорт данных и тестирование

  • Импорт данных

Создайте структуру данных ClickLog и вызовите Kafka Producer для отправки 50 миллионов фрагментов данных в Pulsar.

Структура данных ClickLog выглядит следующим образом

public class ClickLog {
    private String id;
    private String user;
    private String city;
    private String clickTime;
    private String type;
    ... //Omit getter and setter
   }
Вход в полноэкранный режим Выход из полноэкранного режима

Основная логика кода для создания и доставки сообщений выглядит следующим образом.

       String strDateFormat = "yyyy-MM-dd HH:mm:ss";
       @Autowired
       private Producer producer;
        try {
            for(int j =0 ; j<50000;j++){
              int batchSize = 1000;
                for(int i = 0 ; i<batchSize ;i++){
                    ClickLog clickLog  = new ClickLog();
                    clickLog.setId(UUID.randomUUID().toString());
                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);
                    clickLog.setClickTime(simpleDateFormat.format(new Date()));
                    clickLog.setType("webset");
                    clickLog.setUser("user"+ new Random().nextInt(1000) +i);
                    producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
Вход в полноэкранный режим Выход из полноэкранного режима
  • Представление задачи ROUTINE LOAD

Выполните команду SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;, чтобы просмотреть состояние задачи импорта.

mysql>  SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;
*************************** 1. row ***************************
                  Id: 87873
                Name: load_from_pulsar_test
          CreateTime: 2022-05-31 12:03:34
           PauseTime: NULL
             EndTime: NULL
              DbName: default_cluster:pulsar_doris
           TableName: clicklog1
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"clickTime,id,type,user","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","timezone":"Europe/London","send_batch_parallelism":"1","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","deleteCondition":"*","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","execMemLimit":"2147483648","num_as_string":"false","fuzzy_parse":"false","maxBatchRows":"300000"}
DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0","brokerList":"127.0.0.1:9092"}
    CustomProperties: {"group.id":"doris","kafka_default_offsets":"OFFSET_END","client.id":"doris.client"}
           Statistic: {"receivedBytes":5739001913,"runningTxns":[],"errorRows":0,"committedTaskNum":168,"loadedRows":50000000,"loadRowsRate":23000,"abortedTaskNum":1,"errorRowsAfterResumed":0,"totalRows":50000000,"unselectedRows":0,"receivedBytesRate":2675000,"taskExecuteTimeMs":2144799}
            Progress: {"0":"51139566"}
                 Lag: {"0":0}
ReasonOfStateChanged: 
        ErrorLogUrls: 
            OtherMsg: 
1 row in set (0.00 sec)
ERROR: 
No query specified
Вход в полноэкранный режим Выход из полноэкранного режима

Из приведенных выше результатов видно, что totalRows равно 50000000 и errorRows равно 0. Это означает, что данные импортированы в Apache Doris без каких-либо потерь или избыточности.

  • Проверка данных Выполните следующую команду для подсчета данных в таблице и обнаружите, что результат также равен 50000000, как и ожидалось.
mysql> select count(*) from clicklog;
+----------+
| count(*) |
+----------+
| 50000000 |
+----------+
1 row in set (3.73 sec)
mysql> 
Войдите в полноэкранный режим Выход из полноэкранного режима

Заключение

С помощью KoP мы смогли легко интегрировать данные Apache Pulsar в Apache Doris без каких-либо изменений в задаче Routine Load и гарантировать транзакционный характер процесса импорта данных. Тем временем, сообщество Apache Doris инициировало разработку встроенной поддержки импорта для Apache Pulsar, и предполагается, что вскоре можно будет напрямую подписываться на данные сообщений в Pulsar и гарантировать семантику Exactly-Once в процессе импорта данных.

Ссылки

Веб-сайт Apache Doris: http://doris.apache.org

Apache Doris GitHub:https://github.com/apache/doris

Пожалуйста, свяжитесь с нами через: dev@doris.apache.org

Оцените статью
devanswers.ru
Добавить комментарий