±à¼ÍƼö: |
±¾ÎÄÖ÷Òª¶ÔSpark
SQL½øÐиÅÊö£¬²¢Ïêϸ½éÉÜÈçºÎ±àдSpark SQL³ÌÐòµÄ²Ù×÷£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£
±¾ÎÄÀ´×Ôcsdn£¬ÓÉ»ðÁú¹ûÈí¼þAlice±à¼¡¢ÍƼö¡£ |
|
Ò»¡¢Spark SQL
1£®Spark SQL¸ÅÊö
1.1£®Spark SQLµÄǰÊÀ½ñÉú
SharkÊÇÒ»¸öΪSparkÉè¼ÆµÄ´ó¹æÄ£Êý¾Ý²Ö¿âϵͳ£¬ËüÓëHive¼æÈÝ¡£Shark½¨Á¢ÔÚHiveµÄ´úÂë»ù´¡ÉÏ£¬²¢Í¨¹ý½«HiveµÄ²¿·ÖÎïÀíÖ´Ðмƻ®½»»»³öÀ´¡£Õâ¸ö·½·¨Ê¹µÃSharkµÄÓû§¿ÉÒÔ¼ÓËÙHiveµÄ²éѯ£¬µ«ÊÇShark¼Ì³ÐÁËHiveµÄ´óÇÒ¸´ÔӵĴúÂëʹµÃSharkºÜÄÑÓÅ»¯ºÍά»¤£¬Í¬Ê±SharkÒÀÀµÓÚSparkµÄ°æ±¾¡£Ëæ×ÅÎÒÃÇÓöµ½ÁËÐÔÄÜÓÅ»¯µÄÉÏÏÞ£¬ÒÔ¼°¼¯³ÉSQLµÄһЩ¸´ÔӵķÖÎö¹¦ÄÜ£¬ÎÒÃÇ·¢ÏÖHiveµÄMapReduceÉè¼ÆµÄ¿ò¼ÜÏÞÖÆÁËSharkµÄ·¢Õ¹¡£ÔÚ2014Äê7ÔÂ1ÈÕµÄSparkSummitÉÏ£¬DatabricksÐû²¼ÖÕÖ¹¶ÔSharkµÄ¿ª·¢£¬½«Öصã·Åµ½SparkSQLÉÏ¡£
1.2£®Ê²Ã´ÊÇSpark SQL
Spark SQLÊÇSparkÓÃÀ´´¦Àí½á¹¹»¯Êý¾ÝµÄÒ»¸öÄ£¿é£¬ËüÌṩÁËÒ»¸ö±à³Ì³éÏó½Ð×öDataFrame²¢ÇÒ×÷Ϊ·Ö²¼Ê½SQL²éѯÒýÇæµÄ×÷Óá£
Ïà±ÈÓÚSpark RDD API£¬Spark SQL°üº¬Á˶Խṹ»¯Êý¾ÝºÍÔÚÆäÉÏÔËËãµÄ¸ü¶àÐÅÏ¢£¬Spark
SQLʹÓÃÕâЩÐÅÏ¢½øÐÐÁ˶îÍâµÄÓÅ»¯£¬Ê¹¶Ô½á¹¹»¯Êý¾ÝµÄ²Ù×÷¸ü¼Ó¸ßЧºÍ·½±ã¡£
ÓжàÖÖ·½Ê½È¥Ê¹ÓÃSpark SQL£¬°üÀ¨SQL¡¢DataFrames APIºÍDatasets API¡£µ«ÎÞÂÛÊÇÄÄÖÖAPI»òÕßÊDZà³ÌÓïÑÔ£¬ËüÃǶ¼ÊÇ»ùÓÚͬÑùµÄÖ´ÐÐÒýÇæ£¬Òò´ËÄã¿ÉÒÔÔÚ²»Í¬µÄAPIÖ®¼äËæÒâÇл»£¬ËüÃǸ÷Óи÷µÄÌØµã£¬¿´Äãϲ»¶ÄÇÖÖ·ç¸ñ¡£
1.3£®ÎªÊ²Ã´ÒªÑ§Ï°Spark SQL
ÎÒÃÇÒѾѧϰÁËHive£¬ËüÊǽ«Hive SQLת»»³ÉMapReduceÈ»ºóÌá½»µ½¼¯ÈºÖÐÈ¥Ö´ÐУ¬´ó´ó¼ò»¯Á˱àдMapReduce³ÌÐòµÄ¸´ÔÓÐÔ£¬ÓÉÓÚMapReduceÕâÖÖ¼ÆËãÄ£ÐÍÖ´ÐÐЧÂʱȽÏÂý£¬ËùÒÔSpark
SQLÓ¦Ô˶øÉú£¬ËüÊǽ«Spark SQLת»»³ÉRDD£¬È»ºóÌá½»µ½¼¯ÈºÖÐÈ¥ÔËÐУ¬Ö´ÐÐЧÂʷdz£¿ì£¡
1.Ò×ÕûºÏ

