Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Spark¼ÆËãÒýÇæÖ®SparkSQLÏê½â
 
×÷Õߣº ÁÖϦ1740
 
  3056  次浏览      27
2020-4-23
 
±à¼­ÍƼö:
±¾ÎÄÖ÷Òª¶ÔSpark SQL½øÐиÅÊö£¬²¢Ïêϸ½éÉÜÈçºÎ±àдSpark SQL³ÌÐòµÄ²Ù×÷£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£
±¾ÎÄÀ´×Ôcsdn£¬ÓÉ»ðÁú¹ûÈí¼þAlice±à¼­¡¢ÍƼö¡£

Ò»¡¢Spark SQL

1£®Spark SQL¸ÅÊö

1.1£®Spark SQLµÄǰÊÀ½ñÉú

SharkÊÇÒ»¸öΪSparkÉè¼ÆµÄ´ó¹æÄ£Êý¾Ý²Ö¿âϵͳ£¬ËüÓëHive¼æÈÝ¡£Shark½¨Á¢ÔÚHiveµÄ´úÂë»ù´¡ÉÏ£¬²¢Í¨¹ý½«HiveµÄ²¿·ÖÎïÀíÖ´Ðмƻ®½»»»³öÀ´¡£Õâ¸ö·½·¨Ê¹µÃSharkµÄÓû§¿ÉÒÔ¼ÓËÙHiveµÄ²éѯ£¬µ«ÊÇShark¼Ì³ÐÁËHiveµÄ´óÇÒ¸´ÔӵĴúÂëʹµÃSharkºÜÄÑÓÅ»¯ºÍά»¤£¬Í¬Ê±SharkÒÀÀµÓÚSparkµÄ°æ±¾¡£Ëæ×ÅÎÒÃÇÓöµ½ÁËÐÔÄÜÓÅ»¯µÄÉÏÏÞ£¬ÒÔ¼°¼¯³ÉSQLµÄһЩ¸´ÔӵķÖÎö¹¦ÄÜ£¬ÎÒÃÇ·¢ÏÖHiveµÄMapReduceÉè¼ÆµÄ¿ò¼ÜÏÞÖÆÁËSharkµÄ·¢Õ¹¡£ÔÚ2014Äê7ÔÂ1ÈÕµÄSparkSummitÉÏ£¬DatabricksÐû²¼ÖÕÖ¹¶ÔSharkµÄ¿ª·¢£¬½«Öصã·Åµ½SparkSQLÉÏ¡£

1.2£®Ê²Ã´ÊÇSpark SQL

Spark SQLÊÇSparkÓÃÀ´´¦Àí½á¹¹»¯Êý¾ÝµÄÒ»¸öÄ£¿é£¬ËüÌṩÁËÒ»¸ö±à³Ì³éÏó½Ð×öDataFrame²¢ÇÒ×÷Ϊ·Ö²¼Ê½SQL²éѯÒýÇæµÄ×÷Óá£

Ïà±ÈÓÚSpark RDD API£¬Spark SQL°üº¬Á˶Խṹ»¯Êý¾ÝºÍÔÚÆäÉÏÔËËãµÄ¸ü¶àÐÅÏ¢£¬Spark SQLʹÓÃÕâЩÐÅÏ¢½øÐÐÁ˶îÍâµÄÓÅ»¯£¬Ê¹¶Ô½á¹¹»¯Êý¾ÝµÄ²Ù×÷¸ü¼Ó¸ßЧºÍ·½±ã¡£

ÓжàÖÖ·½Ê½È¥Ê¹ÓÃSpark SQL£¬°üÀ¨SQL¡¢DataFrames APIºÍDatasets API¡£µ«ÎÞÂÛÊÇÄÄÖÖAPI»òÕßÊDZà³ÌÓïÑÔ£¬ËüÃǶ¼ÊÇ»ùÓÚͬÑùµÄÖ´ÐÐÒýÇæ£¬Òò´ËÄã¿ÉÒÔÔÚ²»Í¬µÄAPIÖ®¼äËæÒâÇл»£¬ËüÃǸ÷Óи÷µÄÌØµã£¬¿´Äãϲ»¶ÄÇÖÖ·ç¸ñ¡£

1.3£®ÎªÊ²Ã´ÒªÑ§Ï°Spark SQL

ÎÒÃÇÒѾ­Ñ§Ï°ÁËHive£¬ËüÊǽ«Hive SQLת»»³ÉMapReduceÈ»ºóÌá½»µ½¼¯ÈºÖÐÈ¥Ö´ÐУ¬´ó´ó¼ò»¯Á˱àдMapReduce³ÌÐòµÄ¸´ÔÓÐÔ£¬ÓÉÓÚMapReduceÕâÖÖ¼ÆËãÄ£ÐÍÖ´ÐÐЧÂʱȽÏÂý£¬ËùÒÔSpark SQLÓ¦Ô˶øÉú£¬ËüÊǽ«Spark SQLת»»³ÉRDD£¬È»ºóÌá½»µ½¼¯ÈºÖÐÈ¥ÔËÐУ¬Ö´ÐÐЧÂʷdz£¿ì£¡

1.Ò×ÕûºÏ

½«sql²éѯÓëspark³ÌÐòÎÞ·ì»ìºÏ£¬¿ÉÒÔʹÓÃjava¡¢scala¡¢python¡¢RµÈÓïÑÔµÄAPI²Ù×÷¡£

