DataFrame APIµÄÒýÈëÒ»¸ÄRDD API¸ßÀäµÄFP×Ë̬£¬ÁîSpark±äµÃ¸ü¼ÓƽÒ×½üÈË¡£ÍⲿÊý¾ÝÔ´APIÌåÏÖ³öµÄÔòÊǼæÈݲ¢ÐSpark SQL¶àÔªÒ»ÌåµÄ½á¹¹»¯Êý¾Ý´¦ÀíÄÜÁ¦ÕýÔÚÖð½¥ÊÍ·Å¡£
¹ØÓÚ×÷ÕߣºÁ¬³Ç£¬Databricks¹¤³Ìʦ£¬Spark committer£¬Spark SQLÖ÷Òª¿ª·¢ÕßÖ®Ò»¡£ÔÚ4ÔÂ18ÈÕÕÙ¿ªµÄ 2015 Spark¼¼Êõ·å»á ÉÏ£¬Á¬³Ç½«×öÃûΪ¡°ËÄÁ½²¦Ç§½ï¡ª¡ªSpark SQL½á¹¹»¯Êý¾Ý·ÖÎö¡±µÄÖ÷ÌâÑݽ²¡£
×Ô2013Äê3ÔÂÃæÊÀÒÔÀ´£¬Spark SQLÒѾ³ÉΪ³ýSpark CoreÒÔÍâ×î´óµÄSpark×é¼þ¡£³ýÁ˽ӹýSharkµÄ½ÓÁ¦°ô£¬¼ÌÐøÎªSparkÓû§Ìṩ¸ßÐÔÄܵÄSQL on Hadoop½â¾ö·½°¸Ö®Í⣬Ëü»¹ÎªSpark´øÀ´ÁËͨÓᢸßЧ¡¢¶àÔªÒ»ÌåµÄ½á¹¹»¯Êý¾Ý´¦ÀíÄÜÁ¦¡£ÔÚ¸Õ¸Õ·¢²¼µÄ1.3.0°æÖУ¬Spark SQLµÄÁ½´óÉý¼¶±»Ú¹Ê͵ÃÁÜÀ쾡Ö¡£
DataFrame
¾ÍÒ×ÓÃÐÔ¶øÑÔ£¬¶Ô±È´«Í³µÄMapReduce API£¬ËµSparkµÄRDD APIÓÐÁËÊýÁ¿¼¶µÄ·ÉÔ¾²¢²»Îª¹ý¡£È»¶ø£¬¶ÔÓÚûÓÐMapReduceºÍº¯Êýʽ±à³Ì¾ÑéµÄÐÂÊÖÀ´Ëµ£¬RDD APIÈÔÈ»´æÔÚ×ÅÒ»¶¨µÄÃż÷¡£ÁíÒ»·½Ã棬Êý¾Ý¿ÆÑ§¼ÒÃÇËùÊìϤµÄR¡¢PandasµÈ´«Í³Êý¾Ý¿ò¼ÜËäÈ»ÌṩÁËÖ±¹ÛµÄAPI£¬È´¾ÖÏÞÓÚµ¥»ú´¦Àí£¬ÎÞ·¨Ê¤ÈδóÊý¾Ý³¡¾°¡£ÎªÁ˽â¾öÕâһì¶Ü£¬Spark SQL 1.3.0ÔÚÔÓÐSchemaRDDµÄ»ù´¡ÉÏÌṩÁËÓëRºÍPandas·ç¸ñÀàËÆµÄDataFrame API¡£ÐµÄDataFrame AP²»½ö¿ÉÒÔ´ó·ù¶È½µµÍÆÕͨ¿ª·¢ÕßµÄѧϰÃż÷£¬Í¬Ê±»¹Ö§³ÖScala¡¢JavaÓëPythonÈýÖÖÓïÑÔ¡£¸üÖØÒªµÄÊÇ£¬ÓÉÓÚÍÑÌ¥×ÔSchemaRDD£¬DataFrameÌìÈ»ÊÊÓÃÓÚ·Ö²¼Ê½´óÊý¾Ý³¡¾°¡£
DataFrameÊÇʲô£¿
ÔÚSparkÖУ¬DataFrameÊÇÒ»ÖÖÒÔRDDΪ»ù´¡µÄ·Ö²¼Ê½Êý¾Ý¼¯£¬ÀàËÆÓÚ´«Í³Êý¾Ý¿âÖеĶþά±í¸ñ¡£DataFrameÓëRDDµÄÖ÷񻂿±ðÔÚÓÚ£¬Ç°Õß´øÓÐschemaÔªÐÅÏ¢£¬¼´DataFrameËù±íʾµÄ¶þά±íÊý¾Ý¼¯µÄÿһÁж¼´øÓÐÃû³ÆºÍÀàÐÍ¡£ÕâʹµÃSpark SQLµÃÒÔ¶´²ì¸ü¶àµÄ½á¹¹ÐÅÏ¢£¬´Ó¶ø¶Ô²ØÓÚDataFrame±³ºóµÄÊý¾ÝÔ´ÒÔ¼°×÷ÓÃÓÚDataFrameÖ®Éϵı任½øÐÐÁËÕë¶ÔÐÔµÄÓÅ»¯£¬×îÖÕ´ïµ½´ó·ùÌáÉýÔËÐÐʱЧÂʵÄÄ¿±ê¡£·´¹ÛRDD£¬ÓÉÓÚÎÞ´ÓµÃÖªËù´æÊý¾ÝÔªËØµÄ¾ßÌåÄÚ²¿½á¹¹£¬Spark CoreÖ»ÄÜÔÚstage²ãÃæ½øÐмòµ¥¡¢Í¨ÓõÄÁ÷Ë®ÏßÓÅ»¯¡£

