/// ThreadPool with std::thread for threads. using FreeThreadPool = ThreadPoolImpl<std::thread>; class GlobalThreadPool : public FreeThreadPool, private boost::noncopyable
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds) { .... /// Check if there are enough threads to process job. if (threads.size() < std::min(max_threads, scheduled_jobs + 1)) { try { threads.emplace_front(); } catch (...) { /// Most likely this is a std::bad_alloc exception return on_error("cannot allocate thread slot"); }
void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_it) { while (true) { /// This is inside the loop to also reset previous thread names set inside the jobs. setThreadName("ThreadPool");
if (!jobs.empty()) { /// boost::priority_queue does not provide interface for getting non-const reference to an element /// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority::job. job = std::move(const_cast<Job &>(jobs.top().job)); jobs.pop(); } else { /// shutdown is true, simply finish the thread. return; }
}
if (!need_shutdown) { try { ... job(); // 执行真正的任务 /// job should be reset before decrementing scheduled_jobs to /// ensure that the Job destroyed before wait() returns. job = {}; } catch (...) { /// job should be reset before decrementing scheduled_jobs to /// ensure that the Job destroyed before wait() returns. job = {};
{ std::unique_lock lock(mutex); if (!first_exception) first_exception = std::current_exception(); // NOLINT if (shutdown_on_exception) shutdown = true; --scheduled_jobs; }
ClickHouse的线程实现为类ThreadFromGlobalPool,使用方法类似于std::thread,只不过添加了ThreadStatus for ClickHouse。ThreadFromGlobalPool的核心方法为他的构造函数。在创建ThreadFromGlobalPool对象时,同时也向GlobalThreadPool提交了job(方法scheduleorThrow中会调用上面讲到的scheduleImpl方法,从而将任务提交到线程池中去)
/// This moves are needed to destroy function and arguments before exit. /// It will guarantee that after ThreadFromGlobalPool::join all captured params are destroyed. auto function = std::move(func); auto arguments = std::move(args);
/// Thread status holds raw pointer on query context, thus it always must be destroyed /// before sending signal that permits to join this thread. DB::ThreadStatus thread_status; std::apply(function, arguments); });