Код GRPC содержит пример асинхронного сервиса с потоком сообщений от сервиса клиенту. В нем сервис отправляет сообщения клиенту по одному методом респондера Write() из массива, а когда все элементы массива оказываются отправлены, сервис вызывает метод респондера Finish().
В асинхронном сервисе GRPC использует очередь респондеров. Респондер- это объект, который может взаимодействовать с очередью- а именно, самостоятельно размещаться в очереди после завершения операций чтения/записи Read()/Write(), которые асинхронны. Поскольку RPC вызов включает в себя, как правило, и Read(), и Write(), то есть он выполняется за несколько проходов в очереди, объект из очереди должен хранить свое состояние. Когда все взаимодействие с клиентом завершается, объект нало уничтожить.
Мне нужно было реализовать RPC с потоком сообщений от сервиса клиенту, которые сначала отправлялись сразу, как в примере, а после этого, по мере возникновения событий на сервере отсылались клиенту бесконечно (пока клиент не разорвет связь). Эти события могут произойти в любое время, или вовсе не произойти. Так как события нечастые, реализовывать синхронные сервис показалось накладным. Я не нашел примеров, как это делать, поэтому пишу тут, как можно это реализовал.
Отдельный поток (thread) ждет наступления события, и, когда это происходит, нужно вызвать метод Write() респондеров. В сервисе при вызове такого «долгоиграющего» метода респондеры записываются в массив. Массив, таким образом, хранит «коннекты» клиентов.
Первая, неудачная реализация непосредственно вызывала методы Write() респондеров из отдельного потока. Это работало, но тогда, когда клиент разрывал соединение, из-за гонки при иногда объект с респондеров успевал уничтожиться между временем, когда обхект с респондером был размещен в очереди, но очередь до него не дошла, а раньше, сработало событие отключения, которое уничтожило объект респондера.
Кроме того, может произойти assert в методе респондера Write(), когда два потока одновременно его вызовут.
Вторая реализация- это использование мьютексов для защиты.
Третья реализация выглядит лучше- она не использует мьютексы, а использует саму очередь. Идея в том, что очередь выполняется последовательно, что и обеспечивает защиту. Это также предотвратит неожиданный assert в методе респондера Write(), когда два потока одновременно его вызовут.
Для этого в момент события в очередь размещается объект-«планировщик очереди». Этот объект, когда до него дойдет очередь, выполняет работу по размещению в очередь респондеров клиентов для извещения . Теперь, если какой-то клиент отвалится, очередь на уничтожение связанного респондера будет выполнена позже респондеров для извещения, и сервис не упадет.
В коде ниже показано, что в методе, отвечающим клиенту на RPC запрос, когда все накопленные ранее сообщения отправлены, вместо вызова Finish() респондера указатель на респондер записывается в массив указателей на респондеры, ждущие события вызовом preorderService->clientsListenOrders.put(this);.
void ListenOrderData::Proceed(bool successfulEvent) { switch (status) { case CREATE: status = LISTEN_ORDERS; service->Requestlistenorder(&ctx, &rq, &responder, cq, cq, this); break; case LISTEN_ORDERS: if (!new_responder_created) { new ListenOrderData(preorderService); new_responder_created = true; preorderService->clientsListenOrders.put(this); break; } if (!hasData) { status = FINISH; preorderService->clientsListenOrders.rm(this); responder.Finish(grpc::Status(), (void*) this); break; } responder.Write(result, (void*) this); hasData = false; status = LISTEN_ORDERS_SENT; break; case LISTEN_ORDERS_SENT: status = LISTEN_ORDERS; break; case FINISH: delete this; break; } }
Член hasData нужен, чтобы различать «нормальное» событие от события отсоединения клиента.
Чтобы ловить событие отсоединения, нужно указать GRPC (например, в конструкторе объекта с респондером ListenOrderData), что нужно эти события отлавливать:
ctx.AsyncNotifyWhenDone(this);
Когда уничтожается респондер (объект, его содержащий), нужно удалить из массива ссылку на респондер (или его содержащий объект), это можно сделать в деструкторе, или в Proceed(successful)
Замечание 1: Параметр successful = false указывает на то, что не было чтения или записи, или они завершились ошибкой.
Замечание 2: метод ctx.IsCancelled() указывает, было ли разорвано соединение. Вызвать его можно только после ctx.AsyncNotifyWhenDone(), иначе сервис упадет.
Все хорошо, но, оказывается нельзя просто так взять и добавить в очередь объект.
К счастью есть способ обойти это. используя grpc::Alarm:
alarm.Set(cq, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), this);
С помощью этого трюка можно выполнить вставку в очередь объекта, который вставит другие объекты в очередь.
Comments
Powered by Facebook Comments