´´½¨DataFrame
ÔÚSpark SQLÖУ¬¿ª·¢Õß¿ÉÒԷdz£±ã½ÝµØ½«¸÷ÖÖÄÚ¡¢ÍⲿµÄµ¥»ú¡¢·Ö²¼Ê½Êý¾Ýת»»ÎªDataFrame¡£ÒÔÏÂPythonʾÀý´úÂë³ä·ÖÌåÏÖÁËSpark SQL 1.3.0ÖÐDataFrameÊý¾ÝÔ´µÄ·á¸»¶àÑùºÍ¼òµ¥Ò×Óãº
1. # ´ÓHiveÖеÄusers±í¹¹ÔìDataFrame 2. users = sqlContext.table("users") 3. 4. # ¼ÓÔØS3ÉϵÄJSONÎļþ 5. logs = sqlContext.load("s3n://path/to/data.json", "json") 6. 7. # ¼ÓÔØHDFSÉϵÄParquetÎļþ 8. clicks = sqlContext.load("hdfs://path/to/data.parquet", "parquet") 9. 10.# ͨ¹ýJDBC·ÃÎÊMySQL 11.comments = sqlContext.jdbc("jdbc:mysql://localhost/comments", "user") 12. 13.# ½«ÆÕͨRDDת±äΪDataFrame 14.rdd = sparkContext.textFile("article.txt") \ 15. .flatMap(lambda line: line.split()) \ 16. .map(lambda word: (word, 1)) \ 17. .reduceByKey(lambda a, b: a + b) \ 18.wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"]) 19. 20.# ½«±¾µØÊý¾ÝÈÝÆ÷ת±äΪDataFrame 21.data = [("Alice", 21), ("Bob", 24)] 22.people = sqlContext.createDataFrame(data, ["name", "age"]) 23. 24.# ½«Pandas DataFrameת±äΪSpark DataFrame£¨Python APIÌØÓй¦ÄÜ£© 25.sparkDF = sqlContext.createDataFrame(pandasDF)
|
¿É¼û£¬´ÓHive±í£¬µ½ÍⲿÊý¾ÝÔ´APIÖ§³ÖµÄ¸÷ÖÖÊý¾ÝÔ´£¨JSON¡¢Parquet¡¢JDBC£©£¬ÔÙµ½RDDÄËÖÁ¸÷ÖÖ±¾µØÊý¾Ý¼¯£¬¶¼¿ÉÒÔ±»·½±ã¿ì½ÝµØ¼ÓÔØ¡¢×ª»»ÎªDataFrame¡£ÕâЩ¹¦ÄÜҲͬÑù´æÔÚÓÚSpark SQLµÄScala APIºÍJava APIÖС£
ʹÓÃDataFrame
ºÍR¡¢PandasÀàËÆ£¬Spark DataFrameÒ²ÌṩÁËÒ»ÕûÌ×ÓÃÓÚ²Ù×ÝÊý¾ÝµÄDSL¡£ÕâЩDSLÔÚÓïÒåÉÏÓëSQL¹ØÏµ²éѯ·Ç³£Ïà½ü£¨ÕâÒ²ÊÇSpark SQLÄܹ»ÎªDataFrameÌṩÎÞ·ìÖ§³ÖµÄÖØÒªÔÒòÖ®Ò»£©¡£ÒÔÏÂÊÇÒ»×éÓû§Êý¾Ý·ÖÎöʾÀý£º
1.# ´´½¨Ò»¸öÖ»°üº¬"ÄêÇá"Óû§µÄDataFrame 2.young = users.filter(users.age < 21) 3. 4.# Ò²¿ÉÒÔʹÓÃPandas·ç¸ñµÄÓï·¨ 5.young = users[users.age < 21] 6. 7.# ½«ËùÓÐÈ˵ÄÄêÁä¼Ó1 8.young.select(young.name, young.age + 1) 9. 10.# ͳ¼ÆÄêÇáÓû§Öи÷ÐÔ±ðÈËÊý 11.young.groupBy("gender").count() 12. 13.# ½«ËùÓÐÄêÇáÓû§ÓëÁíÒ»¸öÃûΪlogsµÄDataFrameÁª½ÓÆðÀ´ 14.young.join(logs, logs.userId == users.userId, "left_outer")
|
³ýDSLÒÔÍ⣬ÎÒÃǵ±È»Ò²¿ÉÒÔÏñÒÔÍùÒ»Ñù£¬ÓÃSQLÀ´´¦ÀíDataFrame£º
1.young.registerTempTable("young") 2.sqlContext.sql("SELECT count(*) FROM young") |
×îºó£¬µ±Êý¾Ý·ÖÎöÂß¼±àдÍê±Ïºó£¬ÎÒÃDZã¿ÉÒÔ½«×îÖÕ½á¹û±£´æÏÂÀ´»òÕ¹ÏÖ³öÀ´£º
1.# ×·¼ÓÖÁHDFSÉϵÄParquetÎļþ 2.young.save(path="hdfs://path/to/data.parquet", 3. source="parquet", 4. mode="append") 5. 6.# ¸²Ð´S3ÉϵÄJSONÎļþ 7.young.save(path="s3n://path/to/data.json", 8. source="json", 9. mode="append") 10. 11.# ±£´æÎªSQL±í 12.young.saveAsTable(tableName="young", source="parquet" mode="overwrite") 13. 14.# ת»»ÎªPandas DataFrame£¨Python APIÌØÓй¦ÄÜ£© 15.pandasDF = young.toPandas() 16. 17.# ÒÔ±í¸ñÐÎʽ´òÓ¡Êä³ö 18.young.show() |
Ä»ºóÓ¢ÐÛ£ºSpark SQL²éѯÓÅ»¯Æ÷Óë´úÂëÉú³É
ÕýÈçRDDµÄ¸÷Öֱ任ʵ¼ÊÉÏÖ»ÊÇÔÚ¹¹ÔìRDD DAG£¬DataFrameµÄ¸÷Öֱ任ͬÑùÒ²ÊÇlazyµÄ¡£ËüÃDz¢²»Ö±½ÓÇó³ö¼ÆËã½á¹û£¬¶øÊǽ«¸÷Öֱ任×é×°³ÉÓëRDD DAGÀàËÆµÄÂß¼²éѯ¼Æ»®¡£ÈçǰËùÊö£¬ÓÉÓÚDataFrame´øÓÐschemaÔªÐÅÏ¢£¬Spark SQLµÄ²éѯÓÅ»¯Æ÷µÃÒÔ¶´²ìÊý¾ÝºÍ¼ÆËãµÄ¾«Ï¸½á¹¹£¬´Ó¶øÊ©ÐоßÓкÜÇ¿Õë¶ÔÐÔµÄÓÅ»¯¡£Ëæºó£¬¾¹ýÓÅ»¯µÄÂß¼Ö´Ðмƻ®±»·ÒëΪÎïÀíÖ´Ðмƻ®£¬²¢×îÖÕÂäʵΪRDD DAG¡£