2.ͳһµÄÊý¾Ý·ÃÎÊ

ÒÔÏàͬµÄ·½Ê½Á¬½Óµ½ÈκÎÊý¾ÝÔ´¡£

3.¼æÈÝHive

Ö§³ÖhiveSQLµÄÓï·¨¡£

4.±ê×¼µÄÊý¾ÝÁ¬½Ó

¿ÉÒÔʹÓÃÐÐÒµ±ê×¼µÄJDBC»òODBCÁ¬½Ó¡£

2£®DataFrame

2.1£®Ê²Ã´ÊÇDataFrame

DataFrameµÄǰÉíÊÇSchemaRDD£¬´ÓSpark 1.3.0¿ªÊ¼SchemaRDD¸üÃûΪDataFrame¡£ÓëSchemaRDDµÄÖ÷񻂿±ðÊÇ£ºDataFrame²»ÔÙÖ±½Ó¼Ì³Ð×ÔRDD£¬¶øÊÇ×Ô¼ºÊµÏÖÁËRDDµÄ¾ø´ó¶àÊý¹¦ÄÜ¡£ÄãÈԾɿÉÒÔÔÚDataFrameÉϵ÷ÓÃrdd·½·¨½«Æäת»»ÎªÒ»¸öRDD¡£

ÔÚSparkÖУ¬DataFrameÊÇÒ»ÖÖÒÔRDDΪ»ù´¡µÄ·Ö²¼Ê½Êý¾Ý¼¯£¬ÀàËÆÓÚ´«Í³Êý¾Ý¿âµÄ¶þά±í¸ñ£¬DataFrame´øÓÐSchemaÔªÐÅÏ¢£¬¼´DataFrameËù±íʾµÄ¶þά±íÊý¾Ý¼¯µÄÿһÁж¼´øÓÐÃû³ÆºÍÀàÐÍ£¬µ«µ×²ã×öÁ˸ü¶àµÄÓÅ»¯¡£DataFrame¿ÉÒԴӺܶàÊý¾ÝÔ´¹¹½¨£¬±ÈÈ磺ÒѾ­´æÔÚµÄRDD¡¢½á¹¹»¯Îļþ¡¢ÍⲿÊý¾Ý¿â¡¢Hive±í¡£

2.2£®DataFrameÓëRDDµÄÇø±ð

RDD¿É¿´×÷ÊÇ·Ö²¼Ê½µÄ¶ÔÏóµÄ¼¯ºÏ£¬Spark²¢²»ÖªµÀ¶ÔÏóµÄÏêϸģʽÐÅÏ¢£¬DataFrame¿É¿´×÷ÊÇ·Ö²¼Ê½µÄRow¶ÔÏóµÄ¼¯ºÏ£¬ÆäÌṩÁËÓÉÁÐ×é³ÉµÄÏêϸģʽÐÅÏ¢£¬Ê¹µÃSpark SQL¿ÉÒÔ½øÐÐijЩÐÎʽµÄÖ´ÐÐÓÅ»¯¡£DataFrameºÍÆÕͨµÄRDDµÄÂß¼­¿ò¼ÜÇø±ðÈçÏÂËùʾ£º

ÉÏͼֱ¹ÛµØÌåÏÖÁËDataFrameºÍRDDµÄÇø±ð¡£

×ó²àµÄRDD[Person]ËäÈ»ÒÔPersonΪÀàÐͲÎÊý£¬µ«Spark¿ò¼Ü±¾Éí²»Á˽â PersonÀàµÄÄÚ²¿½á¹¹¡£

¶øÓÒ²àµÄDataFrameÈ´ÌṩÁËÏêϸµÄ½á¹¹ÐÅÏ¢£¬Ê¹µÃSpark SQL¿ÉÒÔÇå³þµØÖªµÀ¸ÃÊý¾Ý¼¯Öаüº¬ÄÄЩÁУ¬Ã¿ÁеÄÃû³ÆºÍÀàÐ͸÷ÊÇʲô¡£DataFrame¶àÁËÊý¾ÝµÄ½á¹¹ÐÅÏ¢£¬¼´schema¡£ÕâÑù¿´ÆðÀ´¾ÍÏñÒ»ÕűíÁË£¬DataFrame»¹ÅäÌ×ÁËеIJÙ×÷Êý¾ÝµÄ·½·¨£¬DataFrame API£¨Èçdf.select())ºÍSQL(select id, name from xx_table where ...)¡£

´ËÍâDataFrame»¹ÒýÈëÁËoff-heap,Òâζ×ÅJVM¶ÑÒÔÍâµÄÄÚ´æ, ÕâЩÄÚ´æÖ±½ÓÊܲÙ×÷ϵͳ¹ÜÀí£¨¶ø²»ÊÇJVM£©¡£SparkÄܹ»ÒÔ¶þ½øÖƵÄÐÎʽÐòÁл¯Êý¾Ý(²»°üÀ¨½á¹¹)µ½off-heapÖÐ, µ±Òª²Ù×÷Êý¾Ýʱ, ¾ÍÖ±½Ó²Ù×÷off-heapÄÚ´æ. ÓÉÓÚSparkÀí½âschema, ËùÒÔÖªµÀ¸ÃÈçºÎ²Ù×÷¡£

