×Ô´ÓÈ¥ÄêSpark Submit 2013 Michael Armbrust·ÖÏíÁËËûµÄCatalyst£¬µ½ÖÁ½ñ1Äê¶àÁË,Spark
SQLµÄ¹±Ï×Õß´Ó¼¸È˵½Á˼¸Ê®ÈË£¬¶øÇÒ·¢Õ¹ËÙ¶ÈÒ쳣ѸÃÍ£¬¾¿ÆäÔÒò£¬¸öÈËÈÏΪÓÐÒÔÏÂ2µã£º
1¡¢ÕûºÏ£º½«SQLÀàÐ͵IJéѯÓïÑÔÕûºÏµ½ Spark µÄºËÐÄRDD¸ÅÄîÀï¡£ÕâÑù¿ÉÒÔÓ¦ÓÃÓÚ¶àÖÖÈÎÎñ£¬Á÷´¦Àí£¬Åú´¦Àí£¬°üÀ¨»úÆ÷ѧϰÀï¶¼¿ÉÒÔÒýÈëSql¡£
2¡¢Ð§ÂÊ£ºÒòΪSharkÊܵ½hiveµÄ±à³ÌÄ£ÐÍÏÞÖÆ£¬ÎÞ·¨ÔÙ¼ÌÐøÓÅ»¯À´ÊÊÓ¦SparkÄ£ÐÍÀï¡£
ǰһ¶Îʱ¼ä²âÊÔ¹ýShark£¬²¢ÇÒ¶ÔSpark SQLÒ²½øÐÐÁËһЩ²âÊÔ£¬µ«ÊÇ»¹ÊÇÈ̲»×¡¶ÔSpark
SQLһ̽¾¿¾¹£¬¾Í´ÓÔ´´úÂëµÄ½Ç¶ÈÀ´¿´Ò»ÏÂSpark SQLµÄºËÐÄÖ´ÐÐÁ÷³Ì°É¡£
Ò»¡¢Òý×Ó
ÏÈÀ´¿´Ò»¶Î¼òµ¥µÄSpark SQL³ÌÐò£º
1. val sqlContext = new org.apache.spark.sql.SQLContext(sc) 2. import sqlContext._ 3.case class Person(name: String, age: Int) 4.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)) 5.people.registerAsTable("people") 6.val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") 7.teenagers.map(t => "Name: " + t(0)).collect().foreach(println) |
³ÌÐòǰÁ½¾ä1ºÍ2Éú³ÉSQLContext£¬µ¼ÈësqlContextÏÂÃæµÄall£¬Ò²¾ÍÊÇÔËÐÐSparkSQLµÄÉÏÏÂÎÄ»·¾³¡£
³ÌÐò3£¬4Á½¾äÊǼÓÔØÊý¾ÝÔ´×¢²átable
µÚ6¾äÊÇÕæÕýµÄÈë¿Ú£¬ÊÇsqlº¯Êý£¬´«ÈëÒ»¾äsql£¬ÏȻ᷵»ØÒ»¸öSchemaRDD¡£ÕâÒ»²½ÊÇlazyµÄ£¬Ö±µ½µÚÆß¾äµÄcollectÕâ¸öactionÖ´ÐÐʱ£¬sql²Å»áÖ´ÐС£
¶þ¡¢SQLCOntext
SQLContextÊÇÖ´ÐÐSQLµÄÉÏÏÂÎĶÔÏó£¬Ê×ÏÈÀ´¿´Ò»ÏÂËüHoldµÄÓÐÄÄЩ³ÉÔ±£º
Catalog
Ò»¸ö´æ´¢<tableName,logicalPlan>µÄmap½á¹¹£¬²éÕÒ¹ØÏµµÄĿ¼£¬×¢²á±í£¬×¢Ïú±í£¬²éѯ±íºÍÂß¼¼Æ»®¹ØÏµµÄÀà¡£

SqlParser
Parse ´«ÈëµÄsqlÀ´¶ÔÓï·¨·Ö´Ê£¬¹¹½¨Óï·¨Ê÷£¬·µ»ØÒ»¸ölogical
plan

Analyzer
logical planµÄÓï·¨·ÖÎöÆ÷

Optimizer
logical PlanµÄÓÅ»¯Æ÷

LogicalPlan
Âß¼¼Æ»®£¬ÓÉcatalystµÄTreeNode×é³É£¬¿ÉÒÔ¿´µ½ÓÐ3ÖÖÓï·¨Ê÷

SparkPlanner
°üº¬²»Í¬²ßÂÔµÄÓÅ»¯²ßÂÔÀ´ÓÅ»¯ÎïÀíÖ´Ðмƻ®

QueryExecution
sqlÖ´ÐеĻ·¾³ÉÏÏÂÎÄ

¾ÍÊÇÕâЩ¶ÔÏó×é³ÉÁËSpark SQLµÄÔËÐÐʱ£¬¿´ÆðÀ´ºÜ¿á£¬Óо²Ì¬µÄmetadata´æ´¢£¬ÓзÖÎöÆ÷¡¢ÓÅ»¯Æ÷¡¢Âß¼¼Æ»®¡¢ÎïÀí¼Æ»®¡¢Ö´ÐÐÔËÐÐʱ¡£
ÄÇÕâЩ¶ÔÏóÊÇÔõôÏ໥Ð×÷À´Ö´ÐÐsqlÓï¾äµÄÄØ£¿
Èý¡¢Spark SQLÖ´ÐÐÁ÷³Ì
»°²»¶à˵£¬ÏÈÉÏͼ£¬Õâ¸öͼÎÒÓÃÒ»¸öÔÚÏß×÷ͼ¹¤¾ßprocess on»°µÄ£¬»µÄ²»ºÃ£¬Í¼ÄÜ´ïÒâ¾ÍÐУº

