def test_threadout_multi_and_default(self): |
num = 3 |
defaults = [] |
def f(l): |
self.out.setwritefunc(l.append) |
print id(l), |
self.out.delwritefunc() |
print 1 |
self.out.setdefaultwriter(defaults.append) |
-> pool = WorkerPool() |
listlist = [] |
for x in range(num): |
l = [] |
listlist.append(l) |
pool.dispatch(f, l) |
pool.shutdown() |
for name, value in self.out.__dict__.items(): |
print >>sys.stderr, "%s: %s" %(name, value) |
pool.join(2.0) |
for i in range(num): |
item = listlist[i] |
assert item ==[str(id(item))] |
assert not self.out._tid2out |
assert defaults |
expect = ['1' for x in range(num)] |
defaults = [x for x in defaults if x.strip()] |
assert defaults == expect |