RDDÊÇ·Ö²¼Ê½µÄJava¶ÔÏóµÄ¼¯ºÏ¡£DataFrameÊÇ·Ö²¼Ê½µÄRow¶ÔÏóµÄ¼¯ºÏ¡£DataFrame³ýÁËÌṩÁ˱ÈRDD¸ü·á¸»µÄËã×ÓÒÔÍ⣬¸üÖØÒªµÄÌØµãÊÇÌáÉýÖ´ÐÐЧÂÊ¡¢¼õÉÙÊý¾Ý¶ÁÈ¡ÒÔ¼°Ö´Ðмƻ®µÄÓÅ»¯¡£

ÓÐÁËDataFrameÕâ¸ö¸ßÒ»²ãµÄ³éÏóºó£¬ÎÒÃÇ´¦ÀíÊý¾Ý¸ü¼Ó¼òµ¥ÁË£¬ÉõÖÁ¿ÉÒÔÓÃSQLÀ´´¦ÀíÊý¾ÝÁË£¬¶Ô¿ª·¢ÕßÀ´Ëµ£¬Ò×ÓÃÐÔÓÐÁ˺ܴóµÄÌáÉý¡£

²»½öÈç´Ë£¬Í¨¹ýDataFrame API»òSQL´¦ÀíÊý¾Ý£¬»á×Ô¶¯¾­¹ýSpark ÓÅ»¯Æ÷£¨Catalyst£©µÄÓÅ»¯£¬¼´Ê¹ÄãдµÄ³ÌÐò»òSQL²»¸ßЧ£¬Ò²¿ÉÒÔÔËÐеĺܿ졣

2.3£®DataFrameÓëRDDµÄÓÅȱµã

RDDµÄÓÅȱµã£º

Óŵã:

£¨1£©±àÒëʱÀàÐͰ²È«

±àÒëʱ¾ÍÄܼì²é³öÀàÐÍ´íÎó

£¨2£©ÃæÏò¶ÔÏóµÄ±à³Ì·ç¸ñ

Ö±½Óͨ¹ý¶ÔÏóµ÷Ó÷½·¨µÄÐÎʽÀ´²Ù×÷Êý¾Ý

ȱµã:

£¨1£©ÐòÁл¯ºÍ·´ÐòÁл¯µÄÐÔÄÜ¿ªÏú

ÎÞÂÛÊǼ¯Èº¼äµÄͨÐÅ, »¹ÊÇIO²Ù×÷¶¼ÐèÒª¶Ô¶ÔÏóµÄ½á¹¹ºÍÊý¾Ý½øÐÐÐòÁл¯ºÍ·´ÐòÁл¯¡£

£¨2£©GCµÄÐÔÄÜ¿ªÏú

Ƶ·±µÄ´´½¨ºÍÏú»Ù¶ÔÏó, ÊÆ±Ø»áÔö¼ÓGC

DataFrameͨ¹ýÒýÈëschemaºÍoff-heap£¨²»ÔÚ¶ÑÀïÃæµÄÄڴ棬ָµÄÊdzýÁ˲»ÔڶѵÄÄڴ棬ʹÓòÙ×÷ϵͳÉϵÄÄڴ棩£¬½â¾öÁËRDDµÄȱµã, Sparkͨ¹ýschame¾ÍÄܹ»¶Á¶®Êý¾Ý, Òò´ËÔÚͨÐźÍIOʱ¾ÍÖ»ÐèÒªÐòÁл¯ºÍ·´ÐòÁл¯Êý¾Ý, ¶ø½á¹¹µÄ²¿·Ö¾Í¿ÉÒÔÊ¡ÂÔÁË£»Í¨¹ýoff-heapÒýÈ룬¿ÉÒÔ¿ìËٵIJÙ×÷Êý¾Ý£¬±ÜÃâ´óÁ¿µÄGC¡£µ«ÊÇÈ´¶ªÁËRDDµÄÓŵ㣬DataFrame²»ÊÇÀàÐͰ²È«µÄ, APIÒ²²»ÊÇÃæÏò¶ÔÏó·ç¸ñµÄ¡£

2.4£®¶ÁÈ¡Êý¾ÝÔ´´´½¨DataFrame

2.4.1 ¶ÁÈ¡Îı¾Îļþ´´½¨DataFrame

ÔÚspark2.0°æ±¾Ö®Ç°£¬Spark SQLÖÐSQLContextÊÇ´´½¨DataFrameºÍÖ´ÐÐSQLµÄÈë¿Ú£¬ÀûÓÃhiveContextͨ¹ýhive sqlÓï¾ä²Ù×÷hive±íÊý¾Ý£¬¼æÈÝhive²Ù×÷£¬²¢ÇÒhiveContext¼Ì³Ð×ÔSQLContext¡£ÔÚspark2.0Ö®ºó£¬ÕâЩ¶¼Í³Ò»ÓÚSparkSession£¬SparkSession ·â×°ÁË SparkContext£¬SqlContext£¬Í¨¹ýSparkSession¿ÉÒÔ»ñÈ¡µ½ SparkConetxt,SqlContext ¶ÔÏó¡£

