Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Python´óÊý¾Ý´¦Àí¿âPySparkʵս¡ª¡ªÊ¹ÓÃPySpark´¦ÀíÎı¾¶à·ÖÀàÎÊÌâ
 
  2563  次浏览      27
 2019-9-12
 
±à¼­ÍƼö:
±¾ÎÄÀ´×ÔÔÆÉçÇø,±¾ÎÄͨ¹ýʹÓÃSpark Machine Learning LibraryºÍPySparkÀ´½â¾öÒ»¸öÎı¾¶à·ÖÀàÎÊÌâ¡£

¡¾µ¼¶Á¡¿ÎÒÃÇÖªµÀ£¬Apache SparkÔÚ´¦ÀíʵʱÊý¾Ý·½ÃæµÄÄÜÁ¦·Ç³£³öÉ«£¬Ä¿Ç°Ò²ÔÚ¹¤Òµ½ç¹ã·ºÊ¹Óᣱ¾ÎÄͨ¹ýʹÓÃSpark Machine Learning LibraryºÍPySparkÀ´½â¾öÒ»¸öÎı¾¶à·ÖÀàÎÊÌ⣬ÄÚÈݰüÀ¨£ºÊý¾ÝÌáÈ¡¡¢Model Pipeline¡¢ÑµÁ·/²âÊÔÊý¾Ý¼¯»®·Ö¡¢Ä£ÐÍѵÁ·ºÍÆÀ¼ÛµÈ£¬¾ßÌåϸ½Ú¿ÉÒԲο¼ÏÂÃæÈ«ÎÄ¡£

Multi-Class Text Classification with PySpark

Apache SparkÊܵ½Ô½À´Ô½¶àµÄ¹Ø×¢£¬Ö÷ÒªÊÇÒòΪËü´¦ÀíʵʱÊý¾ÝµÄÄÜÁ¦¡£Ã¿Ìì¶¼ÓдóÁ¿µÄÊý¾ÝÐèÒª±»´¦Àí£¬ÈçºÎʵʱµØ·ÖÎöÕâЩÊý¾Ý±äµÃ¼«ÆäÖØÒª¡£ÁíÍ⣬Apache Spark¿ÉÒÔÔÙ²»²ÉÑùµÄÇé¿öÏ¿ìËÙ´¦Àí´óÁ¿µÄÊý¾Ý¡£Ðí¶à¹¤Òµ½çµÄר¼ÒÌṩÁËÀíÓÉ£º why you should use Spark for Machine Learning?

Êý¾Ý

ÎÒÃǵÄÈÎÎñ£¬Êǽ«¾É½ðɽ·¸×ï¼Ç¼£¨San Francisco Crime Description£©·ÖÀൽ33¸öÀàÄ¿ÖС£

¸ø¶¨Ò»¸ö·¸×ïÃèÊö£¬ÎÒÃÇÏëÖªµÀËüÊôÓÚ33Àà·¸×ïÖеÄÄÄÒ»Àà¡£·ÖÀàÆ÷¼ÙÉèÿ¸ö·¸×ïÒ»¶¨ÊôÓÚÇÒ½öÊôÓÚ33ÀàÖеÄÒ»Àà¡£ÕâÊÇÒ»¸ö¶à·ÖÀàµÄÎÊÌâ¡£

ÊäÈ룺·¸×ïÃèÊö¡£ÀýÈ磺¡° STOLEN AUTOMOBILE¡±

Êä³ö£ºÀà±ð¡£ÀýÈ磺VEHICLE THEFT

ΪÁ˽â¾öÕâ¸öÎÊÌ⣬ÎÒÃÇÔÚSparkµÄÓмලѧϰËã·¨ÖÐÓÃÁËÒ»Ð©ÌØÕ÷ÌáÈ¡¼¼Êõ¡£

Êý¾ÝÌáÈ¡

ÀûÓÃSparkµÄcsv¿âÖ±½ÓÔØÈëCSV¸ñʽµÄÊý¾Ý£º

from pyspark.sql import SQLContext
from pyspark import SparkContext
sc =SparkContext()
sqlContext = SQLContext(sc)
data = sqlContext.read.format('com.databricks.spark.csv')
.options(header='true',
inferschema='true').load('train.csv')

³ýȥһЩ²»ÒªµÄÁУ¬²¢Õ¹Ê¾Ç°ÎåÐУº

drop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y']
data = data.select([column for column in data.columns if column not in drop_list])
data.show(5)

ÀûÓÃprintSchema()·½·¨À´ÏÔʾÊý¾ÝµÄ½á¹¹£º

data.printSchema()

°üº¬ÊýÁ¿×î¶àµÄ20Àà·¸×

from pyspark.sql.functions import col
data.groupBy("Category") \
.count() \
.orderBy(col("count").desc()) \
.show()

°üº¬·¸×ïÊýÁ¿×î¶àµÄ20¸öÃèÊö£º

from pyspark.sql.functions import col
data.groupBy("Category") \
.count() \
.orderBy(col("count").desc()) \
.show()

Á÷Ë®Ïߣ¨Model Pipeline£©

ÎÒÃǵÄÁ÷³ÌºÍscikit-learn°æ±¾µÄºÜÏàËÆ£¬°üº¬3¸ö²½Ö裺

