Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members

wait.cpp

00001 // wait.cpp - written and placed in the public domain by Wei Dai 00002 00003 #include "pch.h" 00004 #include "wait.h" 00005 #include "misc.h" 00006 00007 #ifdef SOCKETS_AVAILABLE 00008 00009 #ifdef USE_BERKELEY_STYLE_SOCKETS 00010 #include <errno.h> 00011 #include <sys/types.h> 00012 #include <sys/time.h> 00013 #include <unistd.h> 00014 #endif 00015 00016 NAMESPACE_BEGIN(CryptoPP) 00017 00018 WaitObjectContainer::WaitObjectContainer() 00019 { 00020 Clear(); 00021 } 00022 00023 void WaitObjectContainer::Clear() 00024 { 00025 #ifdef USE_WINDOWS_STYLE_SOCKETS 00026 m_handles.clear(); 00027 #else 00028 m_maxFd = 0; 00029 FD_ZERO(&m_readfds); 00030 FD_ZERO(&m_writefds); 00031 #endif 00032 m_noWait = false; 00033 } 00034 00035 #ifdef USE_WINDOWS_STYLE_SOCKETS 00036 00037 struct WaitingThreadData 00038 { 00039 bool waitingToWait, terminate; 00040 HANDLE startWaiting, stopWaiting; 00041 const HANDLE *waitHandles; 00042 unsigned int count; 00043 HANDLE threadHandle; 00044 DWORD threadId; 00045 DWORD* error; 00046 }; 00047 00048 WaitObjectContainer::~WaitObjectContainer() 00049 { 00050 try // don't let exceptions escape destructor 00051 { 00052 if (!m_threads.empty()) 00053 { 00054 HANDLE threadHandles[MAXIMUM_WAIT_OBJECTS]; 00055 unsigned int i; 00056 for (i=0; i<m_threads.size(); i++) 00057 { 00058 WaitingThreadData &thread = *m_threads[i]; 00059 while (!thread.waitingToWait) // spin until thread is in the initial "waiting to wait" state 00060 Sleep(0); 00061 thread.terminate = true; 00062 threadHandles[i] = thread.threadHandle; 00063 } 00064 PulseEvent(m_startWaiting); 00065 ::WaitForMultipleObjects(m_threads.size(), threadHandles, TRUE, INFINITE); 00066 for (i=0; i<m_threads.size(); i++) 00067 CloseHandle(threadHandles[i]); 00068 CloseHandle(m_startWaiting); 00069 CloseHandle(m_stopWaiting); 00070 } 00071 } 00072 catch (...) 00073 { 00074 } 00075 } 00076 00077 00078 void WaitObjectContainer::AddHandle(HANDLE handle) 00079 { 00080 m_handles.push_back(handle); 00081 } 00082 00083 DWORD WINAPI WaitingThread(LPVOID lParam) 00084 { 00085 std::auto_ptr<WaitingThreadData> pThread((WaitingThreadData *)lParam); 00086 WaitingThreadData &thread = *pThread; 00087 std::vector<HANDLE> handles; 00088 00089 while (true) 00090 { 00091 thread.waitingToWait = true; 00092 ::WaitForSingleObject(thread.startWaiting, INFINITE); 00093 thread.waitingToWait = false; 00094 00095 if (thread.terminate) 00096 break; 00097 if (!thread.count) 00098 continue; 00099 00100 handles.resize(thread.count + 1); 00101 handles[0] = thread.stopWaiting; 00102 std::copy(thread.waitHandles, thread.waitHandles+thread.count, handles.begin()+1); 00103 00104 DWORD result = ::WaitForMultipleObjects(handles.size(), &handles[0], FALSE, INFINITE); 00105 00106 if (result == WAIT_OBJECT_0) 00107 continue; // another thread finished waiting first, so do nothing 00108 SetEvent(thread.stopWaiting); 00109 if (!(result > WAIT_OBJECT_0 && result < WAIT_OBJECT_0 + handles.size())) 00110 { 00111 assert(!"error in WaitingThread"); // break here so we can see which thread has an error 00112 *thread.error = ::GetLastError(); 00113 } 00114 } 00115 00116 return S_OK; // return a value here to avoid compiler warning 00117 } 00118 00119 void WaitObjectContainer::CreateThreads(unsigned int count) 00120 { 00121 unsigned int currentCount = m_threads.size(); 00122 if (currentCount == 0) 00123 { 00124 m_startWaiting = ::CreateEvent(NULL, TRUE, FALSE, NULL); 00125 m_stopWaiting = ::CreateEvent(NULL, TRUE, FALSE, NULL); 00126 } 00127 00128 if (currentCount < count) 00129 { 00130 m_threads.resize(count); 00131 for (unsigned int i=currentCount; i<count; i++) 00132 { 00133 m_threads[i] = new WaitingThreadData; 00134 WaitingThreadData &thread = *m_threads[i]; 00135 thread.terminate = false; 00136 thread.startWaiting = m_startWaiting; 00137 thread.stopWaiting = m_stopWaiting; 00138 thread.waitingToWait = false; 00139 thread.threadHandle = CreateThread(NULL, 0, &WaitingThread, &thread, 0, &thread.threadId); 00140 } 00141 } 00142 } 00143 00144 bool WaitObjectContainer::Wait(unsigned long milliseconds) 00145 { 00146 if (m_noWait || m_handles.empty()) 00147 return true; 00148 00149 if (m_handles.size() > MAXIMUM_WAIT_OBJECTS) 00150 { 00151 // too many wait objects for a single WaitForMultipleObjects call, so use multiple threads 00152 static const unsigned int WAIT_OBJECTS_PER_THREAD = MAXIMUM_WAIT_OBJECTS-1; 00153 unsigned int nThreads = (m_handles.size() + WAIT_OBJECTS_PER_THREAD - 1) / WAIT_OBJECTS_PER_THREAD; 00154 if (nThreads > MAXIMUM_WAIT_OBJECTS) // still too many wait objects, maybe implement recursive threading later? 00155 throw Err("WaitObjectContainer: number of wait objects exceeds limit"); 00156 CreateThreads(nThreads); 00157 DWORD error = S_OK; 00158 00159 for (unsigned int i=0; i<m_threads.size(); i++) 00160 { 00161 WaitingThreadData &thread = *m_threads[i]; 00162 while (!thread.waitingToWait) // spin until thread is in the initial "waiting to wait" state 00163 Sleep(0); 00164 if (i<nThreads) 00165 { 00166 thread.waitHandles = &m_handles[i*WAIT_OBJECTS_PER_THREAD]; 00167 thread.count = STDMIN(WAIT_OBJECTS_PER_THREAD, m_handles.size() - i*WAIT_OBJECTS_PER_THREAD); 00168 thread.error = &error; 00169 } 00170 else 00171 thread.count = 0; 00172 } 00173 00174 ResetEvent(m_stopWaiting); 00175 PulseEvent(m_startWaiting); 00176 00177 DWORD result = ::WaitForSingleObject(m_stopWaiting, milliseconds); 00178 if (result == WAIT_OBJECT_0) 00179 { 00180 if (error == S_OK) 00181 return true; 00182 else 00183 throw Err("WaitObjectContainer: WaitForMultipleObjects failed with error " + IntToString(error)); 00184 } 00185 SetEvent(m_stopWaiting); 00186 if (result == WAIT_TIMEOUT) 00187 return false; 00188 else 00189 throw Err("WaitObjectContainer: WaitForSingleObject failed with error " + IntToString(::GetLastError())); 00190 } 00191 else 00192 { 00193 DWORD result = ::WaitForMultipleObjects(m_handles.size(), &m_handles[0], FALSE, milliseconds); 00194 if (result >= WAIT_OBJECT_0 && result < WAIT_OBJECT_0 + m_handles.size()) 00195 return true; 00196 else if (result == WAIT_TIMEOUT) 00197 return false; 00198 else 00199 throw Err("WaitObjectContainer: WaitForMultipleObjects failed with error " + IntToString(::GetLastError())); 00200 } 00201 } 00202 00203 #else 00204 00205 void WaitObjectContainer::AddReadFd(int fd) 00206 { 00207 FD_SET(fd, &m_readfds); 00208 m_maxFd = STDMAX(m_maxFd, fd); 00209 } 00210 00211 void WaitObjectContainer::AddWriteFd(int fd) 00212 { 00213 FD_SET(fd, &m_writefds); 00214 m_maxFd = STDMAX(m_maxFd, fd); 00215 } 00216 00217 bool WaitObjectContainer::Wait(unsigned long milliseconds) 00218 { 00219 if (m_noWait || m_maxFd == 0) 00220 return true; 00221 00222 timeval tv, *timeout; 00223 00224 if (milliseconds == INFINITE_TIME) 00225 timeout = NULL; 00226 else 00227 { 00228 tv.tv_sec = milliseconds / 1000; 00229 tv.tv_usec = (milliseconds % 1000) * 1000; 00230 timeout = &tv; 00231 } 00232 00233 int result = select(m_maxFd+1, &m_readfds, &m_writefds, NULL, timeout); 00234 00235 if (result > 0) 00236 return true; 00237 else if (result == 0) 00238 return false; 00239 else 00240 throw Err("WaitObjectContainer: select failed with error " + errno); 00241 } 00242 00243 #endif 00244 00245 // ******************************************************** 00246 00247 bool Waitable::Wait(unsigned long milliseconds) 00248 { 00249 WaitObjectContainer container; 00250 GetWaitObjects(container); 00251 return container.Wait(milliseconds); 00252 } 00253 00254 NAMESPACE_END 00255 00256 #endif

Generated on Fri Aug 13 09:56:55 2004 for Crypto++ by doxygen 1.3.7