£¨1£©ÔÚ±¾µØ´´½¨Ò»¸öÎļþ£¬ÓÐÈýÁУ¬·Ö±ðÊÇid¡¢name¡¢age£¬Óÿոñ·Ö¸ô£¬È»ºóÉÏ´«µ½hdfsÉÏ¡£person.txtÄÚÈÝΪ£º

1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40

ÉÏ´«Êý¾ÝÎļþµ½HDFSÉÏ£º

hdfs dfs -put person.txt /

£¨2£©ÔÚspark shellÖ´ÐÐÏÂÃæÃüÁ¶ÁÈ¡Êý¾Ý£¬½«Ã¿Ò»ÐеÄÊý¾ÝʹÓÃÁзָô·û·Ö¸î

ÏÈÖ´ÐÐ spark-shell --master local[2]

val lineRDD= sc.textFile("/person.txt").map(_.split(" "))

£¨3£©¶¨Òåcase class£¨Ï൱ÓÚ±íµÄschema£©

case class Person(id:Int, name:String, age:Int)

£¨4£©½«RDDºÍcase class¹ØÁª

val personRDD = lineRDD.map(x => Person( x(0).toInt, x(1) , x(2).toInt ))

£¨5£©½«RDDת»»³ÉDataFrame

val personDF = personRDD.toDF

£¨6£©¶ÔDataFrame½øÐд¦Àí

personDF.show

personDF.printSchema

£¨7£©¡¢Í¨¹ýSparkSession¹¹½¨DataFrame

ʹÓÃspark-shellÖÐÒѾ­³õʼ»¯ºÃµÄSparkSession¶ÔÏósparkÉú³ÉDataFrame

val dataFrame=spark.read.text("/person.txt")

2.4.2 ¶ÁÈ¡jsonÎļþ´´½¨DataFrame

£¨1£©Êý¾ÝÎļþ

ʹÓÃspark°²×°°üϵÄ

/opt/bigdata/spark/examples/src/main/resources / people.json Îļþ

£¨2£©ÔÚspark shellÖ´ÐÐÏÂÃæÃüÁ¶ÁÈ¡Êý¾Ý

£¨3£©½ÓÏÂÀ´¾Í¿ÉÒÔʹÓÃDataFrameµÄº¯Êý²Ù×÷

2.4.3 ¶ÁÈ¡parquetÁÐʽ´æ´¢¸ñʽÎļþ´´½¨DataFrame

£¨3£©Êý¾ÝÎļþ

ʹÓÃspark°²×°°üϵÄ

/opt/bigdata/spark/examples/src/main/resources / users.parquet Îļþ

£¨2£©ÔÚspark shellÖ´ÐÐÏÂÃæÃüÁ¶ÁÈ¡Êý¾Ý

£¨3£©½ÓÏÂÀ´¾Í¿ÉÒÔʹÓÃDataFrameµÄº¯Êý²Ù×÷

3.DataFrame³£ÓòÙ×÷

3.1. DSL·ç¸ñÓï·¨

DataFrameÌṩÁËÒ»¸öÁìÓòÌØ¶¨ÓïÑÔ(DSL)ÒÔ·½±ã²Ù×÷½á¹¹»¯Êý¾Ý¡£ÏÂÃæÊÇһЩʹÓÃʾÀý

£¨1£©²é¿´DataFrameÖеÄÄÚÈÝ£¬Í¨¹ýµ÷ÓÃshow·½·¨

personDF.show

£¨2£©²é¿´DataFrame²¿·ÖÁÐÖеÄÄÚÈÝ

²é¿´name×ֶεÄÊý¾Ý

personDF.select(personDF.col("name")).show

²é¿´name×ֶεÄÁíÒ»ÖÖд·¨

²é¿´ name ºÍage×Ö¶ÎÊý¾ÝpersonDF.select(col("name") , col("age")) .show

£¨3£©´òÓ¡DataFrameµÄSchemaÐÅÏ¢

personDF.printSchema

£¨4£©²éѯËùÓеÄnameºÍage£¬²¢½«age+1

personDF.select(col("id"), col("name"), col ("age") + 1) . show

Ò²¿ÉÒÔÕâÑù£º

personDF.select(personDF("id"), personDF("name") , personDF("age") + 1) . show

£¨5£©¹ýÂËage´óÓÚµÈÓÚ25µÄ£¬Ê¹ÓÃfilter·½·¨¹ýÂË

personDF.filter(col("age") >= 25).show

£¨6£©Í³¼ÆÄêÁä´óÓÚ30µÄÈËÊý

personDF.filter(col("age")>30).count()

£¨7£©°´ÄêÁä½øÐзÖ×鲢ͳ¼ÆÏàͬÄêÁäµÄÈËÊý

personDF.groupBy("age").count().show

3.2. SQL·ç¸ñÓï·¨

¡¡DataFrameµÄÒ»¸öÇ¿´óÖ®´¦¾ÍÊÇÎÒÃÇ¿ÉÒÔ½«Ëü¿´×÷ÊÇÒ»¸ö¹ØÏµÐÍÊý¾Ý±í£¬È»ºó¿ÉÒÔͨ¹ýÔÚ³ÌÐòÖÐʹÓÃspark.sql()À´Ö´ÐÐSQL²éѯ£¬½á¹û½«×÷Ϊһ¸öDataFrame·µ»Ø¡£