1. regexTokenizer£ºÀûÓÃÕýÔòÇзֵ¥´Ê

2. stopwordsRemover£ºÒƳýÍ£ÓôÊ

3. countVectors£º¹¹½¨´ÊƵÏòÁ¿

from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="Descript", outputCol="words", pattern="\\W")
# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"]
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").
setStopWords(add_stopwords)
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features",
vocabSize=10000, minDF=5)

StringIndexer

StringIndexer½«Ò»ÁÐ×Ö·û´®label±àÂëΪһÁÐË÷ÒýºÅ£¨´Ó0µ½labelÖÖÀàÊý-1£©£¬¸ù¾Ýlabel³öÏֵįµÂÊÅÅÐò£¬×îÆµ·±³öÏÖµÄlabelµÄindexΪ0¡£

ÔÚ¸ÃÀý×ÓÖУ¬label»á±»±àÂë³É´Ó0µ½32µÄÕûÊý£¬×îÆµ·±µÄ label(LARCENY/THEFT) »á±»±àÂë³É0¡£

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors,
label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

ѵÁ·/²âÊÔÊý¾Ý¼¯»®·Ö

# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

ѵÁ·Êý¾ÝÁ¿£º5185

²âÊÔÊý¾ÝÁ¿£º2104

Ä£ÐÍѵÁ·ºÍÆÀ¼Û

1.ÒÔ´ÊÆµ×÷ÎªÌØÕ÷£¬ÀûÓÃÂß¼­»Ø¹é½øÐзÖÀà

ÎÒÃǵÄÄ£ÐÍÔÚ²âÊÔ¼¯ÉÏÔ¤²âºÍ´ò·Ö£¬²é¿´10¸öÔ¤²â¸ÅÂÊÖµ×î¸ßµÄ½á¹û£º

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
.select("Descript","Category","probability","label"
,"prediction") \
.orderBy("probability", ascending=False) \
.show(n = 10, truncate = 30)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator
(prediction
Col="prediction")
evaluator.evaluate(predictions)

׼ȷÂÊÊÇ0.9610787444388802£¬·Ç³£²»´í£¡

2.ÒÔTF-IDF×÷ÎªÌØÕ÷£¬ÀûÓÃÂß¼­»Ø¹é½øÐзÖÀà

from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
#minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer,
stopwordsRemover, hashingTF, idf,
label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit
([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
.select("Descript","Category","probability","label",
"prediction") \
.orderBy("probability", ascending=False) \
.show(n = 10, truncate = 30)

evaluator = MulticlassClassificationEvaluator(predictionCol
="prediction")
evaluator.evaluate(predictions)

׼ȷÂÊÊÇ0.9616202660247297£¬ºÍÉÏÃæ½á¹û²î²»¶à¡£

3.½»²æÑéÖ¤

Óý»²æÑéÖ¤À´ÓÅ»¯²ÎÊý£¬ÕâÀïÎÒÃÇÕë¶Ô»ùÓÚ´ÊÆµÌØÕ÷µÄÂß¼­»Ø¹éÄ£ÐͽøÐÐÓÅ»¯¡£

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
.addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
.addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2])
# Elastic Net Parameter (Ridge = 0)
# .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
# .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
.build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
estimatorParamMaps=paramGrid, \
evaluator=evaluator, \
numFolds=5)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol=
"prediction")
evaluator.evaluate(predictions)

׼ȷÂʱä³ÉÁË0.9851796929217101£¬»ñµÃÁËÌáÉý¡£

3.ÆÓËØ±´Ò¶Ë¹

from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
.select("Descript","Category","probability","label",
"prediction") \
.orderBy("probability", ascending=False) \
.show(n = 10, truncate = 30)

evaluator = MulticlassClassificationEvaluator(predictionCol=
"prediction")
evaluator.evaluate(predictions)

׼ȷÂÊ£º0.9625414629888848

4.Ëæ»úÉ­ÁÖ

from pyspark.ml.classification import RandomForest
Classifier
rf = RandomForestClassifier(labelCol="label", \
featuresCol="features", \
numTrees = 100, \
maxDepth = 4, \
maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
.select("Descript","Category","probability","label",
"prediction") \
.orderBy("probability", ascending=False) \
.show(n = 10, truncate = 30)

evaluator = MulticlassClassificationEvaluator(predictionCol
="prediction")
evaluator.evaluate(predictions)

׼ȷÂÊ£º0.6600326922344301

ÉÏÃæ½á¹û¿ÉÒÔ¿´³ö£ºËæ»úÉ­ÁÖÊÇÓÅÐãµÄ¡¢Â³°ôµÄͨÓõÄÄ£ÐÍ£¬µ«ÊǶÔÓÚ¸ßάϡÊèÊý¾ÝÀ´Ëµ£¬Ëü²¢²»ÊÇÒ»¸öºÜºÃµÄÑ¡Ôñ¡£

Ã÷ÏÔ£¬ÎÒÃÇ»áÑ¡ÔñʹÓÃÁ˽»²æÑéÖ¤µÄÂß¼­»Ø¹é¡£

   
2563 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