Source code for teos.internal_api

import grpc
from concurrent import futures
from readerwriterlock import rwlock
from google.protobuf.struct_pb2 import Struct

from teos.logger import get_logger
from common.appointment import Appointment
from common.exceptions import InvalidParameter

from teos.protobuf.appointment_pb2 import (
    Appointment as AppointmentProto,
    Tracker as TrackerProto,
    AppointmentData,
    AddAppointmentResponse,
    GetAppointmentResponse,
    GetAllAppointmentsResponse,
)
from teos.protobuf.user_pb2 import RegisterResponse, GetUserResponse, GetUsersResponse
from teos.protobuf.tower_services_pb2 import GetTowerInfoResponse
from teos.protobuf.tower_services_pb2_grpc import TowerServicesServicer, add_TowerServicesServicer_to_server
from teos.gatekeeper import NotEnoughSlots, AuthenticationFailure
from teos.watcher import AppointmentLimitReached, AppointmentAlreadyTriggered, AppointmentNotFound
from google.protobuf.empty_pb2 import Empty


[docs]class InternalAPI: """ The :obj:`InternalAPI` is the interface to interact with the tower backend. It offers methods than can be accessed by the CLI or the client via the :class:`API <teos.api.API>` (HTTP proxy) or the :class:`RPC <teos.rpc.RPC>` (gRPC proxy). Args: watcher (:obj:`Watcher <teos.watcher.Watcher>`): a :obj:`Watcher` instance to pass the requests to. The Watcher is the main backend class of the tower and can interact with the rest. internal_api_endpoint (:obj:`str`): the endpoint where the internal api will be served (gRPC server). stop_command_event (:obj:`multiprocessing.Event`): an event to be set when a ``stop`` command is issued. Attributes: logger (:obj:`Logger <teos.logger.Logger>`): The logger for this component. endpoint (:obj:`str`): The endpoint where the internal api will be served (gRPC server). rpc_server (:obj:`grpc.Server <grpc.Server>`): The non-started gRPC server instance. """ def __init__(self, watcher, internal_api_endpoint, stop_command_event): self.logger = get_logger(component=InternalAPI.__name__) self.watcher = watcher self.endpoint = internal_api_endpoint self.stop_command_event = stop_command_event self.rpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) self.rpc_server.add_insecure_port(self.endpoint) add_TowerServicesServicer_to_server(_InternalAPI(watcher, stop_command_event, self.logger), self.rpc_server)
class _InternalAPI(TowerServicesServicer): """ This represents the internal api service provider and implements all the gRPC methods offered by the API. Args: watcher (:obj:`Watcher <teos.watcher.Watcher>`): a ``Watcher`` instance to pass the requests to. The Watcher is the main backend class of the tower and can interact with the rest. stop_command_event (:obj:`multiprocessing.Event`): an Event to be set when a `stop` command is issued. logger (:obj:`Logger <teos.logger.Logger>`): the logger for this component. Attributes: rw_lock (:obj:`RWLockWrite <rwlock.RWLockWrite>`): a reader-writer lock to manage concurrent access to the backend. """ def __init__(self, watcher, stop_command_event, logger): self.watcher = watcher self.stop_command_event = stop_command_event self.logger = logger self.rw_lock = rwlock.RWLockWrite() # lock to be acquired before interacting with the watchtower's state def register(self, request, context): """Registers a user to the tower.""" with self.rw_lock.gen_wlock(): try: available_slots, subscription_expiry, subscription_signature = self.watcher.register(request.user_id) return RegisterResponse( user_id=request.user_id, available_slots=available_slots, subscription_expiry=subscription_expiry, subscription_signature=subscription_signature, ) except InvalidParameter as e: context.set_details(e.msg) context.set_code(grpc.StatusCode.INVALID_ARGUMENT) return RegisterResponse() def add_appointment(self, request, context): """Processes the request to add an appointment from a user.""" with self.rw_lock.gen_wlock(): try: appointment = Appointment( request.appointment.locator, request.appointment.encrypted_blob, request.appointment.to_self_delay ) return AddAppointmentResponse(**self.watcher.add_appointment(appointment, request.signature)) except (AuthenticationFailure, NotEnoughSlots): msg = "Invalid signature or user does not have enough slots available" status_code = grpc.StatusCode.UNAUTHENTICATED except AppointmentLimitReached: msg = "Appointment limit reached" status_code = grpc.StatusCode.RESOURCE_EXHAUSTED except AppointmentAlreadyTriggered: msg = "The provided appointment has already been triggered" status_code = grpc.StatusCode.ALREADY_EXISTS context.set_details(msg) context.set_code(status_code) return AddAppointmentResponse() def get_appointment(self, request, context): """Returns an appointment stored in the tower, if it exists.""" with self.rw_lock.gen_rlock(): try: data, status = self.watcher.get_appointment(request.locator, request.signature) if status == "being_watched": data = AppointmentData( appointment=AppointmentProto( locator=data.get("locator"), encrypted_blob=data.get("encrypted_blob"), to_self_delay=data.get("to_self_delay"), ) ) else: data = AppointmentData( tracker=TrackerProto( locator=data.get("locator"), dispute_txid=data.get("dispute_txid"), penalty_txid=data.get("penalty_txid"), penalty_rawtx=data.get("penalty_rawtx"), ) ) return GetAppointmentResponse(appointment_data=data, status=status) except (AuthenticationFailure, AppointmentNotFound): context.set_details("Appointment not found") context.set_code(grpc.StatusCode.NOT_FOUND) return GetAppointmentResponse() def get_all_appointments(self, request, context): """Returns all the appointments in the tower.""" with self.rw_lock.gen_rlock(): watcher_appointments = self.watcher.get_all_watcher_appointments() responder_trackers = self.watcher.get_all_responder_trackers() appointments = Struct() appointments.update({"watcher_appointments": watcher_appointments, "responder_trackers": responder_trackers}) return GetAllAppointmentsResponse(appointments=appointments) def get_tower_info(self, request, context): """Returns generic information about the tower.""" with self.rw_lock.gen_rlock(): return GetTowerInfoResponse( tower_id=self.watcher.tower_id, n_registered_users=self.watcher.n_registered_users, n_watcher_appointments=self.watcher.n_watcher_appointments, n_responder_trackers=self.watcher.n_responder_trackers, ) def get_users(self, request, context): """Returns the list of all registered user ids.""" with self.rw_lock.gen_rlock(): return GetUsersResponse(user_ids=self.watcher.get_registered_user_ids()) def get_user(self, request, context): """Returns information about a user, given its user id.""" with self.rw_lock.gen_rlock(): user_info = self.watcher.get_user_info(request.user_id) if not user_info: context.set_details("User not found") context.set_code(grpc.StatusCode.NOT_FOUND) return GetUserResponse() user_struct = Struct() user_struct.update( { "subscription_expiry": user_info.subscription_expiry, "available_slots": user_info.available_slots, "appointments": list(user_info.appointments.keys()), } ) return GetUserResponse(user=user_struct) def stop(self, request, context): """Initiates a graceful shutdown of the tower.""" self.stop_command_event.set() return Empty()