Отслеживание самолетов в реальном времени с помощью открытого исходного кода


Отслеживание воздушных судов в реальном времени с помощью открытого исходного кода

Пример использования: Автоматическое зависимое наблюдение-трансляция (ADS-B Analytics)

Автоматическое зависимое наблюдение-трансляция — Википедия

Automatic Dependent Surveillance-Broadcast (ADS-B) — технология наблюдения, при которой самолёт определяет свою…

en.wikipedia.org

Программный стек:

  • Python 3.10
  • Apache Pulsar 2.10.1
  • Библиотека Apache Pulsar Python
  • Apache Spark
  • Apache Flink
  • Java JDK 17
  • Apache Maven
  • SDKMan
  • Raspian Linux

Система воздушного транспорта следующего поколения (NextGen)

Официальный сайт правительства Соединенных Штатов Америки

www.faa.gov

Аппаратный стек:

  • Flightaware Pro Plus Stick (синий)
  • Антенна и кабель 1090 МГц
  • Raspberry Pi 4 с 2 ГБ оперативной памяти
  • Блок питания USB-C

Ссылки:

  • Что такое ADS-B? https://www.faa.gov/nextgen/programs/adsb/
  • Функции Источник: https://github.com/tspannhw/pulsar-adsb-function
  • Аналитика Источник: https://github.com/tspannhw/FLiP-Py-ADS-B

Используя FLiP-стек с открытым исходным кодом, мы можем с легкостью отслеживать пролетающие самолеты! Для этого потребуется немного оборудования и немного Python. Вы можете создать свою собственную систему дома, используя несколько единиц оборудования и простое программное обеспечение с открытым исходным кодом.

Мы не только собираем, передаем, обогащаем и храним эти данные, но и вносим свой вклад в развитие мира. Вы можете посмотреть мой канал на сайте FlightAware.

Тимоти Спанн Статистика ADS-B фидера — FlightAware

Вы можете помочь нам сохранить FlightAware бесплатным, разрешив рекламу с FlightAware.com. Мы прилагаем все усилия, чтобы сохранить нашу рекламу…

flightaware.com

Если вы думаете, что это похоже на данные, которые вы хотите получить для своего Lakehouse, давайте начнем получать их прямо сейчас. Как только вы настроите свое оборудование, ознакомьтесь с инструкциями от Piaware. Как только вы перезагрузитесь и все запустится, вы сможете увидеть состояние всего работающего оборудования.

Если вы хотите перейти к видео с демонстрацией того, что я сделал, вы можете сделать это.

Шаг 1

Давайте создадим темы, которые нам понадобятся для всех этих необработанных и обработанных данных полета.bin/pulsar-admin topics create persistent://public/default/adsbraw.

bin/pulsar-admin topics create persistent://public/default/aircraftbin/pulsar-admin topics create persistent://public/default/adsblogbin/pulsar-admin topics create persistent://public/default/adsbdead

Первая тема — это наши необработанные JSON данные ADS-B. Возможно, мы захотим использовать их позже, поэтому мы можем позволить этой теме хранить данные вечно, возможно, в какой-то момент мы включим многоуровневое хранение и будем автоматически сохранять их в объектном хранилище на AWS, Google Cloud или Azure. У нас также есть тема для чистых данных, самолет. Наконец, у нас есть тема для журналов, выводимых при обработке, и тема для сообщений, которые можно обрабатывать. Они не мертвы, поскольку мы можем поднять их, как зомби, и обработать снова. Никогда не отказывайтесь от своих данных!

Первое, что я сделал, это изучил данные, которые отображались на этой симпатичной локальной карте. Запустив Chrome в режиме разработчика, я увидел все сделанные REST-вызовы. Я вижу данные в формате JSON, полученные в результате простых вызовов REST. Именно здесь я взял то, что мне было нужно, и поместил это в удобный сценарий Python.

Поскольку мы собираемся передавать, хранить, анализировать, запрашивать и вообще делиться этими данными с различными разработчиками, аналитиками, специалистами по исследованию данных и другими людьми, мы должны убедиться, что знаем, что это за данные.

 hex (String optional)
flight (String optional - name of plane)
alt_baro (int optional - altitude)
alt_geom (int optional)
track (int optional)
baro_rate (int optional)
category (string optional)
nav_qnh (float optional)
nav_altitude_mcp (int optional)
nav_heading (float optional)
nic (int optional)
rc (int optional)
seen_pos (float optional)
version (int optional)
nic_baro (int optional)
nac_p (int optional)
nac_v (int optional)
sil (int optional)
sil_type (string optional)
mlat (array optional)
tisb (array optional)
messages (int optional)
seen (float optional)
rssi (float optional)
squawk (optional) - look at # conversion 7600, 7700, 4000, 5000, 7777, 6100, 5400, 4399, 4478, ...)
speed (optional)
mach (optional speed, mac to mph *767)
emergency (optional string)
lat (long optional)
lon (long optional)

Нам необходимо определить схему с именами, типами и опциональностью. Как только мы это сделаем, мы сможем создать для нее схему на JSON или Python и использовать ее в Apache Pulsar, Pulsar SQL (Presto/Trino), Apache Spark SQL, Apache Flink SQL и в любом другом потребителе, который может читать схему из Pulsar Schema Registry. Данные без контракта — это просто байты.

  • поле: hex — идентификатор ИКАО
  • поле: flight — идентификатор IDENT
  • поле: altBaro — высота в футах (барометрическая)
  • поле: lat, lon — широта и долгота
  • поле: gs — наземная скорость в узлах
  • поле: altGeom — высота над уровнем моря (геометрическая).

Я посмотрел, что это за поля, и сделал несколько заметок. Интересны значения Squawk, которые могут быть интересны людям, работающим с SQL позже.

Как только вы получите данные в классе Java и начнете посылать сообщения в тему самолета, вы сможете вытащить автогенерированную схему.

bin/pulsar-admin schemas get persistent://public/default/aircraft

Шаг 2

Давайте создадим наше Python-приложение для получения данных и публикации их в Pulsar! Мы могли бы использовать множество различных библиотек, поскольку Pulsar поддерживает множество протоколов, таких как Websockets, Kafka, MQTT, AMQP и RocketMQ. Чтобы сохранить простоту и ванильность, я собираюсь использовать проверенный и верный протокол Pulsar и стандартную библиотеку Python Pulsar. Я установил последнюю версию с помощью pip3.10 install pulsar-client[all]. Я сделал все это, поскольку мне нужна была библиотека FastAvr, GRPC, схемы и другие причудливые вещи. Вы можете установить то, что вам нужно.

Полный код на Python находится здесь. Я покажу вам важные фрагменты.

import pulsar
from pulsar import Client, AuthenticationOauth2client = pulsar.Client(service_url, authentication = AuthenticationOauth2(auth_params))producer = client.create_producer(topic=topic ,properties={"producer-name": "adsb-rest","producer-id": "adsb-py" })uniqueid = 'thrml_{0}_{1}'.format(randomword(3),strftime("%Y%m%d%H%M%S",gmtime()))uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())url_data = "http://localhost:8080/data/aircraft.json?_=" + str(uuid.uuid4())response = json.dumps(requests.get(url_data).json())producer.send(response.encode('utf-8'),partition_key=uniqueid)

Затем мы можем запустить его и начать выдавать необработанные данные ADS-B JSON в нашу тему Pulsar.

Шаг 3

Давайте сделаем простую проверку, чтобы убедиться, что данные поступают.

bin/pulsar-client consume "persistent://public/default/adsbraw" -s adsbrawreader -n 0

Как выглядят эти необработанные данные?

{'now': 1659471117.0, 'messages': 7381380, 'aircraft': [{'hex': 'ae6d7a', 'alt_baro': 25000, 'mlat': [], 'tisb': [], 'messages': 177, 'seen': 0.1, 'rssi': -22.7}, {'hex': 'a66174', 'alt_baro': 23000, 'mlat': [], 'tisb': [], 'messages': 5, 'seen': 23.6, 'rssi': -27.8}, .. }

Ну, это очень много. Я сократил его для удобства прокрутки.

Спасибо, что оставались с нами до сих пор, вот кот.

Необработанные данные — это хорошо и все такое, я, конечно, могу поручить Apache Spark, Flink, Python или другим программам очистить их. Я рекомендую вам настроить Delta Lake, Apache Hudi или Apache Iceberg для хранения этих необработанных данных в вашем доме у озера, если вы хотите.

Примечание архитектора: Вы можете просто хранить их в Apache Pulsar вечно или в контролируемом Apache Pulsar многоуровневом хранилище.

Шаг 4

Я хотел сделать это быстро и автоматически в среде Pulsar, поэтому я написал быструю функцию Java Pulsar Function для разделения, разбора, обогащения, очистки и маршрутизации данных в новую тему. Это будет тема очищенных данных. Мы могли бы создать эту функцию и на Python или Golang. В этот раз я выбрал Java.

Да, нам пришлось создать собственную функцию, прежде чем мы смогли развернуть ее в Шаге 4. Давайте бросим быстрый взгляд на нашу функцию Java:

public class ADSBFunction implements Function<byte[], Void> {

Это означает, что наша функция принимает необработанные байты, но ничего не возвращает. Мы не указываем здесь выход, поскольку я буду динамически решать, куда отправлять выход. Мы можем отправлять в любое количество тем на лету.

context.newOutputMessage(PERSISTENT_PUBLIC_DEFAULT, JSONSchema.of(Aircraft.class)) .key(UUID.randomUUID().toString()) .property(LANGUAGE, JAVA) .value(aircraft).send();

В конце моей функции я собираюсь отправить данные в тему (можно создавать их на лету, если мы хотим отправлять их в разные темы). Возможно, мы захотим отправить все полеты Элона Маска в специальную тему. Мы можем сделать эти поиски с помощью чего-то вроде Scylla, я сделал это для своего приложения Air Quality. Я добавляю ключ, добавляю свойство, устанавливаю схему и отправляю данные. Что здесь хорошо, так это то, что мне не нужно использовать формальный язык для определения схемы. Я могу просто создать обычный старый Java Bean. Это простое и старое решение, которое мне подходит.

Между этим кодом у меня есть вспомогательный сервис, который разбирает эту мешанину JSON и извлекает хорошие биты по одному событию в самолете за раз. Это довольно просто, но приятно держать этот код внутри простой функции, которая запускается на каждое событие или сообщение, входящее в тему ADSBRAW. Вам понадобится Java JDK (11 или 17) и Maven. Я рекомендую вам использовать SDKMan, чтобы вы могли запускать несколько JVMS и инструментов.

Для создания функции нам нужно просто набрать текст:

mvn package

Шаг 5

Pulsar позволяет легко добавить обработку событий в каждую тему, поэтому я создал простую на Java. Их очень легко развертывать, отслеживать, запускать, останавливать и удалять.

Давайте развернем нашу функцию:

bin/pulsar-admin functions create --auto-ack true --jar /opt/demo/java/pulsar-adsb-function/target/adsb-1.0.jar --classname "dev.pulsarfunction.adsb.ADSBFunction" --dead-letter-topic "persistent://public/default/adsbdead" --inputs "persistent://public/default/adsbraw" --log-topic "persistent://public/default/adsblog" --name ADSB --namespace default --tenant public --max-message-retries 5

Для новичков в Apache Pulsar, пожалуйста, обратите внимание, что мы должны указать пространство имен и арендатора для того, где это будет жить. Это необходимо для удобства обнаружения, многопользовательскости и просто чистоты. Мы можем иметь столько входных тем, сколько захотим. Темы Log и Dead letter предназначены для специальных выходов. В данном случае, для Java, наше приложение хранится в JAR-файле.

После развертывания давайте проверим его состояние:

bin/pulsar-admin functions status --name ADSB

Результаты в формате JSON, что позволяет автоматизировать DevOps. Если бы мы хотели, мы могли бы управлять этим с помощью REST или инструмента DevOps.

{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [{
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 28,
"numSuccessfullyProcessed" : 28,
"numUserExceptions" : 0,
"latestUserExceptions" : [],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [],
"averageLatency" : 144.23374035714286,
"lastInvocationTime" : 1659725881406,
"workerId" : "c-standalone-fw-127.0.0.1-8080"
}
} ]
}

Если бы мы хотели, мы могли бы остановить его:

bin/pulsar-admin functions stop --name ADSB --namespace default --tenant public

Если бы нам нужно было удалить его:

bin/pulsar-admin functions delete --name ADSB --namespace default --tenant public

Шаг 6

Теперь, когда функция обработала данные, давайте проведем быструю проверку этих чистых данных.

bin/pulsar-client consume "persistent://public/default/aircraft" -s "aircraftconsumer" -n 0

Пример возвращаемой строки JSON:

----- got message -----
key:[c480cd8e-a803-47fe-81b4-aafdec0f6b68], properties:[language=Java], content:{"flight":"N86HZ","category":"A7","emergency":"none","squawk":1200,"hex":"abcd45","gs":52.2,"track":106.7,"lat":40.219757,"lon":-74.580566,"nic":9,"rc":75,"version":2,"sil":3,"gva":2,"sda":2,"mlat":[],"tisb":[],"messages":2259,"seen":1.1,"rssi":-19.9}

Примечание архитектора: Всегда задавайте ключ при создании сообщений.

Шаг 7

Теперь у нас есть чистые данные!

Давайте проверим этот поток данных с помощью Apache Spark Structured Streaming!

val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://pulsar1:6650").option("admin.url", "http://pulsar1:8080").option("topic", "persistent://public/default/aircraft").load()

dfPulsar.printSchema()
root
|-- altBaro: integer (nullable = true)
|-- altGeom: integer (nullable = true)
|-- baroRate: integer (nullable = true)
|-- category: string (nullable = true)
|-- emergency: string (nullable = true)
|-- flight: string (nullable = true)
|-- gs: double (nullable = true)
|-- gva: integer (nullable = true)
|-- hex: string (nullable = true)
|-- lat: double (nullable = true)
|-- lon: double (nullable = true)
|-- mach: double (nullable = true)
|-- messages: integer (nullable = true)
|-- mlat: array (nullable = true)
| |-- element: struct (containsNull = false)
|-- nacP: integer (nullable = true)
|-- nacV: integer (nullable = true)
|-- navAltitudeMcp: integer (nullable = true)
|-- navHeading: double (nullable = true)
|-- navQnh: double (nullable = true)
|-- nic: integer (nullable = true)
|-- nicBaro: integer (nullable = true)
|-- rc: integer (nullable = true)
|-- rssi: double (nullable = true)
|-- sda: integer (nullable = true)
|-- seen: double (nullable = true)
|-- seenPos: double (nullable = true)
|-- sil: integer (nullable = true)
|-- silType: string (nullable = true)
|-- speed: double (nullable = true)
|-- squawk: integer (nullable = true)
|-- tisb: array (nullable = true)
| |-- element: struct (containsNull = false)
|-- track: double (nullable = true)
|-- version: integer (nullable = true)
|-- __key: binary (nullable = true)
|--__topic: string (nullable = true)
|-- __messageId: binary (nullable = true)
|--__publishTime: timestamp (nullable = true)
|-- __eventTime: timestamp (nullable = true)
|--__messageProperties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)


