QueryPlane->PIPELINE 使用官方教程 的例子, 构建相应的表并导入数据集tutorial.hits_v1和tutorial.visits_v1。执行查询示例。
1 2 3 4 5 6 7 8 SELECT StartURL AS URL, AVG(Duration) AS AvgDuration FROM tutorial.visits_v1 WHERE StartDate BETWEEN '2014-03-23' AND '2014-03-30' GROUP BY URL ORDER BY AvgDuration DESC LIMIT 10
使用explain查询sql的queryPlane以及使用EXPLAIN PIPELINE查询pipeline情况。
1 2 3 4 5 6 7 8 9 10 ┌─explain─────────────────────────────────────────────────────────────────────────────┐ │ Expression (Projection) │ │ Limit (preliminary LIMIT (without OFFSET)) │ │ Sorting (Sorting for ORDER BY) │ │ Expression (Before ORDER BY) │ │ Aggregating │ │ Expression (Before GROUP BY) │ │ SettingQuotaAndLimits (Set limits and quota after reading from storage) │ │ ReadFromMergeTree (tutorial.visits_v1) │ └─────────────────────────────────────────────────────────────────────────────────────┘
1 ExpressionStep->LimitStep->SortingSetp->ExpressionStep->AggregatingStep->ExpressionStep->SettingQuotaAndLimitsStep->ReadFromMergeTreeStep
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 ┌─explain─────────────────────────────────────────┐ │ (Expression) │ │ ExpressionTransform │ │ (Limit) │ │ Limit │ │ (Sorting) │ │ MergeSortingTransform │ │ LimitsCheckingTransform │ │ PartialSortingTransform │ │ (Expression) │ │ ExpressionTransform │ │ (Aggregating) │ │ Resize 6 → 1 │ │ AggregatingTransform × 6 │ │ StrictResize 6 → 6 │ │ (Expression) │ │ ExpressionTransform × 6 │ │ (SettingQuotaAndLimits) │ │ (ReadFromMergeTree) │ │ MergeTreeThread × 6 0 → 1 │ └─────────────────────────────────────────────────┘
上述结果的 x 表示有几个这个类型的processor,比如有6个MergeTreeThread来拉取数据,而 x->y中,x表示有几个inPort,y表示有几个outPort。其中括号内的内容表示对应的queryPlanStep。笔者对ClickHouse中这一查询进行debug,其中具体pipeline表达如下图。相同颜色的表示同一类型的processor。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline( const QueryPlanOptimizationSettings & optimization_settings, const BuildQueryPipelineSettings & build_pipeline_settings) { ... struct Frame { Node * node = {}; QueryPipelineBuilders pipelines = {}; }; QueryPipelineBuilderPtr last_pipeline; std::stack<Frame> stack; stack.push(Frame{.node = root}); //(1) while (!stack.empty()) { auto & frame = stack.top(); if (last_pipeline) //(2) { frame.pipelines.emplace_back(std::move(last_pipeline)); last_pipeline = nullptr; //-V1048 } size_t next_child = frame.pipelines.size(); if (next_child == frame.node->children.size()) //(3) { bool limit_max_threads = frame.pipelines.empty(); last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), //(4) build_pipeline_settings); if (limit_max_threads && max_threads) last_pipeline->limitMaxThreads(max_threads); stack.pop(); } else stack.push(Frame{.node = frame.node->children[next_child]}); } ... return last_pipeline; }