Source code for teos.rpc

import grpc
import functools
from concurrent import futures
from signal import signal, SIGINT, SIGQUIT, SIGTERM

from teos.logger import setup_logging, get_logger

from teos.constants import SHUTDOWN_GRACE_TIME
from teos.protobuf.tower_services_pb2_grpc import (
    TowerServicesStub,
    TowerServicesServicer,
    add_TowerServicesServicer_to_server,
)


[docs]class RPC: """ The :obj:`RPC` is an external RPC server offered by tower to receive requests from the CLI. This acts as a proxy between the internal api and the CLI. Args: rpc_bind (:obj:`str`): the IP or host where the RPC server will be hosted. rpc_port (:obj:`int`): the port where the RPC server will be hosted. internal_api_endpoint (:obj:`str`): the endpoint where to reach the internal (gRPC) api. Attributes: logger (:obj:`Logger <teos.logger.Logger>`): The logger for this component. endpoint (:obj:`str`): The endpoint where the RPC api will be served (external gRPC server). rpc_server (:obj:`grpc.Server <grpc.Server>`): The non-started gRPC server instance. """ def __init__(self, rpc_bind, rpc_port, internal_api_endpoint): self.logger = get_logger(component=RPC.__name__) self.endpoint = f"{rpc_bind}:{rpc_port}" self.rpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) self.rpc_server.add_insecure_port(self.endpoint) add_TowerServicesServicer_to_server(_RPC(internal_api_endpoint, self.logger), self.rpc_server)
[docs] def handle_signals(self, signum, frame): """Ignores CTRL + C since and let's the main process manage the teardown.""" pass
[docs] def teardown(self): self.logger.info("Stopping") stopped_event = self.rpc_server.stop(SHUTDOWN_GRACE_TIME) stopped_event.wait() self.logger.info("Stopped")
[docs]def forward_errors(func): """ Transforms ``func`` in order to forward any ``grpc.RPCError`` returned by the upstream grpc as the result of the current grpc call. """ @functools.wraps(func) def wrapper(self, request, context, *args, **kwargs): try: return func(self, request, context, *args, **kwargs) except grpc.RpcError as e: context.set_details(e.details()) context.set_code(e.code()) return wrapper
class _RPC(TowerServicesServicer): """ This represents the RPC server provider and implements all the methods that can be accessed using the CLI. Args: internal_api_endpoint (:obj:`str`): the endpoint where to reach the internal (gRPC) api. logger (:obj:`Logger <teos.logger.Logger>`): the logger for this component. """ def __init__(self, internal_api_endpoint, logger): self.logger = logger self.internal_api_endpoint = internal_api_endpoint self.channel = grpc.insecure_channel(self.internal_api_endpoint) self.stub = TowerServicesStub(self.channel) @forward_errors def get_all_appointments(self, request, context): return self.stub.get_all_appointments(request) @forward_errors def get_tower_info(self, request, context): return self.stub.get_tower_info(request) @forward_errors def get_users(self, request, context): return self.stub.get_users(request) @forward_errors def get_user(self, request, context): return self.stub.get_user(request) @forward_errors def stop(self, request, context): return self.stub.stop(request)
[docs]def serve(rpc_bind, rpc_port, internal_api_endpoint, stop_event, log_file): """ Serves the external RPC API at the given endpoint and connects it to the internal api. This method will serve and hold until the main process is stop or a stop signal is received. Args: rpc_bind (:obj:`str`): the IP or host where the RPC server will be hosted. rpc_port (:obj:`int`): the port where the RPC server will be hosted. internal_api_endpoint (:obj:`str`): the endpoint where to reach the internal (gRPC) api. stop_event (:obj:`multiprocessing.Event`) the Event that this service will monitor. The rpc server will initiate a graceful shutdown once this event is set. """ setup_logging() rpc = RPC(rpc_bind, rpc_port, internal_api_endpoint) signal(SIGINT, rpc.handle_signals) signal(SIGTERM, rpc.handle_signals) signal(SIGQUIT, rpc.handle_signals) rpc.rpc_server.start() rpc.logger.info(f"Initialized. Serving at {rpc.endpoint}") stop_event.wait() rpc.logger.info("Stopping") stopped_event = rpc.rpc_server.stop(SHUTDOWN_GRACE_TIME) stopped_event.wait() rpc.logger.info("Stopped")