val pQuery = dfPulsar.selectExpr("*").writeStream.format("console")
.option("truncate", false).start()

Приведенный выше код Spark, подключенный к кластеру Pulsar, захватил данные из нашей темы Pulsar и создал таблицу. Как вы видите, задать схему — отличная идея. Затем мы можем легко запросить ее как микропакет и в данном случае вывести ее на консоль для отладки. Мы также могли бы отправить этот поток куда-нибудь еще, например в S3.

Шаг 8

Давайте выполним непрерывный SQL-запрос с помощью Apache Flink.

CREATE CATALOG pulsar WITH (
'type' = 'pulsar',
'service-url' = 'pulsar://pulsar1:6650',
'admin-url' = 'http://pulsar1:8080',
'format' = 'json'
);

USE CATALOG pulsar;

Мы создадим каталог для подключения из Flink к Pulsar.

Посмотрим на нашу таблицу.

describe aircraft;
+------------------+-----------------------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------------------+-----------------------+------+-----+--------+-----------+
| alt_baro | INT | true | | | |
| alt_geom | INT | true | | | |
| baro_rate | INT | true | | | |
| category | STRING | true | | | |
| emergency | STRING | true | | | |
| flight | STRING | true | | | |
| gs | DOUBLE | true | | | |
| gva | INT | true | | | |
| hex | STRING | true | | | |
| lat | DOUBLE | true | | | |
| lon | DOUBLE | true | | | |
| mach | DOUBLE | true | | | |
| messages | INT | true | | | |
| mlat | ARRAY<ROW<> NOT NULL> | true | | | |
| nac_p | INT | true | | | |
| nac_v | INT | true | | | |
| nav_altitude_mcp | INT | true | | | |
| nav_heading | DOUBLE | true | | | |
| nav_qnh | DOUBLE | true | | | |
| nic | INT | true | | | |
| nic_baro | INT | true | | | |
| rc | INT | true | | | |
| rssi | DOUBLE | true | | | |
| sda | INT | true | | | |
| seen | DOUBLE | true | | | |
| seen_post | DOUBLE | true | | | |
| sil | INT | true | | | |
| sil_type | STRING | true | | | |
| speed | DOUBLE | true | | | |
| squawk | INT | true | | | |
| tisb | ARRAY<ROW<> NOT NULL> | true | | | |
| track | DOUBLE | true | | | |
| version | INT | true | | | |
+------------------+-----------------------+------+-----+--------+-----------+
33 rows in set

