Benthos

Разрабатывая интерактивные веб-сервисы, однажды вы неизбежно столкнётесь с задачей двух генералов, не имеющей полного решения. Генералов нужно обучить подотчётности и отслеживанию доставки своих сообщений: необходима стратегия подавления рассогласованности планов, полезен счётчик всех входящих и исходящих посыльных. Но вы не первый кто оказывается в данной ситуации. В этой статье я расскажу про совместимое со множеством протоколов решение, берущее на себя эту рутину.

Benthos — универсальный инструмент для работы с поточными данными. Это конфигурируемое консольное приложение, написанное на Go. Он позволяет эффективно обрабатывать большой поток данных и предлагает множество инструментов, чтобы убедиться что каждое сообщение было корректно обработано. Но здесь я хочу пойти немного дальше и буквально проиллюстрировать решение проблемы генералов с его помощью.

В качестве “города” в нашей демонстрации выступит http-сервер, логирующий в консоль одновременные запросы, который мы опишем в нашем city.yaml:

http:
  enabled: true
  address: 0.0.0.0:3333
input:
  batched:
    child:
      http_server:
        path: /post
        allowed_verbs:
          - POST
    policy:
      count: 2
      period: 1s
pipeline:
  processors:
    - group_by_value:
        value: ${! json("time") }
    - archive:
        format: json_array
    - log:
        message: ${! [json().length().string(), "sources,", "delay", (timestamp_unix() - json().0.time).string(), "seconds"].join(" ") }
output:
  stdout: {}

При получении двух запросов за одну секунду, сообщения будут объединены в “пачку”, и если параметр “time” в них совпадает, полученная пачка объединяется в одно сообщение, которое выведется в консоль через обработчик log и в выход stdout.

Первый генерал из general1.yaml раз в минуту принимает решение о времени нападения и передаёт его как unix timestamp:

http:
  enabled: false
input:
  generate:
    mapping: |
      root.time = timestamp_unix() + random_int(min:10, max:60)
    interval: 1m
pipeline:
  processors:
    - retry:
        processors:
          - http:
              url: "http://127.0.0.1:2222/post"
              verb: POST
              timeout: 5s
          - log:
              message: '${! [json("time").string(), if errored() || json("cancel") == true { "no" } else { "yes" }].join(" ") }'
    - mapping: |
        root = if this.cancel { deleted() }
    - sleep:
        duration: '${! json("time") - timestamp_unix() }s'
    - mapping: |
        root = {"time": this.time, "source": 1}
output:
  broker:
    outputs:
      - stdout: {}
      - retry:
          output:
            http_client:
              url: "http://127.0.0.1:3333/post"
              verb: POST

По порту 2222 его сообщения готов принять второй генерал из general2.yaml:

http:
  enabled: true
  address: 0.0.0.0:2222
input:
  http_server:
    path: /post
    allowed_verbs:
      - POST
pipeline:
  processors:
    - mapping: |
        root = {"time": this.time, "cancel": this.time < timestamp_unix()}
    - sync_response: {}
    - dedupe:
        cache: keycache
        key: ${! json("time") }
    - log:
        message: '${! [json("time").string(), if errored() || json("cancel") == true { "no" } else { "yes" }].join(" ") }'
    - mapping: |
        root = if this.cancel == true { deleted() }
    - sleep:
        duration: '${! json("time").number() - timestamp_unix() }s'
    - mapping: |
        root = {"time": this.time, "source": 2}
cache_resources:
  - label: keycache
    memory:
      default_ttl: 1m
output:
  broker:
    outputs:
      - stdout: {}
      - retry:
          output:
            http_client:
              url: "http://127.0.0.1:3333/post"
              verb: POST

Он пропускает одинаковые сообщения чтобы не напасть дважды в случае ошибки доставки первого ответа, и если параметр time содержит время в будущем, отвечает на запрос первого генерала подтверждением плана сообщением с параметром "cancel": false, возвращающимся в теле HTTP-ответа на шаге sync_response. Пока такое подтверждение не получено, первый генерал не нападёт. В случае если передан "cancel": true, нападение не состоится и генералы примут новое решение. После согласования нападения оба генерала выполняют sleep с количеством секунд до выбранного времени в качестве параметра.

Так выглядит запуск “города” (city.yaml, слева) а затем первого (general1.yaml, справа сверху) и второго (general2.yaml) генерала по очереди:

В нашем идеальном сценарии все сообщения быстро доходят по локальной сети и трудности наблюдаются только у первого генерала в ожидании второго. По адресу http://127.0.0.1:2222/metrics без дополнительных настроек мы видим полезные метрики второго генерала:

http_request_code_2xx{label="",path="root.output.broker.outputs.1.retry.output"} 3
input_received{label="",path="root.input"} 6
output_batch_sent{label="",path="root.output.broker.outputs.1.retry.output"} 3
output_error{label="",path="root.output.broker.outputs.1.retry.output"} 37
output_sent{label="",path="root.output.broker.outputs.1.retry.output"} 3

У Benthos достаточно широкая область применения: от решения повседневных задач и простых интеграций до обеспечения отказоустойчивой передачи больших данных между сервисами. Он позволяет гибко настраивать мониторинг, есть возможность добавления пользовательских метрик.

При необходимости Benthos достаточно просто дорабатывается собственными компонентами на Go. Давно слежу за его разработкой и применяю на практике. В следующий раз когда в вашу сторону полетит массив из JSON’ов, против него можно взять silver bullet.

Llama3 неплохо составляет подробное описание для представленных файлов, а вот с написанием базовой конфигурации лучше справилась модель C4AI, однако быстро ломается с повышением сложности задачи.

Если по каким-то причинам вам хочется писать меньше кода, у Benthos есть визуальный редактор.