Использование JAVA для подключения Azure Web Pub Sub Service для создания простой демонстрации MQ


Обзор

Разрабатывайте веб-приложения с обменом сообщениями в реальном времени с помощью Azure Web PubSub — полностью управляемой службы, поддерживающей нативные и бессерверные WebSockets. Используйте модель обмена сообщениями публикация-подписка для создания слабосвязанных масштабируемых приложений, включая чаты, прямые трансляции и информационные панели IoT. Разработчики могут сосредоточиться на функциональности, в то время как Web PubSub управляет потоком данных и контента на ваших веб-страницах и в мобильных приложениях.

концепция

соединение

Соединение, также известное как клиент или клиентское соединение, представляет собой одно WebSocket-соединение с сервисом Web PubSub. После успешного соединения служба Web PubSub присваивает соединению уникальный идентификатор соединения.

концентратор

Концентратор — это логическая концепция для набора клиентских соединений. Обычно концентратор используется для одной цели, например, концентратор чата или концентратор уведомлений. Когда клиент подключается, он подключается к хабу, и на протяжении всей своей жизни он принадлежит этому хабу. Как только клиент подключается к хабу, хаб существует. Различные приложения могут совместно использовать службу Azure Web PubSub, используя разные имена концентраторов.

Группа

Группа — это подмножество подключений к концентратору. Вы можете добавлять клиентские соединения в группу или удалять их из группы в любое время. Например, когда клиент присоединяется к чату или когда клиент покидает чат, чат можно считать группой. Клиент может присоединиться к нескольким группам, а группа может содержать несколько клиентов. Группа подобна групповому «разговору», групповой разговор создается, когда кто-то присоединяется к группе, и разговор исчезает, когда в группе никого нет.

пользователь

Соединение с Web PubSub может принадлежать пользователю. У пользователя может быть несколько подключений, например, когда пользователь подключается к нескольким устройствам или нескольким вкладкам браузера.

сообщение

Когда клиент подключается, он может отправлять сообщения в вышестоящие приложения или получать сообщения от них через соединение WebSocket.

рабочий процесс

  1. Клиент подключается к службе/конечной точке клиента с помощью транспорта WebSocket. Сервис пересылает каждый кадр WebSocket настроенному восходящему потоку (серверу). WebSocket-соединения могут соединяться с любым пользовательским подпротоколом для обработки сервером или с подпротоколом json.webpubsub.azure.v1, поддерживаемым сервисом, который позволяет клиентам напрямую передавать pub/sub. Подробности описаны в разделе Протокол клиента.
  2. Служба вызывает сервер с помощью HTTP-протокола CloudEvents при различных событиях клиента. CloudEvents — это стандартизированное, не зависящее от протокола определение структуры и описания метаданных событий, размещенное Cloud Native Computing Foundation (CNCF). Детали описаны в протоколе сервера.
  3. Сервер может использовать REST API для вызова служб для отправки сообщений клиентам или управления подключенными клиентами. Подробности описаны в протоколе сервера

Создайте субсервис web pub

Подключитесь к webPubSubServiceClient

Импортировать maven

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-webpubsub</artifactId>
    <version>1.1.4</version>
</dependency>
Войти в полноэкранный режим Выйти из полноэкранного режима

Получите строку подключения из Azure и определите концентратор

WebPubSubServiceClient webPubSubServiceClient = new WebPubSubServiceClientBuilder()
        .connectionString(" ")
        .hub(" ")
        .buildClient();
Войти в полноэкранный режим Выйти из полноэкранного режима

Подключение к webSocket

Создать токен

GetClientAccessTokenOptions getClientAccessTokenOptions = new GetClientAccessTokenOptions();
getClientAccessTokenOptions.addRole("webpubsub.sendToGroup");
getClientAccessTokenOptions.addRole("webpubsub.joinLeaveGroup");
WebPubSubClientAccessToken token = webPubSubServiceClient.getClientAccessToken(getClientAccessTokenOptions);
Войдите в полноэкранный режим Выйти из полноэкранного режима

