import threading, weakref, sys |
import Queue |
if 'Message' not in globals(): |
from py.__.execnet.message import Message |
|
class RemoteError(EOFError): |
""" Contains an Exceptions from the other side. """ |
def __init__(self, formatted): |
self.formatted = formatted |
EOFError.__init__(self) |
|
def __str__(self): |
return self.formatted |
|
def __repr__(self): |
return "%s: %s" %(self.__class__.__name__, self.formatted) |
|
def warn(self): |
|
print >> sys.stderr, "Warning: unhandled %r" % (self,) |
|
NO_ENDMARKER_WANTED = object() |
|
|
class Channel(object): |
"""Communication channel between two possibly remote threads of code. """ |
RemoteError = RemoteError |
|
def __init__(self, gateway, id): |
assert isinstance(id, int) |
self.gateway = gateway |
self.id = id |
self._items = Queue.Queue() |
self._closed = False |
self._receiveclosed = threading.Event() |
self._remoteerrors = [] |
|
def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED): |
queue = self._items |
lock = self.gateway._channelfactory._receivelock |
lock.acquire() |
try: |
_callbacks = self.gateway._channelfactory._callbacks |
dictvalue = (callback, endmarker) |
if _callbacks.setdefault(self.id, dictvalue) != dictvalue: |
raise IOError("%r has callback already registered" %(self,)) |
self._items = None |
while 1: |
try: |
olditem = queue.get(block=False) |
except Queue.Empty: |
break |
else: |
if olditem is ENDMARKER: |
queue.put(olditem) |
break |
else: |
callback(olditem) |
if self._closed or self._receiveclosed.isSet(): |
|
self.gateway._channelfactory._close_callback(self.id) |
finally: |
lock.release() |
|
def __repr__(self): |
flag = self.isclosed() and "closed" or "open" |
return "<Channel id=%d %s>" % (self.id, flag) |
|
def __del__(self): |
if self.gateway is None: |
return |
self.gateway._trace("Channel(%d).__del__" % self.id) |
|
if self._closed: |
|
for error in self._remoteerrors: |
error.warn() |
elif self._receiveclosed.isSet(): |
|
|
pass |
else: |
|
if self._items is None: |
Msg = Message.CHANNEL_LAST_MESSAGE |
else: |
Msg = Message.CHANNEL_CLOSE |
self.gateway._outgoing.put(Msg(self.id)) |
|
def _getremoteerror(self): |
try: |
return self._remoteerrors.pop(0) |
except IndexError: |
return None |
|
|
|
|
def isclosed(self): |
""" return True if the channel is closed. A closed |
channel may still hold items. |
""" |
return self._closed |
|
def makefile(self, mode='w', proxyclose=False): |
""" return a file-like object. Only supported mode right |
now is 'w' for binary writes. If you want to have |
a subsequent file.close() mean to close the channel |
as well, then pass proxyclose=True. |
""" |
assert mode == 'w', "mode %r not availabe" %(mode,) |
return ChannelFile(channel=self, proxyclose=proxyclose) |
|
def close(self, error=None): |
""" close down this channel on both sides. """ |
if not self._closed: |
|
|
|
put = self.gateway._outgoing.put |
if error is not None: |
put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error))) |
else: |
put(Message.CHANNEL_CLOSE(self.id)) |
if isinstance(error, RemoteError): |
self._remoteerrors.append(error) |
self._closed = True |
self._receiveclosed.set() |
queue = self._items |
if queue is not None: |
queue.put(ENDMARKER) |
self.gateway._channelfactory._no_longer_opened(self.id) |
|
def waitclose(self, timeout=None): |
""" wait until this channel is closed (or the remote side |
otherwise signalled that no more data was being sent). |
The channel may still hold receiveable items, but not receive |
more. waitclose() reraises exceptions from executing code on |
the other side as channel.RemoteErrors containing a a textual |
representation of the remote traceback. |
""" |
self._receiveclosed.wait(timeout=timeout) |
if not self._receiveclosed.isSet(): |
raise IOError, "Timeout" |
error = self._getremoteerror() |
if error: |
raise error |
|
def send(self, item): |
"""sends the given item to the other side of the channel, |
possibly blocking if the sender queue is full. |
Note that an item needs to be marshallable. |
""" |
if self.isclosed(): |
raise IOError, "cannot send to %r" %(self,) |
if isinstance(item, Channel): |
data = Message.CHANNEL_NEW(self.id, item.id) |
else: |
data = Message.CHANNEL_DATA(self.id, item) |
self.gateway._outgoing.put(data) |
|
def receive(self): |
"""receives an item that was sent from the other side, |
possibly blocking if there is none. |
Note that exceptions from the other side will be |
reraised as channel.RemoteError exceptions containing |
a textual representation of the remote traceback. |
""" |
queue = self._items |
if queue is None: |
raise IOError("calling receive() on channel with receiver callback") |
x = queue.get() |
if x is ENDMARKER: |
queue.put(x) |
raise self._getremoteerror() or EOFError() |
else: |
return x |
|
def __iter__(self): |
return self |
|
def next(self): |
try: |
return self.receive() |
except EOFError: |
raise StopIteration |
|
|
|
|
|
ENDMARKER = object() |
|
class ChannelFactory(object): |
RemoteError = RemoteError |
|
def __init__(self, gateway, startcount=1): |
self._channels = weakref.WeakValueDictionary() |
self._callbacks = {} |
self._writelock = threading.Lock() |
self._receivelock = threading.RLock() |
self.gateway = gateway |
self.count = startcount |
self.finished = False |
|
def new(self, id=None): |
""" create a new Channel with 'id' (or create new id if None). """ |
self._writelock.acquire() |
try: |
if self.finished: |
raise IOError("connexion already closed: %s" % (self.gateway,)) |
if id is None: |
id = self.count |
self.count += 2 |
channel = Channel(self.gateway, id) |
self._channels[id] = channel |
return channel |
finally: |
self._writelock.release() |
|
def channels(self): |
return self._channels.values() |
|
|
|
|
def _no_longer_opened(self, id): |
try: |
del self._channels[id] |
except KeyError: |
pass |
self._close_callback(id) |
|
def _close_callback(self, id): |
try: |
callback, endmarker = self._callbacks.pop(id) |
except KeyError: |
pass |
else: |
if endmarker is not NO_ENDMARKER_WANTED: |
callback(endmarker) |
|
def _local_close(self, id, remoteerror=None): |
channel = self._channels.get(id) |
if channel is None: |
|
if remoteerror: |
remoteerror.warn() |
else: |
|
if remoteerror: |
channel._remoteerrors.append(remoteerror) |
channel._closed = True |
channel._receiveclosed.set() |
queue = channel._items |
if queue is not None: |
queue.put(ENDMARKER) |
self._no_longer_opened(id) |
|
def _local_last_message(self, id): |
channel = self._channels.get(id) |
if channel is None: |
|
pass |
else: |
|
channel._receiveclosed.set() |
queue = channel._items |
if queue is not None: |
queue.put(ENDMARKER) |
self._no_longer_opened(id) |
|
def _local_receive(self, id, data): |
|
self._receivelock.acquire() |
try: |
try: |
callback, endmarker = self._callbacks[id] |
except KeyError: |
channel = self._channels.get(id) |
queue = channel and channel._items |
if queue is None: |
pass |
else: |
queue.put(data) |
else: |
callback(data) |
finally: |
self._receivelock.release() |
|
def _finished_receiving(self): |
self._writelock.acquire() |
try: |
self.finished = True |
finally: |
self._writelock.release() |
for id in self._channels.keys(): |
self._local_last_message(id) |
for id in self._callbacks.keys(): |
self._close_callback(id) |
|
|
class ChannelFile: |
def __init__(self, channel, proxyclose=True): |
self.channel = channel |
self._proxyclose = proxyclose |
|
def write(self, out): |
self.channel.send(out) |
|
def flush(self): |
pass |
|
def close(self): |
if self._proxyclose: |
self.channel.close() |
|
def __repr__(self): |
state = self.channel.isclosed() and 'closed' or 'open' |
return '<ChannelFile %d %s>' %(self.channel.id, state) |
|
|