ÕâÑù×öµÄºÃ´¦ÌåÏÖÔÚ¼¸¸ö·½Ã棺
1. Óû§¿ÉÒÔÓøüÉÙµÄÉêÃ÷ʽ´úÂë²ûÃ÷¼ÆËãÂß¼£¬ÎïÀíÖ´Ðз¾¶Ôò½»ÓÉSpark SQL×ÔÐÐÌôÑ¡¡£Ò»·½Ãæ½µµÍÁË¿ª·¢³É±¾£¬Ò»·½ÃæÒ²½µµÍÁËʹÓÃÃż÷¡ª¡ªºÜ¶àÇé¿öÏ£¬¼´±ãÐÂÊÖд³öÁ˽ÏΪµÍЧµÄ²éѯ£¬Spark SQLÒ²¿ÉÒÔͨ¹ý¹ýÂËÌõ¼þÏÂÍÆ¡¢ÁмôÖ¦µÈ²ßÂÔÓèÒÔÓÐЧÓÅ»¯¡£ÕâÊÇRDD APIËù²»¾ß±¸µÄ¡£
2. Spark SQL¿ÉÒÔ¶¯Ì¬µØÎªÎïÀíÖ´Ðмƻ®Öеıí´ïʽÉú³ÉJVM×Ö½ÚÂ룬½øÒ»²½ÊµÏÖ¹é±ÜÐ麯Êýµ÷ÓÿªÏú¡¢Ï÷¼õ¶ÔÏó·ÖÅä´ÎÊýµÈµ×²ãÓÅ»¯£¬Ê¹µÃ×îÖյIJéѯִÐÐÐÔÄÜ¿ÉÒÔÓëÊÖд´úÂëµÄÐÔÄÜÏàæÇÃÀ¡£
3. ¶ÔÓÚPySpark¶øÑÔ£¬²ÉÓÃDataFrame±à³ÌʱֻÐèÒª¹¹ÔìÌå»ýСÇɵÄÂß¼Ö´Ðмƻ®£¬ÎïÀíÖ´ÐÐÈ«²¿ÓÉJVM¶Ë¸ºÔð£¬Python½âÊÍÆ÷ºÍJVM¼ä´óÁ¿²»±ØÒªµÄ¿ç½ø³ÌͨѶµÃÒÔÃâ³ý¡£ÈçÉÏͼËùʾ£¬Ò»×é¼òµ¥µÄ¶ÔһǧÍòÕûÊý¶Ô×ö¾ÛºÏµÄ²âÊÔÖУ¬PySparkÖÐDataFrame APIµÄÐÔÄÜÇáËÉʤ³öRDD API½üÎå±¶¡£´ËÍ⣬½ñºóSpark SQLÔÚScala¶Ë¶Ô²éѯÓÅ»¯Æ÷µÄËùÓÐÐÔÄܸĽø£¬PySpark¶¼¿ÉÒÔÃâ·Ñ»ñÒæ¡£
ÍⲿÊý¾ÝÔ´APIÔöÇ¿

