Source code for teos.teosd

import os
import daemon
import subprocess
from sys import argv, exit
import multiprocessing
import threading

from getopt import getopt, GetoptError
from signal import signal, SIGINT, SIGQUIT, SIGTERM

from common.config_loader import ConfigLoader
from common.cryptographer import Cryptographer
from common.tools import setup_data_folder

import teos.api as api
import teos.rpc as rpc
from teos.logger import setup_logging, get_logger, serve as serve_logging
from teos.help import show_usage
from teos.watcher import Watcher
from teos.builder import Builder
from teos.carrier import Carrier
from teos.users_dbm import UsersDBM
from teos.responder import Responder
from teos.gatekeeper import Gatekeeper
from teos.internal_api import InternalAPI
from teos.chain_monitor import ChainMonitor
from teos.block_processor import BlockProcessor
from teos.appointments_dbm import AppointmentsDBM
from teos import DATA_DIR, DEFAULT_CONF, CONF_FILE_NAME
from teos.tools import can_connect_to_bitcoind, in_correct_network, get_default_rpc_port
from teos.constants import INTERNAL_API_ENDPOINT, SHUTDOWN_GRACE_TIME

parent_pid = os.getpid()


[docs]def get_config(command_line_conf, data_dir): """ Combines the command line config with the config loaded from the file and the default config in order to construct the final config object. Args: command_line_conf (:obj:`dict`): a collection of the command line parameters. Returns: :obj:`dict`: A dictionary containing all the system's configuration parameters. """ config_loader = ConfigLoader(data_dir, CONF_FILE_NAME, DEFAULT_CONF, command_line_conf) config = config_loader.build_config() # Set default RPC port if not overwritten by the user. if "BTC_RPC_PORT" not in config_loader.overwritten_fields: config["BTC_RPC_PORT"] = get_default_rpc_port(config.get("BTC_NETWORK")) return config
[docs]class TeosDaemon: """ The :class:`TeosDaemon` organizes the code to initialize all the components of teos, start the service, stop and teardown. Args: config (:obj:`dict`): the configuration object. sk (:obj:`PrivateKey`): the :obj:`PrivateKey` of the tower. logger (:obj:`Logger <teos.logger.Logger>`): the logger instance. Attributes: stop_command_event (:obj:`threading.Event`): The event that will be set to initiate a graceful shutdown. stop_event (:obj:`multiprocessing.Event`): The event that services running on different processes will monitor in order to be informed that they should shutdown. block_processor (:obj:`teos.block_processor.BlockProcessor`): The block processor instance. db_manager (:obj:`teos.appointments_dbm.AppointmentsDBM`): The db manager for appointments. watcher (:obj:`teos.watcher.Watcher`): The watcher instance. watcher_thread (:obj:`multithreading.Thread`): After ``bootstrap_components``, the thread that runs the Watcher monitoring (set to :obj:`None` beforehand). responder_thread (:obj:`multithreading.Thread`): After ``bootstrap_components``, the thread that runs the Responder monitoring (set to :obj:`None` beforehand). chain_monitor (:obj:`teos.chain_monitor.ChainMonitor`): The ``ChainMonitor`` instance. self.api_proc (:obj:`subprocess.Popen` or :obj:`multiprocessing.Process`): Once the rpc process is created, the instance of either ``Popen`` or ``Process`` that is serving the public API (set to :obj:`None` beforehand). self.rpc_process (:obj:`multiprocessing.Process`): The instance of the internal RPC server; only set if running. self.internal_api (:obj:`teos.internal_api.InternalAPI`): The InternalAPI instance. """ def __init__(self, config, sk, logger): self.config = config self.logger = logger # event triggered when a ``stop`` command is issued # Using multiprocessing.Event seems to cause a deadlock if event.set() is called in a signal handler that # interrupted event.wait(). This does not happen with threading.Event. # See https://bugs.python.org/issue41606 self.stop_command_event = threading.Event() # event triggered when the public API is halted, hence teosd is ready to stop self.stop_event = multiprocessing.Event() bitcoind_connect_params = {k: v for k, v in config.items() if k.startswith("BTC_RPC")} bitcoind_feed_params = {k: v for k, v in config.items() if k.startswith("BTC_FEED")} if not can_connect_to_bitcoind(bitcoind_connect_params): raise RuntimeError("Cannot connect to bitcoind") elif not in_correct_network(bitcoind_connect_params, config.get("BTC_NETWORK")): raise RuntimeError("bitcoind is running on a different network, check teos.conf and bitcoin.conf") self.logger.info("tower_id = {}".format(Cryptographer.get_compressed_pk(sk.public_key))) self.block_processor = BlockProcessor(bitcoind_connect_params) carrier = Carrier(bitcoind_connect_params) gatekeeper = Gatekeeper( UsersDBM(self.config.get("USERS_DB_PATH")), self.block_processor, self.config.get("SUBSCRIPTION_SLOTS"), self.config.get("SUBSCRIPTION_DURATION"), self.config.get("EXPIRY_DELTA"), ) self.db_manager = AppointmentsDBM(self.config.get("APPOINTMENTS_DB_PATH")) responder = Responder(self.db_manager, gatekeeper, carrier, self.block_processor) self.watcher = Watcher( self.db_manager, gatekeeper, self.block_processor, responder, sk, self.config.get("MAX_APPOINTMENTS"), self.config.get("LOCATOR_CACHE_SIZE"), ) self.watcher_thread = None self.responder_thread = None # Create the chain monitor self.chain_monitor = ChainMonitor( [self.watcher.block_queue, self.watcher.responder.block_queue], self.block_processor, bitcoind_feed_params ) # Set up the internal API self.internal_api = InternalAPI(self.watcher, INTERNAL_API_ENDPOINT, self.stop_command_event) # Create the rpc, without starting it self.rpc_process = multiprocessing.Process( target=rpc.serve, args=( self.config.get("RPC_BIND"), self.config.get("RPC_PORT"), INTERNAL_API_ENDPOINT, self.stop_event, self.config.get("LOG_FILE"), ), daemon=True, ) # This variables will contain the handle of the process running the API, when the service is started. # It will be an instance of either Popen or Process, depending on the WSGI config setting. self.api_proc = None
[docs] def bootstrap_components(self): """ Performs the initial setup of the components. It loads the appointments and tracker for the watcher and the responder (if any), and awakes the components. It also populates the block queues with any missing data, in case the tower has been offline for some time. Finally, it starts the chain monitor. """ # Make sure that the ChainMonitor starts listening to new blocks while we bootstrap self.chain_monitor.monitor_chain() watcher_appointments_data = self.db_manager.load_watcher_appointments() responder_trackers_data = self.db_manager.load_responder_trackers() if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: self.logger.info("Fresh bootstrap") self.watcher_thread = self.watcher.awake() self.responder_thread = self.watcher.responder.awake() else: self.logger.info("Bootstrapping from backed up data") # Update the Watcher backed up data if found. if len(watcher_appointments_data) != 0: self.watcher.appointments, self.watcher.locator_uuid_map = Builder.build_appointments( watcher_appointments_data ) # Update the Responder with backed up data if found. if len(responder_trackers_data) != 0: self.watcher.responder.trackers, self.watcher.responder.tx_tracker_map = Builder.build_trackers( responder_trackers_data ) # Awaking components so the states can be updated. self.watcher_thread = self.watcher.awake() self.responder_thread = self.watcher.responder.awake() last_block_watcher = self.db_manager.load_last_block_hash_watcher() last_block_responder = self.db_manager.load_last_block_hash_responder() # Populate the block queues with data if they've missed some while offline. If the blocks of both match # we don't perform the search twice. # FIXME: 32-reorgs-offline dropped txs are not used at this point. last_common_ancestor_watcher, dropped_txs_watcher = self.block_processor.find_last_common_ancestor( last_block_watcher ) missed_blocks_watcher = self.block_processor.get_missed_blocks(last_common_ancestor_watcher) if last_block_watcher == last_block_responder: dropped_txs_responder = dropped_txs_watcher missed_blocks_responder = missed_blocks_watcher else: last_common_ancestor_responder, dropped_txs_responder = self.block_processor.find_last_common_ancestor( last_block_responder ) missed_blocks_responder = self.block_processor.get_missed_blocks(last_common_ancestor_responder) # If only one of the instances needs to be updated, it can be done separately. if len(missed_blocks_watcher) == 0 and len(missed_blocks_responder) != 0: Builder.populate_block_queue(self.watcher.responder.block_queue, missed_blocks_responder) self.watcher.responder.block_queue.join() elif len(missed_blocks_responder) == 0 and len(missed_blocks_watcher) != 0: Builder.populate_block_queue(self.watcher.block_queue, missed_blocks_watcher) self.watcher.block_queue.join() # Otherwise they need to be updated at the same time, block by block elif len(missed_blocks_responder) != 0 and len(missed_blocks_watcher) != 0: Builder.update_states(self.watcher, missed_blocks_watcher, missed_blocks_responder) # Activate ChainMonitor self.chain_monitor.activate()
[docs] def start_services(self): """Readies the tower by setting up signal handling, and starting all the services.""" signal(SIGINT, self.handle_signals) signal(SIGTERM, self.handle_signals) signal(SIGQUIT, self.handle_signals) # Start the rpc process self.rpc_process.start() # Start the internal API # This MUST be done after rpc_process.start to avoid the issue that was solved in # https://github.com/talaia-labs/python-teos/pull/198 self.internal_api.rpc_server.start() self.logger.info(f"Internal API initialized. Serving at {INTERNAL_API_ENDPOINT}") # Start the public API server api_endpoint = f"{self.config.get('API_BIND')}:{self.config.get('API_PORT')}" if self.config.get("WSGI") == "gunicorn": # FIXME: We may like to add workers depending on a config value self.api_proc = subprocess.Popen( [ "gunicorn", f"--bind={api_endpoint}", f"teos.api:serve(internal_api_endpoint='{INTERNAL_API_ENDPOINT}', " f"endpoint='{api_endpoint}', min_to_self_delay='{self.config.get('MIN_TO_SELF_DELAY')}')", ] ) else: self.api_proc = multiprocessing.Process( target=api.serve, kwargs={ "internal_api_endpoint": INTERNAL_API_ENDPOINT, "endpoint": api_endpoint, "min_to_self_delay": self.config.get("MIN_TO_SELF_DELAY"), "auto_run": True, }, ) self.api_proc.start()
[docs] def handle_signals(self, signum, frame): """Handles signals by initiating a graceful shutdown.""" self.logger.debug(f"Signal {signum} received. Stopping") self.stop_command_event.set()
[docs] def teardown(self): """Shuts down all services and closes the DB, then exits. This method does not return.""" self.logger.info("Terminating public API") # Stop the public API first if isinstance(self.api_proc, subprocess.Popen): self.api_proc.terminate() self.api_proc.wait() elif isinstance(self.api_proc, multiprocessing.Process): # FIXME: when the public API process is ran with flask, there is no SIGTERM handler attempting # a graceful shutdown (rejecting new requests, trying to complete ongoing ones); therefore, we send # a SIGKILL instead. self.api_proc.kill() self.api_proc.join() self.logger.info("Public API terminated") # Signals readiness to shutdown to the other processes self.stop_event.set() # wait for RPC process to shutdown self.rpc_process.join() # Stops the internal API, after waiting for some grace time self.logger.info("Stopping internal API") self.internal_api.rpc_server.stop(SHUTDOWN_GRACE_TIME).wait() self.logger.info("Internal API stopped") # terminate the ChainMonitor self.chain_monitor.terminate() # wait for watcher and responder to finish processing their queues self.watcher_thread.join() self.responder_thread.join() self.logger.info("Closing connection with appointments db") self.db_manager.db.close() self.logger.info("Shutting down TEOS") exit(0)
[docs] def start(self): """This method implements the whole lifetime cycle of the the TEOS tower. This method does not return.""" self.logger.info("Starting TEOS") self.bootstrap_components() self.start_services() self.stop_command_event.wait() self.teardown()
[docs]def main(config): setup_data_folder(config.get("DATA_DIR")) silent = config.get("DAEMON") logging_server_ready = multiprocessing.Event() logging_process = multiprocessing.Process( target=serve_logging, daemon=True, args=(config.get("LOG_FILE"), silent, logging_server_ready) ) logging_process.start() logging_server_ready.wait() setup_logging() logger = get_logger(component="Daemon") if not os.path.exists(config.get("TEOS_SECRET_KEY")) or config.get("OVERWRITE_KEY"): logger.info("Generating a new key pair") sk = Cryptographer.generate_key() Cryptographer.save_key_file(sk.to_der(), "teos_sk", config.get("DATA_DIR")) else: logger.info("Tower identity found. Loading keys") secret_key_der = Cryptographer.load_key_file(config.get("TEOS_SECRET_KEY")) if not secret_key_der: raise IOError("TEOS private key cannot be loaded") sk = Cryptographer.load_private_key_der(secret_key_der) try: TeosDaemon(config, sk, logger).start() except Exception as e: logger.error("An error occurred: {}. Shutting down".format(e)) exit(1)
if __name__ == "__main__": # Subprocess need to be run using "spawn" for consistent execution between different OS. No state is really shared # between process. multiprocessing.set_start_method("spawn") command_line_conf = {} data_dir = DATA_DIR opts, _ = getopt( argv[1:], "hd", [ "apibind=", "apiport=", "rpcbind=", "rpcport=", "btcnetwork=", "btcrpcuser=", "btcrpcpassword=", "btcrpcconnect=", "btcrpcport=", "btcfeedconnect=", "btcfeedport=", "datadir=", "wsgi=", "daemon", "overwritekey", "help", ], ) try: for opt, arg in opts: if opt in ["--apibind"]: command_line_conf["API_BIND"] = arg if opt in ["--apiport"]: try: command_line_conf["API_PORT"] = int(arg) except ValueError: exit("apiport must be an integer") if opt in ["--rpcbind"]: command_line_conf["RPC_BIND"] = arg if opt in ["--rpcport"]: try: command_line_conf["RPC_PORT"] = int(arg) except ValueError: exit("rpcport must be an integer") if opt in ["--btcnetwork"]: command_line_conf["BTC_NETWORK"] = arg if opt in ["--btcrpcuser"]: command_line_conf["BTC_RPC_USER"] = arg if opt in ["--btcrpcpassword"]: command_line_conf["BTC_RPC_PASSWORD"] = arg if opt in ["--btcrpcconnect"]: command_line_conf["BTC_RPC_CONNECT"] = arg if opt in ["--btcrpcport"]: try: command_line_conf["BTC_RPC_PORT"] = int(arg) except ValueError: exit("btcrpcport must be an integer") if opt in ["--btcfeedconnect"]: command_line_conf["BTC_FEED_CONNECT"] = arg if opt in ["--btcfeedport"]: try: command_line_conf["BTC_FEED_PORT"] = int(arg) except ValueError: exit("btcfeedport must be an integer") if opt in ["--datadir"]: data_dir = os.path.expanduser(arg) if opt in ["--wsgi"]: if arg in ["gunicorn", "flask"]: command_line_conf["WSGI"] = arg else: exit("wsgi must be either gunicorn or flask") if opt in ["-d", "--daemon"]: command_line_conf["DAEMON"] = True if opt in ["--overwritekey"]: command_line_conf["OVERWRITE_KEY"] = True if opt in ["-h", "--help"]: exit(show_usage()) except GetoptError as e: exit(e) config = get_config(command_line_conf, data_dir) if config.get("DAEMON"): print("Starting TEOS") with daemon.DaemonContext(): main(config) else: main(config)