// ********************************************************************** // // Copyright (c) 2000 // Object Oriented Concepts, Inc. // Billerica, MA, USA // // All Rights Reserved // // ********************************************************************** #include #include #include #include #include #include #include #include #include #include #include #include #include #if defined(HAVE_POSIX_THREADS) #include #include #endif #ifdef HAVE_STD_IOSTREAM using namespace std; #endif #ifndef WIN32 // // This helper class is used to re-acquire the the recursive mutex. // It's in place to help with robustness in the event of // pthread_cancel. // class JTCCondHelper { JTCRecursiveMutex& mut_; int count_; public: JTCCondHelper(JTCRecursiveMutex& m, int c) : mut_(m), count_(c) { } ~JTCCondHelper() { // // We need to acquire the mutex rationally... We unlock the // mutexes critical section, and lock the mutex. Restore // saved information. NOTE: It's not correct to set the count // and the owner of the mutex, the lock API method MUST be // called. // pthread_mutex_unlock(&mut_.crit_); mut_.lock(count_); } }; #endif // ---------------------------------------------------------------------- // JTCCond private member implementation // ---------------------------------------------------------------------- // // Wait for the condition variable to be signalled. The posix threads // and NT threads implementation are essentially the same, except that // the NT threads implementation uses an NT semaphore instead of a // native condition variable. // // A timeout value of -1 means wait forever. Otherwise, only wait for // timeout milliseconds. // void JTCCond::_wait(long timeout) { #if defined(HAVE_POSIX_THREADS) || defined(HAVE_DCE_THREADS) // // The mutex is locked at this point. We need to acquire internal, // save count. The cond_wait will unlock the mutexes critical // section. // // To see that this code is correct, consider the following cases: // // - If no thread is currently attempting to lock the mutex then code // code is correct. // // - If another thread is attempting to unlock the mutex the // application is in error, since the thread calling wait owns the // mutex. // // - If thread B is attempting to lock the mutex, while thread A // is calling wait then the following cases need to be considered: // // - Thread B obtains lock on internal first. The mutex count // will be 0, the owner will be nullThreadId. (set below) // Thread B unlocks internal. Thread A can now obtain it's lock // on internal. It locks internal, discovers count is 0, // increments count, sets owner, and attempts to lock // crit_. This lock cannot proceed until pthread_cond_wait below // is called. // // - Thread A obtains lock on internal first. Count is non-zero, // and thread A does not own the mutex. Thread A attempts to // lock crit_, which blocks until pthread_cond_wait below is // called. // int count; { JTCSynchronized guard(mutex_.internal_); // // Save internal state information to be restored on method // exit. // count = mutex_.count_; mutex_.count_ = 0; mutex_.owner_ = JTCThreadId(); } // // Calculate the timeout period. // struct timespec abstime; if(timeout >= 0) { struct timeval tv; gettimeofday(&tv, 0); //123456789 - 10^9 const long oneBillion = 1000000000; abstime.tv_sec = tv.tv_sec + (timeout/1000); abstime.tv_nsec = (tv.tv_usec * 1000) + ((timeout%1000) * 1000000); if(abstime.tv_nsec > oneBillion) { ++abstime.tv_sec; abstime.tv_nsec -= oneBillion; } } try { JTCCondHelper unlock(mutex_, count); if(timeout < 0) { JTC_SYSCALL_2(pthread_cond_wait, &cond_, &mutex_.crit_, < 0); } else { JTC_SYSCALL_3(pthread_cond_timedwait, &cond_, &mutex_.crit_, &abstime, < 0); } } catch(const JTCSystemCallException& e) { // // EINTR is translated to InterruptedException. // if(e.getError() == EINTR) throw JTCInterruptedException(); // // EAGAIN means that the wait time has passed. // else if (e.getError() == EAGAIN) return; // // Rethrow caught exception. // throw; } #endif #if defined(HAVE_WIN32_THREADS) { JTCSynchronized guard(waitersMutex_); // // mutex_ is locked on entry. // ++waiters_; } int count; if(timeout < 0) timeout = INFINITE; // // Save state information. // { JTCSynchronized guard(mutex_.internal_); count = mutex_.count_; mutex_.count_ = 0; mutex_.owner_ = JTCThreadId(); LeaveCriticalSection(&mutex_.crit_); } try { JTC_SYSCALL_2(WaitForSingleObject, sema_, timeout, == WAIT_ABANDONED); } catch(const JTCSystemCallException&) { // // Restore the mutex lock count times. // { JTCSynchronized guard(waitersMutex_); --waiters_; } mutex_.lock(count); throw; } // // Restore the mutex lock count times. // { JTCSynchronized guard(waitersMutex_); --waiters_; if (waiters_ == 0 && haveBroadcast_) broadcastEvent_.post(); } mutex_.lock(count); #endif } // ---------------------------------------------------------------------- // JTCCond constructor and destructor // ---------------------------------------------------------------------- JTCCond::JTCCond(JTCRecursiveMutex& mut) : mutex_(mut) { #if defined(HAVE_POSIX_THREADS) JTC_SYSCALL_2(pthread_cond_init, &cond_, 0, != 0) #endif #if defined(HAVE_DCE_THREADS) JTC_SYSCALL_2(pthread_cond_init, &cond_, pthread_condattr_default, != 0) #endif #if defined(HAVE_WIN32_THREADS) haveBroadcast_ = false; waiters_ = 0; JTC_SYSCALL_4(sema_ = CreateSemaphore, 0, 0, 0x7fffffff, 0, == INVALID_HANDLE_VALUE) #endif } JTCCond::~JTCCond() { #if defined(HAVE_POSIX_THREADS) || defined(HAVE_DCE_THREADS) pthread_cond_destroy(&cond_); #endif #if defined(HAVE_WIN32_THREADS) CloseHandle(sema_); #endif } // ---------------------------------------------------------------------- // JTCMonitor public member implementation // ---------------------------------------------------------------------- // // Wait to be signalled for timeout milliseconds. // void JTCCond::wait(long timeout) { if(timeout < 0) throw JTCIllegalArgumentException("timeout value is negative"); _wait(timeout); } // // Wait to be signalled. // void JTCCond::wait() { _wait(-1); } // // Wake one waiting thread. // void JTCCond::signal() { #if defined(HAVE_POSIX_THREADS) || defined(HAVE_DCE_THREADS) JTC_SYSCALL_1(pthread_cond_signal,&cond_, != 0) #endif #if defined(HAVE_WIN32_THREADS) bool haveWaiters; { JTCSynchronized guard(waitersMutex_); haveWaiters = (waiters_ > 0); } if (haveWaiters) { JTC_SYSCALL_3(ReleaseSemaphore, sema_, 1, 0, == 0) } #endif } // // Wake all waiting threads. // void JTCCond::broadcast() { #if defined(HAVE_POSIX_THREADS) || defined(HAVE_DCE_THREADS) JTC_SYSCALL_1(pthread_cond_broadcast,&cond_, != 0) #endif #if defined(HAVE_WIN32_THREADS) bool haveWaiters; int numWaiters; { JTCSynchronized guard(waitersMutex_); haveWaiters = (waiters_ > 0); numWaiters = waiters_; } if (haveWaiters) { // // This is a broadcast event, so set the flag and reset // the Event synchronization object. // haveBroadcast_ = true; broadcastEvent_.reset(); // // Release the semaphore waiters_ times. // JTC_SYSCALL_3(ReleaseSemaphore, sema_, numWaiters, 0, == 0) // // Wait for each thread to wake. // After reset the haveBroadcast_ flag. // broadcastEvent_.wait(); haveBroadcast_ = false; } #endif }