Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
Spark SQLÔ´Âë·ÖÎöÖ®ºËÐÄÁ÷³Ì
 
×÷Õß OopsOutOfMemory £¬»ðÁú¹ûÈí¼þ    ·¢²¼ÓÚ 2014-08-20
  3975  次浏览      30
 

×Ô´ÓÈ¥ÄêSpark Submit 2013 Michael Armbrust·ÖÏíÁËËûµÄCatalyst£¬µ½ÖÁ½ñ1Äê¶àÁË,Spark SQLµÄ¹±Ï×Õß´Ó¼¸È˵½Á˼¸Ê®ÈË£¬¶øÇÒ·¢Õ¹ËÙ¶ÈÒ쳣ѸÃÍ£¬¾¿ÆäÔ­Òò£¬¸öÈËÈÏΪÓÐÒÔÏÂ2µã£º

1¡¢ÕûºÏ£º½«SQLÀàÐ͵IJéѯÓïÑÔÕûºÏµ½ Spark µÄºËÐÄRDD¸ÅÄîÀï¡£ÕâÑù¿ÉÒÔÓ¦ÓÃÓÚ¶àÖÖÈÎÎñ£¬Á÷´¦Àí£¬Åú´¦Àí£¬°üÀ¨»úÆ÷ѧϰÀï¶¼¿ÉÒÔÒýÈëSql¡£

2¡¢Ð§ÂÊ£ºÒòΪSharkÊܵ½hiveµÄ±à³ÌÄ£ÐÍÏÞÖÆ£¬ÎÞ·¨ÔÙ¼ÌÐøÓÅ»¯À´ÊÊÓ¦SparkÄ£ÐÍÀï¡£

ǰһ¶Îʱ¼ä²âÊÔ¹ýShark£¬²¢ÇÒ¶ÔSpark SQLÒ²½øÐÐÁËһЩ²âÊÔ£¬µ«ÊÇ»¹ÊÇÈ̲»×¡¶ÔSpark SQLһ̽¾¿¾¹£¬¾Í´ÓÔ´´úÂëµÄ½Ç¶ÈÀ´¿´Ò»ÏÂSpark SQLµÄºËÐÄÖ´ÐÐÁ÷³Ì°É¡£

Ò»¡¢Òý×Ó

ÏÈÀ´¿´Ò»¶Î¼òµ¥µÄSpark SQL³ÌÐò£º

1. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
2. import sqlContext._
3.case class Person(name: String, age: Int)
4.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
5.people.registerAsTable("people")
6.val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
7.teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

³ÌÐòǰÁ½¾ä1ºÍ2Éú³ÉSQLContext£¬µ¼ÈësqlContextÏÂÃæµÄall£¬Ò²¾ÍÊÇÔËÐÐSparkSQLµÄÉÏÏÂÎÄ»·¾³¡£

³ÌÐò3£¬4Á½¾äÊǼÓÔØÊý¾ÝÔ´×¢²átable

µÚ6¾äÊÇÕæÕýµÄÈë¿Ú£¬ÊÇsqlº¯Êý£¬´«ÈëÒ»¾äsql£¬ÏȻ᷵»ØÒ»¸öSchemaRDD¡£ÕâÒ»²½ÊÇlazyµÄ£¬Ö±µ½µÚÆß¾äµÄcollectÕâ¸öactionÖ´ÐÐʱ£¬sql²Å»áÖ´ÐС£

¶þ¡¢SQLCOntext

SQLContextÊÇÖ´ÐÐSQLµÄÉÏÏÂÎĶÔÏó£¬Ê×ÏÈÀ´¿´Ò»ÏÂËüHoldµÄÓÐÄÄЩ³ÉÔ±£º

Catalog

Ò»¸ö´æ´¢<tableName,logicalPlan>µÄmap½á¹¹£¬²éÕÒ¹ØÏµµÄĿ¼£¬×¢²á±í£¬×¢Ïú±í£¬²éѯ±íºÍÂß¼­¼Æ»®¹ØÏµµÄÀà¡£

SqlParser

Parse ´«ÈëµÄsqlÀ´¶ÔÓï·¨·Ö´Ê£¬¹¹½¨Óï·¨Ê÷£¬·µ»ØÒ»¸ölogical plan

Analyzer

logical planµÄÓï·¨·ÖÎöÆ÷

Optimizer

logical PlanµÄÓÅ»¯Æ÷

LogicalPlan

Âß¼­¼Æ»®£¬ÓÉcatalystµÄTreeNode×é³É£¬¿ÉÒÔ¿´µ½ÓÐ3ÖÖÓï·¨Ê÷

SparkPlanner

°üº¬²»Í¬²ßÂÔµÄÓÅ»¯²ßÂÔÀ´ÓÅ»¯ÎïÀíÖ´Ðмƻ®

QueryExecution

sqlÖ´ÐеĻ·¾³ÉÏÏÂÎÄ

¾ÍÊÇÕâЩ¶ÔÏó×é³ÉÁËSpark SQLµÄÔËÐÐʱ£¬¿´ÆðÀ´ºÜ¿á£¬Óо²Ì¬µÄmetadata´æ´¢£¬ÓзÖÎöÆ÷¡¢ÓÅ»¯Æ÷¡¢Âß¼­¼Æ»®¡¢ÎïÀí¼Æ»®¡¢Ö´ÐÐÔËÐÐʱ¡£

ÄÇÕâЩ¶ÔÏóÊÇÔõôÏ໥Э×÷À´Ö´ÐÐsqlÓï¾äµÄÄØ£¿

