您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
   
 
 
     
   
 订阅
  捐助
Hive Driver源码执行流程分析
 
作者:周旭龙 来源:博客园 发布于:2015-6-8
  2314  次浏览      35
 

接着上一篇来说执行入口的分析,CliDriver最终将用户指令command提交给了Driver的run方法(针对常用查询语句而言),在这里用户的command将会被编译,优化并生成MapReduce任务进行执行。所以Driver也是Hive的核心,他扮演了一个将用户查询和MapReduce Task转换并执行的角色,下面我们就看看Hive是如何一步一步操作的。

源码分析

在说run方法之前,由于CliDriver需要得到一个Driver类的实例,所以首先看一下Driver的构造方法。Driver有三个构造函数,主要功能也就是设置类的实例变量HiveConf。SessionState前文已经有介绍,SessionState返回了当前会话的一些信息,提取配置文件,初始化Driver实例。

public Driver() {
if (SessionState.get() != null) {
conf = SessionState.get().getConf();
}
}

run

下面就开始解析Driver内部对用户命令command的处理流程,首先是入口函数run. run函数通过调用runInternal方法处理用户指令,在处理完成runInternal之后,如果执行过程中出现出错,还附加了对错误码和错误信息的处理,此处省略。

public CommandProcessorResponse run(String command)
      throws CommandNeedRetryException {
    return run(command, false);
}

public CommandProcessorResponse run(String command, boolean alreadyCompiled)
        throws CommandNeedRetryException {
    CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
     ...
}

runInternal

runInternal方法包含的主要操作有,处理preRunHook(具体功能可以顾名思义哦),compile , execute, 处理postRunHook以及构造CommandProcessorResponse并返回。下面依次从代码的角度分析这几步的具体操作:

PreRunHook

处理preRunHook,首先根据配置文件和指令,构造用户Hook执行的上下文hookContext,然后读取用户PreRunHook配置指定的类(字符串), 此配置项对应于Hive配置文件当中的“hive.exec.driver.run.hooks”一项,利用反射机制Class.forName实例化PreRunHook类实例(getHook函数完成),依次执行各钩子的功能(preDriverRun函数完成)。

