diff --git a/lib/base/threadpool.cpp b/lib/base/threadpool.cpp index 71e5aad54..cfefdee14 100644 --- a/lib/base/threadpool.cpp +++ b/lib/base/threadpool.cpp @@ -71,23 +71,13 @@ void ThreadPool::Join(void) lock.lock(); } - int alive; + for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { + lock.unlock(); + m_Threads[i].Thread.join(); + lock.lock(); - do { - alive = 0; - for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) { - if (m_ThreadStats[i].State != ThreadDead) { - alive++; - KillWorker(); - } - } - - if (alive > 0) { - lock.unlock(); - Utility::Sleep(0.5); - lock.lock(); - } - } while (alive > 0); + m_Threads[i].State = ThreadDead; + } m_ManagerThread.join(); m_StatsThread.join(); @@ -110,10 +100,10 @@ void ThreadPool::QueueThreadProc(int tid) UpdateThreadUtilization(tid, ThreadIdle); - while (m_WorkItems.empty() && !m_Stopped && !m_ThreadStats[tid].Zombie) + while (m_WorkItems.empty() && !m_Stopped && !m_Threads[tid].Zombie) m_WorkCV.wait(lock); - if (m_ThreadStats[tid].Zombie) + if (m_Threads[tid].Zombie) break; if (m_WorkItems.empty() && m_Stopped) @@ -194,7 +184,7 @@ void ThreadPool::QueueThreadProc(int tid) boost::mutex::scoped_lock lock(m_Mutex); UpdateThreadUtilization(tid, ThreadDead); - m_ThreadStats[tid].Zombie = false; + m_Threads[tid].Zombie = false; } /** @@ -246,10 +236,10 @@ void ThreadPool::ManagerThreadProc(void) alive = 0; - for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) { - if (m_ThreadStats[i].State != ThreadDead && !m_ThreadStats[i].Zombie) { + for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { + if (m_Threads[i].State != ThreadDead && !m_Threads[i].Zombie) { alive++; - utilization += m_ThreadStats[i].Utilization * 100; + utilization += m_Threads[i].Utilization * 100; } } @@ -307,13 +297,14 @@ void ThreadPool::ManagerThreadProc(void) */ void ThreadPool::SpawnWorker(void) { - for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) { - if (m_ThreadStats[i].State == ThreadDead) { + for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { + if (m_Threads[i].State == ThreadDead) { Log(LogDebug, "debug", "Spawning worker thread."); - m_ThreadStats[i] = ThreadStats(ThreadIdle); - boost::thread worker(boost::bind(&ThreadPool::QueueThreadProc, this, i)); - worker.detach(); + m_Threads[i].State = ThreadIdle; + + boost::thread thread(boost::bind(&ThreadPool::QueueThreadProc, this, i)); + m_Threads[i].Thread = boost::move(thread); break; } @@ -325,11 +316,11 @@ void ThreadPool::SpawnWorker(void) */ void ThreadPool::KillWorker(void) { - for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) { - if (m_ThreadStats[i].State == ThreadIdle && !m_ThreadStats[i].Zombie) { + for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { + if (m_Threads[i].State == ThreadIdle && !m_Threads[i].Zombie) { Log(LogDebug, "base", "Killing worker thread."); - m_ThreadStats[i].Zombie = true; + m_Threads[i].Zombie = true; m_WorkCV.notify_all(); break; @@ -352,7 +343,7 @@ void ThreadPool::StatsThreadProc(void) if (m_Stopped) break; - for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) + for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) UpdateThreadUtilization(i); } } @@ -364,7 +355,7 @@ void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state) { double utilization; - switch (m_ThreadStats[tid].State) { + switch (m_Threads[tid].State) { case ThreadDead: return; case ThreadIdle: @@ -378,16 +369,16 @@ void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state) } double now = Utility::GetTime(); - double time = now - m_ThreadStats[tid].LastUpdate; + double time = now - m_Threads[tid].LastUpdate; const double avg_time = 5.0; if (time > avg_time) time = avg_time; - m_ThreadStats[tid].Utilization = (m_ThreadStats[tid].Utilization * (avg_time - time) + utilization * time) / avg_time; - m_ThreadStats[tid].LastUpdate = now; + m_Threads[tid].Utilization = (m_Threads[tid].Utilization * (avg_time - time) + utilization * time) / avg_time; + m_Threads[tid].LastUpdate = now; if (state != ThreadUnspecified) - m_ThreadStats[tid].State = state; + m_Threads[tid].State = state; } diff --git a/lib/base/threadpool.h b/lib/base/threadpool.h index 7e4ae6733..c59960e42 100644 --- a/lib/base/threadpool.h +++ b/lib/base/threadpool.h @@ -57,14 +57,15 @@ private: ThreadBusy }; - struct ThreadStats + struct WorkerThread { + boost::thread Thread; ThreadState State; bool Zombie; double Utilization; double LastUpdate; - ThreadStats(ThreadState state = ThreadDead) + WorkerThread(ThreadState state = ThreadDead) : State(state), Zombie(false), Utilization(0), LastUpdate(0) { } }; @@ -72,7 +73,7 @@ private: int m_ID; static int m_NextID; - ThreadStats m_ThreadStats[512]; + WorkerThread m_Threads[512]; boost::thread m_ManagerThread; boost::thread m_StatsThread;