Выполним несколько простых запросов.

select alt_baro,
gs,
alt_geom,
baro_rate,
mach,
hex, flight, lat, lon
from aircraft;

select max(alt_baro) as MaxAltitudeFeet, min(alt_baro) as MinAltitudeFeet, avg(alt_baro) as AvgAltitudeFeet,
max(alt_geom) as MaxGAltitudeFeet, min(alt_geom) as MinGAltitudeFeet, avg(alt_geom) as AvgGAltitudeFeet,
max(gs) as MaxGroundSpeed, min(gs) as MinGroundSpeed, avg(gs) as AvgGroundSpeed,
count(alt_baro) as RowCount,
hex as ICAO, flight as IDENT
from aircraft
group by flight, hex;

Мы можем сделать базовые запросы, которые будут возвращать каждую строку по мере ее поступления или агрегировать их.

Шаг 9

Мы сделали это. Давайте начнем транслировать наш маленький поток мечты.

Спасибо, что остались на весь процесс создания приложения. Надеюсь скоро увидеть вас на встрече или мероприятии. Свяжитесь со мной, если у вас есть вопросы или вы ищете другие приложения, созданные на базе FLiPN+.

Дополнительные ресурсы для создания более продвинутых приложений:

  • https://flightaware.com/adsb/piaware/build/
  • https://github.com/adsbxchange/dump1090-fa
  • https://flightaware.com/adsb/piaware/build
  • https://flightaware.com/adsb/piaware/build/optional#piawarecommands
  • https://dzone.com/articles/ingesting-flight-data-ads-b-usb-receiver-with-apac
  • https://globe.adsbexchange.com/
  • https://community.cloudera.com/t5/Community-Articles/Ingesting-Flight-Data-ADS-B-USB-Receiver-with-Apache-NiFi-1/ta-p/247940
  • https://elmwoodelectronics.ca/blogs/news/tracking-and-logging-flights-with-ads-b-flight-aware-and-raspberry-pi
  • https://github.com/flightaware/piaware
  • https://github.com/erikbeebe/flink_adsb_stream
  • https://github.com/erikbeebe/flink_adsb_querystate
  • https://github.com/openskynetwork/java-adsb
  • https://openskynetwork.github.io/opensky-api/rest.html
  • https://github.com/openskynetwork/opensky-api
  • https://opensky-network.org/my-opensky
  • https://dev.to/tspannhw/tracking-air-quality-with-apache-nifi-cloudera-data-science-workbench-pyspark-and-parquet-28c
  • https://openskynetwork.github.io/opensky-api/rest.html
  • https://github.com/tspannhw/airquality
  • https://www.faa.gov/air_traffic/publications/atpubs/atc_html/chap5_section_2.html
  • https://aviation.stackexchange.com/questions/60747/what-are-all-the-squawk-codes
  • https://en.wikipedia.org/wiki/List_of_transponder_codes
  • https://github.com/briantwalter/spark1090
  • https://youtu.be/0LMOglFN8ZA

Оцените статью
devanswers.ru
Добавить комментарий