HiveDriverRunHookContext hookContext 
= new HiveDriverRunHookContextImpl(conf, command);
// Get all the driver run hooks and pre-execute them.
List<HiveDriverRunHook> driverRunHooks;
try{
driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
HiveDriverRunHook.class);
for (HiveDriverRunHook driverRunHook : driverRunHooks) {
driverRunHook.preDriverRun(hookContext);
}
}catch (Exception e) {
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
SQLState = ErrorMsg.findSQLState(e.getMessage());
downstreamError = e;
console.printError(errorMessage + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return createProcessorResponse(12);
}

compile

编译,直接调用complieInternal函数编译用户指令,将指令翻译成MapReduce任务。这一个过程涉及的内容比较多,也很重要,后面将单独用一篇文章说明编译优化的过程。这里借用网上的一幅图,帮助对compile的功能有个整体的理解,参考文献: Hive实现原理.pdf。
编译流程

execute

在运行之前还有获取锁的操作,由于新版本添加了ACID事务的支持,还设置了事务管理器等,目前还没详细的弄懂这块的处理逻辑和功能,先放一下,主要看下execute函数执行了什么操作,也就是如何根据编译结果执行任务的。

首先是从编译得到的查询计划QueryPlan里获取基本的查询ID,查询字串等信息,并在回话状态中把当前查询字串和查询计划插入到历史记录中。

String queryId = plan.getQueryId();
String queryStr = plan.getQueryStr();

if (SessionState.get() != null) {
   SessionState.get().getHiveHistory().startQuery(queryStr,
       conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
   SessionState.get().getHiveHistory().logPlanProgress(plan);
}

与PreRunHook类似,在执行任务之前,检查并执行用户设定的"hive.pre.exec.hooks",此处不再详述。完成这部操作之后,向控制台简单的打印一些信息之后,就开始正式执行任务了。

DriverContext

创建执行上下文DriverContext,它记录的信息主要包括可执行的任务队列(Queue runnable), 正在运行的任务队列(Queue running), 当前启动的任务数curJobNo, statsTasks(Map<String, StatsTask>, what used for?)以及语义分析Semantic Analyzers依赖的Context对象等。

DriverContext driverCxt = new DriverContext(ctx); driverCxt.prepare(plan);

public DriverContext(Context ctx) {
    this.runnable = new ConcurrentLinkedQueue>();
    this.running = new LinkedBlockingQueue();
    this.ctx = ctx;
 }

public void prepare(QueryPlan plan) {
    // extract stats keys from StatsTask
    List> rootTasks = plan.getRootTasks();
    NodeUtils.iterateTask(rootTasks, StatsTask.class, new Function()  
    {
      public void apply(StatsTask statsTask) {
        statsTasks.put(statsTask.getWork().getAggKey(), statsTask);
      }
    });
 }

顺便提一下Context对象,在Context的源码注释当中提到, 每一个查询都要对应一个Context对象,不同查询之间Context对象是不可重用的, 执行完一个查询之后需要clear对应的Context对象(主要是语法分析用到的temp文件目录),在Hive的实现中也是这么做的。回顾上一篇文章,从CliDriver循环的读取用户指令,每读取到一条指令都要进行processLine,processCmd,processLocalCmd的处理,然后提交给Driver编译解析。Context对象是在compile函数中实例化的,也就说每一条查询都会创建一个Context对象,当执行完一条查询从Driver返回到processLocalCmd中时,都会调用Driver对象的close函数对Context进行清理(ctx.clear),这样就保证了一条查询对应一个Context对象。对于DriverContext对象也是类似,在execute函数中实例化,Driver的close函数中关闭(driverCtx.shutdown),和Context相比一个用来辅助语义分析,一个用来辅助任务执行。还有,我们发现在processCmd函数中通过CommandProcessorFactory设置了Driver类的实例对象,也就是每一条查询都需要一个Driver对象进行处理,那这些Driver对象之间是否可以共享呢?答案是肯定的,在CommandProcessorFactory中维持了一个HiveConf到Driver的Map,每次获取Driver对象时都是根据conf对象来查找到的,如果不存在才重新创建一个Driver对象,而HiveConf对象又是在CliDriver的run方法中实例化的,与一个CliSessionState对应,所以Driver实例应该是与一个Cli的会话对应,同一个会话内部的查询共享一个Driver实例。

Manage and run all tasks

扯得有点远,继续看Driver对查询任务的执行,在实例化DriverContext对象之后,就将查询计划plan中的任务放入到DriverContext的runnable队列中。

for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
driverCxt.addToRunnable(tsk);
}

下面就开始运行任务Task,整个任务的运行由一个循环控制,只要DriverContext没有被关闭,并且runnable和running队列中还有任务就一直循环。为了方便描述,下文将一次对任务循环过程的每一步进行说明,这里只给出循环判断条件。

while (!destroyed && driverCxt.isRunning()) {}

public synchronized boolean isRunning() {
    return !shutdown && (!running.isEmpty() || !runnable.isEmpty());
}

1. Put all the tasks into runnable queue

在循环内部,首先不停的从runnable队列中抽取队首的任务,然后launch该任务。

 while (!destroyed && driverCxt.isRunning()) {
// Launch upto maxthreads tasks
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId());
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
if (!runner.isRunning()) {
break;
}
}

2. Launch a task

在launch一个任务的过程中,根据任务类型(是不是MapReduceTask或者ConditialTask),做一些操作(don't know what used for),将DriverContext当前已启动任务数curJobNo加1,然后根据配置文件conf,查询计划plan,执行上下文cxt(DriverContext),初始化一个任务,接着创建任务结果TaskResult对象和任务执行对象TaskRunner,将TaskRunner放入DriverContext的running队列中,表示该任务正在运行。最后,根据配置文件指定的任务运行模式,即是否支持并行运行,启动任务。

private TaskRunner launchTask(Task<? extends Serializable> tsk, 
String queryId, boolean noName,
String jobname, int jobs, DriverContext cxt) throws HiveException {

if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startTask(queryId, tsk,
tsk.getClass().getName());
}

if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
if (noName) {
conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" +
tsk.getId() + ")");
}
conf.set("mapreduce.workflow.node.name", tsk.getId());
Utilities.setWorkflowAdjacencies(conf, plan);
cxt.incCurJobNo(1);
console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
}

tsk.initialize(conf, plan, cxt);
TaskResult tskRes = new TaskResult();
TaskRunner tskRun = new TaskRunner(tsk, tskRes);

cxt.launching(tskRun);
// Launch Task
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL)
&& (tsk.isMapRedTask() || (tsk instanceof MoveTask))) {
// Launch it in the parallel mode, as a separate thread only for MR tasks
//并发执行
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in parallel");
}
tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
tskRun.start();
} else {
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in serial mode");
}
//顺序执行
tskRun.runSequential();
}
return tskRun;
}

3. Poll a finished task

完成任务的启动之后,将调用DriverContext的pollFinished函数,查看任务是否执行完毕,如果有任务完成,则将该任务出队,并将已完成的任务添加到钩子上下文HookContext中。

TaskRunner tskRun = driverCxt.pollFinished();
        if (tskRun == null) {
          continue;
}

