Проталкивание stream сообщений от асинхронного grpc сервиса (c++)

Код GRPC содержит пример асинхронного сервиса с потоком сообщений от сервиса клиенту. В нем сервис отправляет сообщения клиенту по одному методом респондера Write() из массива, а когда все элементы массива оказываются отправлены, сервис вызывает метод респондера Finish().

В асинхронном сервисе GRPC использует очередь респондеров. Респондер- это объект, который может взаимодействовать с очередью- а именно, самостоятельно размещаться в очереди после завершения операций чтения/записи Read()/Write(), которые асинхронны. Поскольку RPC вызов включает в себя, как правило, и Read(), и Write(), то есть он выполняется за несколько проходов в очереди, объект из очереди должен хранить свое состояние. Когда все взаимодействие с клиентом завершается, объект нало уничтожить.

Мне нужно было реализовать RPC с потоком сообщений от сервиса клиенту, которые сначала отправлялись сразу, как в примере, а после этого, по мере возникновения событий на сервере отсылались клиенту бесконечно (пока клиент не разорвет связь). Эти события могут произойти в любое время, или вовсе не произойти. Так как события нечастые, реализовывать синхронные сервис показалось накладным. Я не нашел примеров, как это делать, поэтому пишу тут, как можно это реализовал.

Отдельный поток (thread) ждет наступления события, и, когда это происходит, нужно вызвать метод Write() респондеров. В сервисе при вызове такого «долгоиграющего» метода респондеры записываются в массив. Массив, таким образом, хранит «коннекты» клиентов.

Первая, неудачная реализация непосредственно вызывала методы Write() респондеров из отдельного потока. Это работало, но тогда, когда клиент разрывал соединение, из-за гонки при иногда объект с респондеров успевал уничтожиться между временем, когда обхект с респондером был размещен в очереди, но очередь до него не дошла, а раньше, сработало событие отключения, которое уничтожило объект респондера.

Кроме того, может произойти assert в методе респондера Write(), когда два потока одновременно его вызовут.

Вторая реализация- это использование мьютексов для защиты.

Третья реализация выглядит лучше- она не использует мьютексы, а использует саму очередь. Идея в том, что очередь выполняется последовательно, что и обеспечивает защиту. Это также предотвратит неожиданный assert в методе респондера Write(), когда два потока одновременно его вызовут.

Для этого в момент события в очередь размещается объект-«планировщик очереди». Этот объект, когда до него дойдет очередь, выполняет работу по размещению в очередь респондеров клиентов для извещения . Теперь, если какой-то клиент отвалится, очередь на уничтожение связанного респондера будет выполнена позже респондеров для извещения, и сервис не упадет.

В коде ниже показано, что в методе, отвечающим клиенту на RPC запрос, когда все накопленные ранее сообщения отправлены, вместо вызова Finish() респондера указатель на респондер записывается в массив указателей на респондеры, ждущие события вызовом preorderService->clientsListenOrders.put(this);.

void ListenOrderData::Proceed(bool successfulEvent) {
  switch (status) {
  case CREATE:
    status = LISTEN_ORDERS;
    service->Requestlistenorder(&ctx, &rq, &responder, cq, cq, this);
    break;
  case LISTEN_ORDERS:
    if (!new_responder_created) {
      new ListenOrderData(preorderService);
      new_responder_created = true;
      preorderService->clientsListenOrders.put(this);
      break;
    }

    if (!hasData) {
      status = FINISH;
      preorderService->clientsListenOrders.rm(this);
      responder.Finish(grpc::Status(), (void*) this);
      break;
    }
    responder.Write(result, (void*) this);
    hasData = false;
    status = LISTEN_ORDERS_SENT;
    break;
  case LISTEN_ORDERS_SENT:
    status = LISTEN_ORDERS;
    break;
  case FINISH:
    delete this;
    break;
  }
}

Член hasData нужен, чтобы различать «нормальное» событие от события отсоединения клиента.

Чтобы ловить событие отсоединения, нужно указать GRPC (например, в конструкторе объекта с респондером ListenOrderData), что нужно эти события отлавливать:

ctx.AsyncNotifyWhenDone(this);

Когда уничтожается респондер (объект, его содержащий), нужно удалить из массива ссылку на респондер (или его содержащий объект), это можно сделать в деструкторе, или в Proceed(successful)

Замечание 1: Параметр successful = false указывает на то, что не было чтения или записи, или они завершились ошибкой.

Замечание 2: метод ctx.IsCancelled() указывает, было ли разорвано соединение. Вызвать его можно только после ctx.AsyncNotifyWhenDone(), иначе сервис упадет.

Все хорошо, но, оказывается нельзя просто так взять и добавить в очередь объект.

К счастью есть способ обойти это. используя grpc::Alarm:

alarm.Set(cq, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), this);

С помощью этого трюка можно выполнить вставку в очередь объекта, который вставит другие объекты в очередь.

Comments

comments

Powered by Facebook Comments

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *