离线任务调度框架

这两天为项目需要写了个查询工具,想想还是有些东西可以提炼出来总结的,所以这篇来描述下我做的事情。

背景

首先,我们把广告主的数据做了汇总,最初的数据构建是为系统使用,并不是为用户数据分析使用的(为用户分析和为系统使用的区别,简单举个例子,用户分析关心账户的名称,系统只需要它的ID),但项目里想想既然都花时间做了数据,那索性给用户导出一些分析数据(ps,很理解,但从技术角度讲,导数据的辛苦程度相比做数据也不逞多让,尤其是我们的数据开始并不是为用户准备的,还好我在导出兴趣信息的时候多想想,不仅导出了ID,还导出来兴趣名称…)。

开始的需求是比较模糊的,可能是广告类型的投放分析,国家的投放分析,将来很可能会扩展和定制。

设计目标

原则上讲,每个查询需求实际都是对应一个hive查询。如果仅仅是处理目前报告的内容,最直接的方法是理解查询需求,然后写hive脚本,导出结果。

这样做虽然对少量的查询比较方便,但长远考虑,有明显的问题:

  1. Hive QL包含大量重复内容,按国家维度可能是查ctr, cpa,按性别维度也是,按年龄维度也是,如果不能保存查询语句,会浪费很多时间重敲语句
  2. 查询对用户不友好,因为查询都是要直接写Hive QL完成,一般非技术人员没有能力这样做,最终导致需求全部叠加到技术人员身上
  3. 查询效率不高,这里指的不是每条Hive QL语句(好吧为了突出优点有点拼,这里并没有对Hive QL本身的优化,在最后的缺点也会提到),而是多个查询本身是可以并行执行的,如果人工输入却不得不串行执行。尤其认真说,考虑到hive查询本身是比较慢的,等待时间并不十分乐观
  4. Hive数据与导出数据不一致:这点看起来很小,但实际影响很大。虽然有很多查询可以直接使用Hive QL的输出,但还有一些查询需要再对原始数据做进一步的处理。举个例子,我们的国家是按 [A,B,C]保存的,但用户最终要的是国家A,B和C分别保存的信息,忽略或者特殊处理这种合并的情况,直接把hive查询结果返回是不能满足用户需求的
  5. 通用操作:比如结果展现给用户的形式,通过web界面,或者通过邮件,对不同的查询是通用的

为解决上述问题,我在设计上考虑框架应该实现的几个属性

  1. 任务化:把每个查询作为一个独立的任务对待,通过db ORM读取再处理,这样做主要是考虑将来可能和用户做交互时,用户的查询需要持久化
  2. 任务划分:

    Hive查询任务,更通用的说,是和db做交互的任务

    用户定义任务,基于Hive查询任务或其它用户定义任务,做类似ETL或者格式规范化方面的内容

  3. 信息抽取:主要是针对Hive查询任务,对每个查询语句都有select, from, tablename, where等,这些语法信息是用户不关心也无需了解的,让框架来完成这些通用的内容处理,用户只填写相关的信息

  4. 任务调度:hive查询是一个高IO低CPU的操作,可以通过多线程提高效率。同时由于用户定义任务对其它任务的依赖存在,这个调度变成比较经典的有向无环图(DAG)遍历问题
  5. 通用处理:我依赖pandas DataFrame作为数据存储的容器,每个hive查询任务的结果会被转为DataFrame再做持久化,用户依赖任务会依赖DataFrame做进一步操作(的确多了一次落盘读盘,后续考虑改进),DataFrame本身很方便的与csv文件做转换,以及对数据处理的强大支持(嗯还没有内存放不下的数据不用考虑分布式),也会方便后续的处理

框架流程

Framework

实现

  1. Task表定义

基于Django的ORM定义,Task表里把Hive QL的基本组成部分做了独立的拆分,比如dimension, metric,为方便用户输入,这些字段采用json化的list或者list of dict,这些模块后续会由Task的引擎负责检查和拼装成正确的Hive QL

Class QueryTask(models.Model):

    id = models.IntegerField(primary\_key=True)

    name = models.CharField(max\_length=255, blank=True)

    status = models.IntegerField(blank=True, null=True)

    paused = models.IntegerField(blank=True, null=True)

    dimension = models.CharField(max\_length=1024, blank=True)

    metric = models.CharField(max\_length=1024, blank=True)

    filter = models.CharField(max\_length=1024, blank=True)

    order = models.CharField(max\_length=1024, blank=True)

    tablename = models.CharField(max\_length=1024, blank=True)

    limit = models.IntegerField(blank=True, null=True)

    start\_dt = models.IntegerField(blank=True, null=True)

    end\_dt = models.IntegerField(blank=True, null=True)

    create\_time = models.IntegerField(blank=True, null=True)

parent\_task\_ids = models.CharField(max\_length=45, blank=True)

后续如果有用户交互的需求,可以把用户的查询保存到db,两边都通过ORM读写。在Django项目外使用modeles.py会有一些小坑,但都不算太大,这里不再描述。

  1. 任务依赖执行

有向无环图的多线程遍历,我依赖的是两个队列,未执行队列放入所有入度为0的任务,已执行队列保存所有执行完毕的任务。子线程负责从未执行队列里读取任务,执行任务,向完成队列里放入任务,主线程负责从已执行队列里取任务,减少对它依赖任务的入度,并将入度为0的任务放入未执行队列。

Iter

  • HiveTask

    引擎拼装HiveQL

    查询执行

    结果转换为DataFrame再做dump

  • UserDefineTask

    加载依赖Task的DataFrame

    数据处理,这里采用组合的方式,把任务的实际处理交给Worker来完成

    保存数据

代码

Github task_manager

问题

作为一个粗糙的版本,虽然解决了开始提到的问题,但本身还是有很多的局限性:

  1. 不支持table join,单个任务的查询数据只能来自一个table,这个是硬伤,对小项目可能还行,但大一点的查询就不够了。想想之前微策略做的就是根据table schema去构建table relationship,然后再封装成db无关的BI查询,还是挺敬仰的。
  2. 还是有很多项目耦合的内容在,比如HiveTask可以更一般化到DbTask,读取Task的方法可以考虑db之外更灵活的方式(比如xml),但这个的确是因为时间所限,还是以满足任务需求的扩展为主了。
  3. 用户定义任务通用内容的抽取:现在是通过把任务交给worker的方式解耦,但worker里的通用任务还不多,这点在后续的持续应用中应该会得到增加和改善。