from __future__ import generators |
import os, sys, time, signal |
import py |
from py.__.execnet import gateway |
from py.__.conftest import option |
mypath = py.magic.autopath() |
|
from StringIO import StringIO |
from py.__.execnet.register import startup_modules, getsource |
|
TESTTIMEOUT = 10.0 |
|
def test_getsource_import_modules(): |
for dottedname in startup_modules: |
yield getsource, dottedname |
|
def test_getsource_no_colision(): |
seen = {} |
for dottedname in startup_modules: |
mod = __import__(dottedname, None, None, ['__doc__']) |
for name, value in vars(mod).items(): |
if py.std.inspect.isclass(value): |
if name in seen: |
olddottedname, oldval = seen[name] |
if oldval is not value: |
py.test.fail("duplicate class %r in %s and %s" % |
(name, dottedname, olddottedname)) |
seen[name] = (dottedname, value) |
|
def test_stdouterrin_setnull(): |
cap = py.io.StdCaptureFD() |
from py.__.execnet.register import stdouterrin_setnull |
stdouterrin_setnull() |
import os |
os.write(1, "hello") |
if os.name == "nt": |
os.write(2, "world") |
os.read(0, 1) |
out, err = cap.reset() |
assert not out |
assert not err |
|
|
class TestMessage: |
def test_wire_protocol(self): |
for cls in gateway.Message._types.values(): |
one = StringIO() |
cls(42, '23').writeto(one) |
two = StringIO(one.getvalue()) |
msg = gateway.Message.readfrom(two) |
assert isinstance(msg, cls) |
assert msg.channelid == 42 |
assert msg.data == '23' |
assert isinstance(repr(msg), str) |
|
|
class TestPureChannel: |
def setup_method(self, method): |
self.fac = gateway.ChannelFactory(None) |
|
def test_factory_create(self): |
chan1 = self.fac.new() |
assert chan1.id == 1 |
chan2 = self.fac.new() |
assert chan2.id == 3 |
|
def test_factory_getitem(self): |
chan1 = self.fac.new() |
assert self.fac._channels[chan1.id] == chan1 |
chan2 = self.fac.new() |
assert self.fac._channels[chan2.id] == chan2 |
|
def test_channel_timeouterror(self): |
channel = self.fac.new() |
py.test.raises(IOError, channel.waitclose, timeout=0.01) |
|
class PopenGatewayTestSetup: |
def setup_class(cls): |
cls.gw = py.execnet.PopenGateway() |
|
|
|
|
class BasicRemoteExecution: |
def test_correct_setup(self): |
assert self.gw._receiverthread.isAlive() |
|
def test_repr_doesnt_crash(self): |
assert isinstance(repr(self), str) |
|
def test_correct_setup_no_py(self): |
channel = self.gw.remote_exec(""" |
import sys |
channel.send(sys.modules.keys()) |
""") |
remotemodules = channel.receive() |
assert 'py' not in remotemodules, ( |
"py should not be imported on remote side") |
|
def test_remote_exec_waitclose(self): |
channel = self.gw.remote_exec('pass') |
channel.waitclose(TESTTIMEOUT) |
|
def test_remote_exec_waitclose_2(self): |
channel = self.gw.remote_exec('def gccycle(): pass') |
channel.waitclose(TESTTIMEOUT) |
|
def test_remote_exec_waitclose_noarg(self): |
channel = self.gw.remote_exec('pass') |
channel.waitclose() |
|
def test_remote_exec_error_after_close(self): |
channel = self.gw.remote_exec('pass') |
channel.waitclose(TESTTIMEOUT) |
py.test.raises(IOError, channel.send, 0) |
|
def test_remote_exec_channel_anonymous(self): |
channel = self.gw.remote_exec(''' |
obj = channel.receive() |
channel.send(obj) |
''') |
channel.send(42) |
result = channel.receive() |
assert result == 42 |
|
def test_channel_close_and_then_receive_error(self): |
channel = self.gw.remote_exec('raise ValueError') |
py.test.raises(channel.RemoteError, channel.receive) |
|
def test_channel_finish_and_then_EOFError(self): |
channel = self.gw.remote_exec('channel.send(42)') |
x = channel.receive() |
assert x == 42 |
py.test.raises(EOFError, channel.receive) |
py.test.raises(EOFError, channel.receive) |
py.test.raises(EOFError, channel.receive) |
|
def test_channel_close_and_then_receive_error_multiple(self): |
channel = self.gw.remote_exec('channel.send(42) ; raise ValueError') |
x = channel.receive() |
assert x == 42 |
py.test.raises(channel.RemoteError, channel.receive) |
|
def test_channel__local_close(self): |
channel = self.gw._channelfactory.new() |
self.gw._channelfactory._local_close(channel.id) |
channel.waitclose(0.1) |
|
def test_channel__local_close_error(self): |
channel = self.gw._channelfactory.new() |
self.gw._channelfactory._local_close(channel.id, |
channel.RemoteError("error")) |
py.test.raises(channel.RemoteError, channel.waitclose, 0.01) |
|
def test_channel_error_reporting(self): |
channel = self.gw.remote_exec('def foo():\n return foobar()\nfoo()\n') |
try: |
channel.receive() |
except channel.RemoteError, e: |
assert str(e).startswith('Traceback (most recent call last):') |
assert str(e).find('NameError: global name \'foobar\' ' |
'is not defined') > -1 |
else: |
py.test.fail('No exception raised') |
|
def test_channel_syntax_error(self): |
|
channel = self.gw.remote_exec('def foo()\n return 1\nfoo()\n') |
try: |
channel.receive() |
except channel.RemoteError, e: |
assert str(e).startswith('Traceback (most recent call last):') |
assert str(e).find('SyntaxError') > -1 |
|
def test_channel_iter(self): |
channel = self.gw.remote_exec(""" |
for x in range(3): |
channel.send(x) |
""") |
l = list(channel) |
assert l == range(3) |
|
def test_channel_passing_over_channel(self): |
channel = self.gw.remote_exec(''' |
c = channel.gateway.newchannel() |
channel.send(c) |
c.send(42) |
''') |
c = channel.receive() |
x = c.receive() |
assert x == 42 |
|
|
channel.waitclose(TESTTIMEOUT) |
|
newchan = self.gw.remote_exec(''' |
assert %d not in channel.gateway._channelfactory._channels |
''' % (channel.id)) |
newchan.waitclose(TESTTIMEOUT) |
assert channel.id not in self.gw._channelfactory._channels |
|
def test_channel_receiver_callback(self): |
l = [] |
|
channel = self.gw.remote_exec(source=''' |
channel.send(42) |
channel.send(13) |
channel.send(channel.gateway.newchannel()) |
''') |
channel.setcallback(callback=l.append) |
py.test.raises(IOError, channel.receive) |
channel.waitclose(TESTTIMEOUT) |
assert len(l) == 3 |
assert l[:2] == [42,13] |
assert isinstance(l[2], channel.__class__) |
|
def test_channel_callback_after_receive(self): |
l = [] |
channel = self.gw.remote_exec(source=''' |
channel.send(42) |
channel.send(13) |
channel.send(channel.gateway.newchannel()) |
''') |
x = channel.receive() |
assert x == 42 |
channel.setcallback(callback=l.append) |
py.test.raises(IOError, channel.receive) |
channel.waitclose(TESTTIMEOUT) |
assert len(l) == 2 |
assert l[0] == 13 |
assert isinstance(l[1], channel.__class__) |
|
def test_waiting_for_callbacks(self): |
l = [] |
def callback(msg): |
import time; time.sleep(0.2) |
l.append(msg) |
channel = self.gw.remote_exec(source=''' |
channel.send(42) |
''') |
channel.setcallback(callback) |
channel.waitclose(TESTTIMEOUT) |
assert l == [42] |
|
def test_channel_callback_stays_active(self, earlyfree=True): |
|
l = [] |
channel = self.gw.remote_exec(source=''' |
import thread, time |
def producer(subchannel): |
for i in range(5): |
time.sleep(0.15) |
subchannel.send(i*100) |
channel2 = channel.receive() |
thread.start_new_thread(producer, (channel2,)) |
del channel2 |
''') |
subchannel = self.gw.newchannel() |
subchannel.setcallback(l.append) |
channel.send(subchannel) |
if earlyfree: |
subchannel = None |
counter = 100 |
while len(l) < 5: |
if subchannel and subchannel.isclosed(): |
break |
counter -= 1 |
print counter |
if not counter: |
py.test.fail("timed out waiting for the answer[%d]" % len(l)) |
time.sleep(0.04) |
assert l == [0, 100, 200, 300, 400] |
return subchannel |
|
def test_channel_callback_remote_freed(self): |
channel = self.test_channel_callback_stays_active(False) |
channel.waitclose(TESTTIMEOUT) |
|
def test_channel_endmarker_callback(self): |
l = [] |
channel = self.gw.remote_exec(source=''' |
channel.send(42) |
channel.send(13) |
channel.send(channel.gateway.newchannel()) |
''') |
channel.setcallback(l.append, 999) |
py.test.raises(IOError, channel.receive) |
channel.waitclose(TESTTIMEOUT) |
assert len(l) == 4 |
assert l[:2] == [42,13] |
assert isinstance(l[2], channel.__class__) |
assert l[3] == 999 |
|
def test_remote_redirect_stdout(self): |
out = py.std.StringIO.StringIO() |
handle = self.gw._remote_redirect(stdout=out) |
c = self.gw.remote_exec("print 42") |
c.waitclose(TESTTIMEOUT) |
handle.close() |
s = out.getvalue() |
assert s.strip() == "42" |
|
def test_remote_exec_redirect_multi(self): |
num = 3 |
l = [[] for x in range(num)] |
channels = [self.gw.remote_exec("print %d" % i, |
stdout=l[i].append) |
for i in range(num)] |
for x in channels: |
x.waitclose(TESTTIMEOUT) |
|
for i in range(num): |
subl = l[i] |
assert subl |
s = subl[0] |
assert s.strip() == str(i) |
|
def test_channel_file(self): |
channel = self.gw.remote_exec(""" |
f = channel.makefile() |
print >>f, "hello world" |
f.close() |
channel.send(42) |
""") |
first = channel.receive() + channel.receive() |
assert first.strip() == 'hello world' |
second = channel.receive() |
assert second == 42 |
|
def test_channel_file_write_error(self): |
channel = self.gw.remote_exec("pass") |
f = channel.makefile() |
channel.waitclose(TESTTIMEOUT) |
py.test.raises(IOError, f.write, 'hello') |
|
def test_channel_file_proxyclose(self): |
channel = self.gw.remote_exec(""" |
f = channel.makefile(proxyclose=True) |
print >>f, "hello world" |
f.close() |
channel.send(42) |
""") |
first = channel.receive() + channel.receive() |
assert first.strip() == 'hello world' |
py.test.raises(EOFError, channel.receive) |
|
def test_confusion_from_os_write_stdout(self): |
channel = self.gw.remote_exec(""" |
import os |
os.write(1, 'confusion!') |
channel.send(channel.receive() * 6) |
channel.send(channel.receive() * 6) |
""") |
channel.send(3) |
res = channel.receive() |
assert res == 18 |
channel.send(7) |
res = channel.receive() |
assert res == 42 |
|
def test_confusion_from_os_write_stderr(self): |
channel = self.gw.remote_exec(""" |
import os |
os.write(2, 'test') |
channel.send(channel.receive() * 6) |
channel.send(channel.receive() * 6) |
""") |
channel.send(3) |
res = channel.receive() |
assert res == 18 |
channel.send(7) |
res = channel.receive() |
assert res == 42 |
|
def test_non_reverse_execution(self): |
gw = self.gw |
c1 = gw.remote_exec(""" |
c = channel.gateway.remote_exec("pass") |
try: |
c.waitclose() |
except c.RemoteError, e: |
channel.send(str(e)) |
""") |
text = c1.receive() |
assert text.find("execution disallowed") != -1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution): |
|
def test_chdir_separation(self): |
old = py.test.ensuretemp('chdirtest').chdir() |
try: |
gw = py.execnet.PopenGateway() |
finally: |
waschangedir = old.chdir() |
c = gw.remote_exec("import os ; channel.send(os.getcwd())") |
x = c.receive() |
assert x == str(waschangedir) |
|
def test_many_popen(self): |
num = 4 |
l = [] |
for i in range(num): |
l.append(py.execnet.PopenGateway()) |
channels = [] |
for gw in l: |
channel = gw.remote_exec("""channel.send(42)""") |
channels.append(channel) |
|
|
|
|
|
|
|
|
|
|
|
while channels: |
channel = channels.pop() |
ret = channel.receive() |
assert ret == 42 |
|
def disabled_test_remote_is_killed_while_sending(self): |
gw = py.execnet.PopenGateway() |
channel = gw.remote_exec(""" |
import os |
import time |
channel.send(os.getppid()) |
channel.send(os.getpid()) |
while 1: |
channel.send('#'*1000) |
time.sleep(10) |
""") |
parent = channel.receive() |
remote = channel.receive() |
assert parent == os.getpid() |
time.sleep(0.5) |
os.kill(remote, signal.SIGKILL) |
time.sleep(1) |
channel.waitclose(TESTTIMEOUT) |
py.test.raises(EOFError, channel.receive) |
|
|
|
|
|
class SocketGatewaySetup: |
def setup_class(cls): |
|
cls.proxygw = py.execnet.PopenGateway() |
cls.gw = py.execnet.SocketGateway.new_remote(cls.proxygw, |
("127.0.0.1", 0) |
) |
|
|
|
|
|
class TestSocketGateway(SocketGatewaySetup, BasicRemoteExecution): |
pass |
|
class TestSshGateway(BasicRemoteExecution): |
def setup_class(cls): |
if option.sshtarget is None: |
py.test.skip("no known ssh target, use -S to set one") |
cls.gw = py.execnet.SshGateway(option.sshtarget) |
|
def test_sshaddress(self): |
assert self.gw.remoteaddress == option.sshtarget |
|
def test_failed_connexion(self): |
gw = py.execnet.SshGateway('nowhere.codespeak.net') |
try: |
channel = gw.remote_exec("...") |
except IOError: |
pass |
else: |
|
py.test.raises(EOFError, channel.receive) |
|
py.test.raises(IOError, gw.remote_exec, "...") |
|
def test_threads(): |
gw = py.execnet.PopenGateway() |
gw.remote_init_threads(3) |
c1 = gw.remote_exec("channel.send(channel.receive())") |
c2 = gw.remote_exec("channel.send(channel.receive())") |
c2.send(1) |
res = c2.receive() |
assert res == 1 |
c1.send(42) |
res = c1.receive() |
assert res == 42 |
gw.exit() |
|
def test_threads_twice(): |
gw = py.execnet.PopenGateway() |
gw.remote_init_threads(3) |
py.test.raises(IOError, gw.remote_init_threads, 3) |
gw.exit() |
|
|
|