Создайте WebSocket и определите используемый протокол передачи данных

String url = token.getUrl();
ws = HttpClient.newHttpClient().newWebSocketBuilder().subprotocols("json.webpubsub.azure.v1")
        .buildAsync(URI.create(url), new WebSocketClient()).join();
Войти в полноэкранный режим Выйти из полноэкранного режима

Реализовать приемник сообщений WebSocket (в зависимости от используемого WebSocket, метод реализации также отличается)

private static final class WebSocketClient implements WebSocket.Listener {
    private WebSocketClient() {
    }

    @Override
    public void onOpen(WebSocket webSocket) {
        log.info("subscriber open");
        WebSocket.Listener.super.onOpen(webSocket);
    }

    @Override
    public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
        log.info("Message received:{}", data);
        return WebSocket.Listener.super.onText(webSocket, data, last);
    }

    @Override
    public void onError(WebSocket webSocket, Throwable error) {
        System.out.println("Bad day! " + webSocket.toString());
        WebSocket.Listener.super.onError(webSocket, error);
    }
}

Войти в полноэкранный режим Выйти из полноэкранного режима

Информация

AckId

При использовании ackId вы можете получить ответное сообщение с подтверждением, когда ваш запрос будет обработан. Вы можете не указывать ackId в сценариях fire-and-forget.
Служба Web PubSub отправит ответ ack для каждого запроса с ackId.

public class AckResponseMessage {
    private String type;
    private String ackId;
    private boolean success;
    private Error error;

    public static class Error{
        private String name;
        private String message;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getMessage() {
            return message;
        }

        public void setMessage(String message) {
            this.message = message;
        }
    }
}

Вход в полноэкранный режим Выход из полноэкранного режима

Отправить в группу

public class SendGroupMessage {
    public final String type = "sendToGroup";
    public String data;
    public int ackId;
    public String group;
    public boolean noEcho;
    public DataType dataType;

    public SendGroupMessage(String data, int ackId, String group, boolean noEcho, DataType dataType) {
        this.data = data;
        this.ackId = ackId;
        this.group = group;
        this.noEcho = noEcho;
        this.dataType = dataType;
    }
}

Войти в полноэкранный режим Выйти из полноэкранного режима

Присоединиться к группе

public class JoinGroupMessage {
    public int ackId;
    public final String type = "joinGroup";
    public String group;

    public JoinGroupMessage(int ackId, String group){
        this.ackId = ackId;
        this.group = group;
    }
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Покинуть группу

public class LeaveGroupMessage {
    public int ackId;
    public final String type = "leaveGroup";
    public String group;

    public LeaveGroupMessage(int ackId, String group){
        this.ackId = ackId;
        this.group = group;
    }
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Получение сообщений от групп

public class ReceivedGroupMessage {
    private String type;
    private String from;
    private String fromUserId;
    private String group;
    private DataType dataType;
    private String data;
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Получение сообщения с сервера

public class ReceivedServerMessage {
    private String type;
    private String from;
    private DataType dataType;
    private String data;
}
Войти в полноэкранный режим Выход из полноэкранного режима

Ответ системы

public class ConnectedSystemMessage {
    private String type;
    private String event;
    private String userId;
    private String connectionId;
}

public class DisConnectedSystemMessage {
     private String type;
     private String event;
     private String message;
}
Вход в полноэкранный режим Выход из полноэкранного режима

Публикация и подписка

Опубликовать

public void sendToGroup(String data,String group) {
     ++ackId;
     GroupMessage groupMessage = new GroupMessage(data, ackId, group);
     String string = null;
     try {
         string = objectMapper.writeValueAsString(groupMessage);
     } catch (JsonProcessingException e) {
         e.printStackTrace();
     }
     ws.sendText(string,true);
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Подписаться

@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
     try {
         String message = String.valueOf(data);
         handleData(message);
     } catch (Exception e) {
         log.warn("e:{}", e.getMessage());
     }
     return WebSocket.Listener.super.onText(webSocket, data, last);
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Оцените статью
devanswers.ru
Добавить комментарий