½«sql²éѯÓëspark³ÌÐòÎÞ·ì»ìºÏ£¬¿ÉÒÔʹÓÃjava¡¢scala¡¢python¡¢RµÈÓïÑÔµÄAPI²Ù×÷¡£
2.ͳһµÄÊý¾Ý·ÃÎÊ 
ÒÔÏàͬµÄ·½Ê½Á¬½Óµ½ÈκÎÊý¾ÝÔ´¡£
3.¼æÈÝHive

Ö§³ÖhiveSQLµÄÓï·¨¡£
4.±ê×¼µÄÊý¾ÝÁ¬½Ó

¿ÉÒÔʹÓÃÐÐÒµ±ê×¼µÄJDBC»òODBCÁ¬½Ó¡£
2£®DataFrame
2.1£®Ê²Ã´ÊÇDataFrame
DataFrameµÄǰÉíÊÇSchemaRDD£¬´ÓSpark 1.3.0¿ªÊ¼SchemaRDD¸üÃûΪDataFrame¡£ÓëSchemaRDDµÄÖ÷񻂿±ðÊÇ£ºDataFrame²»ÔÙÖ±½Ó¼Ì³Ð×ÔRDD£¬¶øÊÇ×Ô¼ºÊµÏÖÁËRDDµÄ¾ø´ó¶àÊý¹¦ÄÜ¡£ÄãÈԾɿÉÒÔÔÚDataFrameÉϵ÷ÓÃrdd·½·¨½«Æäת»»ÎªÒ»¸öRDD¡£
ÔÚSparkÖУ¬DataFrameÊÇÒ»ÖÖÒÔRDDΪ»ù´¡µÄ·Ö²¼Ê½Êý¾Ý¼¯£¬ÀàËÆÓÚ´«Í³Êý¾Ý¿âµÄ¶þά±í¸ñ£¬DataFrame´øÓÐSchemaÔªÐÅÏ¢£¬¼´DataFrameËù±íʾµÄ¶þά±íÊý¾Ý¼¯µÄÿһÁж¼´øÓÐÃû³ÆºÍÀàÐÍ£¬µ«µ×²ã×öÁ˸ü¶àµÄÓÅ»¯¡£DataFrame¿ÉÒԴӺܶàÊý¾ÝÔ´¹¹½¨£¬±ÈÈ磺ÒѾ´æÔÚµÄRDD¡¢½á¹¹»¯Îļþ¡¢ÍⲿÊý¾Ý¿â¡¢Hive±í¡£
2.2£®DataFrameÓëRDDµÄÇø±ð
RDD¿É¿´×÷ÊÇ·Ö²¼Ê½µÄ¶ÔÏóµÄ¼¯ºÏ£¬Spark²¢²»ÖªµÀ¶ÔÏóµÄÏêϸģʽÐÅÏ¢£¬DataFrame¿É¿´×÷ÊÇ·Ö²¼Ê½µÄRow¶ÔÏóµÄ¼¯ºÏ£¬ÆäÌṩÁËÓÉÁÐ×é³ÉµÄÏêϸģʽÐÅÏ¢£¬Ê¹µÃSpark
SQL¿ÉÒÔ½øÐÐijЩÐÎʽµÄÖ´ÐÐÓÅ»¯¡£DataFrameºÍÆÕͨµÄRDDµÄÂß¼¿ò¼ÜÇø±ðÈçÏÂËùʾ£º

