threadable.py   [plain text]



# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

"""
A module that will allow your program to be multi-threaded,
micro-threaded, and single-threaded.  Currently microthreads are
unimplemented.  The idea is to abstract away some commonly used
functionality so that I don't have to special-case it in all programs.
"""

import traceback
import sys

class ThreadableError(Exception): pass

class _Waiter:
    def __init__(self):
        self.callbacks = {}
        self.results = {}
        self.conditions = {}

    def registerCallback(self, key, callback=None, errback=None):
        self.callbacks[key] = errback, callback

    def hasCallback(self, key):
        return self.callbacks.has_key(key)

    def preWait(self, key):
        pass

    def block(self):
        pass

    def wait(self, key):
        self.conditions[key] = 1
        while not self.results.has_key(key):
            # This call will ostensibly eventually populate this
            # dictionary.
            self.block()
        r, is_ok = self.results[key]
        del self.results[key]
        if is_ok:
            return r
        else:
            raise r


    def runCallback(self, key, value, ok):
        call_or_err = self.callbacks.get(key)
        if not call_or_err:
            return 0
        callback = call_or_err[ok]
        if callback is None:
            del self.callbacks[key]
            return 1

        try:
            callback(value)
            del self.callbacks[key]
            return 1
        except:
            log.deferr()
            del self.callbacks[key]
            return 2

    def unwait(self, key, value, ok):
        if not self.runCallback(key, value, ok):
            self.results[key] = (value, ok)
            if self.conditions.has_key(key):
                del self.conditions[key]

    def unwait_all(self):
        for k in self.conditions.keys():
            self.unwait(k, ThreadableError("Shut Down."), 0)


class _ThreadedWaiter(_Waiter):
    synchronized = ['registerCallback',
                    'hasCallback',
                    'preWait',
                    'unwait']

    def __init__(self):
        _Waiter.__init__(self)

    def preWait(self, key):
        global ioThread
        if threadmodule.get_ident() == ioThread:
            return _Waiter.preWait(self, key)
        cond = self.conditions[key] = threadingmodule.Condition()
        cond.acquire()

    def wait(self, key):
        global ioThread
        if threadmodule.get_ident() == ioThread:
            return _Waiter.wait(self, key)
        cond = self.conditions[key]
        cond.wait()
        # ...
        r, is_ok = self.results[key]
        del self.conditions[key]
        del self.results[key]
        cond.release()

        if is_ok:
            return r
        else:
            raise r


    def unwait(self, key, value, ok):
        if not self.runCallback(key, value, ok):
            cond = self.conditions.get(key)
            if cond is None:
                self.results[key] = (value,ok)
            else:
                cond.acquire()
                self.results[key] = (value, ok)
                cond.notify()
                cond.release()

class _XLock:
    """
    Exclusive lock class.  The advantage of this over threading.RLock
    is that it's picklable (kinda...).  The goal is to one day not be
    dependent upon threading, and to have this work for any old
    'thread'
    """
    
    def __init__(self):
        assert threaded,\
               "Locks may not be allocated in an unthreaded environment!"

        self.block = threadmodule.allocate_lock()
        self.count = 0
        self.owner = 0

    def __setstate__(self, state):
        self.__init__()

    def __getstate__(self):
        return None

    def withThisLocked(self, func, *args, **kw):
        self.acquire()
        try:
            return apply(func,args,kw)
        finally:
            self.release()

    def acquire(self):
        current = threadmodule.get_ident()
        if self.owner == current:
            self.count = self.count + 1
            return 1
        self.block.acquire()
        self.owner = current
        self.count = 1

    def release(self):
        current = threadmodule.get_ident()
        if self.owner != current:
            raise "Release of unacquired lock."

        self.count = self.count - 1

        if self.count == 0:
            self.owner = None
            self.block.release()

##def _synch_init(self, *a, **b):
##    self.lock = XLock()

def _synchPre(self, *a, **b):
    if not self.__dict__.has_key('_threadable_lock'):
        _synchLockCreator.acquire()
        if not self.__dict__.has_key('_threadable_lock'):
            self.__dict__['_threadable_lock'] = XLock()
        _synchLockCreator.release()
    self._threadable_lock.acquire()

def _synchPost(self, *a, **b):
    self._threadable_lock.release()

_to_be_synched = []

def synchronize(*klasses):
    """Make all methods listed in each class' synchronized attribute synchronized.

    The synchronized attribute should be a list of strings, consisting of the
    names of methods that must be synchronized. If we are running in threaded
    mode these methods will be wrapped with a lock.
    """
    global _to_be_synched
    if not threaded:
        map(_to_be_synched.append, klasses)
        return

    if threaded:
        import hook
        for klass in klasses:
##            hook.addPre(klass, '__init__', _synch_init)
            for methodName in klass.synchronized:
                hook.addPre(klass, methodName, _synchPre)
                hook.addPost(klass, methodName, _synchPost)

threaded = None
ioThread = None
threadCallbacks = []

def whenThreaded(cb):
    if threaded:
        cb()
    else:
        threadCallbacks.append(cb)

def init(with_threads=1):
    """Initialize threading. Should be run once, at the beginning of program.
    """
    global threaded, _to_be_synched, Waiter
    global threadingmodule, threadmodule, XLock, _synchLockCreator
    if threaded == with_threads:
        return
    elif threaded:
        raise RuntimeError("threads cannot be disabled, once enabled")
    threaded = with_threads
    if threaded:
        log.msg('Enabling Multithreading.')
        import thread, threading
        threadmodule = thread
        threadingmodule = threading
        Waiter = _ThreadedWaiter
        XLock = _XLock
        _synchLockCreator = XLock()
        synchronize(*_to_be_synched)
        _to_be_synched = []
        for cb in threadCallbacks:
            cb()
    else:
        Waiter = _Waiter
        # Hack to allow XLocks to be unpickled on an unthreaded system.
        class DummyXLock:
            pass
        XLock = DummyXLock


def isInIOThread():
    """Are we in the thread responsable for I/O requests (the event loop)?
    """
    global threaded
    global ioThread
    if threaded:
        if (ioThread == threadmodule.get_ident()):
            return 1
        else:
            return 0
    return 1


def registerAsIOThread():
    """Mark the current thread as responsable for I/O requests.
    """
    global threaded
    global ioThread
    if threaded:
        import thread
        ioThread = thread.get_ident()


synchronize(_ThreadedWaiter)
init(0)

# sibling imports
import log