Èç¹ûÏëʹÓÃSQL·ç¸ñµÄÓï·¨£¬ÐèÒª½«DataFrame×¢²á³É±í,²ÉÓÃÈçϵķ½Ê½£º

personDF.registerTempTable("t_person")

£¨1£©²éѯÄêÁä×î´óµÄǰÁ½Ãû

spark.sql("select * from t_person order by age desc limit 2" ). show

£¨2£©ÏÔʾ±íµÄSchemaÐÅÏ¢

spark.sql("desc t_person").show

£¨3£©²éѯÄêÁä´óÓÚ30µÄÈ˵ÄÐÅÏ¢

spark.sql("select * from t_personwhere age > 30 ") .show

4.DataSet

4.1. ʲôÊÇDataSet

DataSetÊÇ·Ö²¼Ê½µÄÊý¾Ý¼¯ºÏ¡£DataSetÊÇÔÚSpark1.6ÖÐÌí¼ÓµÄеĽӿڡ£Ëü¼¯ÖÐÁËRDDµÄÓŵ㣨ǿÀàÐͺͿÉÒÔÓÃÇ¿´ólambdaº¯Êý£©ÒÔ¼°Spark SQLÓÅ»¯µÄÖ´ÐÐÒýÇæ¡£DataSet¿ÉÒÔͨ¹ýJVMµÄ¶ÔÏó½øÐй¹½¨£¬¿ÉÒÔÓú¯ÊýʽµÄת»»£¨ map/flatmap/filter £©½øÐжàÖÖ²Ù×÷¡£

4.2. DataFrame¡¢DataSet¡¢RDDµÄÇø±ð

¼ÙÉèRDDÖеÄÁ½ÐÐÊý¾Ý³¤ÕâÑù£º

ÄÇôDataFrameÖеÄÊý¾Ý³¤ÕâÑù:

ÄÇôDatasetÖеÄÊý¾Ý³¤ÕâÑù:

»òÕß³¤ÕâÑù£¨Ã¿ÐÐÊý¾ÝÊǸöObject£©:

DataSet°üº¬ÁËDataFrameµÄ¹¦ÄÜ£¬Spark2.0ÖÐÁ½Õßͳһ£¬DataFrame±íʾΪDataSet[Row]£¬¼´DataSetµÄ×Ó¼¯¡£

£¨1£©DataSet¿ÉÒÔÔÚ±àÒëʱ¼ì²éÀàÐÍ

£¨2£©²¢ÇÒÊÇÃæÏò¶ÔÏóµÄ±à³Ì½Ó¿Ú

Ïà±ÈDataFrame£¬DatasetÌṩÁ˱àÒëʱÀàÐͼì²é£¬¶ÔÓÚ·Ö²¼Ê½³ÌÐòÀ´½²£¬Ìá½»Ò»´Î×÷ҵ̫·Ñ¾¢ÁË£¨Òª±àÒë¡¢´ò°ü¡¢ÉÏ´«¡¢ÔËÐУ©£¬µ½Ìá½»µ½¼¯ÈºÔËÐÐʱ²Å·¢ÏÖ´íÎó£¬Õâ»áÀË·Ñ´óÁ¿µÄʱ¼ä£¬ÕâÒ²ÊÇÒýÈëDatasetµÄÒ»¸öÖØÒªÔ­Òò¡£

4.3. DataFrameÓëDataSetµÄ»¥×ª

DataFrameºÍDataSet¿ÉÒÔÏ໥ת»¯¡£

£¨1£©DataFrameתΪ DataSet

df.as[ElementType]ÕâÑù¿ÉÒÔ°ÑDataFrameת»¯ÎªDataSet¡£

£¨2£©DataSetתΪDataFrame

ds.toDF()ÕâÑù¿ÉÒÔ°ÑDataSetת»¯ÎªDataFrame¡£

4.4. ´´½¨DataSet

£¨1£©Í¨¹ýspark.createDataset´´½¨

£¨2£©Í¨toDS·½·¨Éú³ÉDataSet

£¨3£©Í¨¹ýDataFrameת»¯Éú³É

ʹÓÃas[]ת»»ÎªDataSet

Èý¡¢ÒÔ±à³Ì·½Ê½Ö´ÐÐSpark SQL²éѯ

1£®±àдSpark SQL³ÌÐòʵÏÖRDDת»»DataFrame

Ç°ÃæÎÒÃÇѧϰÁËÈçºÎÔÚSpark ShellÖÐʹÓÃSQLÍê³É²éѯ£¬ÏÖÔÚÎÒÃÇÀ´ÊµÏÖÔÚ×Ô¶¨ÒåµÄ³ÌÐòÖбàдSpark SQL²éѯ³ÌÐò¡£

ÔÚSpark SQLÖÐÓÐÁ½ÖÖ·½Ê½¿ÉÒÔÔÚDataFrameºÍRDD½øÐÐת»»£¬µÚÒ»ÖÖ·½·¨ÊÇÀûÓ÷´Éä»úÖÆ£¬ÍƵ¼°üº¬Ä³ÖÖÀàÐ͵ÄRDD£¬Í¨¹ý·´É佫Æäת»»ÎªÖ¸¶¨ÀàÐ͵ÄDataFrame£¬ÊÊÓÃÓÚÌáǰ֪µÀRDDµÄschema¡£

