1
0
mirror of https://github.com/sjlongland/tornado-gallery.git synced 2025-09-13 08:42:23 +10:00
tornado-gallery/tornado_gallery/pool.py
Stuart Longland 8372935fe1
resizer: Use Tornado-friendly pool
Standard pool will block when too many requests are pushed into the
stack, which means *everything* blocks even if a request is not
dependent on the pool.
2018-04-18 17:35:34 +10:00

94 lines
2.5 KiB
Python

from tornado.gen import coroutine, Future, Return
from tornado.ioloop import IOLoop
from tornado.queues import Queue
from tornado.locks import Semaphore
from threading import Thread
from multiprocessing import cpu_count
from sys import exc_info
class WorkerPool(object):
"""
The WorkerPool object represents a pool of worker threads which
each run a task in an external thread.
"""
def __init__(self, workers=None, io_loop=None):
if workers is None:
workers = cpu_count()
if io_loop is None:
io_loop = IOLoop.current()
self._io_loop = io_loop
self._sem = Semaphore(workers)
self._queue = Queue()
self._active = False
@coroutine
def apply(self, func, args=None, kwds=None):
"""
Enqueue a request to be processed in a worker thread.
"""
if args is None: args = ()
if kwds is None: kwds = {}
# Our result placeholder
future = Future()
# Enqueue the request
yield self._queue.put((future, func, args, kwds))
# Kick-start the queue manager if not already running
self._io_loop.add_callback(self._queue_manager)
# Get back the result
result = yield future
raise Return(result)
@coroutine
def _apply(self, future, func, args=None, kwds=None):
"""
Execute a function in a worker thread. Wrapper function.
"""
yield self._sem.acquire()
# Receive the result back; sets the future result
def _recv_result(err, res):
self._sem.release()
if err is not None:
future.set_exc_info(err)
else:
future.set_result(res)
# Run the function; in a worker thread
def _exec():
err = None
res = None
try:
res = func(*args, **kwds)
except:
err = exc_info()
self._io_loop.add_callback(_recv_result, err, res)
# Spawn the worker thread
thread = Thread(target=_exec)
thread.start()
@coroutine
def _queue_manager(self):
"""
Queue manager co-routine.
"""
if self._active:
# Already active
return
try:
self._active = True
while True:
(future, func, args, kwds) = yield self._queue.get()
self._io_loop.add_callback(
self._apply, future, func, args, kwds)
finally:
self._active = False