Source code for teos.chain_monitor

from enum import Enum
from queue import Queue
import zmq
import binascii
from threading import Thread, Event, Condition

from teos.logger import get_logger


[docs]class ChainMonitorStatus(Enum): IDLE = 0 LISTENING = 1 ACTIVE = 2 TERMINATED = 3
[docs]class ChainMonitor: """ The :obj:`ChainMonitor` is in charge of monitoring the blockchain (via ``bitcoind``) to detect new blocks on top of the best chain. If a new best block is spotted, the chain monitor will notify the given queues. The :obj:`ChainMonitor` monitors the chain using two methods: ``zmq`` and ``polling``. Blocks are only notified once per queue and the notification is triggered by the method that detects the block faster. The :obj:`ChainMonitor` lifecycle goes through 4 states: idle, listening, active and terminated. When a :obj:`ChainMonitor` instance is created, it is not yet monitoring the chain and the ``status`` attribute is set to ``ChainMonitorStatus.IDLE``. Once the ``monitor_chain`` method is called, the chain monitor changes ``status`` to ``ChainMonitorStatus.LISTENING``, and starts monitoring the chain for new blocks; it does not yet notify the receiving queues, but keeps the block hashes in the order they where spotted in an internal queue. Once the ``activate`` method is called, the ``status`` changes to ``ChainMonitorStatus.ACTIVE``, and the receiving queues are notified in order for all the block hashes that are in the internal queue or any new one that is detected. Finally, once the ``terminate`` method is called, the ``status`` is changed to ``ChainMonitorStatus.TERMINATED``, the chain monitor stops monitoring the chain and no receiving queue will be notified about new blocks (including any block that is currently in the internal queue). A final ``"END"`` message is sent to all the subscribers. Args: receiving_queues (:obj:`list`): a list of :obj:`Queue` objects that will be notified when the chain_monitor is active and it received new blocks hashes. block_processor (:obj:`BlockProcessor <teos.block_processor.BlockProcessor>`): a :obj:`BlockProcessor` instance. bitcoind_feed_params (:obj:`dict`): a dict with the feed (ZMQ) connection parameters. Attributes: logger (:obj:`Logger <teos.logger.Logger>`): The logger for this component. last_tips (:obj:`list`): A list of last chain tips. Used as a sliding window to avoid notifying about old tips. check_tip (:obj:`Event`): An event that is triggered at fixed time intervals and controls the polling thread. lock (:obj:`Condition`): A lock used to protect concurrent access to the queues by the zmq and polling threads. zmqSubSocket (:obj:`socket`): A socket to connect to ``bitcoind`` via ``zmq``. polling_delta (:obj:`int`): Time between polls (in seconds). max_block_window_size (:obj:`int`): Max size of ``last_tips``. queue (:obj:`Queue`): A queue where blocks are stored before they are processed. status (:obj:`ChainMonitorStatus`): The current status of the monitor, either ``ChainMonitorStatus.IDLE``, ``ChainMonitorStatus.LISTENING``, ``ChainMonitorStatus.ACTIVE`` or ``ChainMonitorStatus.TERMINATED``. """ def __init__(self, receiving_queues, block_processor, bitcoind_feed_params): self.logger = get_logger(component=ChainMonitor.__name__) self.last_tips = [] self.check_tip = Event() self.lock = Condition() self.zmqContext = zmq.Context() self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.connect( "%s://%s:%s" % ( bitcoind_feed_params.get("BTC_FEED_PROTOCOL"), bitcoind_feed_params.get("BTC_FEED_CONNECT"), bitcoind_feed_params.get("BTC_FEED_PORT"), ) ) self.receiving_queues = receiving_queues self.polling_delta = 60 self.max_block_window_size = 10 self.block_processor = block_processor self.queue = Queue() self.status = ChainMonitorStatus.IDLE
[docs] def enqueue(self, block_hash): """ Adds a new block hash to the internal queue of the :obj:`ChainMonitor` and the internal state. The state contains the list of ``last_tips`` to prevent notifying about old blocks. ``last_tips`` is bounded to ``max_block_window_size``. Args: block_hash (:obj:`str`): the new best tip. Returns: :obj:`bool`: True if the state was successfully updated, False otherwise. """ if block_hash not in self.last_tips: with self.lock: self.queue.put(block_hash) self.last_tips.append(block_hash) if len(self.last_tips) > self.max_block_window_size: self.last_tips.pop(0) return True else: return False
[docs] def monitor_chain_polling(self): """ Monitors ``bitcoind`` via polling. Once the method is fired, it keeps monitoring as long as the ``status`` attribute is not ``ChainMonitorStatus.TERMINATED``. Polling is performed once every ``polling_delta`` seconds. If a new best tip is found, it is added to the internal queue. """ while self.status != ChainMonitorStatus.TERMINATED: self.check_tip.wait(timeout=self.polling_delta) current_tip = self.block_processor.get_best_block_hash() # get_best_block_hash may return None if the RPC times out. if current_tip and current_tip not in self.last_tips: self.logger.info("New block received via polling", block_hash=current_tip) self.enqueue(current_tip)
[docs] def monitor_chain_zmq(self): """ Monitors ``bitcoind`` via zmq. Once the method is fired, it keeps monitoring as long as the ``status`` attribute is not ``ChainMonitorStatus.TERMINATED``. If a new best tip is found, it is added to the internal queue. """ while self.status != ChainMonitorStatus.TERMINATED: msg = self.zmqSubSocket.recv_multipart() topic = msg[0] body = msg[1] if topic == b"hashblock": block_hash = binascii.hexlify(body).decode("utf-8") if block_hash not in self.last_tips: self.logger.info("New block received via zmq", block_hash=block_hash) self.enqueue(block_hash)
[docs] def notify_subscribers(self): """ Once the method is fired, it keeps getting the elements added to the internal queue and notifies the receiving queues about them. It terminates whenever the internal state is set to ``ChainMonitorStatus.TERMINATED``. """ while self.status != ChainMonitorStatus.TERMINATED: message = self.queue.get() # A special "END" message is added to the queue after the status is set to TERMINATED # In all the other cases, message is a block_hash with self.lock: for rec_queue in self.receiving_queues: rec_queue.put(message)
[docs] def monitor_chain(self): """ Changes the ``status`` of the :obj:`ChainMonitor` from idle to listening. It initializes the ``last_tips`` list to terminate the current best tip (by querying the :obj:`BlockProcessor <teos.block_processor.BlockProcessor>`) and creates two threads, one per each monitoring approach (``zmq`` and ``polling``). Raises: :obj:`RuntimeError`: if the ``status`` was not ``ChainMonitor.IDLE`` when the method was called. """ if self.status != ChainMonitorStatus.IDLE: raise RuntimeError(f"This method can only be called in IDLE status. Current status is {self.status.name}.") self.status = ChainMonitorStatus.LISTENING self.last_tips.append(self.block_processor.get_best_block_hash()) Thread(target=self.monitor_chain_polling, daemon=True).start() Thread(target=self.monitor_chain_zmq, daemon=True).start()
[docs] def activate(self): """ Changes the ``status`` of the :obj:`ChainMonitor` from listening to active. It creates a new thread that runs the ``notify_subscribers`` method, which is in charge of notifying the receiving queue for each block hash that is added to the internal queue. Raises: :obj:`RuntimeError`: if the ``status`` was not ``ChainMonitor.LISTENING`` when the method was called. """ if self.status != ChainMonitorStatus.LISTENING: raise RuntimeError( f"This method can only be called in LISTENING status. Current status is {self.status.name}." ) self.status = ChainMonitorStatus.ACTIVE Thread(target=self.notify_subscribers, daemon=True).start()
[docs] def terminate(self): """ Changes the ``status`` of the :obj:`ChainMonitor` to terminated and sends the "END" message to the internal queue. All the threads will stop as soon as possible. """ self.status = ChainMonitorStatus.TERMINATED self.queue.put("END")