±à¼ÍƼö: |
±¾ÎÄÀ´×Ôcsdn£¬±¾Îļòµ¥½éÉÜÁËPython+sparkµÄÅäÖÃÔËÐм°ÊµÀý½éÉÜ£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËùÆôµÏ¡£ |
|
0¡¢Ç°Ìá
0.1 ÅäÖÃ
¿É²Î¿¼£º
windowsÉÏÅäÖà Python+spark¿ª·¢»·¾³
0.2 ÓйØspark
˵Ã÷£º
spark ²»¼æÈÝ Python3.6
°²×°×¢Òâ°æ±¾
¿ÉÏÂÔØ£º
anaconda4.2
Ò»¡¢ÊµÀý·ÖÎö
1.1 Êý¾Ý student.txt

1.2 ´úÂë
#studentExample
Àý×Ó Á·Ï°
def map_func(x):
s = x.split()
return (s[0], [int(s[1]),int(s[2]),int(s[3])])
#·µ»ØÎª£¨key,vaklue£©¸ñʽ£¬ÆäÖÐkey:x[0],value:x[1]ÇÒΪÓÐÈý¸öÔªËØµÄÁбí
#return (s[0],[int(s[1],s[2],s[3])]) #×¢Òâ´ËÓ÷¨²»ºÏ·¨
def has100(x):
for y in x:
if(y == 100): #°Ñx¡¢yÀí½âΪ xÖá¡¢yÖá
return True
return False
def allis0(x):
if(type(x)==list and sum(x) == 0): #ÀàÐÍΪlistÇÒ×Ü·ÖΪ0
ÕßΪtrue£»ÆäÖÐtype(x)==list :ÅжÏÀàÐÍÊÇ·ñÏàͬ
return True
return False
def subMax(x,y):
m = [x[1][i] if(x[1][i] > y[1][i]) else y[1][i]
for i in range(3)]
return('Maximum subject score', m)
def sumSub(x,y):
n = [x[1][i]+y[1][i] for i in range(3)]
#»òÕß n = ([x[1][0]+y[1][0],x[1][1]+y[1][0],x[1][2]+y[1][2]])
return('Total subject score', n)
def sumPer(x):
return (x[0],sum(x[1]))#ֹ֮ͣǰµÄSparkContext£¬²»È»ÖØÐÂÔËÐлòÕß´´½¨¹¤×÷»áʧ°Ü£»ÁíÍ⣬ֻÓÐ
sc.stop()Ò²¿ÉÒÔ£¬µ«ÊÇÊ×´ÎÔËÐлáÓÐÎó
try:
sc.stop()
except:
pass
from pyspark import SparkContext #µ¼ÈëÄ£¿é
sc=SparkContext(appName='Student') #̟̞
lines=sc.textFile("student.txt").map(lambda
x:map_func(x)).cache() #µ¼ÈëÊý¾ÝÇÒ±£³ÖÔÚÄÚ´æÖУ¬ÆäÖÐcache()£ºÊý¾Ý±£³ÖÔÚÄÚ´æÖÐ
count=lines.count() #¶ÔRDDÖеÄÊý¾Ý¸öÊý½øÐмÆÊý£»ÆäÖУ¬RDDÒ»ÐÐΪһ¸öÊý¾Ý¼¯#RDD'ת»»'ÔËËã
£¨É¸Ñ¡ ¹Ø¼ü×Öfilter£©
whohas100 = lines.filter(lambda x: has100(x[1])).collect()
#×¢Ò⣺´¦ÀíµÄÊÇvalueÁÐ±í£¬Ò²¾ÍÊÇx[1]
whois0 = lines.filter(lambda x: allis0(x[1])).collect()
sumScore = lines.map(lambda x: (x[0],sum(x[1]))).collect()
#¡®¶¯×÷¡¯ÔËËã
maxScore = max(sumScore,key=lambda x: x[1]) #×Ü·Ö×î¸ßÕß
minScore = min(sumScore,key=lambda x: x[1]) #×Ü·Ö×îµÍÕß
sumSubScore = lines.reduce(lambda x,y: sumSub(x,y))
avgScore = [x/count for x in sumSubScore[1]]#µ¥¿Æ³É¼¨Æ½¾ùÖµ
#RDD key-value¡®×ª»»¡¯ÔËËã
subM = lines.reduce(lambda x,y: subMax(x,y))
redByK = lines.reduceByKey(lambda x,y: [x[i]+y[i]
for i in range(3)]).collect() #ºÏ²¢keyÏàͬµÄvalueÖµx[0]+y[0],x[1]+y[1],x[2]+y[2]
#RDD'ת»»'ÔËËã
sumPerSore = lines.map(lambda x: sumPer(x)).collect()
#ÿ¸öÈ˵Ä×Ü·Ö #sumSore = lines.map(lambda x: (x[0],sum(x[1]))).collect()
sorted = lines.sortBy(lambda x: sum(x[1])) #×ܳɼ¨µÍµ½¸ßµÄѧÉú³É¼¨ÅÅÐò
sortedWithRank = sorted.zipWithIndex().collect()#°´×Ü·ÖÅÅÐò
first3 = sorted.takeOrdered(3,key=lambda x:-sum(x[1]))
#×Ü·ÖǰÈýÕß#ÏÞ¶¨ÒÔ¿Õ¸ñµÄÐÎʽÊä³öµ½ÎļþÖÐ
first3RDD = sc.parallelize(first3)\
.map(lambda x:str(x[0])+' '+str(x[1][0])+' '+str(x[1][1])+'
'+str(x[1][2])).saveAsTextFile("result") |
#print(lines.collect())
print("Êý¾Ý¼¯¸öÊý£¨ÐУ©:",count)
print("µ¥¿ÆÂú·ÖÕߣº",whohas100)
print("µ¥¿ÆÁã·ÖÕß:",whois0)
print("µ¥¿Æ×î¸ß·ÖÕߣº",subM)
print("µ¥¿Æ×Ü·Ö£º",sumSubScore)
print("ºÏ²¢Ãû×ÖÏàͬµÄ·ÖÊý£º",redByK)
print("×Ü·Ö/£¨ÈË£©",sumPerSore)
print("×î¸ß×Ü·ÖÕߣº",maxScore)
print("×îµÍ×Ü·ÖÕߣº",minScore)
print("ÿ¿ÆÆ½¾ù³É¼¨£º",avgScore)
print("×Ü·Öµ¹Ðò£º",sortedWithRank)
print("×Ü·ÖǰÈýÕߣº",first3)
print(first3RDD)
sc.stop() |
1.3 ½á¹ûչʾ
Êý¾Ý¼¯¸öÊý£¨ÐУ©: 7
µ¥¿ÆÂú·ÖÕߣº [('li', [100, 54, 0]), ('li', [100, 54,
0])]
µ¥¿ÆÁã·ÖÕß: [('yanf', [0, 0, 0])]
µ¥¿Æ×î¸ß·ÖÕߣº ('Maximum subject score', [100, 90, 100])
µ¥¿Æ×Ü·Ö£º ('Total subject score', [485, 438, 280])
ºÏ²¢Ãû×ÖÏàͬµÄ·ÖÊý£º [('li', [200, 108, 0]), ('zhang', [180,
180, 200]), ('yang', [85, 90, 30]), ('wang', [20,
60, 50]), ('yanf', [0, 0, 0])]
×Ü·Ö/£¨ÈË£© [('yang', 205), ('wang', 130), ('zhang',
280), ('zhang', 280), ('li', 154), ('li', 154),
('yanf', 0)]
×î¸ß×Ü·ÖÕߣº ('zhang', 280)
×îµÍ×Ü·ÖÕߣº ('yanf', 0)
ÿ¿ÆÆ½¾ù³É¼¨£º [69.28571428571429, 62.57142857142857,
40.0]
×Ü·Öµ¹Ðò£º [(('yanf', [0, 0, 0]), 0), (('wang', [20,
60, 50]), 1), (('li', [100, 54, 0]), 2), (('li',
[100, 54, 0]), 3), (('yang', [85, 90, 30]), 4),
(('zhang', [90, 90, 100]), 5), (('zhang', [90,
90, 100]), 6)]
×Ü·ÖǰÈýÕߣº [('zhang', [90, 90, 100]), ('zhang', [90,
90, 100]), ('yang', [85, 90, 30])]
None |
¶þ¡¢´úÂë½âÎö
2.1º¯Êý½âÎö
2.1.1 collect()
RDDµÄÌØÐÔ

ÔÚ½øÐлù±¾RDD¡°×ª»»¡±ÔËËãʱ²»»áÁ¢¼´Ö´ÐУ¬½á¹û²»»áÏÔʾÔÚÏÔʾÆÁÖУ¬collect£¨£©ÊÇÒ»¸ö¡°¶¯×÷¡±ÔËË㣬»áÁ¢¿ÌÖ´ÐУ¬ÏÔʾ½á¹û¡£
2.1.2 reduce()
˵Ã÷
reduce()º¯Êý»á¶Ô²ÎÊýÐòÁÐÖеÄÔªËØ½øÐÐÀÛ»ý¡£
Óï·¨
reduce(function, iterable[, initializer])
²ÎÊý
function ¨C º¯Êý£¬ÓÐÁ½¸ö²ÎÊý
iterable ¨C ¿Éµü´ú¶ÔÏó
initializer ¨C ¿ÉÑ¡£¬³õʼ²ÎÊý
ʵÀý
˵Ã÷£ºPython3µÄÄÚ½¨º¯ÊýÒÆ³ýÁËreduceº¯Êý£¬reduceº¯Êý·ÅÔÚfunctoolsÄ£¿é
In [24]:
#r = reduce(lambda x, y: x+y, [4,4,5,5]) # ʹÓÃ
lambda ÄäÃûº¯Êý
from functools import reduce
def add(x, y) : # Á½ÊýÏà¼Ó
return x + y
reduce(add, [1,2,3,4,5])
Out[24]:
15
In [25]:
reduce(lambda x, y: x+y, [1,2,3,4,5]) # ʹÓà lambda
ÄäÃûº¯Êý
Out[25]:
15 |
2.1.3 type()
Óï·¨
class type(name, bases, dict)
²ÎÊý
name ¨C ÀàµÄÃû³Æ¡£
bases ¨C »ùÀàµÄÔª×é¡£
dict ¨C ×ֵ䣬ÀàÄÚ¶¨ÒåµÄÃüÃû¿Õ¼ä±äÁ¿¡£
·µ»ØÖµ
Ò»¸ö²ÎÊý·µ»Ø¶ÔÏóÀàÐÍ, Èý¸ö²ÎÊý£¬·µ»ØÐµÄÀàÐͶÔÏó¡£
ʵÀý
# Ò»¸ö²ÎÊýʵÀý
In [1]:
type(1)
Out[1]:
int
In [2]:
type([2])
Out[2]:
list
In [3]:
type({3:'three'})
Out[3]:
dict
In [5]:
x = 5
type(x) == list #ÅжÏxµÄÀàÐÍÊÇ·ñΪlist
Out[5]:
False |
# Èý¸ö²ÎÊýʵÀý
class y(object):
z = 5
?
x = type('y',(object,),dict(z=5))
?
print(x) <class '__main__.y'> #²úÉúÒ»¸öеÄÀàÐÍ |
Èý¡¢ÎÊÌâ·ÖÎö
An error occurred
while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 1 in stage 2.0 failed
1 times, most recent failure: Lost task 1.0 in
stage 2.0 (TID 5, localhost, executor driver):
org.apache.spark.api.python.PythonException: Traceback
(most recent call last): |
½âÎö
1¡¢¼ì²éƴдÊÇ·ñÓÐÎó
2¡¢¼ì²éËõ½øÊÇ·ñºÏ¹æ
3¡¢¼ì²é£¨£©ÊÇ·ñÒ»Ò»Åä¶Ô
ËÄ¡¢ÊµÀý СÁ·
4.1 Êý¾Ý user_small
1441900799.728000
1441900802.452000 8618245698655 0134730038 729312 2 1 1IPHONE_5 17999 20693 10.67.23.157
111.13.34.100 6 58986 80 GET mmsns.qpic.cn/mmsns/PdibpV1sFDHdaOTqNXb8VGS NicyYpOVa9R7icxSr4BkwbsSyzJbBTmE5Zz5aZichejbkKuia7twzraqk /150?tp=webp&length=1136 &width=640 weixin.qq.com/?version=369229843&uin=2925174340 &nettype=0&scene=moment WeChat/6.2.0.19
CFNetwork /711.3.18 Darwin/14.0.0 200 59 image/webp 7504
706 8212 7 1827
1441900750.023000 1441900754.063000 8613836044032
0136210021269713 2 1 1 IPHONE_5 17752 25632
10.67.21.71 117. 144.242.26 6 52941 80 POST short.weixin.qq.com http://short.weixin.qq.com/cgi-bin /micromsg-bin /tenpay - MicroMessenger Client - - - - 715
0 7 1827
1441900755.480472 1441900756.762000 8618246899077
0131830068670612 2 1 1 IPHONE_4S 17875 61433 10.67.43.51 120.192.84.86
6 58684 31271 GETi.gtimg.cn http://i.gtimg.cn/qqshow/admindata/comdata/vip_emoji_aio_ ios_new_ config/ xydata.json - QQ/5.7.0.469 CFNetwork/672.0.8
Darwin/14.0.0 304 83 x-json - 0 0 18 1041
1441900754.860000 1441900755.480472 8618246899077
0131830068670612 2 1 1 IPHONE_4S 17875 61433 10.67.43.51 120.192.84.86
6 58684 31271 GET i.gtimg.cn http://i.gtimg.cn/club/item/avatar/zip/0/i0 /all. zip - QQ/5.7.0.469 CFNetwork/672.0.8 Darwin/14.0.0
404 210 text/ html 85 487 411 18 10411441900753.786000 1441900755.726000
861824 6195634 9900026543899411 2 1 1 IPHONE_4S 17783 19302 10.67.29.55 111.40.194.207
6 49412 80 GET sb.symcd. com /MFYwVKADAgEAME0wSzBJMAkGBSsOAwIaBQAEFDmvGLQcAh85EJZW%2F cbTWO90h YuZBBROQ8 gddu83U3pP8lhvlPM44tW93wIQd9jUM82by0%2FVy957MNapGQ%3D%3D
- securityd (unknown version) CFNetwork/672.0.2 Darwin/14.0.0
- - - - 522 0 18 1041
1441900761.308739 1441900761.408000 8615045213668
0127590050857822 2 1 1 IPHONE_4 17772 50621 10.67.63.219 183.232.95.61
6 49337 80 POST szminorshort .weixin.qq.com http://szminorshort.weixin.qq.com/cgi-bin/micromsg-bin /rtkvreport - MicroMessenger Client - - - - 500 16 7 1827
1441900696.427624 1441900761.308739 8615045213668
0127590050857822 2 1 1IPHONE_4 17772 50621 10.67.63.219 183.232.95.61
6 49337 80 POST szminorshort .weixin.qq.com http://szminorshort.weixin.qq.com/cgi-bin/micromsg- bin/rtkvreport - MicroMessenger Client - - - - 500 16 7 1827
1441900693.219000 1441900696.427624 8615045213668
0127590050857822 2 1 1 IPHONE_4 17772 50621 10.67.63.219 183.232.95.61
6 49337 80 POST szminorshort .weixin.qq.com http://szminorshort.weixin.qq.com/cgi-bi n/micromsg -bin/rtkvreport - MicroMessenger Client - - - - 502 16 7 1827
1441900750.845345 1441900753.537000 8618246195634
9900026543899411 2 1 1 IPHONE_4S 17783 19302 10.67.29.55 117.135.169.124
6 49411 80 GET b227.photo. store.qq.com /psb?/V12jlwSP30SPej/VE1V5LlXFMzHeg5gTzpyu CueaEVEGV* 0X6BbSyJZRhs! /b/dCWGUIc. HQAA&ek=1&kp=1&pt=0&bo=yAD6AAAAAAABBxI!&t=5
v1_iph_sq_5.6.0_1_app _a-4-2 QQ/5. 6.0.438 CFNetwork/672.0.2 Darwin/14.0.0 - -
- - 792 0 18 1041
1441900748.094000 1441900750.845345 8618246195634
9900026543899411 2 1 1 IPHONE_4S 17783 19302 10.67.29.55 117.135.169.124 6 49411 80 GETb227.photo. store.qq.com /psb?/V12jlwSP30SPej/VE1V5LlXFMzHeg5gTzpyu CueaEVEGV *0X6BbSyJZRhs! /b/dCWGUIc. HQAA&ek=1&kp=1&pt=0&bo=yAD6AAAAAAABBxI!&t=5
v1_iph_sq _5.6.0_1_ app_a-4-2 QQ/ 5.6.0.438 CFNetwork/672.0.2 Darwin/14.0.0 -
- - - 792 0 18 1041
|
4.2 Óû§ÉÏÍø¼Ç¼ͳ¼Æ(Ò»ÐÐΪһÌõ¼Ç¼).£¨Óû§£ºµÚ3ÁУ©
#test 1_1 Óû§ÉÏÍø¼Ç¼ͳ¼Æ
sc.stop()
from pyspark import SparkContext
sc = SparkContext(appName='test1')
rdd = sc.textFile('user_small')\
.map(lambda x:x.split('\t'))\
.map(lambda x:(x[3],1))\
.reduceByKey(lambda x,y:x+y)\
.map(lambda x:str(x[0])+' '+str(x[0][1])).collect()
#.saveAsTextFile('text1_1') #ÏÞ¶¨Îª¿Õ¸ñ¼üÊä³öµ½Îļþ
print(rdd) |
4.2Óû§Á÷Á¿Í³¼Æ¡£·Ö±ðͳ¼ÆÉÏÐÐÁ÷Á¿¼°ÏÂÐÐÁ÷Á¿²¢½«½á¹û¸÷ÁÐÒÔ¿Õ¸ñ¼ü¸ô¿ªÊä³öµ½Îļþ¡££¨Óû§£ºµÚ3ÁУ»ÉÏÐÐÁ÷Á¿£ºµÚ25ÁУ»ÏÂÐÐÁ÷Á¿£ºµÚ26ÁУ©
['0127590050857822
1', '9900026543899411 9', '0131830068670612 1',
'0136210021269713 1', '0134730038729312 1'] |
4.3 ͳ¼ÆÓû§×ÜÁ÷Á¿
#test 1_2 ͳ¼ÆÓû§ÉÏÍø
×ÜÁ÷Á¿
try:
sc.stop() #ֹ֮ͣǰµÄSparkContext£¬²»È»ÖØÐÂÔËÐлòÕß´´½¨¹¤×÷»áʧ°Ü
except:
pass
from pyspark import SparkContext
sc = SparkContext(appName='test1')
rdd = sc.textFile('user_small')\
.map(lambda x:x.split('\t'))\
.map(lambda x:(x[2],int(x[24])+int(x[25])))\
.reduceByKey(lambda x,y:x+y)\
.map(lambda x:str(x[0])+' '+str(x[1])).collect()
print(rdd)
sc.stop() |
['8618246899077
898', '8615045213668 1550', '8618245698655 8918',
'8613836044032 715', '8618246195634 2106'] |
4.4¡¢Î¢ÐÅAPPÁ÷Á¿Í³¼Æ¡££¨Î¢ÐÅAPPÌØÕ÷MicroMessenger£¬Î»ÓÚµÚ20ÁУ¬Í³¼Æ¶ÔÓ¦µÄÏÂÐÐÁ÷Á¿Öµ¡ª¡ªµÚ26ÁеÄÊýÖµ¡££©
#test 1_3
sc.stop()
from pyspark import SparkContext
sc = SparkContext(appName='test1')
rdd = sc.textFile('user_small')\
.map(lambda x:x.split('\t'))\
.map(lambda x:(x[19],int(x[25])))\
.filter(lambda x: 'WeChat' or 'MicroMessenger'
in x[1])#ɸѡ\
.reduceByKey(lambda x,y:x+y)\
.map(lambda x:str(x[0])+' '+str(x[1])).collect()
print(rdd) |
['securityd (unknown
version) CFNetwork/672.0.2 Darwin/14.0.0 0', 'QQ/5.6.0.438
CFNetwork/672.0.2 Darwin/14.0.0 0', 'QQ/5.7.0.469
CFNetwork/672.0.8 Darwin/14.0.0 411', 'MicroMessenger
Client 48', 'WeChat/6.2.0.19 CFNetwork/711.3.18
Darwin/14.0.0 8212'] |
|