µÚ¶þÖÖ·½·¨Í¨¹ý±à³Ì½Ó¿ÚÓëRDD½øÐн»»¥»ñÈ¡schema£¬²¢¶¯Ì¬´´½¨DataFrame£¬ÔÚÔËÐÐʱ¾ö¶¨Áм°ÆäÀàÐÍ¡£

Ê×ÏÈÔÚmavenÏîÄ¿µÄpom.xmlÖÐÌí¼ÓSpark SQLµÄÒÀÀµ

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.2</version>
</dependency>

1.1£®Í¨¹ý·´ÉäÍÆ¶ÏSchema

ScalaÖ§³ÖʹÓÃcase classÀàÐ͵¼ÈëRDDת»»ÎªDataFrame£¬Í¨¹ýcase class´´½¨schema£¬case classµÄ²ÎÊýÃû³Æ»á±»·´Éä¶ÁÈ¡²¢³ÉΪ±íµÄÁÐÃû¡£ÕâÖÖRDD¿ÉÒÔ¸ßЧµÄת»»ÎªDataFrame²¢×¢²áΪ±í¡£

´úÂëÈçÏ£º

package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* RDDת»¯³ÉDataFrame:ÀûÓ÷´Éä»úÖÆ
*/
//todo:¶¨ÒåÒ»¸öÑùÀýÀàPerson
case class Person(id:Int,name:String,age:Int)
extends Serializable

object InferringSchema {

def main(args: Array[String]): Unit = {
//todo£º1¡¢¹¹½¨sparkSession Ö¸¶¨
appNameºÍmasterµÄµØÖ·
val spark: SparkSession = SparkSession.builder()
.appName("InferringSchema")
.master("local[2]").getOrCreate()
//todo:2¡¢´ÓsparkSession»ñÈ¡sparkContext¶ÔÏó
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")//ÉèÖÃÈÕÖ¾Êä³ö¼¶±ð
//todo:3¡¢¼ÓÔØÊý¾Ý
val dataRDD: RDD[String] = sc.textFile
("D:\\person.txt")
//todo:4¡¢ÇзÖÿһÐмǼ
val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))
//todo:5¡¢½«RDDÓëPersonÀà¹ØÁª
val personRDD: RDD[Person] = lineArrayRDD.map(x=>Person(x(0).toInt,
x(1),x(2).toInt))
//todo:6¡¢´´½¨dataFrame,ÐèÒªµ¼ÈëÒþʽת»»
import spark.implicits._
val personDF: DataFrame = personRDD.toDF()

//todo
-------------------DSLÓï·¨²Ù×÷ start--------------
//1¡¢ÏÔʾDataFrameµÄÊý¾Ý£¬Ä¬ÈÏÏÔʾ20ÐÐ
personDF.show()
//2¡¢ÏÔʾDataFrameµÄschemaÐÅÏ¢
personDF.printSchema()
//3¡¢ÏÔʾDataFrame¼Ç¼Êý
println(personDF.count())
//4¡¢ÏÔʾDataFrameµÄËùÓÐ×Ö¶Î
personDF.columns.foreach(println)
//5¡¢È¡³öDataFrameµÄµÚÒ»ÐмǼ
println(personDF.head())
//6¡¢ÏÔʾDataFrameÖÐname×ֶεÄËùÓÐÖµ
personDF.select("name").show()
//7¡¢¹ýÂ˳öDataFrameÖÐÄêÁä´óÓÚ30µÄ¼Ç¼
personDF.filter($"age" > 30).show()
//8¡¢Í³¼ÆDataFrameÖÐÄêÁä´óÓÚ30µÄÈËÊý
println(personDF.filter($"age">30).count())
//9¡¢Í³¼ÆDataFrameÖа´ÕÕÄêÁä½øÐзÖ×飬
Çóÿ¸ö×éµÄÈËÊý
personDF.groupBy("age").count().show()
//todo
-------------------DSLÓï·¨²Ù×÷ end-------------

//todo
--------------------SQL²Ù×÷·ç¸ñ start-----------
//todo:½«DataFrame×¢²á³É±í
personDF.createOrReplaceTempView("t_person")
//todo:´«ÈësqlÓï¾ä£¬½øÐвÙ×÷
spark.sql("select * from t_person").show()
spark.sql("select * from t_person where name='zhangsan'").show()
spark.sql("select * from t_person order by age desc")
.show()
//todo
--------------------SQL²Ù×÷·ç¸ñ end-------------
sc.stop()
}
}

1.2£®Í¨¹ýStructTypeÖ±½ÓÖ¸¶¨Schema

µ±case class²»ÄÜÌáǰ¶¨ÒåºÃʱ£¬¿ÉÒÔͨ¹ýÒÔÏÂÈý²½Í¨¹ý´úÂë´´½¨DataFrame

£¨1£©½«RDDתΪ°üº¬row¶ÔÏóµÄRDD

£¨2£©»ùÓÚstructTypeÀàÐÍ´´½¨schema£¬ÓëµÚÒ»²½´´½¨µÄRDDÏàÆ¥Åä

£¨3£©Í¨¹ýsparkSessionµÄcreateDataFrame·½·¨¶ÔµÚÒ»²½µÄRDDÓ¦ÓÃ

schema´´½¨DataFrame