Èý¡¢Spark SQLÖ´ÐÐÁ÷³Ì

»°²»¶à˵£¬ÏÈÉÏͼ£¬Õâ¸öͼÎÒÓÃÒ»¸öÔÚÏß×÷ͼ¹¤¾ßprocess on»°µÄ£¬»­µÄ²»ºÃ£¬Í¼ÄÜ´ïÒâ¾ÍÐУº

ºËÐÄ×é¼þ¶¼ÊÇÂÌÉ«µÄ·½¿ò£¬Ã¿Ò»²½Á÷³ÌµÄ½á¹û¶¼ÊÇÀ¶É«µÄ¿ò¿ò£¬µ÷Óõķ½·¨ÊdzÈÉ«µÄ¿ò¿ò¡£

ÏȸÅÀ¨Ò»Ï£¬´óÖµÄÖ´ÐÐÁ÷³ÌÊÇ£º

Parse SQL -> Analyze Logical Plan -> Optimize Logical Plan -> Generate Physical Plan -> Prepareed 
             Spark Plan -> Execute SQL -> Generate RDD

¸ü¾ßÌåµÄÖ´ÐÐÁ÷³Ì£º

sql or hql -> sql parser(parse)Éú³É unresolved logical plan -> analyzer(analysis)Éú³Éanalyzed logical plan  ->
 optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)Éú³Éphysical plan -> 
²ÉÓò»Í¬StrategiesÉú³Éspark plan -> 
spark plan(prepare) prepared spark plan -> call toRDD£¨execute£¨£©º¯Êýµ÷Óã© Ö´ÐÐsqlÉú³ÉRDD

3.1¡¢Parse SQL

»Øµ½¿ªÊ¼µÄ³ÌÐò£¬ÎÒÃǵ÷ÓÃsqlº¯Êý£¬ÆäʵÊÇSQLContextÀïµÄsqlº¯ÊýËüµÄʵÏÖÊÇnewÒ»¸öSchemaRDD£¬ÔÚÉú³ÉµÄʱºò¾Íµ÷ÓÃparseSql·½·¨ÁË¡£

	  /**
* Executes a SQL query using Spark, returning the result as a SchemaRDD.
*
* @group userf
*/
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))

½á¹ûÊÇ»áÉú³ÉÒ»¸öÂß¼­¼Æ»®

   @transient
protected[sql] val parser = new catalyst.SqlParser protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)

3.2¡¢Analyze to Execution

µ±ÎÒÃǵ÷ÓÃSchemaRDDÀïÃæµÄcollect·½·¨Ê±£¬Ôò»á³õʼ»¯QueryExecution£¬¿ªÊ¼Æô¶¯Ö´ÐС£

 override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()

ÎÒÃÇ¿ÉÒÔºÜÇåÎúµÄ¿´µ½Ö´Ðв½Ö裺

protected abstract class QueryExecution {
def logical: LogicalPlan
lazy val analyzed = analyzer(logical) //Ê×ÏÈ·ÖÎöÆ÷»á·ÖÎöÂß¼­¼Æ»®
lazy val optimizedPlan = optimizer(analyzed) //ËæºóÓÅ»¯Æ÷È¥ÓÅ»¯·ÖÎöºóµÄÂß¼­¼Æ»®
// TODO: Don't just pick the first one...
lazy val sparkPlan = planner(optimizedPlan).next() //¸ù¾Ý²ßÂÔÉú³ÉplanÎïÀí¼Æ»®
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //×îºóÉú³ÉÒѾ­×¼±¸ºÃµÄSpark Plan
/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[Row] = executedPlan.execute() //×îºóµ÷ÓÃtoRDD·½·¨Ö´ÐÐÈÎÎñ½«½á¹ûת»»ÎªRDD
protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
def simpleString: String = stringOrError(executedPlan)
override def toString: String =
s"""== Logical Plan ==
|${stringOrError(analyzed)}
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
""".stripMargin.trim
}

ÖÁ´ËÕû¸öÁ÷³Ì½áÊø¡£

ËÄ¡¢×ܽ᣺

ͨ¹ý·ÖÎöSQLContextÎÒÃÇÖªµÀÁËSpark SQL¶¼°üº¬ÁËÄÄЩ×é¼þ£¬SqlParser,Parser£¬Analyzer£¬Optimizer£¬LogicalPlan£¬SparkPlanner£¨°üº¬Physical Plan£©,QueryExecution.

ͨ¹ýµ÷ÊÔ´úÂ룬֪µÀÁËSpark SQLµÄÖ´ÐÐÁ÷³Ì£º

sql or hql -> sql parser(parse)Éú³É unresolved logical 
                          plan -> analyzer(analysis)Éú³Éanalyzed logical plan 
                          -> optimizer(optimize)optimized logical plan -> 
                          spark planner(use strategies to plan)Éú³Éphysical plan 
                          -> ²ÉÓò»Í¬StrategiesÉú³Éspark plan -> spark plan(prepare) 
                          prepared spark plan -> call toRDD£¨execute£¨£©º¯Êýµ÷Óã© 
                          Ö´ÐÐsqlÉú³ÉRDD

Ëæºó»¹»á¶ÔÀïÃæµÄÿ¸ö×é¼þ¶ÔÏó½øÐÐÑо¿£¬¿´¿´catalyst¾¿¾¹×öÁËÄÄЩÓÅ»¯¡£

   
3975 ´Îä¯ÀÀ       30
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí