Source code for futureproof.executors

import concurrent.futures as futures
import logging
import sys
import time
from threading import Lock

logger = logging.getLogger(__name__)


class _FutureProofExecutor:
    """A wrapper around a subclass of concurrent.futures.Executor.

    Requires the subclass to be passed as the first parameter to the constructor.
    Do not use this class directly, use either ThreadPoolExecutor or ProcessPoolExecutor.
    """

    def __init__(self, executor_cls, *args, monitor_interval=2, **kwargs):
        self._executor = executor_cls(*args, **kwargs)  # type: futures.Executor
        self._monitor_interval = monitor_interval  # type: int
        self._log = logger.info if self._monitor_interval != 0 else logger.debug
        self._current_futures = set()  # type: set
        self._current_futures_lock = Lock()  # type: Lock
        self._monitor_count = 0  # type: int
        self._monitor_future = None  # type: futures.Future
        self._monitor_lock = Lock()  # type: Lock

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        return self._executor.__exit__(exc_type, exc_val, exc_tb)

    @property
    def max_workers(self):
        return self._executor._max_workers

    def join(self):
        logger.debug("Shutting down executor")
        self._executor.shutdown()

    def initialize_worker(self):
        """Called on a newly spawned worker to perform initialization"""
        pass

    def submit(self, fn, *args, **kwargs):
        """Create a future invoking fn with the specified args and kwargs"""
        if self._monitor_future is None:
            # TODO: we're taking up a worker for monitoring without alerting the user
            # This is especially bad in the case of a ProcessExecutor
            self._monitor_future = self._executor.submit(self.monitor)

        fut = self._executor.submit(fn, *args, **kwargs)
        with self._current_futures_lock:
            self._current_futures.add(fut)
        # NOTE: done callbacks will be executed immediately if the future is complete
        fut.add_done_callback(self._cb)
        return fut

    def _cb(self, future):
        with self._current_futures_lock:
            self._current_futures.remove(future)
        with self._monitor_lock:
            self._monitor_count += 1

    def monitor(self):
        """Monitor task checking on the status of the futures and logging the progress."""
        try:
            self._log("Starting executor monitor")
            while True:
                # If there are no futures being processed we check if we're
                # shutting down or eagerly wait for more futures
                if not self._current_futures:
                    if self._executor._shutdown:
                        return
                    else:
                        logger.debug("No current futures")
                        time.sleep(0.01)
                else:
                    start = time.time()
                    previous_count = self._monitor_count

                    time.sleep(self._monitor_interval)

                    with self._monitor_lock:
                        self._log(
                            "%d task(s) completed in the last %.2f seconds",
                            self._monitor_count - previous_count,
                            time.time() - start,
                        )

                    if self._executor._shutdown:
                        self._log("Shutting down monitor...")
                        return
        except Exception:
            logger.exception("Error in monitor")


[docs]class ThreadPoolExecutor(_FutureProofExecutor): """Wrapper around concurrent.futures ThreadPoolExecutor. Arguments not specified below will be forwarded to the underlying executor. :param monitor_interval: Frequency in seconds for monitor logging, defaults to 2 seconds, set to 0 to disable. """ def __init__(self, *args, monitor_interval=2, **kwargs): super().__init__( futures.ThreadPoolExecutor, *args, monitor_interval=monitor_interval, **kwargs )
[docs]class ProcessPoolExecutor(_FutureProofExecutor): """Wrapper around concurrent.futures ProcessPoolExecutor. Available only in Python 3.7 and above. Arguments not specified below will be forwarded to the underlying executor. :param monitor_interval: Frequency in seconds for monitor logging, defaults to 2 seconds, set to 0 to disable. """ def __init__(self, *args, monitor_interval=2, **kwargs): if sys.version_info < (3, 7): raise NotImplementedError( "ProcessPoolExecutor are only available for Python 3.7+" ) super().__init__( futures.ProcessPoolExecutor, *args, monitor_interval=monitor_interval, **kwargs ) def join(self): logger.debug("Shutting down executor")
# Do not shutdown the executor manually as it logs an error # in the concurrent.futures.process registered exit handler