ºËÐÄ×é¼þ¶¼ÊÇÂÌÉ«µÄ·½¿ò£¬Ã¿Ò»²½Á÷³ÌµÄ½á¹û¶¼ÊÇÀ¶É«µÄ¿ò¿ò£¬µ÷Óõķ½·¨ÊdzÈÉ«µÄ¿ò¿ò¡£
ÏȸÅÀ¨Ò»Ï£¬´óÖµÄÖ´ÐÐÁ÷³ÌÊÇ£º
Parse SQL -> Analyze Logical Plan -> Optimize Logical Plan -> Generate Physical Plan -> Prepareed
Spark Plan -> Execute SQL -> Generate RDD |
¸ü¾ßÌåµÄÖ´ÐÐÁ÷³Ì£º
sql or hql -> sql parser(parse)Éú³É unresolved logical plan -> analyzer(analysis)Éú³Éanalyzed logical plan ->
optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)Éú³Éphysical plan ->
²ÉÓò»Í¬StrategiesÉú³Éspark plan ->
spark plan(prepare) prepared spark plan -> call toRDD£¨execute£¨£©º¯Êýµ÷Óã© Ö´ÐÐsqlÉú³ÉRDD |
3.1¡¢Parse SQL
»Øµ½¿ªÊ¼µÄ³ÌÐò£¬ÎÒÃǵ÷ÓÃsqlº¯Êý£¬ÆäʵÊÇSQLContextÀïµÄsqlº¯ÊýËüµÄʵÏÖÊÇnewÒ»¸öSchemaRDD£¬ÔÚÉú³ÉµÄʱºò¾Íµ÷ÓÃparseSql·½·¨ÁË¡£
/** * Executes a SQL query using Spark, returning the result as a SchemaRDD. * * @group userf */ def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText)) |
½á¹ûÊÇ»áÉú³ÉÒ»¸öÂß¼¼Æ»®
@transient protected[sql] val parser = new catalyst.SqlParser protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql) |
3.2¡¢Analyze to Execution
µ±ÎÒÃǵ÷ÓÃSchemaRDDÀïÃæµÄcollect·½·¨Ê±£¬Ôò»á³õʼ»¯QueryExecution£¬¿ªÊ¼Æô¶¯Ö´ÐС£
override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect() |
ÎÒÃÇ¿ÉÒÔºÜÇåÎúµÄ¿´µ½Ö´Ðв½Ö裺
protected abstract class QueryExecution { def logical: LogicalPlan lazy val analyzed = analyzer(logical) //Ê×ÏÈ·ÖÎöÆ÷»á·ÖÎöÂß¼¼Æ»® lazy val optimizedPlan = optimizer(analyzed) //ËæºóÓÅ»¯Æ÷È¥ÓÅ»¯·ÖÎöºóµÄÂß¼¼Æ»® // TODO: Don't just pick the first one... lazy val sparkPlan = planner(optimizedPlan).next() //¸ù¾Ý²ßÂÔÉú³ÉplanÎïÀí¼Æ»® // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //×îºóÉú³ÉÒѾ׼±¸ºÃµÄSpark Plan /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[Row] = executedPlan.execute() //×îºóµ÷ÓÃtoRDD·½·¨Ö´ÐÐÈÎÎñ½«½á¹ûת»»ÎªRDD protected def stringOrError[A](f: => A): String = try f.toString catch { case e: Throwable => e.toString } def simpleString: String = stringOrError(executedPlan) override def toString: String = s"""== Logical Plan == |${stringOrError(analyzed)} |== Optimized Logical Plan == |${stringOrError(optimizedPlan)} |== Physical Plan == |${stringOrError(executedPlan)} """.stripMargin.trim } |
ÖÁ´ËÕû¸öÁ÷³Ì½áÊø¡£
ËÄ¡¢×ܽ᣺
ͨ¹ý·ÖÎöSQLContextÎÒÃÇÖªµÀÁËSpark SQL¶¼°üº¬ÁËÄÄЩ×é¼þ£¬SqlParser,Parser£¬Analyzer£¬Optimizer£¬LogicalPlan£¬SparkPlanner£¨°üº¬Physical
Plan£©,QueryExecution.
ͨ¹ýµ÷ÊÔ´úÂ룬֪µÀÁËSpark SQLµÄÖ´ÐÐÁ÷³Ì£º
sql or hql -> sql parser(parse)Éú³É unresolved logical
plan -> analyzer(analysis)Éú³Éanalyzed logical plan
-> optimizer(optimize)optimized logical plan ->
spark planner(use strategies to plan)Éú³Éphysical plan
-> ²ÉÓò»Í¬StrategiesÉú³Éspark plan -> spark plan(prepare)
prepared spark plan -> call toRDD£¨execute£¨£©º¯Êýµ÷Óã©
Ö´ÐÐsqlÉú³ÉRDD |
Ëæºó»¹»á¶ÔÀïÃæµÄÿ¸ö×é¼þ¶ÔÏó½øÐÐÑо¿£¬¿´¿´catalyst¾¿¾¹×öÁËÄÄЩÓÅ»¯¡£ |