package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.
{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row,
SparkSession}
/**
* RDDת»»³ÉDataFrame:ͨ¹ýÖ¸¶¨schema¹¹½¨DataFrame
*/
object SparkSqlSchema {
def main(args: Array[String]): Unit = {
//todo:1¡¢´´½¨SparkSession,Ö¸¶¨appNameºÍmaster
val spark: SparkSession = SparkSession.builder()
.appName("SparkSqlSchema")
.master("local[2]")
.getOrCreate()
//todo:2¡¢»ñÈ¡sparkContext¶ÔÏó
val sc: SparkContext = spark.sparkContext
//todo:3¡¢¼ÓÔØÊý¾Ý
val dataRDD: RDD[String] = sc.textFile
("d:\\person.txt")
//todo:4¡¢ÇзÖÿһÐÐ
val dataArrayRDD: RDD[Array[String]] = dataRDD.map
(_.split(" "))
//todo:5¡¢¼ÓÔØÊý¾Ýµ½Row¶ÔÏóÖÐ
val personRDD: RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
//todo:6¡¢´´½¨schema
val schema:StructType= StructType(Seq(
StructField("id", IntegerType, false),
StructField("name", StringType, false),
StructField("age", IntegerType, false)
))

//todo:7¡¢ÀûÓÃpersonRDDÓëschema´´½¨DataFrame
val personDF: DataFrame = spark.createDataFrame
(personRDD,schema)

//todo:8¡¢DSL²Ù×÷ÏÔʾDataFrameµÄÊý¾Ý½á¹û
personDF.show()

//todo:9¡¢½«DataFrame×¢²á³É±í
personDF.createOrReplaceTempView("t_person")

//todo:10¡¢sqlÓï¾ä²Ù×÷
spark.sql("select * from t_person").show()

spark.sql("select count(*) from t_person").show()
sc.stop()
}
}

2£®±àдSpark SQL³ÌÐò²Ù×÷HiveContext

HiveContextÊǶÔÓ¦spark-hiveÕâ¸öÏîÄ¿,ÓëhiveÓв¿·ÖñîºÏ, Ö§³Öhql,ÊÇSqlContextµÄ×ÓÀà,Ò²¾ÍÊÇ˵¼æÈÝSqlContext;

2.1£®Ìí¼ÓpomÒÀÀµ

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.0.2</version>
</dependency>

2.2£®´úÂëʵÏÖ

