class IProcessor { protected: InputPorts inputs; OutputPorts outputs; public: virtual Status prepare() { throw Exception("Method 'prepare' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED); }
/** Method 'prepare' is responsible for all cheap ("instantaneous": O(1) of data volume, no wait) calculations. * * It may access input and output ports, * indicate the need for work by another processor by returning NeedData or PortFull, * or indicate the absence of work by returning Finished or Unneeded, * it may pull data from input ports and push data to output ports. * * The method is not thread-safe and must be called from a single thread in one moment of time, * even for different connected processors. * * Instead of all long work (CPU calculations or waiting) it should just prepare all required data and return Ready or Async. * * Thread safety and parallel execution: * - no methods (prepare, work, schedule) of single object can be executed in parallel; * - method 'work' can be executed in parallel for different objects, even for connected processors; * - method 'prepare' cannot be executed in parallel even for different objects, * if they are connected (including indirectly) to each other by their ports; */ virtual Status prepare() { throw Exception("Method 'prepare' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED); }
/// Optimization for prepare in case we know ports were updated. virtual Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) { return prepare(); }
/** You may call this method if 'prepare' returned Ready. * This method cannot access any ports. It should use only data that was prepared by 'prepare' method. * * Method work can be executed in parallel for different processors. */ virtual void work() { throw Exception("Method 'work' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED); } }
/// Edge represents connection between OutputPort and InputPort. /// For every connection, two edges are created: direct and backward (it is specified by backward flag). struct Edge { { update_info.update_list = update_list; update_info.id = this; }
/// Processor id this edge points to. /// It is processor with output_port for direct edge or processor with input_port for backward. bool backward; /// Port numbers. They are same for direct and backward edges. uint64_t input_port_number; uint64_t output_port_number;
/// Edge version is increased when port's state is changed (e.g. when data is pushed). See Port.h for details. /// To compare version with prev_version we can decide if neighbour processor need to be prepared. Port::UpdateInfo update_info; };
struct Node { /// Processor and it's position in graph. IProcessor * processor = nullptr; uint64_t processors_id = 0;
/// Direct edges are for output ports, back edges are for input ports. Edges direct_edges; Edges back_edges;
/// Ports which have changed their state since last processor->prepare() call. /// They changed when neighbour processors interact with connected ports. /// They will be used as arguments for next processor->prepare() (and will be cleaned after that). IProcessor::PortNumbers updated_input_ports; IProcessor::PortNumbers updated_output_ports;
/// Ports that have changed their state during last processor->prepare() call. /// We use this data to fill updated_input_ports and updated_output_ports for neighbour nodes. /// This containers are temporary, and used only after processor->prepare() is called. /// They could have been local variables, but we need persistent storage for Port::UpdateInfo. Port::UpdateInfo::UpdateList post_updated_input_ports; Port::UpdateInfo::UpdateList post_updated_output_ports;
using ThreadsData = std::vector<ThreadFromGlobalPool>; //(2) ThreadsData threads; threads.reserve(num_threads);
bool finished_flag = false; ...
if (num_threads > 1) { auto thread_group = CurrentThread::getGroup();
for (size_t i = 0; i < num_threads; ++i) { //(3)empalce_back函数接受的参数是lambda表达式会被转化为job。 threads.emplace_back([this, thread_group, thread_num = i] { /// ThreadStatus thread_status;
setThreadName("QueryPipelineEx");
if (thread_group) CurrentThread::attachTo(thread_group);
try { executeSingleThread(thread_num); //(4) } catch (...) { /// In case of exception from executor itself, stop other threads. finish(); tasks.getThreadContext(thread_num).setException(std::current_exception()); } }); }
/// Add childless processors to stack. uint64_t num_processors = nodes.size(); for (uint64_t proc = 0; proc < num_processors; ++proc) { if (nodes[proc]->direct_edges.empty()) { stack.push(proc); /// do not lock mutex, as this function is executed in single thread nodes[proc]->status = ExecutingGraph::ExecStatus::Preparing; } } ... while (!stack.empty()) { uint64_t proc = stack.top(); stack.pop(); updateNode(proc, queue, async_queue); ... } }
switch (node.last_processor_status) { case IProcessor::Status::NeedData: case IProcessor::Status::PortFull: { node.status = ExecutingGraph::ExecStatus::Idle; break; } case IProcessor::Status::Finished: { node.status = ExecutingGraph::ExecStatus::Finished; break; } case IProcessor::Status::Ready: { node.status = ExecutingGraph::ExecStatus::Executing; queue.push(&node);// node状态为ready表示该节点对应的processor,中的数据准备好了,可以进行处理, //放入queue里。当方法返回后,线程池中的线程会从这个队列里取出这个node然后执行其work方法。 break; } case IProcessor::Status::Async: { node.status = ExecutingGraph::ExecStatus::Executing; async_queue.push(&node); break; } case IProcessor::Status::ExpandPipeline: { need_expand_pipeline = true; break; } }
if (!need_expand_pipeline) {
for (auto it = node.post_updated_output_ports.rbegin(); it != node.post_updated_output_ports.rend(); ++it) { auto * edge = static_cast<ExecutingGraph::Edge *>(*it); updated_edges.push(edge); edge->update_info.trigger(); }
for (auto it = node.post_updated_input_ports.rbegin(); it != node.post_updated_input_ports.rend(); ++it) { auto * edge = static_cast<ExecutingGraph::Edge *>(*it); updated_edges.push(edge); edge->update_info.trigger(); }
auto & context = tasks.getThreadContext(thread_num); //(1) bool yield = false;
while (!tasks.isFinished() && !yield) { /// First, find any processor to execute. /// Just traverse graph and prepare any processor. while (!tasks.isFinished() && !context.hasTask()) tasks.tryGetTask(context); //(2)
while (context.hasTask() && !yield) { if (tasks.isFinished()) break;