KoP — это сокращение от Kafka on Pulsar, и, как следует из названия, это способ чтения и записи данных Kafka на Pulsar. KoP привносит плагин обработки протокола Kafka в брокер Pulsar, чтобы сделать Apache Pulsar совместимым с протоколом Apache Kafka. Добавив плагин обработки протокола KoP к существующему кластеру Pulsar, пользователи могут перенести существующие приложения и сервисы Kafka на Pulsar без изменения кода.
Ключевые особенности Apache Pulsar следующие:
-
Оптимизация операций с помощью функций многопользовательской аренды корпоративного класса.
-
Избегайте перемещения данных и упрощайте операции.
-
Постоянное хранение потоков событий с помощью Apache BookKeeper и многоуровневого хранилища.
-
Использование функций Pulsar Functions для бессерверной обработки событий.
Архитектура KoP показана на следующей схеме, из которой видно, что KoP представляет новый плагин для обработки протоколов, который использует существующие компоненты Pulsar (например, Topic discovery, распределенный репозиторий логов-ManagedLedger, курсор и т.д.) для реализации транспортного протокола Kafka.
Routine Load Подписка на данные Pulsar
Apache Doris Routine Load поддерживает доступ к данным Kafka в Apache Doris и гарантирует транзакционные операции при доступе к данным. Apache Pulsar позиционируется как облачная система публикации и подписки корпоративных сообщений, которая уже используется многими онлайн-сервисами. Как же пользователи Apache Pulsar получают доступ к данным в Apache Doris? Ответ — через KoP.
Поскольку Kop обеспечивает совместимость с Kafka непосредственно в Pulsar, поэтому Plusar можно использовать как Kafka для Apache Doris, и весь процесс может быть выполнен без изменения задачи для Apache Doris, чтобы подключить данные Pulsar к Apache Doris и получить транзакционные гарантии Routine Load.
Практическая работа
Подготовка среды установки Pulsar:
- Скачайте бинарный пакет Pulsar и разархивируйте:
#Download
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz
#Unzip and enter the installation directory
tar xvfz apache-pulsar-2.10.0-bin.tar.gz
cd apache-pulsar-2.10.0
Компиляция и установка KoP:
- Скачайте исходный код KoP
git clone https://github.com/streamnative/kop.git
cd kop
- Компиляция KoP:
mvn clean install -DskipTests
- конфигурация протоколов: Создайте папку protocols в распакованном каталоге apache-pulsar и скопируйте скомпилированный пакет nar в папку protocols.
mkdir apache-pulsar-2.10.0/protocols
# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols
cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols
- Просмотр результатов после добавления:
[root@17a5da45700b apache-pulsar-2.10.0]# ls protocols/
pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar
Добавьте конфигурацию KoP:
- Добавьте следующую конфигурацию в standalone.conf или broker.conf
#Protocols to which KoP is adapted
messagingProtocols=kafka
#KoP's NAR file path
protocolHandlerDirectory=./protocols
#Whether to allow automatic topic creation
allowAutoTopicCreationType=partitioned
- Добавьте следующую конфигурацию прослушивателя служб
# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0
kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.
# If it’s not configured, it will be the same with `kafkaListeners` config by default
kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
При возникновении следующей ошибки:
java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.
Добавьте следующую конфигурацию для включения transactionCoordinatorEnabled
kafkaTransactionCoordinatorEnabled=true
transactionCoordinatorEnabled=true
Эта ошибка должна быть исправлена, иначе вы увидите, что данные производятся и потребляются на Pulsar с помощью инструментов, поставляемых с kafka: bin/kafka-console-producer.sh и bin/kafka-console-consumer.sh работают нормально, но в Apache Doris данные не могут быть синхронизированы.
Запустите Pulsar
#bin/pulsar standalone
pulsar-daemon start standalone
Создайте базу данных Doris и постройте таблицы
mysql -u root -h 127.0.0.1 -P 9030
create database pulsar_doris;
#Switching databases
use pulsar_doris;
#Create clicklog table
CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog
(
`clickTime` DATETIME NOT NULL COMMENT "clickTime",
`type` String NOT NULL COMMENT "clickType",
`id` VARCHAR(100) COMMENT "id",
`user` VARCHAR(100) COMMENT "user",
`city` VARCHAR(50) COMMENT "city"
)
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
Создание рутинных задач загрузки
CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog
COLUMNS(clickTime,id,type,user)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "127.0.0.1:9092",
"kafka_topic" = "test",
"property.group.id" = "doris"
);
Параметры в приведенной выше команде объясняются следующим образом:
-
pulsar_doris: База данных, в которой находится задание Routine Load
-
load_from_pulsar_test: Имя задачи Routine Load
-
clicklog:Целевая таблица для задачи Routine Load
-
strict_mode: Выполняется ли импорт в строгом режиме, здесь установлено значение false
-
format: Тип данных для импорта, здесь настроен как json
-
kafka_broker_list: Адрес службы брокера kafka
-
kafka_broker_list: Имя темы kafka, т.е. на какую тему синхронизировать данные
-
property.group.id: Идентификатор группы потребителей
Импорт данных и тестирование
- Импорт данных
Создайте структуру данных ClickLog и вызовите Kafka Producer для отправки 50 миллионов фрагментов данных в Pulsar.
Структура данных ClickLog выглядит следующим образом
public class ClickLog {
private String id;
private String user;
private String city;
private String clickTime;
private String type;
... //Omit getter and setter
}
Основная логика кода для создания и доставки сообщений выглядит следующим образом.
String strDateFormat = "yyyy-MM-dd HH:mm:ss";
@Autowired
private Producer producer;
try {
for(int j =0 ; j<50000;j++){
int batchSize = 1000;
for(int i = 0 ; i<batchSize ;i++){
ClickLog clickLog = new ClickLog();
clickLog.setId(UUID.randomUUID().toString());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);
clickLog.setClickTime(simpleDateFormat.format(new Date()));
clickLog.setType("webset");
clickLog.setUser("user"+ new Random().nextInt(1000) +i);
producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));
}
}
} catch (Exception e) {
e.printStackTrace();
}
- Представление задачи ROUTINE LOAD
Выполните команду SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;, чтобы просмотреть состояние задачи импорта.
mysql> SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;
*************************** 1. row ***************************
Id: 87873
Name: load_from_pulsar_test
CreateTime: 2022-05-31 12:03:34
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:pulsar_doris
TableName: clicklog1
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"clickTime,id,type,user","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","timezone":"Europe/London","send_batch_parallelism":"1","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","deleteCondition":"*","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","execMemLimit":"2147483648","num_as_string":"false","fuzzy_parse":"false","maxBatchRows":"300000"}
DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0","brokerList":"127.0.0.1:9092"}
CustomProperties: {"group.id":"doris","kafka_default_offsets":"OFFSET_END","client.id":"doris.client"}
Statistic: {"receivedBytes":5739001913,"runningTxns":[],"errorRows":0,"committedTaskNum":168,"loadedRows":50000000,"loadRowsRate":23000,"abortedTaskNum":1,"errorRowsAfterResumed":0,"totalRows":50000000,"unselectedRows":0,"receivedBytesRate":2675000,"taskExecuteTimeMs":2144799}
Progress: {"0":"51139566"}
Lag: {"0":0}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)
ERROR:
No query specified
Из приведенных выше результатов видно, что totalRows равно 50000000 и errorRows равно 0. Это означает, что данные импортированы в Apache Doris без каких-либо потерь или избыточности.
- Проверка данных Выполните следующую команду для подсчета данных в таблице и обнаружите, что результат также равен 50000000, как и ожидалось.
mysql> select count(*) from clicklog;
+----------+
| count(*) |
+----------+
| 50000000 |
+----------+
1 row in set (3.73 sec)
mysql>
Заключение
С помощью KoP мы смогли легко интегрировать данные Apache Pulsar в Apache Doris без каких-либо изменений в задаче Routine Load и гарантировать транзакционный характер процесса импорта данных. Тем временем, сообщество Apache Doris инициировало разработку встроенной поддержки импорта для Apache Pulsar, и предполагается, что вскоре можно будет напрямую подписываться на данные сообщений в Pulsar и гарантировать семантику Exactly-Once в процессе импорта данных.
Ссылки
Веб-сайт Apache Doris: http://doris.apache.org
Apache Doris GitHub:https://github.com/apache/doris
Пожалуйста, свяжитесь с нами через: dev@doris.apache.org