Проталкивание stream сообщений от асинхронного grpc сервиса (c++)

GRPC содержит пример асинхронного сервиса с потоком сообщений от сервиса клиенту. В нем сервис отправляет сообщения клиенту по одному методом Write респондера из массива, когда все элементы массива отправлены, сервис вызывает метод Finish респондера. После этого объект респондера уничтожается (в примере респондер- член объекта, живущего до тех пор, пока RPC не отработает до конца).

Мне нужно было реализовать RPC с потоком сообщений от сервиса клиенту, которые сначала отправлялись сразу, как в примере, а потом- по мере возникновения событий на сервере. Эти события могут произойти в любое время. Я не нашел примеров, как это делать, поэтому запишу тут, как можно это реализовать.

Отдельный поток (thread) ждет наступления события, и, когда это происходит, нужно вызвать метод Write респондера. Тогда клиент получит сообщение. Отдельный поток нужен, в потоке, где выполняется асинхронный grpc сервис, это сделать нельзя.

В коде ниже показано, что в методе, отвечающим клиенту на RPC запрос, когда все накопленные ранее сообщения отправлены, вместо вызова Finish респондера указатель на респондер записывается в массив указателей на респондеры (addResponderToListener()), ждущие события.

    if (counter >= result.size()) {
        status = LISTEN_ORDERS;
        addResponderToListener();
    } else {
      responder.Write(result[counter], (void*) this);
      counter++;
    }

Обработчик события перебирает в цикле респондеры и вызывает их метод Write().

Когда уничтожается респондер (объект, его содержащий), нужно удалить из массива ссылку на респондер (или его содержащий объект), это можно сделать в деструкторе, или в Proceed(successful), когда параметр successful = false указывает на то, что соединение с клиентом прервано (можно проверить контекст, но прежде нужно не зыбыть указать grpc слать событие при прерывании, иначе grpc падает).

Все у меня на работе заработало. Решил проверить дома, и все упало. Оказывается, если на компьютере больше одного ядра, произойдет assert в методе Write() grpc, когда два потока одновременно его вызовут.

Поэтому лучше разместить в очереди новый тег(объект) на чтение события, который будет ждать ответа. Ответ должен поступить из ждущего событий потока, считан, и тогда вызовы grpc Write() будут выполняться из одного потока, а не из двух.

Напишу позже, что получится.

Comments

comments

Powered by Facebook Comments

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *