Quine позволяет легко преобразовывать данные о событиях в реальном времени в графические структуры данных для поиска сложных закономерностей. Узнайте, как использовать Cypher для создания запросов приема (ETL).

Quine Ingest Streams

Quine оптимизирован для обработки больших объемов данных в движении, а затем для потоковой передачи высококачественной информации в режиме реального времени. Поток загрузки — это место, где начинается граф потоковой передачи. Он подключается к производителям данных, преобразует данные, а затем заполняет потоковый граф для анализа с помощью постоянных запросов.

Граф потоковой передачи Quine объединяет несколько источников для обнаружения шаблонов с высокой ценностью.

Давайте заглянем под капот, чтобы понять, как работают потоки загрузки.

По своей сути Quine — это потоково-ориентированный процессор данных, использующий графовую модель данных. Это обеспечивает оптимальную интеграцию с производителями и потребителями потоковых данных, такими как Kafka и Kinesis. Quine строится на основе этой потоковой передачи, чтобы обеспечить пакетные возможности путем преобразования данных, хранящихся в файлах, в потоковые данные для загрузки в граф.

Концепции Ingest Stream

Что такое Ingest Stream?

Принимаемый поток подключает источник данных к Quine и подготавливает переданные данные для графа потоковой передачи. В потоке загрузки запрос загрузки, написанный на Cypher, обновляет узлы и ребра потокового графа по мере получения данных.

Обратное давление на входящие потоки

Неизбежно, когда производители потоковых данных опережают потребителей, потребитель будет перегружен. В Quine, когда поток загрузки начинает получать больше данных, чем он может обработать, он управляет потоком данных, чтобы избежать перегрузки с помощью «противодавления».

Система с противодавлением не буферизируется, это заставляет восходящих производителей *не* отправлять данные со скоростью, превышающей скорость их обработки. Проблема с буферизацией заключается в том, что в буфере в конечном итоге заканчивается место. А что потом? Система должна решить, что делать, когда буфер заполнен: удалить новые результаты, удалить старые результаты, сбой системы или противодавление.

Противодавление — это протокол, определяющий способ отправки логического сигнала ВВЕРХ по потоку с информацией о готовности нижестоящих потребителей принять дополнительные данные. Этот сигнал обратного давления следует тому же пути, что и данные, перемещающиеся вниз по течению, но в обратном направлении. Если нисходящий поток не готов к потреблению, восходящий не отправляет.

Quine использует реактивную реализацию потока обратного давления, Akka Streams, построенную поверх модели акторов, чтобы обеспечить отказоустойчивость приема и обработки потоков.

ПРИМЕЧАНИЕ.Вас интересуют операционные проблемы, связанные с реактивными потоками? Прочтите Reactive Manifesto, чтобы понять проблемы, с которыми сталкивается каждый процессор потоковой передачи в конвейере данных большого объема.

Включение асинхронного неблокирующего обратного давления — единственный метод, гарантирующий, что все данные из потока большого объема обрабатываются без потери данных или задержек обработки.

Интересуетесь операционными проблемами, связанными с реактивными потоками? Прочтите Reactive Manifesto, чтобы понять проблемы, с которыми сталкивается каждый потоковый процессор в конвейере данных большого объема.

Включение асинхронного неблокирующего обратного давления — единственный метод, гарантирующий, что все данные из потока большого объема обрабатываются без потери данных или задержек обработки.

Все узлы существуют

В графовой модели данных узлы являются основной единицей данных — так же, как «строка» является основной единицей данных в реляционной базе данных. Однако, в отличие от традиционных систем графовых данных, пользователю Quine никогда не приходится создавать узел напрямую. Вместо этого система работает так, как будто все узлы существуют.

Quine представляет каждый возможный узел как существующий «пустой узел» без какой-либо интересной истории. По мере поступления данных в систему узел становится интересным, и Куайн создает для узла историю.

Мы добавили в Cypher функцию idFrom, которая принимает любое количество аргументов и детерминировано создает идентификатор узла из этих данных. Это похоже на стратегию согласованного хеширования, за исключением того, что идентификатор, созданный этой функцией, всегда является идентификатором, который соответствует типу, выбранному для поставщика идентификаторов.

Вы будете использовать idFrom в части запроса загрузки каждого потока загрузки, который вы создаете. Например, абсолютный минимальный запрос загрузки для загрузки входящих данных в граф — это просто оболочка вокруг функции idFrom.

MATCH (n) WHERE id(n) = idFrom($that) SET n.line = $that

Историческое управление версиями

Каждый узел на графике записывает все свои исторические изменения с течением времени. Когда свойства или ребра узла изменяются, событие изменения и отметка времени сохраняются в журнале только для добавления для этого конкретного узла. Этот исторический журнал можно воспроизвести до любого желаемого момента времени, что позволяет системе быстро отвечать на вопросы, используя состояние графика, как это было в прошлом. Это метод, известный как Источник событий, применяемый индивидуально к каждому узлу.

Синтаксис и структура

Первым шагом при определении потока загрузки является понимание общей формы ваших данных. Это включает в себя определение элементов данных, необходимых для постоянных запросов для использования в MATCH.

Запрос на прием определяется установкой type, описанной в Документации по API. Quine поддерживает восемь типов потоков загрузки. Каждый тип имеет уникальную форму и требует определенной структуры для правильной настройки.

Например, построение потока загрузки через конечную точку /api/v1/ingest/{name} API для чтения данных из стандарта и сохранения каждой строки в виде узла выглядит аналогично приведенному ниже примеру.

{    
	"type": "StandardInputIngest",    
	"format": {	    
		"type": "CypherLine",	   
		"query": "MATCH (n) WHERE id(n) = idFrom($that) SET n.line = $that"    
    }
 }

Quine изначально читает из стандартного ввода, передавая каждую строку в запрос Cypher как: $that. Уникальный идентификатор узла создается с использованием idFrom($that). Затем каждая строка сохраняется как параметр line, связанный с новым узлом графа потоковой передачи.

ПРИМЕЧАНИЕ.При создании потока загрузки с помощью API вам предоставляется возможность назвать поток осмысленным именем. Например, вы можете назвать указанный выше поток загрузкиstandardIn, чтобы на него было проще ссылаться в приложении.

Кроме того, при создании потока загрузки с помощью рецепта Quine автоматически присваивает имя каждому потоку, используя форматINGEST-1, где первый поток загрузки, определенный в рецепте,INGEST-1 а последующие потоки загрузки именуются в порядке#считая up.

При создании входящего потока через API вам предоставляется возможность назвать поток осмысленным именем. Например, вы можете назвать указанный выше поток загрузки standardIn, чтобы на него было проще ссылаться в приложении.

В качестве альтернативы, создавая поток загрузки с помощью рецепта, Quine автоматически присваивает имя каждому потоку, используя формат INGEST-1, где первый поток загрузки, определенный в рецепте, имеет имя INGEST-1, а последующие потоки загрузки именуются в порядке возрастания #.

Вот тот же поток загрузки, определенный в Рецепте Quine.

ingestStreams:
  - type: StandardInputIngest
    format:
      type: CypherLine
      query: |-
        MATCH (n)
        WHERE id(n) = idFrom($that)
        SET n.line = $that

Получение отчетов о потоке

Проверка потоков загрузки с помощью API

Quine предоставляет ряд конечных точек API, которые позволяют отслеживать потоки загрузки и управлять ими во время работы. Полные определения конечных точек доступны в документации по API.

Давайте посмотрим на информацию, доступную из потока загрузки INGEST-1 из рецепта Распространение тегов Ethereum.

Запустите рецепт.

❯ java -jar quine-x.x.x.jar -r ethereum

Перечислите потоки загрузки, запущенные рецептом Ethereum, используя конечную точку /api/v1/ingest.

❯ curl -s "http://localhost:8080/api/v1/ingest" | jq '. | keys'
[
  "INGEST-1",
  "INGEST-2"
]

Рецепт Ethereum создает два потока загрузки; INGEST-1 и INGEST-2.

Теперь просмотрите статистику входящего потока, используя конечную точку /api/v1/ingest/INGEST-1.

❯ curl -s "http://localhost:8080/api/v1/ingest/INGEST-1" | jq
{
  "name": "INGEST-1",
  "status": "Running",
  "settings": {
    "format": {
      "query": "MATCH (BA), (minerAcc), (blk), (parentBlk)\nWHERE\n  
    id(blk) = idFrom('block', $that.hash)\n  AND id(parentBlk) = 
    idFrom('block', $that.parentHash)\n  AND id(BA) = 
    idFrom('block_assoc', $that.hash)\n  AND id(minerAcc) = 
    idFrom('account', $that.miner)\nCREATE\n  
    (minerAcc)<-[:mined_by]-(blk)-[:header_for]->(BA),\n  
    (blk)-[:preceded_by]->(parentBlk)\nSET\n  BA:block_assoc,\n  
     BA.number = $that.number,\n  BA.hash = $that.hash,\n  
     blk:block,\n  blk = $that,\n  minerAcc:account,\n  
     minerAcc.address = $that.miner",
     	"parameter": "that",
      "type": "CypherJson"
    },
    "url": "https://ethereum.demo.thatdot.com/blocks_head",
    "parallelism": 16,
    "type": "ServerSentEventsIngest"
  },
  "stats": {
    "ingestedCount": 57,
    "rates": {
      "count": 57,
      "oneMinute": 0.045556443551085735,
      "fiveMinute": 0.06175571100053622,
      "fifteenMinute": 0.04159128290271318,
      "overall": 0.07659077758191643
    },
    "byteRates": {
      "count": 78451,
      "oneMinute": 62.49789862393008,
      "fiveMinute": 84.92629746711795,
      "fifteenMinute": 57.22987512826503,
      "overall": 105.41446006900763
    },
    "startTime": "2022-05-17T18:56:08.161500Z",
    "totalRuntime": 744041
  }
}

Отчеты о ходе загрузки потока с помощью запроса статуса

При создании запроса загрузки с помощью рецепта вы можете добавить запрос состояния, который выполняется непрерывно. Например, приведенный ниже запрос состояния выводит информацию для каждого узла графа и ссылку на визуализацию в веб-интерфейсе.

statusQuery:  
		cypherQuery: MATCH (n) RETURN count(n)

Серия блогов Ingest Stream

Это только начало. Есть еще много чего. В течение следующих нескольких недель мы будем освещать наиболее распространенные потоки загрузки в отдельных сообщениях блога.

Попробуйте загрузить данные в Quine самостоятельно

А если вы хотите попробовать Quine сами, то можете скачать его здесь. И в дополнение к рецепту Ethereum взгляните на рецепты Wikipedia Ingest и Apache Log Analytics для различных примеров потоков загрузки.

Если у вас есть вопросы или вы хотите ознакомиться с сообществом, присоединяйтесь к Quine slack или посетите нашу страницу Github.