hookContext.addCompleteTask(tskRun);

public synchronized TaskRunner pollFinished() throws InterruptedException {
    while (!shutdown) {
      Iterator it = running.iterator();
      while (it.hasNext()) {
        TaskRunner runner = it.next();
        if (runner != null && !runner.isRunning()) {
          it.remove();
          return runner;
        }
      }
      wait(SLEEP_TIME);
    }
    return null;
  }

4. Handle the finished task

针对一个已完成的任务,首先获取任务的结果对象TaskResult和退出状态, 如果任务非正常退出,则第一步先判断任务是否支持Retry,如果支持,关闭当前DriverContext,设置jobTracker为初始状态,抛出CommandNeedRetry异常,这个异常会在CliDriver的processLocalCmd中捕获,然后尝试重新处理该命令,参见上一篇文章的说明。如果任务不支持Retry,则启动备份任务backupTask(类似于回滚?),并添加到runnable队列,在下次循环过程中执行。如果没有backupTask,则查找用户配置“hive.exec.failure.hooks”,根据用户配置相应出错处理,并关闭DriverContext, 返回退出码。

Task<? extends Serializable> tsk = tskRun.getTask();
TaskResult result = tskRun.getTaskResult();

int exitVal = result.getExitVal();
if (exitVal != 0) {
if (tsk.ifRetryCmdWhenFail()) {
driverCxt.shutdown();
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
throw new CommandNeedRetryException();
}
Task<? extends Serializable> backupTask = tsk.getAndInitBackupTask();
if (backupTask != null) {
setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
console.printError(errorMessage);
errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName();
console.printError(errorMessage);

// add backup task to runnable
if (DriverContext.isLaunchable(backupTask)) {
driverCxt.addToRunnable(backupTask);
}

continue;

} else {
hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK);
// Get all the failure execution hooks and execute them.
for (Hook ofh : getHooks(HiveConf.ConfVars.ONFAILUREHOOKS)) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());

((ExecuteWithHookContext) ofh).run(hookContext);

perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
}
setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
SQLState = "08S01";
console.printError(errorMessage);
driverCxt.shutdown();
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
return exitVal;
}
}

5. Find children tasks

最后调用DriverContext的finished函数,对完成的任务进行处理(处理逻辑没看懂), 然后判断当前任务是否包含子任务,如果包含则依次将子任务添加到runnable队列,下次循环中被启动执行。

riverCxt.finished(tskRun);

if (tsk.getChildTasks() != 
for (Task<? extends Serializable> child : tsk.getChildTasks()) {
if (DriverContext.isLaunchable(child)) {
driverCxt.addToRunnable(child);
}
}
}

6. Do something before return

当所有的任务都完成之后,如果发现DriverContext已经被关闭,表明任务取消,打印信息并返回对应的状态码。最后清楚任务执行中不完整的输出,并加载执行用户指定的"hive.exec.post.hooks",完成对应的钩子功能。对于执行过程中出现的异常,CommandNeedRetryException将会直接向上抛出,其他Exception,直接打印出错信息。无论是否发生异常,只要能够获取到任务执行过程中的MapReduce状态信息,都将在finally语句块中打印。(限于篇幅,此处只给出部分代码,钩子的处理方式前文已经给出不再详述,异常处理的部分,有兴趣的执行查看)

//判断DriverContext是否被关闭
if (driverCxt.isShutdown()) {
SQLState = "HY008";
errorMessage = "FAILED: Operation cancelled";
console.printError(errorMessage);
return 1000;
} //删除不完整的输出
HashSet<WriteEntity> remOutputs = new HashSet<WriteEntity>();
for (WriteEntity output : plan.getOutputs()) {
if (!output.isComplete()) {
remOutputs.add(output);
}
} for (WriteEntity output : remOutputs) {
plan.getOutputs().remove(output);
}

最后的最后,如果所有的任务都正常执行完毕,此次查询完成,plan.setDone(),打印OK~

PostRunHook and return

还没完~当execute函数执行完成后,返回到runInternal函数中,接着释放锁,与之前的PreRunHook相对应,还需要加载相应用户自定义的PostRunHook(代码不再重复),最后才调用creatProcessorResponse,创建响应对象CommandProcessorResponse并返回。

private CommandProcessorResponse createProcessorResponse(int ret) {
return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError);
}
   
2314 次浏览       35
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新活动计划
嵌入式软件架构设计 12-11[北京]
LLM大模型与智能体开发实战 12-18[北京]
嵌入式软件测试 12-25[北京]
AI原生应用的微服务架构 1-9[北京]
AI大模型编写高质量代码 1-14[北京]
需求分析与管理 1-22[北京]

MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   

并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理

GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...