import grpc
from google.protobuf import json_format
from flask import Flask, request, jsonify
import common.errors as errors
from teos.inspector import Inspector, InspectionFailed
from teos.protobuf.user_pb2 import RegisterRequest
from teos.protobuf.tower_services_pb2_grpc import TowerServicesStub
from teos.protobuf.appointment_pb2 import Appointment, AddAppointmentRequest, GetAppointmentRequest
from common.exceptions import InvalidParameter
from teos.logger import setup_logging, get_logger
from common.constants import HTTP_OK, HTTP_BAD_REQUEST, HTTP_SERVICE_UNAVAILABLE, HTTP_NOT_FOUND
# NOTCOVERED: not sure how to monkey patch this one. May be related to #77
[docs]def get_remote_addr():
"""
Gets the remote client ip address. The ``HTTP_X_REAL_IP`` field is tried first in case the server is behind a
reverse proxy.
Returns:
:obj:`str`: The IP address of the client.
"""
# Getting the real IP if the server is behind a reverse proxy
remote_addr = request.environ.get("HTTP_X_REAL_IP")
if not remote_addr:
remote_addr = request.environ.get("REMOTE_ADDR")
return remote_addr
# NOTCOVERED: not sure how to monkey patch this one. May be related to #77
[docs]def get_request_data_json(request):
"""
Gets the content of a json ``POST`` request and makes sure it decodes to a dictionary.
Args:
request (:obj:`Request`): the request sent by the user.
Returns:
:obj:`dict`: The dictionary parsed from the json request.
Raises:
:obj:`InvalidParameter`: if the request is not json encoded or it does not decodes to a dictionary.
"""
if request.is_json:
request_data = request.get_json()
if isinstance(request_data, dict):
return request_data
else:
raise InvalidParameter("Invalid request content")
else:
raise InvalidParameter("Request is not json encoded")
[docs]def serve(internal_api_endpoint, endpoint, min_to_self_delay, auto_run=False):
"""
Starts the API.
This method can be handled either form an external WSGI (like gunicorn) or by the Flask development server.
Args:
internal_api_endpoint (:obj:`str`): endpoint where the internal api is running (``host:port``).
endpoint (:obj:`str`): endpoint where the http api will be running (``host:port``).
min_to_self_delay (:obj:`str`): the minimum to_self_delay accepted by the :obj:`Inspector`.
auto_run (:obj:`bool`): whether the server should be started by this process. False if run with an external
WSGI. True is run by Flask.
Returns:
The application object needed by the WSGI server to run if ``auto_run`` is False, :obj:`None` otherwise.
"""
setup_logging()
inspector = Inspector(int(min_to_self_delay))
api = API(inspector, internal_api_endpoint)
api.logger.info(f"Initialized. Serving at {endpoint}")
if auto_run:
host, port = endpoint.split(":")
api.app.run(host=host, port=port)
else:
return api.app
[docs]class API:
"""
The :class:`API` is in charge of the interface between the user and the tower. It handles and serves user requests.
The API is connected with the :class:`InternalAPI <teos.internal_api.InternalAPI>` via gRPC.
Args:
inspector (:obj:`Inspector <teos.inspector.Inspector>`): an :obj:`Inspector` instance to check the correctness
of the received appointment data.
internal_api_endpoint (:obj:`str`): the endpoint where the internal api is served.
Attributes:
logger (:obj:`Logger <teos.logger.Logger>`): The logger for this component.
app: The Flask app of the API server.
"""
def __init__(self, inspector, internal_api_endpoint):
self.logger = get_logger(component=API.__name__)
self.app = Flask(__name__)
self.inspector = inspector
self.internal_api_endpoint = internal_api_endpoint
# Adds all the routes to the functions listed above.
routes = {
"/register": (self.register, ["POST"]),
"/add_appointment": (self.add_appointment, ["POST"]),
"/get_appointment": (self.get_appointment, ["POST"]),
}
for url, params in routes.items():
self.app.add_url_rule(url, view_func=params[0], methods=params[1])
[docs] def register(self):
"""
Registers a user by creating a subscription.
Registration is pretty straightforward for now, since it does not require payments.
The amount of slots and expiry of the subscription cannot be requested by the user yet either. This is linked to
the previous point.
Users register by sending a public key to the proper endpoint. This is exploitable atm, but will be solved when
payments are introduced.
Returns:
:obj:`tuple`: A tuple containing the response (:obj:`str`) and response code (:obj:`int`). For accepted
requests, the ``rcode`` is always 200 and the response contains a json with the public key and number of
slots in the subscription. For rejected requests, the ``rcode`` is a 404 and the value contains an
application error, and an error message. Error messages can be found at ``common.errors``.
"""
remote_addr = get_remote_addr()
self.logger.info("Received register request", from_addr="{}".format(remote_addr))
# Check that data type and content are correct. Abort otherwise.
try:
request_data = get_request_data_json(request)
except InvalidParameter as e:
self.logger.info("Received invalid register request", from_addr="{}".format(remote_addr))
return jsonify({"error": str(e), "error_code": errors.INVALID_REQUEST_FORMAT}), HTTP_BAD_REQUEST
user_id = request_data.get("public_key")
if user_id:
try:
with grpc.insecure_channel(self.internal_api_endpoint) as channel:
stub = TowerServicesStub(channel)
r = stub.register(RegisterRequest(user_id=user_id))
rcode = HTTP_OK
response = json_format.MessageToDict(
r, including_default_value_fields=True, preserving_proto_field_name=True
)
response["public_key"] = user_id
except grpc.RpcError as e:
rcode = HTTP_BAD_REQUEST
response = {"error": e.details(), "error_code": errors.REGISTRATION_MISSING_FIELD}
else:
rcode = HTTP_BAD_REQUEST
response = {
"error": "public_key not found in register message",
"error_code": errors.REGISTRATION_WRONG_FIELD_FORMAT,
}
self.logger.info("Sending response and disconnecting", from_addr="{}".format(remote_addr), response=response)
return jsonify(response), rcode
[docs] def add_appointment(self):
"""
Main endpoint of the Watchtower.
The client sends requests (appointments) to this endpoint to request a job to the Watchtower. Requests must be
json encoded and contain an ``appointment`` and ``signature`` fields.
Returns:
:obj:`tuple`: A tuple containing the response (:obj:`str`) and response code (:obj:`int`). For accepted
appointments, the ``rcode`` is always 200 and the response contains the receipt signature (json). For
rejected appointments, the ``rcode`` contains an application error, and an error message. Error messages can
be found at ``common.errors``.
"""
# Getting the real IP if the server is behind a reverse proxy
remote_addr = get_remote_addr()
self.logger.info("Received add_appointment request", from_addr="{}".format(remote_addr))
# Check that data type and content are correct. Abort otherwise.
try:
request_data = get_request_data_json(request)
except InvalidParameter as e:
return jsonify({"error": str(e), "error_code": errors.INVALID_REQUEST_FORMAT}), HTTP_BAD_REQUEST
try:
appointment = self.inspector.inspect(request_data.get("appointment"))
with grpc.insecure_channel(self.internal_api_endpoint) as channel:
stub = TowerServicesStub(channel)
r = stub.add_appointment(
AddAppointmentRequest(
appointment=Appointment(
locator=appointment.locator,
encrypted_blob=appointment.encrypted_blob,
to_self_delay=appointment.to_self_delay,
),
signature=request_data.get("signature"),
)
)
rcode = HTTP_OK
response = json_format.MessageToDict(
r, including_default_value_fields=True, preserving_proto_field_name=True
)
except InspectionFailed as e:
rcode = HTTP_BAD_REQUEST
response = {"error": "appointment rejected. {}".format(e.reason), "error_code": e.erno}
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAUTHENTICATED:
rcode = HTTP_BAD_REQUEST
response = {
"error": f"appointment rejected. {e.details()}",
"error_code": errors.APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS,
}
elif e.code() == grpc.StatusCode.ALREADY_EXISTS:
rcode = HTTP_BAD_REQUEST
response = {
"error": f"appointment rejected. {e.details()}",
"error_code": errors.APPOINTMENT_ALREADY_TRIGGERED,
}
else:
# This covers grpc.StatusCode.RESOURCE_EXHAUSTED (and any other return).
rcode = HTTP_SERVICE_UNAVAILABLE
response = {"error": "appointment rejected"}
self.logger.info("Sending response and disconnecting", from_addr="{}".format(remote_addr), response=response)
return jsonify(response), rcode
[docs] def get_appointment(self):
"""
Gives information about a given appointment state in the Watchtower.
The information is requested by ``locator``.
Returns:
:obj:`str`: A json formatted dictionary containing information about the requested appointment.
Returns not found if the user does not have the requested appointment or the locator is invalid.
A ``status`` flag is added to the data provided by either the :obj:`Watcher <teos.watcher.Watcher>` or the
:obj:`Responder <teos.responder.Responder>` that signals the status of the appointment.
- Appointments hold by the :obj:`Watcher <teos.watcher.Watcher>` are flagged as ``being_watched``.
- Appointments hold by the :obj:`Responder <teos.responder.Responder>` are flagged as ``dispute_triggered``.
- Unknown appointments are flagged as ``not_found``.
"""
# Getting the real IP if the server is behind a reverse proxy
remote_addr = get_remote_addr()
# Check that data type and content are correct. Abort otherwise.
try:
request_data = get_request_data_json(request)
except InvalidParameter as e:
self.logger.info("Received invalid get_appointment request", from_addr="{}".format(remote_addr))
return jsonify({"error": str(e), "error_code": errors.INVALID_REQUEST_FORMAT}), HTTP_BAD_REQUEST
locator = request_data.get("locator")
try:
self.inspector.check_locator(locator)
self.logger.info("Received get_appointment request", from_addr="{}".format(remote_addr), locator=locator)
with grpc.insecure_channel(self.internal_api_endpoint) as channel:
stub = TowerServicesStub(channel)
r = stub.get_appointment(
GetAppointmentRequest(locator=locator, signature=request_data.get("signature"))
)
data = (
r.appointment_data.appointment
if r.appointment_data.WhichOneof("appointment_data") == "appointment"
else r.appointment_data.tracker
)
rcode = HTTP_OK
response = {
"locator": locator,
"status": r.status,
"appointment": json_format.MessageToDict(
data, including_default_value_fields=True, preserving_proto_field_name=True
),
}
except (InspectionFailed, grpc.RpcError):
rcode = HTTP_NOT_FOUND
response = {"locator": locator, "status": "not_found"}
return jsonify(response), rcode