00001
00002
00003
00004
00005
00006
00007 #include "wvsubprocqueue.h"
00008 #include <unistd.h>
00009 #include <assert.h>
00010
00011
00012 WvSubProcQueue::WvSubProcQueue(unsigned _maxrunning)
00013 {
00014 maxrunning = _maxrunning;
00015 }
00016
00017
00018 WvSubProcQueue::~WvSubProcQueue()
00019 {
00020 }
00021
00022
00023 void WvSubProcQueue::add(void *cookie, WvSubProc *proc)
00024 {
00025 assert(proc);
00026 assert(!proc->running);
00027 if (cookie)
00028 {
00029
00030 EntList::Iter i(waitq);
00031 for (i.rewind(); i.next(); )
00032 {
00033 if (i->cookie == cookie)
00034 {
00035
00036
00037
00038
00039
00040 Ent *e = i.ptr();
00041 if (i.next())
00042 e->redo = true;
00043 delete proc;
00044 return;
00045 }
00046 }
00047 }
00048
00049 waitq.append(new Ent(cookie, proc), true);
00050 }
00051
00052
00053 void WvSubProcQueue::add(void *cookie,
00054 const char *cmd, const char * const *argv)
00055 {
00056 WvSubProc *p = new WvSubProc;
00057 p->preparev(cmd, argv);
00058 add(cookie, p);
00059 }
00060
00061
00062 bool WvSubProcQueue::cookie_running()
00063 {
00064 EntList::Iter i(runq);
00065 for (i.rewind(); i.next(); )
00066 if (i->cookie)
00067 return true;
00068 return false;
00069 }
00070
00071
00072 int WvSubProcQueue::go()
00073 {
00074 int started = 0;
00075
00076
00077
00078
00079
00080 {
00081 EntList::Iter i(runq);
00082 for (i.rewind(); i.next(); )
00083 {
00084 Ent *e = i.ptr();
00085
00086 e->proc->wait(0, true);
00087 if (!e->proc->running)
00088 {
00089 if (e->redo)
00090 {
00091
00092
00093 e->redo = false;
00094 i.xunlink(false);
00095 waitq.append(e, true);
00096 }
00097 else
00098 i.xunlink();
00099 }
00100 }
00101 }
00102
00103 while (!waitq.isempty() && runq.count() < maxrunning)
00104 {
00105 EntList::Iter i(waitq);
00106 for (i.rewind(); i.next(); )
00107 {
00108
00109
00110
00111
00112 if (i->cookie && !runq.isempty())
00113 goto out;
00114 if (cookie_running())
00115 goto out;
00116
00117
00118
00119 Ent *e = i.ptr();
00120 i.xunlink(false);
00121 runq.append(e, true);
00122 e->proc->start_again();
00123 started++;
00124 break;
00125 }
00126 }
00127
00128 out:
00129 assert(runq.count() <= maxrunning);
00130 return started;
00131 }
00132
00133
00134 unsigned WvSubProcQueue::running() const
00135 {
00136 return runq.count();
00137 }
00138
00139
00140 unsigned WvSubProcQueue::remaining() const
00141 {
00142 return runq.count() + waitq.count();
00143 }
00144
00145
00146 bool WvSubProcQueue::isempty() const
00147 {
00148 return runq.isempty() && waitq.isempty();
00149 }
00150
00151
00152 void WvSubProcQueue::finish()
00153 {
00154 while (!isempty())
00155 {
00156 go();
00157 if (!isempty())
00158 usleep(100*1000);
00159 }
00160 }