´ÓǰÎÄÖÐÎÒÃÇÒѾ¿´µ½£¬Spark 1.3.0ΪDataFrameÌṩÁ˷ḻ¶àÑùµÄÊý¾ÝÔ´Ö§³Ö¡£ÆäÖеÄÖØÍ·Ï·£¬±ãÊÇ×ÔSpark 1.2.0ÒýÈëµÄÍⲿÊý¾ÝÔ´API¡£ÔÚ1.3.0ÖУ¬ÎÒÃǶÔÕâÌ×API×öÁ˽øÒ»²½µÄÔöÇ¿¡£
Êý¾ÝдÈëÖ§³Ö
ÔÚSpark 1.2.0ÖУ¬ÍⲿÊý¾ÝÔ´APIÖ»Äܽ«ÍⲿÊý¾ÝÔ´ÖеÄÊý¾Ý¶ÁÈëSpark£¬¶øÎÞ·¨½«¼ÆËã½á¹ûд»ØÊý¾ÝÔ´£»Í¬Ê±£¬Í¨¹ýÊý¾ÝÔ´ÒýÈë²¢×¢²áµÄ±íÖ»ÄÜÊÇÁÙʱ±í£¬Ïà¹ØÔªÐÅÏ¢ÎÞ·¨³Ö¾Ã»¯¡£ÔÚ1.3.0ÖУ¬ÎÒÃÇÌṩÁËÍêÕûµÄÊý¾ÝдÈëÖ§³Ö£¬´Ó¶ø²¹È«Á˶àÊý¾ÝÔ´»¥²Ù×÷µÄ×îºóÒ»¿éÖØÒªÆ´Í¼¡£Ç°ÎÄʾÀýÖÐHive¡¢Parquet¡¢JSON¡¢PandasµÈ¶àÖÖÊý¾ÝÔ´¼äµÄÈÎÒâת»»£¬ÕýÊÇÕâÒ»ÔöÇ¿µÄÖ±½Ó³É¹û¡£
Õ¾ÔÚSpark SQLÍⲿÊý¾ÝÔ´¿ª·¢ÕߵĽǶȣ¬Êý¾ÝдÈëÖ§³ÖµÄAPIÖ÷Òª°üÀ¨£º
1. Êý¾ÝÔ´±íÔªÊý¾Ý³Ö¾Ã»¯
1.3.0ÒýÈëÁËеÄÍⲿÊý¾ÝÔ´DDLÓï·¨£¨SQL´úÂëÆ¬¶Î£©
CREATE [TEMPORARY] TABLE [IF NOT EXISTS] <table-name> [(col-name data-type [, ...)] USING <source> [OPTIONS ...] [AS <select-query>]
|
ÓÉ´Ë£¬×¢²á×ÔÍⲿÊý¾ÝµÄSQL±í¼È¿ÉÒÔÊÇÁÙʱ±í£¬Ò²¿ÉÒÔ±»³Ö¾Ã»¯ÖÁHive metastore¡£ÐèÒª³Ö¾Ã»¯Ö§³ÖµÄÍⲿÊý¾ÝÔ´£¬³ýÁËÐèÒª¼Ì³ÐÔÓеÄRelationProviderÒÔÍ⣬»¹Ðè¼Ì³ÐCreatableRelationProvider¡£
2. InsertableRelation
Ö§³ÖÊý¾ÝдÈëµÄÍⲿÊý¾ÝÔ´µÄrelationÀ࣬»¹Ðè¼Ì³Ðtrait InsertableRelation£¬²¢ÔÚinsert·½·¨ÖÐʵÏÖÊý¾Ý²åÈëÂß¼¡£
Spark 1.3.0ÖÐÄÚÖõÄJSONºÍParquetÊý¾ÝÔ´¶¼ÒÑʵÏÖÉÏÊöAPI£¬¿ÉÒÔ×÷Ϊ¿ª·¢ÍⲿÊý¾ÝÔ´µÄ²Î¿¼Ê¾Àý¡£
ͳһµÄload/save API
ÔÚSpark 1.2.0ÖУ¬ÒªÏ뽫SchemaRDDÖеĽá¹û±£´æÏÂÀ´£¬±ã½ÝµÄÑ¡Ôñ²¢²»¶à¡£³£ÓõÄһЩ°üÀ¨£º
- rdd.saveAsParquetFile(...)
- rdd.saveAsTextFile(...)
- rdd.toJSON.saveAsTextFile(...)
- rdd.saveAsTable(...)
- ....
¿É¼û£¬²»Í¬µÄÊý¾ÝÊä³ö·½Ê½£¬²ÉÓõÄAPIÒ²²»¾¡Ïàͬ¡£¸üÁîÈËÍ·ÌÛµÄÊÇ£¬ÎÒÃÇȱ·¦Ò»¸öÁé»îÀ©Õ¹ÐµÄÊý¾ÝдÈë¸ñʽµÄ·½Ê½¡£
Õë¶ÔÕâÒ»ÎÊÌ⣬1.3.0ͳһÁËload/save API£¬ÈÃÓû§°´Ðè×ÔÓÉÑ¡ÔñÍⲿÊý¾ÝÔ´¡£ÕâÌ×API°üÀ¨£º
1.SQLContext.table
´ÓSQL±íÖмÓÔØDataFrame¡£
2.SQLContext.load
´ÓÖ¸¶¨µÄÍⲿÊý¾ÝÔ´¼ÓÔØDataFrame¡£
3.SQLContext.createExternalTable
½«Ö¸¶¨Î»ÖõÄÊý¾Ý±£´æÎªÍⲿSQL±í£¬ÔªÐÅÏ¢´æÈëHive metastore£¬²¢·µ»Ø°üº¬ÏàÓ¦Êý¾ÝµÄDataFrame¡£
4.DataFrame.save
½«DataFrameдÈëÖ¸¶¨µÄÍⲿÊý¾ÝÔ´¡£
5.DataFrame.saveAsTable
½«DataFrame±£´æÎªSQL±í£¬ÔªÐÅÏ¢´æÈëHive metastore£¬Í¬Ê±½«Êý¾ÝдÈëÖ¸¶¨Î»Öá£
ParquetÊý¾ÝÔ´ÔöÇ¿
Spark SQL´ÓÒ»¿ªÊ¼±ãÄÚÖÃÖ§³ÖParquetÕâÒ»¸ßЧµÄÁÐʽ´æ´¢¸ñʽ¡£ÔÚ¿ª·ÅÍⲿÊý¾ÝÔ´APIÖ®ºó£¬ÔÓеÄParquetÖ§³ÖÒ²ÕýÔÚÖð½¥×ªÏòÍⲿÊý¾ÝÔ´¡£1.3.0ÖУ¬ParquetÍⲿÊý¾ÝÔ´µÄÄÜÁ¦µÃµ½ÁËÏÔÖøÔöÇ¿¡£Ö÷Òª°üÀ¨schemaºÏ²¢ºÍ×Ô¶¯·ÖÇø´¦Àí¡£
1.SchemaºÏ²¢
ÓëProtocolBufferºÍThriftÀàËÆ£¬ParquetÒ²ÔÊÐíÓû§ÔÚ¶¨ÒåºÃschemaÖ®ºóËæÊ±¼äÍÆÒÆÖð½¥Ìí¼ÓеÄÁУ¬Ö»Òª²»ÐÞ¸ÄÔÓÐÁеÄÔªÐÅÏ¢£¬Ð¾ÉschemaÈÔÈ»¿ÉÒÔ¼æÈÝ¡£ÕâÒ»ÌØÐÔʹµÃÓû§¿ÉÒÔËæÊ±°´ÐèÌí¼ÓеÄÊý¾ÝÁУ¬¶øÎÞÐè²ÙÐÄÊý¾ÝÇ¨ÒÆ¡£
2.·ÖÇøÐÅÏ¢·¢ÏÖ
°´Ä¿Â¼¶ÔͬһÕűíÖеÄÊý¾Ý·ÖÇø´æ´¢£¬ÊÇHiveµÈϵͳ²ÉÓõÄÒ»ÖÖ³£¼ûµÄÊý¾Ý´æ´¢·½Ê½¡£ÐµÄParquetÊý¾ÝÔ´¿ÉÒÔ×Ô¶¯¸ù¾ÝĿ¼½á¹¹·¢ÏÖºÍÍÆÑÝ·ÖÇøÐÅÏ¢¡£
3.·ÖÇø¼ôÖ¦
·ÖÇøÊµ¼ÊÉÏÌṩÁËÒ»ÖÖ´ÖÁ£¶ÈµÄË÷Òý¡£µ±²éѯÌõ¼þÖнöÉæ¼°²¿·Ö·ÖÇøÊ±£¬Í¨¹ý·ÖÇø¼ôÖ¦Ìø¹ý²»±ØÒªÉ¨ÃèµÄ·ÖÇøÄ¿Â¼£¬¿ÉÒÔ´ó·ùÌáÉý²éѯÐÔÄÜ¡£
ÒÔÏÂScala´úÂëʾÀýͳһչʾÁË1.3.0ÖÐParquetÊý¾ÝÔ´µÄÕ⼸¸öÄÜÁ¦£¨Scala´úÂëÆ¬¶Î£©£º
// ´´½¨Á½¸ö¼òµ¥µÄDataFrame£¬½«Ö®´æÈëÁ½¸ö¶ÀÁ¢µÄ·ÖÇøÄ¿Â¼
val df1 = (1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.save("data/test_table/key=1", "parquet", SaveMode.Append)
val df2 = (6 to 10).map(i => (i, i * 2)).toDF("single", "double")
df2.save("data/test_table/key=2", "parquet", SaveMode.Append)
// ÔÚÁíÒ»¸öDataFrameÖÐÒýÈëÒ»¸öеÄÁУ¬²¢´æÈëÁíÒ»¸ö·ÖÇøÄ¿Â¼
val df3 = (11 to 15).map(i => (i, i * 3)).toDF("single", "triple")
df3.save("data/test_table/key=3", "parquet", SaveMode.Append)
// Ò»´ÎÐÔ¶ÁÈëÕû¸ö·ÖÇø±íµÄÊý¾Ý
val df4 = sqlContext.load("data/test_table", "parquet")
// °´·ÖÇø½øÐвéѯ£¬²¢Õ¹Ê¾½á¹û
val df5 = df4.filter($"key" >= 2) df5.show()
|
Õâ¶Î´úÂëµÄÖ´Ðнá¹ûΪ£º
6 12 null 2 7 14 null 2 8 16 null 2 9 18 null 2 10 20 null 2 11 null 33 3 12 null 36 3 13 null 39 3 14 null 42 3 15 null 45 3
|
¿É¼û£¬ParquetÊý¾ÝÔ´×Ô¶¯´ÓÎļþ·¾¶Öз¢ÏÖÁËkeyÕâ¸ö·ÖÇøÁУ¬²¢ÇÒÕýÈ·ºÏ²¢ÁËÁ½¸ö²»Ïàͬµ«ÏàÈݵÄschema¡£ÖµµÃ×¢ÒâµÄÊÇ£¬ÔÚ×îºóµÄ²éѯÖвéѯÌõ¼þÌø¹ýÁËkey=1Õâ¸ö·ÖÇø¡£Spark SQLµÄ²éѯÓÅ»¯Æ÷»á¸ù¾ÝÕâ¸ö²éѯÌõ¼þ½«¸Ã·ÖÇøÄ¿Â¼¼ôµô£¬ÍêÈ«²»É¨Ãè¸ÃĿ¼ÖеÄÊý¾Ý£¬´Ó¶øÌáÉý²éѯÐÔÄÜ¡£
С½á
DataFrame APIµÄÒýÈëÒ»¸ÄRDD API¸ßÀäµÄFP×Ë̬£¬ÁîSpark±äµÃ¸ü¼ÓƽÒ×½üÈË£¬Ê¹´óÊý¾Ý·ÖÎöµÄ¿ª·¢ÌåÑéÓ봫ͳµ¥»úÊý¾Ý·ÖÎöµÄ¿ª·¢ÌåÑéÔ½À´Ô½½Ó½ü¡£ÍⲿÊý¾ÝÔ´APIÌåÏÖ³öµÄÔòÊǼæÈݲ¢ÐĿǰ£¬³ýÁËÄÚÖõÄJSON¡¢Parquet¡¢JDBCÒÔÍ⣬ÉçÇøÖÐÒѾӿÏÖ³öÁËCSV¡¢Avro¡¢HBaseµÈ¶àÖÖÊý¾ÝÔ´£¬Spark SQL¶àÔªÒ»ÌåµÄ½á¹¹»¯Êý¾Ý´¦ÀíÄÜÁ¦ÕýÔÚÖð½¥ÊÍ·Å¡£
Ϊ¿ª·¢ÕßÌṩ¸ü¶àµÄÀ©Õ¹µã£¬ÊÇSpark¹á´©Õû¸ö2015ÄêµÄÖ÷ÌâÖ®Ò»¡£ÎÒÃÇÏ£Íûͨ¹ýÕâЩÀ©Õ¹API£¬ÇÐʵµØÒý±¬ÉçÇøµÄÄÜÁ¿£¬ÁîSparkµÄÉú̬¸ü¼Ó·áÂúºÍ¶àÑù¡£
|