package itcast.sql
import org.apache.spark.sql.SparkSession
/**
* todo:Ö§³ÖhiveµÄsql²Ù×÷
*/
object HiveSupport {
def main(args: Array[String]): Unit = {
val warehouseLocation = "D:\\workSpace
_IDEA_NEW\\day2017-10-12\\spark-warehouse"
//todo:1¡¢´´½¨sparkSession
val spark: SparkSession = SparkSession.builder()
.appName("HiveSupport")
.master("local[2]")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport() //¿ªÆôÖ§³Öhive
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
//ÉèÖÃÈÕÖ¾Êä³ö¼¶±ð
import spark.implicits._
import spark.sql

//todo:2¡¢²Ù×÷sqlÓï¾ä
sql("CREATE TABLE IF NOT EXISTS person (id int, name string, age int) row format delimited fields
terminated by ' '")
sql("LOAD DATA LOCAL INPATH '/person.txt' INTO TABLE person")
sql("select * from person ").show()
spark.stop()
}
}

ËÄ¡¢Êý¾ÝÔ´

1£®JDBC

Spark SQL¿ÉÒÔͨ¹ýJDBC´Ó¹ØÏµÐÍÊý¾Ý¿âÖжÁÈ¡Êý¾ÝµÄ·½Ê½´´½¨DataFrame£¬Í¨¹ý¶ÔDataFrameһϵÁеļÆËãºó£¬»¹¿ÉÒÔ½«Êý¾ÝÔÙд»Ø¹ØÏµÐÍÊý¾Ý¿âÖС£

1.1£®SparkSql´ÓMySQLÖмÓÔØÊý¾Ý

1.1.1 ͨ¹ýIDEA±àдSparkSql´úÂë

package itcast.sql
import java.util.Properties
import org.apache.spark.sql.{DataFrame,
SparkSession}
/**
* todo:Sparksql´ÓmysqlÖмÓÔØÊý¾Ý
*/
object DataFromMysql {
def main(args: Array[String]): Unit = {
//todo:1¡¢´´½¨sparkSession¶ÔÏó
val spark: SparkSession = SparkSession.builder()
.appName("DataFromMysql")
.master("local[2]")
.getOrCreate()
//todo:2¡¢´´½¨Properties¶ÔÏó£¬ÉèÖÃÁ¬½Ómysql
µÄÓû§ÃûºÍÃÜÂë
val properties: Properties =new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")
//todo:3¡¢¶ÁÈ¡mysqlÖеÄÊý¾Ý
val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql
://192.168.200.150:3306/spark","iplocaltion",properties)
//todo:4¡¢ÏÔʾmysqlÖбíµÄÊý¾Ý
mysqlDF.show()
spark.stop()
}
}

Ö´Ðв鿴Ч¹û£º

1.1.2 ͨ¹ýspark-shellÔËÐÐ

£¨1£©¡¢Æô¶¯spark-shell(±ØÐëÖ¸¶¨mysqlµÄÁ¬½ÓÇý¶¯°ü)

spark-shell \
--master spark://hdp-node-01:7077 \
--executor-memory 1g\
--total-executor-cores 2\
--jars /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar\
--driver-class-path/opt/bigdata/hive/lib/mysql
-connector-java-5.1.35.jar

£¨2£©¡¢´ÓmysqlÖмÓÔØÊý¾Ý

val mysqlDF = spark.read.format("jdbc").options
(Map("url" -> "jdbc:mysql://192.168.200.100:3306/spark", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "iplocaltion", "user" -> "root", "password" -> "123456")).load()

£¨3£©¡¢Ö´Ðвéѯ

1.2£®SparkSql½«Êý¾ÝдÈëµ½MySQLÖÐ

1.2.1 ͨ¹ýIDEA±àдSparkSql´úÂë

£¨1£©±àд´úÂë

package itcast.sql
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset,
SaveMode, SparkSession}
/**
* todo:sparksqlдÈëÊý¾Ýµ½mysqlÖÐ
*/
object SparkSqlToMysql {
def main(args: Array[String]): Unit = {
//todo:1¡¢´´½¨sparkSession¶ÔÏó
val spark: SparkSession = SparkSession.builder()
.appName("SparkSqlToMysql")
.getOrCreate()
//todo:2¡¢¶ÁÈ¡Êý¾Ý
val data: RDD[String] = spark.sparkContext
.textFile(args(0))
//todo:3¡¢ÇзÖÿһÐÐ,
val arrRDD: RDD[Array[String]] = data.map(_.split(" "))
//todo:4¡¢RDD¹ØÁªStudent
val studentRDD: RDD[Student] = arrRDD.map
(x=>Student(x(0).
toInt,x(1),x(2).toInt))
//todo:µ¼ÈëÒþʽת»»
import spark.implicits._
//todo:5¡¢½«RDDת»»³ÉDataFrame
val studentDF: DataFrame = studentRDD.toDF()
//todo:6¡¢½«DataFrame×¢²á³É±í
studentDF.createOrReplaceTempView("student")
//todo:7¡¢²Ù×÷student±í ,°´ÕÕÄêÁä½øÐнµÐòÅÅÁÐ
val resultDF: DataFrame = spark.sql("select *
from student order by age desc")

//todo:8¡¢°Ñ½á¹û±£´æÔÚmysql±íÖÐ
//todo:´´½¨Properties¶ÔÏó£¬ÅäÖÃÁ¬½ÓmysqlµÄ
Óû§ÃûºÍÃÜÂë
val prop =new Properties()
prop.setProperty("user","root")
prop.setProperty("password","123456")
resultDF.write.jdbc("jdbc:mysql://192.168.200.150:3306/
spark","student",prop)
//todo:дÈëmysqlʱ£¬¿ÉÒÔÅäÖòåÈëmode£¬overwrite¸²¸Ç£¬
append×·¼Ó£¬
ignoreºöÂÔ£¬errorĬÈÏ±í´æÔÚ±¨´í
//resultDF.write.mode(SaveMode.Overwrite).jdbc
("jdbc:mysql://192.168.200.150:3306/spark",
"student",prop)
spark.stop()
}
}
//todo:´´½¨ÑùÀýÀàStudent
case class Student(id:Int,name:String,age:Int)

£¨2£©ÓÃmaven½«³ÌÐò´ò°ü

ͨ¹ýIDEA¹¤¾ß´ò°ü¼´¿É

£¨3£©½«Jar°üÌá½»µ½spark¼¯Èº

spark-submit \
--class itcast.sql.SparkSqlToMysql \
--master spark://hdp-node-01:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
--jars/opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar \
--driver-class-path /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar\
/root/original-spark-2.0.2.jar /person.txt

£¨4£©²é¿´mysqlÖбíµÄÊý¾Ý

 

 
   
3056 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]
 
×îÐÂÎÄÕÂ
´óÊý¾Ýƽ̨ϵÄÊý¾ÝÖÎÀí
ÈçºÎÉè¼ÆÊµÊ±Êý¾Ýƽ̨£¨¼¼Êõƪ£©
´óÊý¾Ý×ʲú¹ÜÀí×ÜÌå¿ò¼Ü¸ÅÊö
Kafka¼Ü¹¹ºÍÔ­Àí
ELK¶àÖּܹ¹¼°ÓÅÁÓ
×îпγÌ
´óÊý¾Ýƽ̨´î½¨Óë¸ßÐÔÄܼÆËã
´óÊý¾Ýƽ̨¼Ü¹¹ÓëÓ¦ÓÃʵս
´óÊý¾ÝϵͳÔËά
´óÊý¾Ý·ÖÎöÓë¹ÜÀí
Python¼°Êý¾Ý·ÖÎö
³É¹¦°¸Àý
ijͨÐÅÉ豸ÆóÒµ PythonÊý¾Ý·ÖÎöÓëÍÚ¾ò
Ä³ÒøÐÐ È˹¤ÖÇÄÜ+Python+´óÊý¾Ý
±±¾© Python¼°Êý¾Ý·ÖÎö
ÉñÁúÆû³µ ´óÊý¾Ý¼¼Êõƽ̨-Hadoop
ÖйúµçÐÅ ´óÊý¾Ýʱ´úÓëÏÖ´úÆóÒµµÄÊý¾Ý»¯ÔËӪʵ¼ù