±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÔÆÉçÇø,±¾ÎÄͨ¹ýʹÓÃSpark
Machine Learning LibraryºÍPySparkÀ´½â¾öÒ»¸öÎı¾¶à·ÖÀàÎÊÌâ¡£ |
|
¡¾µ¼¶Á¡¿ÎÒÃÇÖªµÀ£¬Apache SparkÔÚ´¦ÀíʵʱÊý¾Ý·½ÃæµÄÄÜÁ¦·Ç³£³öÉ«£¬Ä¿Ç°Ò²ÔÚ¹¤Òµ½ç¹ã·ºÊ¹Óᣱ¾ÎÄͨ¹ýʹÓÃSpark
Machine Learning LibraryºÍPySparkÀ´½â¾öÒ»¸öÎı¾¶à·ÖÀàÎÊÌ⣬ÄÚÈݰüÀ¨£ºÊý¾ÝÌáÈ¡¡¢Model
Pipeline¡¢ÑµÁ·/²âÊÔÊý¾Ý¼¯»®·Ö¡¢Ä£ÐÍѵÁ·ºÍÆÀ¼ÛµÈ£¬¾ßÌåϸ½Ú¿ÉÒԲο¼ÏÂÃæÈ«ÎÄ¡£
Multi-Class Text Classification with PySpark
Apache SparkÊܵ½Ô½À´Ô½¶àµÄ¹Ø×¢£¬Ö÷ÒªÊÇÒòΪËü´¦ÀíʵʱÊý¾ÝµÄÄÜÁ¦¡£Ã¿Ìì¶¼ÓдóÁ¿µÄÊý¾ÝÐèÒª±»´¦Àí£¬ÈçºÎʵʱµØ·ÖÎöÕâЩÊý¾Ý±äµÃ¼«ÆäÖØÒª¡£ÁíÍ⣬Apache
Spark¿ÉÒÔÔÙ²»²ÉÑùµÄÇé¿öÏ¿ìËÙ´¦Àí´óÁ¿µÄÊý¾Ý¡£Ðí¶à¹¤Òµ½çµÄר¼ÒÌṩÁËÀíÓÉ£º why you should
use Spark for Machine Learning?
Êý¾Ý
ÎÒÃǵÄÈÎÎñ£¬Êǽ«¾É½ðɽ·¸×ï¼Ç¼£¨San Francisco Crime
Description£©·ÖÀൽ33¸öÀàÄ¿ÖС£
¸ø¶¨Ò»¸ö·¸×ïÃèÊö£¬ÎÒÃÇÏëÖªµÀËüÊôÓÚ33Àà·¸×ïÖеÄÄÄÒ»Àà¡£·ÖÀàÆ÷¼ÙÉèÿ¸ö·¸×ïÒ»¶¨ÊôÓÚÇÒ½öÊôÓÚ33ÀàÖеÄÒ»Àà¡£ÕâÊÇÒ»¸ö¶à·ÖÀàµÄÎÊÌâ¡£
ÊäÈ룺·¸×ïÃèÊö¡£ÀýÈ磺¡° STOLEN AUTOMOBILE¡±
Êä³ö£ºÀà±ð¡£ÀýÈ磺VEHICLE THEFT
ΪÁ˽â¾öÕâ¸öÎÊÌ⣬ÎÒÃÇÔÚSparkµÄÓмලѧϰËã·¨ÖÐÓÃÁËÒ»Ð©ÌØÕ÷ÌáÈ¡¼¼Êõ¡£
Êý¾ÝÌáÈ¡
ÀûÓÃSparkµÄcsv¿âÖ±½ÓÔØÈëCSV¸ñʽµÄÊý¾Ý£º
from pyspark.sql
import SQLContext
from pyspark import SparkContext
sc =SparkContext()
sqlContext = SQLContext(sc)
data = sqlContext.read.format('com.databricks.spark.csv') .options(header='true',
inferschema='true').load('train.csv') |
³ýȥһЩ²»ÒªµÄÁУ¬²¢Õ¹Ê¾Ç°ÎåÐУº
drop_list = ['Dates',
'DayOfWeek', 'PdDistrict', 'Resolution', 'Address',
'X', 'Y']
data = data.select([column for column in data.columns
if column not in drop_list])
data.show(5) |

ÀûÓÃprintSchema()·½·¨À´ÏÔʾÊý¾ÝµÄ½á¹¹£º

°üº¬ÊýÁ¿×î¶àµÄ20Àà·¸×
from pyspark.sql.functions
import col
data.groupBy("Category") \
.count() \
.orderBy(col("count").desc()) \
.show() |

°üº¬·¸×ïÊýÁ¿×î¶àµÄ20¸öÃèÊö£º
from pyspark.sql.functions
import col
data.groupBy("Category") \
.count() \
.orderBy(col("count").desc()) \
.show() |

Á÷Ë®Ïߣ¨Model Pipeline£©
ÎÒÃǵÄÁ÷³ÌºÍscikit-learn°æ±¾µÄºÜÏàËÆ£¬°üº¬3¸ö²½Ö裺
1. regexTokenizer£ºÀûÓÃÕýÔòÇзֵ¥´Ê
2. stopwordsRemover£ºÒƳýÍ£ÓôÊ
3. countVectors£º¹¹½¨´ÊƵÏòÁ¿
from pyspark.ml.feature
import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="Descript",
outputCol="words", pattern="\\W")
# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"]
stopwordsRemover = StopWordsRemover(inputCol="words",
outputCol="filtered").
setStopWords(add_stopwords)
# bag of words count
countVectors = CountVectorizer(inputCol="filtered",
outputCol="features",
vocabSize=10000, minDF=5) |
StringIndexer
StringIndexer½«Ò»ÁÐ×Ö·û´®label±àÂëΪһÁÐË÷ÒýºÅ£¨´Ó0µ½labelÖÖÀàÊý-1£©£¬¸ù¾Ýlabel³öÏֵįµÂÊÅÅÐò£¬×îÆµ·±³öÏÖµÄlabelµÄindexΪ0¡£
ÔÚ¸ÃÀý×ÓÖУ¬label»á±»±àÂë³É´Ó0µ½32µÄÕûÊý£¬×îÆµ·±µÄ label(LARCENY/THEFT)
»á±»±àÂë³É0¡£
from pyspark.ml
import Pipeline
from pyspark.ml.feature import OneHotEncoder,
StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "Category",
outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover,
countVectors,
label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5) |

ѵÁ·/²âÊÔÊý¾Ý¼¯»®·Ö
# set seed for
reproducibility
(trainingData, testData) = dataset.randomSplit([0.7,
0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))
|
ѵÁ·Êý¾ÝÁ¿£º5185
²âÊÔÊý¾ÝÁ¿£º2104
Ä£ÐÍѵÁ·ºÍÆÀ¼Û
1.ÒÔ´ÊÆµ×÷ÎªÌØÕ÷£¬ÀûÓÃÂß¼»Ø¹é½øÐзÖÀà
ÎÒÃǵÄÄ£ÐÍÔÚ²âÊÔ¼¯ÉÏÔ¤²âºÍ´ò·Ö£¬²é¿´10¸öÔ¤²â¸ÅÂÊÖµ×î¸ßµÄ½á¹û£º
lr = LogisticRegression(maxIter=20,
regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction']
== 0) \
.select("Descript","Category","probability","label" ,"prediction") \
.orderBy("probability", ascending=False)
\
.show(n = 10, truncate = 30) |

from pyspark.ml.evaluation
import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator (prediction Col="prediction")
evaluator.evaluate(predictions) |
׼ȷÂÊÊÇ0.9610787444388802£¬·Ç³£²»´í£¡
2.ÒÔTF-IDF×÷ÎªÌØÕ÷£¬ÀûÓÃÂß¼»Ø¹é½øÐзÖÀà
from pyspark.ml.feature
import HashingTF, IDF
hashingTF = HashingTF(inputCol="filtered",
outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures",
outputCol="features", minDocFreq=5)
#minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer,
stopwordsRemover, hashingTF, idf,
label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit ([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3,
elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction']
== 0) \
.select("Descript","Category","probability","label", "prediction") \
.orderBy("probability", ascending=False)
\
.show(n = 10, truncate = 30) |

evaluator
= MulticlassClassificationEvaluator(predictionCol ="prediction")
evaluator.evaluate(predictions) |
׼ȷÂÊÊÇ0.9616202660247297£¬ºÍÉÏÃæ½á¹û²î²»¶à¡£
3.½»²æÑéÖ¤
Óý»²æÑéÖ¤À´ÓÅ»¯²ÎÊý£¬ÕâÀïÎÒÃÇÕë¶Ô»ùÓÚ´ÊÆµÌØÕ÷µÄÂß¼»Ø¹éÄ£ÐͽøÐÐÓÅ»¯¡£
pipeline = Pipeline(stages=[regexTokenizer,
stopwordsRemover, countVectors, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7,
0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3,
elasticNetParam=0)
from pyspark.ml.tuning import ParamGridBuilder,
CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
.addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization
parameter
.addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2])
# Elastic Net Parameter (Ridge = 0)
# .addGrid(model.maxIter, [10, 20, 50]) #Number
of iterations
# .addGrid(idf.numFeatures, [10, 100, 1000]) #
Number of features
.build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
estimatorParamMaps=paramGrid, \
evaluator=evaluator, \
numFolds=5)
cvModel = cv.fit(trainingData)
predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol= "prediction")
evaluator.evaluate(predictions) |
׼ȷÂʱä³ÉÁË0.9851796929217101£¬»ñµÃÁËÌáÉý¡£
3.ÆÓËØ±´Ò¶Ë¹
from pyspark.ml.classification
import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction']
== 0) \
.select("Descript","Category","probability","label", "prediction") \
.orderBy("probability", ascending=False)
\
.show(n = 10, truncate = 30) |

evaluator
= MulticlassClassificationEvaluator(predictionCol= "prediction")
evaluator.evaluate(predictions) |
׼ȷÂÊ£º0.9625414629888848
4.Ëæ»úÉÁÖ
from pyspark.ml.classification
import RandomForest Classifier
rf = RandomForestClassifier(labelCol="label",
\
featuresCol="features", \
numTrees = 100, \
maxDepth = 4, \
maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction']
== 0) \
.select("Descript","Category","probability","label", "prediction") \
.orderBy("probability", ascending=False)
\
.show(n = 10, truncate = 30) |

evaluator
= MulticlassClassificationEvaluator(predictionCol ="prediction")
evaluator.evaluate(predictions) |
׼ȷÂÊ£º0.6600326922344301
ÉÏÃæ½á¹û¿ÉÒÔ¿´³ö£ºËæ»úÉÁÖÊÇÓÅÐãµÄ¡¢Â³°ôµÄͨÓõÄÄ£ÐÍ£¬µ«ÊǶÔÓÚ¸ßάϡÊèÊý¾ÝÀ´Ëµ£¬Ëü²¢²»ÊÇÒ»¸öºÜºÃµÄÑ¡Ôñ¡£
Ã÷ÏÔ£¬ÎÒÃÇ»áÑ¡ÔñʹÓÃÁ˽»²æÑéÖ¤µÄÂß¼»Ø¹é¡£
|