ÉÏͼֱ¹ÛµØÌåÏÖÁËDataFrameºÍRDDµÄÇø±ð¡£
×ó²àµÄRDD[Person]ËäÈ»ÒÔPersonΪÀàÐͲÎÊý£¬µ«Spark¿ò¼Ü±¾Éí²»Á˽â PersonÀàµÄÄÚ²¿½á¹¹¡£
¶øÓÒ²àµÄDataFrameÈ´ÌṩÁËÏêϸµÄ½á¹¹ÐÅÏ¢£¬Ê¹µÃSpark SQL¿ÉÒÔÇå³þµØÖªµÀ¸ÃÊý¾Ý¼¯Öаüº¬ÄÄЩÁУ¬Ã¿ÁеÄÃû³ÆºÍÀàÐ͸÷ÊÇʲô¡£DataFrame¶àÁËÊý¾ÝµÄ½á¹¹ÐÅÏ¢£¬¼´schema¡£ÕâÑù¿´ÆðÀ´¾ÍÏñÒ»ÕűíÁË£¬DataFrame»¹ÅäÌ×ÁËеIJÙ×÷Êý¾ÝµÄ·½·¨£¬DataFrame
API£¨Èçdf.select())ºÍSQL(select id, name from xx_table
where ...)¡£
´ËÍâDataFrame»¹ÒýÈëÁËoff-heap,Òâζ×ÅJVM¶ÑÒÔÍâµÄÄÚ´æ, ÕâЩÄÚ´æÖ±½ÓÊܲÙ×÷ϵͳ¹ÜÀí£¨¶ø²»ÊÇJVM£©¡£SparkÄܹ»ÒÔ¶þ½øÖƵÄÐÎʽÐòÁл¯Êý¾Ý(²»°üÀ¨½á¹¹)µ½off-heapÖÐ,
µ±Òª²Ù×÷Êý¾Ýʱ, ¾ÍÖ±½Ó²Ù×÷off-heapÄÚ´æ. ÓÉÓÚSparkÀí½âschema, ËùÒÔÖªµÀ¸ÃÈçºÎ²Ù×÷¡£
RDDÊÇ·Ö²¼Ê½µÄJava¶ÔÏóµÄ¼¯ºÏ¡£DataFrameÊÇ·Ö²¼Ê½µÄRow¶ÔÏóµÄ¼¯ºÏ¡£DataFrame³ýÁËÌṩÁ˱ÈRDD¸ü·á¸»µÄËã×ÓÒÔÍ⣬¸üÖØÒªµÄÌØµãÊÇÌáÉýÖ´ÐÐЧÂÊ¡¢¼õÉÙÊý¾Ý¶ÁÈ¡ÒÔ¼°Ö´Ðмƻ®µÄÓÅ»¯¡£
ÓÐÁËDataFrameÕâ¸ö¸ßÒ»²ãµÄ³éÏóºó£¬ÎÒÃÇ´¦ÀíÊý¾Ý¸ü¼Ó¼òµ¥ÁË£¬ÉõÖÁ¿ÉÒÔÓÃSQLÀ´´¦ÀíÊý¾ÝÁË£¬¶Ô¿ª·¢ÕßÀ´Ëµ£¬Ò×ÓÃÐÔÓÐÁ˺ܴóµÄÌáÉý¡£
²»½öÈç´Ë£¬Í¨¹ýDataFrame API»òSQL´¦ÀíÊý¾Ý£¬»á×Ô¶¯¾¹ýSpark
ÓÅ»¯Æ÷£¨Catalyst£©µÄÓÅ»¯£¬¼´Ê¹ÄãдµÄ³ÌÐò»òSQL²»¸ßЧ£¬Ò²¿ÉÒÔÔËÐеĺܿ졣
2.3£®DataFrameÓëRDDµÄÓÅȱµã
RDDµÄÓÅȱµã£º
Óŵã:
£¨1£©±àÒëʱÀàÐͰ²È«
±àÒëʱ¾ÍÄܼì²é³öÀàÐÍ´íÎó
£¨2£©ÃæÏò¶ÔÏóµÄ±à³Ì·ç¸ñ
Ö±½Óͨ¹ý¶ÔÏóµ÷Ó÷½·¨µÄÐÎʽÀ´²Ù×÷Êý¾Ý
ȱµã:
£¨1£©ÐòÁл¯ºÍ·´ÐòÁл¯µÄÐÔÄÜ¿ªÏú
ÎÞÂÛÊǼ¯Èº¼äµÄͨÐÅ, »¹ÊÇIO²Ù×÷¶¼ÐèÒª¶Ô¶ÔÏóµÄ½á¹¹ºÍÊý¾Ý½øÐÐÐòÁл¯ºÍ·´ÐòÁл¯¡£
£¨2£©GCµÄÐÔÄÜ¿ªÏú
Ƶ·±µÄ´´½¨ºÍÏú»Ù¶ÔÏó, ÊÆ±Ø»áÔö¼ÓGC
DataFrameͨ¹ýÒýÈëschemaºÍoff-heap£¨²»ÔÚ¶ÑÀïÃæµÄÄڴ棬ָµÄÊdzýÁ˲»ÔڶѵÄÄڴ棬ʹÓòÙ×÷ϵͳÉϵÄÄڴ棩£¬½â¾öÁËRDDµÄȱµã,
Sparkͨ¹ýschame¾ÍÄܹ»¶Á¶®Êý¾Ý, Òò´ËÔÚͨÐźÍIOʱ¾ÍÖ»ÐèÒªÐòÁл¯ºÍ·´ÐòÁл¯Êý¾Ý, ¶ø½á¹¹µÄ²¿·Ö¾Í¿ÉÒÔÊ¡ÂÔÁË£»Í¨¹ýoff-heapÒýÈ룬¿ÉÒÔ¿ìËٵIJÙ×÷Êý¾Ý£¬±ÜÃâ´óÁ¿µÄGC¡£µ«ÊÇÈ´¶ªÁËRDDµÄÓŵ㣬DataFrame²»ÊÇÀàÐͰ²È«µÄ,
APIÒ²²»ÊÇÃæÏò¶ÔÏó·ç¸ñµÄ¡£
2.4£®¶ÁÈ¡Êý¾ÝÔ´´´½¨DataFrame
2.4.1 ¶ÁÈ¡Îı¾Îļþ´´½¨DataFrame
ÔÚspark2.0°æ±¾Ö®Ç°£¬Spark SQLÖÐSQLContextÊÇ´´½¨DataFrameºÍÖ´ÐÐSQLµÄÈë¿Ú£¬ÀûÓÃhiveContextͨ¹ýhive
sqlÓï¾ä²Ù×÷hive±íÊý¾Ý£¬¼æÈÝhive²Ù×÷£¬²¢ÇÒhiveContext¼Ì³Ð×ÔSQLContext¡£ÔÚspark2.0Ö®ºó£¬ÕâЩ¶¼Í³Ò»ÓÚSparkSession£¬SparkSession
·â×°ÁË SparkContext£¬SqlContext£¬Í¨¹ýSparkSession¿ÉÒÔ»ñÈ¡µ½ SparkConetxt,SqlContext
¶ÔÏó¡£

