0%

ClickHouse中QueryPlan到PIPELINE的转换

QueryPlane->PIPELINE

使用官方教程的例子, 构建相应的表并导入数据集tutorial.hits_v1和tutorial.visits_v1。执行查询示例。

1
2
3
4
5
6
7
8
SELECT
StartURL AS URL,
AVG(Duration) AS AvgDuration
FROM tutorial.visits_v1
WHERE StartDate BETWEEN '2014-03-23' AND '2014-03-30'
GROUP BY URL
ORDER BY AvgDuration DESC
LIMIT 10

使用explain查询sql的queryPlane以及使用EXPLAIN PIPELINE查询pipeline情况。

QueryPlan结果如下

1
2
3
4
5
6
7
8
9
10
┌─explain─────────────────────────────────────────────────────────────────────────────┐
│ Expression (Projection) │
│ Limit (preliminary LIMIT (without OFFSET)) │
│ Sorting (Sorting for ORDER BY) │
│ Expression (Before ORDER BY) │
│ Aggregating │
│ Expression (Before GROUP BY) │
│ SettingQuotaAndLimits (Set limits and quota after reading from storage) │
│ ReadFromMergeTree (tutorial.visits_v1) │
└─────────────────────────────────────────────────────────────────────────────────────┘

QueryPlan是一个树形结构,对应到代码中表示为如下,箭头表示A->B表示A是B的父节点。xxxStep表示的是类名字

1
ExpressionStep->LimitStep->SortingSetp->ExpressionStep->AggregatingStep->ExpressionStep->SettingQuotaAndLimitsStep->ReadFromMergeTreeStep

PIPELINE结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
┌─explain─────────────────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Limit) │
│ Limit │
│ (Sorting) │
│ MergeSortingTransform │
│ LimitsCheckingTransform │
│ PartialSortingTransform │
│ (Expression) │
│ ExpressionTransform │
│ (Aggregating) │
│ Resize 6 → 1 │
│ AggregatingTransform × 6 │
│ StrictResize 6 → 6 │
│ (Expression) │
│ ExpressionTransform × 6 │
│ (SettingQuotaAndLimits) │
│ (ReadFromMergeTree) │
│ MergeTreeThread × 6 0 → 1 │
└─────────────────────────────────────────────────┘

上述结果的 x 表示有几个这个类型的processor,比如有6个MergeTreeThread来拉取数据,而
x->y中,x表示有几个inPort,y表示有几个outPort。其中括号内的内容表示对应的queryPlanStep。笔者对ClickHouse中这一查询进行debug,其中具体pipeline表达如下图。相同颜色的表示同一类型的processor。

构建pipeline的代码调用栈如下图。

buildQueryPipeline方法本质上对queryPlan树形数据结构进行后序遍历,不过这里没有使用递归实现,而是使用栈。考虑原因一个是如果流水线太深,则会有太多的调用栈,可能会出现栈溢出问题。其次,栈实现效率上面本身也因为少了函数调用的开销而更有效率。主要过程为首先将queryplan的root节点入栈(1)处。其中比较巧妙的地方在于(3)处,通过判断当前栈顶元素pipelines中有几个已经构建好的pipeline,如果当前节点的每个孩子节点都已经处理完毕了,也就是说每个孩子节点对应的子树都已经转换为了一个pipeline,那么通过当前节点的updatePipeline方法来将当前节点加入并将孩子节点的pipleline连接起来(4)处。举一个简单的例子,如下图所示。这个queryplan有四个节点,首先后续遍历会处理node2节点,进入(3)代码处,因为他的孩子节点数目为零等于pipelines的元素个数也为零。因此,(3)中会将node2节点中生成的pipeline赋值给lastpipeline,然后将node2从栈中弹出。那么有后续遍历的规则,退出当前栈顶节点为node1,(2)处lastpipeline不为空,将lastpipeline放入之node1的pipelines中。当代码运行到(3)时,nextchild为1,而孩子个数为2,因此将node3入栈,当下一次循环时候,node3因为没有孩子节点会跟node2节点一样进入(3),从而生成pipeline并赋值给lastpipeline并弹出node3.因此node1再次成为栈顶节点,且这次piplelines中有了2个流水线,分别为node2节点和node3节点生成的,成功进入到(3),得以执行updatePipeline方法将本节点加入到pipleline,并将生成的新pipeline传给root节点,依次类推,root节点最终返回生成的最后的lastpipeline。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
const QueryPlanOptimizationSettings & optimization_settings,
const BuildQueryPipelineSettings & build_pipeline_settings)
{
...
struct Frame
{
Node * node = {};
QueryPipelineBuilders pipelines = {};
};

QueryPipelineBuilderPtr last_pipeline;

std::stack<Frame> stack;
stack.push(Frame{.node = root}); //(1)

while (!stack.empty())
{
auto & frame = stack.top();

if (last_pipeline) //(2)
{
frame.pipelines.emplace_back(std::move(last_pipeline));
last_pipeline = nullptr; //-V1048
}

size_t next_child = frame.pipelines.size();
if (next_child == frame.node->children.size()) //(3)
{
bool limit_max_threads = frame.pipelines.empty();
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), //(4) build_pipeline_settings);

if (limit_max_threads && max_threads)
last_pipeline->limitMaxThreads(max_threads);

stack.pop();
}
else
stack.push(Frame{.node = frame.node->children[next_child]});
}

...
return last_pipeline;
}