Аутентификация Camel K в Kafka с помощью SASL

Apache Camel K — это легкий интеграционный фреймворк, созданный на основе Apache Camel, который работает на Kubernetes и специально разработан для бессерверных и микросервисных архитектур. Его часто называют швейцарским армейским ножом для решения проблем интеграции, с его помощью можно легко интегрировать множество разнородных систем и приложений, позволяя им беспрепятственно обмениваться данными. Пользователи Camel K могут мгновенно запускать код интеграции, написанный на Camel DSL, в выбранном ими облаке (Kubernetes или OpenShift). Чтобы узнать больше о Camel K и Camel, посетите официальный сайт Camel.

Kafka может быть легко интегрирован с остальными приложениями с помощью Camel K. Брокеры Kafka поддерживают аутентификацию клиентов с помощью SASL (Simple Authentication and Security Layer). SASL отделяет механизмы аутентификации от прикладных протоколов, что позволяет нам использовать любой из механизмов аутентификации, поддерживаемых SASL. Мы будем использовать механизм OAUTHBEARER, поддерживаемый SASL, для аутентификации, а SSL будет выполнять шифрование данных.

Начало работы

Для данной демонстрации требуется доступ к кластеру kubernetes/openshift.

Установка Camel K и необходимых инструментов CLI

Настройка экземпляра Kafka

Используя Openshift Streams for Apache Kafka, мы можем легко создать экземпляр Kafka, учетную запись службы и тему Kafka Topic. Необходимо выбрать метод аутентификации SASL OAUTHBEARER. Для данной демонстрации мы назовем тему Kafka Topic «test». Сохраните URL-адрес брокера Kafka, идентификатор учетной записи службы, секрет учетной записи службы и URL-адрес конечной точки токена. Мы будем использовать их в конфигурационном файле.

Использование Kafka в интеграции Camel K

Мы создадим следующие файлы:

Kafka использует Java Authentication and Authorization Service (JAAS) для конфигурации SASL, поэтому мы укажем конфигурацию JAAS в конфигурационном файле application.properties. Мы также должны указать обработчик обратного вызова логина, который будет получать токен oauthbearer.

файл application.properties:

# Kafka config
camel.component.kafka.brokers = <YOUR KAFKA BROKER URL>
camel.component.kafka.security-protocol = SASL_SSL

camel.component.kafka.sasl-mechanism = OAUTHBEARER
camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required 
        oauth.client.id='<YOUR SERVICE ACCOUNT ID>' 
        oauth.client.secret='<YOUR SERVICE ACCOUNT SECRET>' 
        oauth.token.endpoint.uri="<TOKEN ENDPOINT URL>" ;
camel.component.kafka.additional-properties[sasl.login.callback.handler.class] = io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

consumer.topic=test
producer.topic=test
Вход в полноэкранный режим Выход из полноэкранного режима

В файлах интеграции мы укажем kafka-oauth-client в качестве maven-зависимости, поскольку он предоставляет класс обработчика входа, указанный в конфигурационном файле.

SaslSSLKafkaProducer.java:

// camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.10.0

import org.apache.camel.builder.RouteBuilder;

public class SaslSSLKafkaProducer extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    log.info("About to start route: Timer -> Kafka ");
    from("timer:foo")
        .routeId("FromTimer2Kafka")
        .setBody()
            .simple("Message #${exchangeProperty.CamelTimerCounter}")
        .to("kafka:{{producer.topic}}")
        .log("Message correctly sent to the topic!");
  }
}
Вход в полноэкранный режим Выход из полноэкранного режима

SaslSSLKafkaConsumer.java:

// camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.10.0

import org.apache.camel.builder.RouteBuilder;

public class SaslSSLKafkaConsumer extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    log.info("About to start route: Kafka -> Log ");
    from("kafka:{{consumer.topic}}")
        .routeId("FromKafka2Log")
        .log("${body}");
  }
}
Войти в полноэкранный режим Выход из полноэкранного режима

Соберите свойства конфигурации в секрет:

kubectl create secret generic kafka-props --from-file application.properties
Войти в полноэкранный режим Выход из полноэкранного режима

Создание темы Kafka

kamel run --config secret:kafka-props SaslSSLKafkaProducer.java --dev
...
[2] 2021-05-06 08:48:11,854 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #1 - KafkaProducer[test]) Message correctly sent to the topic!
[2] 2021-05-06 08:48:11,854 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #3 - KafkaProducer[test]) Message correctly sent to the topic!
[2] 2021-05-06 08:48:11,973 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #5 - KafkaProducer[test]) Message correctly sent to the topic!
Войти в полноэкранный режим Выйти из полноэкранного режима

Потребление из темы Kafka

kamel run --config secret:kafka-props SaslSSLKafkaConsumer.java --dev
...
[1] 2021-05-06 08:51:08,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #8
[1] 2021-05-06 08:51:10,065 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #9
[1] 2021-05-06 08:51:10,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #10
Войти в полноэкранный режим Выйти из полноэкранного режима

Вот и все! Надеюсь, кому-то это пригодится, а если у вас есть вопросы или комментарии, не стесняйтесь писать в разделе комментариев. Счастливого кодинга!

Оцените статью
devanswers.ru
Добавить комментарий