£¨1£©ÔÚ±¾µØ´´½¨Ò»¸öÎļþ£¬ÓÐÈýÁУ¬·Ö±ðÊÇid¡¢name¡¢age£¬Óÿոñ·Ö¸ô£¬È»ºóÉÏ´«µ½hdfsÉÏ¡£person.txtÄÚÈÝΪ£º
1 zhangsan
20
2 lisi 29 3 wangwu 25 4 zhaoliu 30 5 tianqi 35 6 kobe 40 |
ÉÏ´«Êý¾ÝÎļþµ½HDFSÉÏ£º
hdfs dfs -put person.txt /
£¨2£©ÔÚspark shellÖ´ÐÐÏÂÃæÃüÁ¶ÁÈ¡Êý¾Ý£¬½«Ã¿Ò»ÐеÄÊý¾ÝʹÓÃÁзָô·û·Ö¸î
ÏÈÖ´ÐÐ spark-shell --master local[2]
val lineRDD= sc.textFile("/person.txt").map(_.split("
"))

£¨3£©¶¨Òåcase class£¨Ï൱ÓÚ±íµÄschema£©
case class Person(id:Int, name:String, age:Int)

£¨4£©½«RDDºÍcase class¹ØÁª
val personRDD = lineRDD.map(x =>
Person( x(0).toInt, x(1) , x(2).toInt ))

£¨5£©½«RDDת»»³ÉDataFrame
val personDF = personRDD.toDF

£¨6£©¶ÔDataFrame½øÐд¦Àí
personDF.show

personDF.printSchema

£¨7£©¡¢Í¨¹ýSparkSession¹¹½¨DataFrame
ʹÓÃspark-shellÖÐÒѾ³õʼ»¯ºÃµÄSparkSession¶ÔÏósparkÉú³ÉDataFrame
val dataFrame=spark.read.text("/person.txt")

2.4.2 ¶ÁÈ¡jsonÎļþ´´½¨DataFrame
£¨1£©Êý¾ÝÎļþ
ʹÓÃspark°²×°°üϵÄ
/opt/bigdata/spark/examples/src/main/resources
/ people.json Îļþ
£¨2£©ÔÚspark shellÖ´ÐÐÏÂÃæÃüÁ¶ÁÈ¡Êý¾Ý

£¨3£©½ÓÏÂÀ´¾Í¿ÉÒÔʹÓÃDataFrameµÄº¯Êý²Ù×÷


2.4.3 ¶ÁÈ¡parquetÁÐʽ´æ´¢¸ñʽÎļþ´´½¨DataFrame
£¨3£©Êý¾ÝÎļþ
ʹÓÃspark°²×°°üϵÄ
/opt/bigdata/spark/examples/src/main/resources
/ users.parquet Îļþ
£¨2£©ÔÚspark shellÖ´ÐÐÏÂÃæÃüÁ¶ÁÈ¡Êý¾Ý

£¨3£©½ÓÏÂÀ´¾Í¿ÉÒÔʹÓÃDataFrameµÄº¯Êý²Ù×÷


3.DataFrame³£ÓòÙ×÷
3.1. DSL·ç¸ñÓï·¨
DataFrameÌṩÁËÒ»¸öÁìÓòÌØ¶¨ÓïÑÔ(DSL)ÒÔ·½±ã²Ù×÷½á¹¹»¯Êý¾Ý¡£ÏÂÃæÊÇһЩʹÓÃʾÀý
£¨1£©²é¿´DataFrameÖеÄÄÚÈÝ£¬Í¨¹ýµ÷ÓÃshow·½·¨
personDF.show

£¨2£©²é¿´DataFrame²¿·ÖÁÐÖеÄÄÚÈÝ
²é¿´name×ֶεÄÊý¾Ý
personDF.select(personDF.col("name")).show

²é¿´name×ֶεÄÁíÒ»ÖÖд·¨

²é¿´ name ºÍage×Ö¶ÎÊý¾ÝpersonDF.select(col("name")
, col("age")) .show

£¨3£©´òÓ¡DataFrameµÄSchemaÐÅÏ¢
personDF.printSchema

£¨4£©²éѯËùÓеÄnameºÍage£¬²¢½«age+1
personDF.select(col("id"),
col("name"), col ("age") + 1)
. show

Ò²¿ÉÒÔÕâÑù£º
personDF.select(personDF("id"),
personDF("name") , personDF("age")
+ 1) . show

£¨5£©¹ýÂËage´óÓÚµÈÓÚ25µÄ£¬Ê¹ÓÃfilter·½·¨¹ýÂË
personDF.filter(col("age")
>= 25).show

£¨6£©Í³¼ÆÄêÁä´óÓÚ30µÄÈËÊý
personDF.filter(col("age")>30).count()

£¨7£©°´ÄêÁä½øÐзÖ×鲢ͳ¼ÆÏàͬÄêÁäµÄÈËÊý
personDF.groupBy("age").count().show

3.2. SQL·ç¸ñÓï·¨
¡¡DataFrameµÄÒ»¸öÇ¿´óÖ®´¦¾ÍÊÇÎÒÃÇ¿ÉÒÔ½«Ëü¿´×÷ÊÇÒ»¸ö¹ØÏµÐÍÊý¾Ý±í£¬È»ºó¿ÉÒÔͨ¹ýÔÚ³ÌÐòÖÐʹÓÃspark.sql()À´Ö´ÐÐSQL²éѯ£¬½á¹û½«×÷Ϊһ¸öDataFrame·µ»Ø¡£
Èç¹ûÏëʹÓÃSQL·ç¸ñµÄÓï·¨£¬ÐèÒª½«DataFrame×¢²á³É±í,²ÉÓÃÈçϵķ½Ê½£º
personDF.registerTempTable("t_person")
£¨1£©²éѯÄêÁä×î´óµÄǰÁ½Ãû
spark.sql("select * from t_person
order by age desc limit 2" ). show

£¨2£©ÏÔʾ±íµÄSchemaÐÅÏ¢
spark.sql("desc t_person").show

£¨3£©²éѯÄêÁä´óÓÚ30µÄÈ˵ÄÐÅÏ¢
spark.sql("select * from t_personwhere
age > 30 ") .show

4.DataSet
4.1. ʲôÊÇDataSet
DataSetÊÇ·Ö²¼Ê½µÄÊý¾Ý¼¯ºÏ¡£DataSetÊÇÔÚSpark1.6ÖÐÌí¼ÓµÄеĽӿڡ£Ëü¼¯ÖÐÁËRDDµÄÓŵ㣨ǿÀàÐͺͿÉÒÔÓÃÇ¿´ólambdaº¯Êý£©ÒÔ¼°Spark
SQLÓÅ»¯µÄÖ´ÐÐÒýÇæ¡£DataSet¿ÉÒÔͨ¹ýJVMµÄ¶ÔÏó½øÐй¹½¨£¬¿ÉÒÔÓú¯ÊýʽµÄת»»£¨ map/flatmap/filter
£©½øÐжàÖÖ²Ù×÷¡£
4.2. DataFrame¡¢DataSet¡¢RDDµÄÇø±ð
¼ÙÉèRDDÖеÄÁ½ÐÐÊý¾Ý³¤ÕâÑù£º

ÄÇôDataFrameÖеÄÊý¾Ý³¤ÕâÑù:

ÄÇôDatasetÖеÄÊý¾Ý³¤ÕâÑù:

»òÕß³¤ÕâÑù£¨Ã¿ÐÐÊý¾ÝÊǸöObject£©:

DataSet°üº¬ÁËDataFrameµÄ¹¦ÄÜ£¬Spark2.0ÖÐÁ½Õßͳһ£¬DataFrame±íʾΪDataSet[Row]£¬¼´DataSetµÄ×Ó¼¯¡£
£¨1£©DataSet¿ÉÒÔÔÚ±àÒëʱ¼ì²éÀàÐÍ
£¨2£©²¢ÇÒÊÇÃæÏò¶ÔÏóµÄ±à³Ì½Ó¿Ú
Ïà±ÈDataFrame£¬DatasetÌṩÁ˱àÒëʱÀàÐͼì²é£¬¶ÔÓÚ·Ö²¼Ê½³ÌÐòÀ´½²£¬Ìá½»Ò»´Î×÷ҵ̫·Ñ¾¢ÁË£¨Òª±àÒë¡¢´ò°ü¡¢ÉÏ´«¡¢ÔËÐУ©£¬µ½Ìá½»µ½¼¯ÈºÔËÐÐʱ²Å·¢ÏÖ´íÎó£¬Õâ»áÀË·Ñ´óÁ¿µÄʱ¼ä£¬ÕâÒ²ÊÇÒýÈëDatasetµÄÒ»¸öÖØÒªÔÒò¡£
4.3. DataFrameÓëDataSetµÄ»¥×ª
DataFrameºÍDataSet¿ÉÒÔÏ໥ת»¯¡£
£¨1£©DataFrameתΪ DataSet
df.as[ElementType]ÕâÑù¿ÉÒÔ°ÑDataFrameת»¯ÎªDataSet¡£
£¨2£©DataSetתΪDataFrame
ds.toDF()ÕâÑù¿ÉÒÔ°ÑDataSetת»¯ÎªDataFrame¡£
4.4. ´´½¨DataSet
£¨1£©Í¨¹ýspark.createDataset´´½¨

£¨2£©Í¨toDS·½·¨Éú³ÉDataSet

£¨3£©Í¨¹ýDataFrameת»¯Éú³É
ʹÓÃas[]ת»»ÎªDataSet

Èý¡¢ÒÔ±à³Ì·½Ê½Ö´ÐÐSpark SQL²éѯ
1£®±àдSpark SQL³ÌÐòʵÏÖRDDת»»DataFrame
Ç°ÃæÎÒÃÇѧϰÁËÈçºÎÔÚSpark ShellÖÐʹÓÃSQLÍê³É²éѯ£¬ÏÖÔÚÎÒÃÇÀ´ÊµÏÖÔÚ×Ô¶¨ÒåµÄ³ÌÐòÖбàдSpark
SQL²éѯ³ÌÐò¡£
ÔÚSpark SQLÖÐÓÐÁ½ÖÖ·½Ê½¿ÉÒÔÔÚDataFrameºÍRDD½øÐÐת»»£¬µÚÒ»ÖÖ·½·¨ÊÇÀûÓ÷´Éä»úÖÆ£¬ÍƵ¼°üº¬Ä³ÖÖÀàÐ͵ÄRDD£¬Í¨¹ý·´É佫Æäת»»ÎªÖ¸¶¨ÀàÐ͵ÄDataFrame£¬ÊÊÓÃÓÚÌáǰ֪µÀRDDµÄschema¡£
µÚ¶þÖÖ·½·¨Í¨¹ý±à³Ì½Ó¿ÚÓëRDD½øÐн»»¥»ñÈ¡schema£¬²¢¶¯Ì¬´´½¨DataFrame£¬ÔÚÔËÐÐʱ¾ö¶¨Áм°ÆäÀàÐÍ¡£
Ê×ÏÈÔÚmavenÏîÄ¿µÄpom.xmlÖÐÌí¼ÓSpark SQLµÄÒÀÀµ
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.2</version>
</dependency> |
1.1£®Í¨¹ý·´ÉäÍÆ¶ÏSchema
ScalaÖ§³ÖʹÓÃcase classÀàÐ͵¼ÈëRDDת»»ÎªDataFrame£¬Í¨¹ýcase class´´½¨schema£¬case
classµÄ²ÎÊýÃû³Æ»á±»·´Éä¶ÁÈ¡²¢³ÉΪ±íµÄÁÐÃû¡£ÕâÖÖRDD¿ÉÒÔ¸ßЧµÄת»»ÎªDataFrame²¢×¢²áΪ±í¡£
´úÂëÈçÏ£º
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* RDDת»¯³ÉDataFrame:ÀûÓ÷´Éä»úÖÆ
*/
//todo:¶¨ÒåÒ»¸öÑùÀýÀàPerson
case class Person(id:Int,name:String,age:Int)
extends Serializable
object InferringSchema {
def main(args: Array[String]): Unit = {
//todo£º1¡¢¹¹½¨sparkSession Ö¸¶¨ appNameºÍmasterµÄµØÖ·
val spark: SparkSession = SparkSession.builder()
.appName("InferringSchema")
.master("local[2]").getOrCreate()
//todo:2¡¢´ÓsparkSession»ñÈ¡sparkContext¶ÔÏó
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")//ÉèÖÃÈÕÖ¾Êä³ö¼¶±ð
//todo:3¡¢¼ÓÔØÊý¾Ý
val dataRDD: RDD[String] = sc.textFile ("D:\\person.txt")
//todo:4¡¢ÇзÖÿһÐмǼ
val lineArrayRDD: RDD[Array[String]] =
dataRDD.map(_.split(" "))
//todo:5¡¢½«RDDÓëPersonÀà¹ØÁª
val personRDD: RDD[Person] = lineArrayRDD.map(x=>Person(x(0).toInt, x(1),x(2).toInt))
//todo:6¡¢´´½¨dataFrame,ÐèÒªµ¼ÈëÒþʽת»»
import spark.implicits._
val personDF: DataFrame = personRDD.toDF()
//todo -------------------DSLÓï·¨²Ù×÷ start--------------
//1¡¢ÏÔʾDataFrameµÄÊý¾Ý£¬Ä¬ÈÏÏÔʾ20ÐÐ
personDF.show()
//2¡¢ÏÔʾDataFrameµÄschemaÐÅÏ¢
personDF.printSchema()
//3¡¢ÏÔʾDataFrame¼Ç¼Êý
println(personDF.count())
//4¡¢ÏÔʾDataFrameµÄËùÓÐ×Ö¶Î
personDF.columns.foreach(println)
//5¡¢È¡³öDataFrameµÄµÚÒ»ÐмǼ
println(personDF.head())
//6¡¢ÏÔʾDataFrameÖÐname×ֶεÄËùÓÐÖµ
personDF.select("name").show()
//7¡¢¹ýÂ˳öDataFrameÖÐÄêÁä´óÓÚ30µÄ¼Ç¼
personDF.filter($"age" > 30).show()
//8¡¢Í³¼ÆDataFrameÖÐÄêÁä´óÓÚ30µÄÈËÊý
println(personDF.filter($"age">30).count())
//9¡¢Í³¼ÆDataFrameÖа´ÕÕÄêÁä½øÐзÖ×飬 Çóÿ¸ö×éµÄÈËÊý
personDF.groupBy("age").count().show()
//todo -------------------DSLÓï·¨²Ù×÷ end-------------
//todo --------------------SQL²Ù×÷·ç¸ñ start-----------
//todo:½«DataFrame×¢²á³É±í
personDF.createOrReplaceTempView("t_person")
//todo:´«ÈësqlÓï¾ä£¬½øÐвÙ×÷
spark.sql("select * from t_person").show()
spark.sql("select * from t_person where
name='zhangsan'").show()
spark.sql("select * from t_person order
by age desc") .show()
//todo
--------------------SQL²Ù×÷·ç¸ñ end-------------
sc.stop()
}
}
|
1.2£®Í¨¹ýStructTypeÖ±½ÓÖ¸¶¨Schema
µ±case class²»ÄÜÌáǰ¶¨ÒåºÃʱ£¬¿ÉÒÔͨ¹ýÒÔÏÂÈý²½Í¨¹ý´úÂë´´½¨DataFrame
£¨1£©½«RDDתΪ°üº¬row¶ÔÏóµÄRDD
£¨2£©»ùÓÚstructTypeÀàÐÍ´´½¨schema£¬ÓëµÚÒ»²½´´½¨µÄRDDÏàÆ¥Åä
£¨3£©Í¨¹ýsparkSessionµÄcreateDataFrame·½·¨¶ÔµÚÒ»²½µÄRDDÓ¦ÓÃ
schema´´½¨DataFrame
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types. {IntegerType,
StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row,
SparkSession}
/**
* RDDת»»³ÉDataFrame:ͨ¹ýÖ¸¶¨schema¹¹½¨DataFrame
*/
object SparkSqlSchema {
def main(args: Array[String]): Unit = {
//todo:1¡¢´´½¨SparkSession,Ö¸¶¨appNameºÍmaster
val spark: SparkSession = SparkSession.builder()
.appName("SparkSqlSchema")
.master("local[2]")
.getOrCreate()
//todo:2¡¢»ñÈ¡sparkContext¶ÔÏó
val sc: SparkContext = spark.sparkContext
//todo:3¡¢¼ÓÔØÊý¾Ý
val dataRDD: RDD[String] = sc.textFile ("d:\\person.txt")
//todo:4¡¢ÇзÖÿһÐÐ
val dataArrayRDD: RDD[Array[String]] = dataRDD.map (_.split("
"))
//todo:5¡¢¼ÓÔØÊý¾Ýµ½Row¶ÔÏóÖÐ
val personRDD: RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
//todo:6¡¢´´½¨schema
val schema:StructType= StructType(Seq(
StructField("id", IntegerType, false),
StructField("name", StringType, false),
StructField("age", IntegerType, false)
))
//todo:7¡¢ÀûÓÃpersonRDDÓëschema´´½¨DataFrame
val personDF: DataFrame = spark.createDataFrame (personRDD,schema)
//todo:8¡¢DSL²Ù×÷ÏÔʾDataFrameµÄÊý¾Ý½á¹û
personDF.show()
//todo:9¡¢½«DataFrame×¢²á³É±í
personDF.createOrReplaceTempView("t_person")
//todo:10¡¢sqlÓï¾ä²Ù×÷
spark.sql("select * from t_person").show()
spark.sql("select count(*) from t_person").show()
sc.stop()
}
} |
2£®±àдSpark SQL³ÌÐò²Ù×÷HiveContext
HiveContextÊǶÔÓ¦spark-hiveÕâ¸öÏîÄ¿,ÓëhiveÓв¿·ÖñîºÏ, Ö§³Öhql,ÊÇSqlContextµÄ×ÓÀà,Ò²¾ÍÊÇ˵¼æÈÝSqlContext;
2.1£®Ìí¼ÓpomÒÀÀµ
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.0.2</version>
</dependency>
|
2.2£®´úÂëʵÏÖ
package itcast.sql
import org.apache.spark.sql.SparkSession
/**
* todo:Ö§³ÖhiveµÄsql²Ù×÷
*/
object HiveSupport {
def main(args: Array[String]): Unit = {
val warehouseLocation = "D:\\workSpace _IDEA_NEW\\day2017-10-12\\spark-warehouse"
//todo:1¡¢´´½¨sparkSession
val spark: SparkSession = SparkSession.builder()
.appName("HiveSupport")
.master("local[2]")
.config("spark.sql.warehouse.dir",
warehouseLocation)
.enableHiveSupport() //¿ªÆôÖ§³Öhive
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
//ÉèÖÃÈÕÖ¾Êä³ö¼¶±ð
import spark.implicits._
import spark.sql
//todo:2¡¢²Ù×÷sqlÓï¾ä
sql("CREATE TABLE IF NOT EXISTS person
(id int, name string, age int) row format delimited
fields terminated by ' '")
sql("LOAD DATA LOCAL INPATH '/person.txt'
INTO TABLE person")
sql("select * from person ").show()
spark.stop()
}
} |
ËÄ¡¢Êý¾ÝÔ´
1£®JDBC
Spark SQL¿ÉÒÔͨ¹ýJDBC´Ó¹ØÏµÐÍÊý¾Ý¿âÖжÁÈ¡Êý¾ÝµÄ·½Ê½´´½¨DataFrame£¬Í¨¹ý¶ÔDataFrameһϵÁеļÆËãºó£¬»¹¿ÉÒÔ½«Êý¾ÝÔÙд»Ø¹ØÏµÐÍÊý¾Ý¿âÖС£
1.1£®SparkSql´ÓMySQLÖмÓÔØÊý¾Ý
1.1.1 ͨ¹ýIDEA±àдSparkSql´úÂë
package itcast.sql
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* todo:Sparksql´ÓmysqlÖмÓÔØÊý¾Ý
*/
object DataFromMysql {
def main(args: Array[String]): Unit = {
//todo:1¡¢´´½¨sparkSession¶ÔÏó
val spark: SparkSession = SparkSession.builder()
.appName("DataFromMysql")
.master("local[2]")
.getOrCreate()
//todo:2¡¢´´½¨Properties¶ÔÏó£¬ÉèÖÃÁ¬½Ómysql µÄÓû§ÃûºÍÃÜÂë
val properties: Properties =new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")
//todo:3¡¢¶ÁÈ¡mysqlÖеÄÊý¾Ý
val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql ://192.168.200.150:3306/spark","iplocaltion",properties)
//todo:4¡¢ÏÔʾmysqlÖбíµÄÊý¾Ý
mysqlDF.show()
spark.stop()
}
}
|
Ö´Ðв鿴Ч¹û£º

1.1.2 ͨ¹ýspark-shellÔËÐÐ
£¨1£©¡¢Æô¶¯spark-shell(±ØÐëÖ¸¶¨mysqlµÄÁ¬½ÓÇý¶¯°ü)
spark-shell
\
--master spark://hdp-node-01:7077 \
--executor-memory 1g\
--total-executor-cores 2\
--jars /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar\
--driver-class-path/opt/bigdata/hive/lib/mysql -connector-java-5.1.35.jar
|
£¨2£©¡¢´ÓmysqlÖмÓÔØÊý¾Ý
val mysqlDF
= spark.read.format("jdbc").options (Map("url"
-> "jdbc:mysql://192.168.200.100:3306/spark",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "iplocaltion",
"user" -> "root", "password"
-> "123456")).load() |
£¨3£©¡¢Ö´Ðвéѯ

1.2£®SparkSql½«Êý¾ÝдÈëµ½MySQLÖÐ
1.2.1 ͨ¹ýIDEA±àдSparkSql´úÂë
£¨1£©±àд´úÂë
package itcast.sql
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset,
SaveMode, SparkSession}
/**
* todo:sparksqlдÈëÊý¾Ýµ½mysqlÖÐ
*/
object SparkSqlToMysql {
def main(args: Array[String]): Unit = {
//todo:1¡¢´´½¨sparkSession¶ÔÏó
val spark: SparkSession = SparkSession.builder()
.appName("SparkSqlToMysql")
.getOrCreate()
//todo:2¡¢¶ÁÈ¡Êý¾Ý
val data: RDD[String] = spark.sparkContext .textFile(args(0))
//todo:3¡¢ÇзÖÿһÐÐ,
val arrRDD: RDD[Array[String]] = data.map(_.split("
"))
//todo:4¡¢RDD¹ØÁªStudent
val studentRDD: RDD[Student] = arrRDD.map (x=>Student(x(0). toInt,x(1),x(2).toInt))
//todo:µ¼ÈëÒþʽת»»
import spark.implicits._
//todo:5¡¢½«RDDת»»³ÉDataFrame
val studentDF: DataFrame = studentRDD.toDF()
//todo:6¡¢½«DataFrame×¢²á³É±í
studentDF.createOrReplaceTempView("student")
//todo:7¡¢²Ù×÷student±í ,°´ÕÕÄêÁä½øÐнµÐòÅÅÁÐ
val resultDF: DataFrame = spark.sql("select
* from student order by age desc")
//todo:8¡¢°Ñ½á¹û±£´æÔÚmysql±íÖÐ
//todo:´´½¨Properties¶ÔÏó£¬ÅäÖÃÁ¬½ÓmysqlµÄ Óû§ÃûºÍÃÜÂë
val prop =new Properties()
prop.setProperty("user","root")
prop.setProperty("password","123456")
resultDF.write.jdbc("jdbc:mysql://192.168.200.150:3306/ spark","student",prop)
//todo:дÈëmysqlʱ£¬¿ÉÒÔÅäÖòåÈëmode£¬overwrite¸²¸Ç£¬ append×·¼Ó£¬ ignoreºöÂÔ£¬errorĬÈÏ±í´æÔÚ±¨´í
//resultDF.write.mode(SaveMode.Overwrite).jdbc
("jdbc:mysql://192.168.200.150:3306/spark", "student",prop)
spark.stop()
}
}
//todo:´´½¨ÑùÀýÀàStudent
case class Student(id:Int,name:String,age:Int)
|
£¨2£©ÓÃmaven½«³ÌÐò´ò°ü
ͨ¹ýIDEA¹¤¾ß´ò°ü¼´¿É
£¨3£©½«Jar°üÌá½»µ½spark¼¯Èº
spark-submit
\
--class itcast.sql.SparkSqlToMysql \
--master spark://hdp-node-01:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
--jars/opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar
\
--driver-class-path /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar\
/root/original-spark-2.0.2.jar /person.txt
|

£¨4£©²é¿´mysqlÖбíµÄÊý¾Ý

|