【小(xiǎo)編推薦】在Apache Spark上(shàn>​↑g)跑Logistic Regression算(suàn)法

2015-07-24  &n§≥bsp;|   發布者:梁國(guó)芳&nb​'<sp;  |   查∞"→↔看(kàn):3320次

IT新聞
 本文(wén)旨在介紹使用(yòng)機(jī)器(qì)學習(x'₩↓í)算(suàn)法,來(lái)介紹Apache Spark數(shù)據©₩處理(lǐ)引擎。我們一(yī)開(kāi)始♥​γ↕會(huì)先簡單介紹一(yī)下(xià)Spark,然後我們将開Ω☆σ(kāi)始實踐一(yī)個(gè)機(jī) ←>器(qì)學習(xí)的(de)例子(zǐ)。我們将使用(yòng)Qual→↔itative Bankruptcy數(shù)據集 ±,來(lái)自(zì)UCI機(jī)器(qì)學習(xí)數(®★Ωshù)據倉庫。雖然Spark支持同時(shí)Java£φ,Scala,Python和(hé)R,在本教程中我們将使用(yòng)Sca₩"la作(zuò)為(wèi)編程語言。不(bù)用(yòn✔¥"σg)擔心你(nǐ)沒有(yǒu)使用(yòng)Scal•>a的(de)經驗。練習(xí)中的(de)每個(gè£γα)代碼段,我們都(dōu)會(huì)詳細解釋一(yī)遍。'&÷

APACHE SPARK

Apache Spark是(shì)一(yī)個(gè)開(k∑πāi)源的(de)集群計(jì)算(suàn)框架,用(yòng)β÷¥Spark編寫的(de)應用(yòng)程序可(kě)以比Hado ♥op MapReduce範式的(de)速度高(gāo)1δ€00倍以上(shàng)。Spark的(de)一(π yī)個(gè)主要(yào)的(de)特點,基于內(nèiπ±≠)存,運行(xíng)速度快(kuài),不(bù)僅如(rú)此π₽✔,複雜(zá)應用(yòng)在Spark≤★φ$系統上(shàng)運行(xíng),也(yě)比基于磁盤的£↕β(de)MapReduce更有(yǒu)效。Spark還(hái)旨在更 ​₹通(tōng)用(yòng),因此它提供了(le)以λ©σ下(xià)庫:

 

 

正如(rú)已經提到(dào)的(de),S"&park支持Java,Scala,Python和(h₩ é)R編程語言。它還(hái)集成了(le)其他 ∞•¶(tā)大(dà)數(shù)據工(gōng)具。特别是(sh'✘→ì),Spark可(kě)以運行(xíng)在☆φ<¥Hadoop集群,可(kě)以訪問(wèn)任何數β∑λ₩(shù)據源,包括Hadoop Cassandra。

Spark核心概念

在一(yī)個(gè)高(gāo)的(de)抽象≠¶層面,一(yī)個(gè)Spark的(de)應用(yòng)程β←<序由一(yī)個(gè)驅動程序作(zuò)為(•✘→βwèi)入口,在一(yī)個(gè)集群上(shàng±λ≥)運行(xíng)各種并行(xíng)操作(zuò)。驅 ≠動程序包含了(le)你(nǐ)的(de)>®↕應用(yòng)程序的(de)main函數(shù),然後将這(z±π₹hè)些(xiē)應用(yòng)程序分(fēn)配給集群成員(yuán)執行©‌(xíng)。驅動程序通(tōng)過SparkCont★∏ext對(duì)象來(lái)訪問(wèn)計(jì)算(suàn)集群。對•Ω÷↔(duì)于交互式的(de)shell應用(yò☆£ng),SparkContext默認可(kě)通(tōng)過sδ<★♦c變量訪問(wèn)。

Spark的(de)一(yī)個(gè)非常重要(yào)的(de)概念是(s←☆€hì)RDD–彈性分(fēn)布式數(shù)據集。這(zh£≤è)是(shì)一(yī)個(gè)不(bù)可(kě)改變的($¥♦de)對(duì)象集合。每個(gè)RDD會(huì)分(σ ≤fēn)成多(duō)個(gè)分(fēn)區(qū £),每個(gè)分(fēn)區(qū)可(kě)能(néng)在不(bù)ε'σ同的(de)群集節點上(shàng)參與計(jì)算(suàn)。RDD可♠¶©(kě)以包含任何類型的(de)Java,Scβ‌↓ala對(duì)象,Python或R,包括用(yòng)戶自(♠≠♠<zì)定義的(de)類。RDDS的(de)産生(shēng)有∏£(yǒu)兩種基本方式:通(tōng)過加載☆σ外(wài)部數(shù)據集或分(fēn)配對(du♥ε αì)象的(de)集合如(rú),list或set。

在創建了(le)RDDs之後,我們可(kě)以對(duì)RD₽≠✘αDs做(zuò)2種不(bù)同類型的(de)操作(zuò•≤):

 

 

RDDs通(tōng)過lazy的(de)方式計(jì)算(suà←©"n) - 即當RDDs碰到(dào)Act&>€ion操作(zuò)時(shí),才會(h₩φ$♠uì)開(kāi)始計(jì)算(suàn)。Spark的(de)Trα€ansformations操作(zuò),都(dōΩ£ u)會(huì)積累成一(yī)條鏈,隻有(yǒu)當α✔需要(yào)數(shù)據的(de)時(shβσ♦í)候,才會(huì)執行(xíng)這(zhè)些(xiē)Transγ'formations操作(zuò)。每一(yī)次RDD進行Ω↕ (xíng)Action操作(zuò)時∞λ(shí),RDD都(dōu)會(huì)重新生(shēng)成。如(rú↔♠✘)果你(nǐ)希望某些(xiē)中間(ji★•ān)的(de)計(jì)算(suàn)結果能(néng)被其他(✘↕tā)的(de)Action操作(zuò§Ωδ¥)複用(yòng),那(nà)麽你(nǐ)需要(yào)調用(yòng)Spa&' rk的(de)RDD.persist()來(lái)保存中間(ji®÷←‌ān)數(shù)據。

Spark支持多(duō)種運行(xíng∏ ‌)模式,你(nǐ)可(kě)以使用(yòng)↕↑交互式的(de)Shell,或者單獨運行(xíng)一(yī)個(g↓∑è)standalone的(de)Spark程序。不(bù)σ∞管哪一(yī)種方式,你(nǐ)都(dō£​☆u)會(huì)有(yǒu)如(rú)下(xià)的(de)工φ€÷(gōng)作(zuò)流:

 

 

安裝Apache Spark

為(wèi)了(le)開(kāi)始使用(yòng)Spar§λεγk,需要(yào)先從(cóng)官網下(xià)載。選擇“Pre-built for Had≤←δ oop 2.4 and later”"≠版本然後點擊“Direct Download”。π‌₩¶如(rú)果是(shì)Windows用(yòn​♥™★g)戶,建議(yì)将Spark放(fàng)進名字沒有(yǒu)空(kōng)∑<®格的(de)文(wén)件(jiàn)夾中。比如(rú)說(δ€≈shuō),将文(wén)件(jiàn)解壓到♠÷ ₹(dào):C:\spark。

正如(rú)上(shàng)面所說(shuō)的(de),我們将會(h✔§uì)使用(yòng)Scala編程語言。進入Spark的(de)安裝路(lù ‍)徑,運行(xíng)如(rú)下(xià)命令:

 

// Linux and Mac users

bin/spark-₩Ωσshell

// Windows user₹®→ s

bin\spark shell

 

然後你(nǐ)可(kě)以在控制(zhì)台中看(kàn)到(dào)Sc×‌ala:

 

scala>

 

QUALITATIVE 破産分(fēn)類

現(xiàn)實生(shēng)活中的(de)問(wèn)題是(shì)可¥δ(kě)以用(yòng)機(jī)器(qì)學習(xí)算(su​÷£àn)法來(lái)預測的(de)。我們将試圖解決的(de),通(t♣γōng)過一(yī)個(gè)公司的(de)定性信↓€↑♣息,預測該公司是(shì)否會(huì)破δδ産。數(shù)據集可(kě)以從(cóng)UCI機(jī)♥'≤器(qì)學習(xí)庫https://archive.ics.uci.eδδdu/ml/datasets/qualitative_bankruptc∏₹y下(xià)載。在Spark的(de)安裝文(wén)件(∏∏jiàn)夾中,創建一(yī)個(gè)新的​×'(de)文(wén)件(jiàn)夾命名為(wèi)playgλ≤round。複制(zhì) qualitative_bankru✘​↕↓ptcy.data.txt文(wén)件(jλ¶iàn)到(dào)這(zhè)裡(lǐ)面。這(zhè)将是(sδ∏hì)我們的(de)訓練數(shù)據。

數(shù)據集包含250個(gè)實例,其中143個(gè)實例為(wèi)§↑α非破産,107個(gè)破産實例。

每一(yī)個(gè)實例數(shù)據格式"₹如(rú)下(xià):

 

 

這(zhè)些(xiē)被稱為(wèi)定性參數Ω≈(shù),因為(wèi)它們不(bù)能(×™βnéng)被表示為(wèi)一(yī)個(gè)數(shù)字。每一(yī​¥'♦)個(gè)參數(shù)可(kě)以取下(xià)以下(±≥©♣xià)值:

 

 

數(shù)據集的(de)最後一(yī)個(gè)列是(shì)每‌§φ&個(gè)實例的(de)分(fēn)類:B為(wèi)破産或NB非破産♦™✔。

鑒于此數(shù)據集,我們必須訓練一(yī)±₽個(gè)模型,它可(kě)以用(yòng)來(lái®↑)分(fēn)類新的(de)數(shù)據實例,這(zφε∏hè)是(shì)一(yī)個(gè)典型的(de>γ)分(fēn)類問(wèn)題。

解決問(wèn)題的(de)步驟如(rú± π)下(xià):

 

 

SPARK LOGISTIC REGRESSIO ←N

我們将用(yòng)Spark的(de)邏輯回歸算(su♦→àn)法訓練分(fēn)類模型。如(rú>↑≈≈)果你(nǐ)想知(zhī)道(dào)更多(duō)邏輯回歸算(suàn)☆£法的(de)原理(lǐ),你(nǐ)可(kě)以閱讀(dú)→‌Ω以下(xià)教程http://technobium.↑<✘com/logistic-regression÷✔®-using-apache-mahout。

在Spark的(de)Scala Shell中粘π±₩貼以下(xià)import語句:

 

import org.apache.spark.mllib.★< ↓classification.{LogisticRegressi₩↑±onWithLBFGS, LogisticReg←σ₹ ressionModel}

import org.ap ®±≈ache.spark.mllib.regression≤γ.LabeledPoint

import o£≈♥rg.apache.spark.mllib.lin®←♣alg.{Vector, Vectors}

 

這(zhè)将導入所需的(de)庫。

接下(xià)來(lái)我們将創建一(yī)個(gèλ✘)Scala函數(shù),将數(shù§¶₩)據集中的(de)qualitative數(sh✘∏&ù)據轉換為(wèi)Double型數(shù)值。鍵入或粘貼以下(xià)代 ≤碼并回車(chē),在Spark Scala Shell。

 

def getDoubleValue( input:String ) : D >§ouble = {

    var result:D₩≈ ±ouble = 0.0

    if (input == "σΩπP")  result = 3.0 

    if£≤₹ (input == "A")  resu§π←₹lt = 2.0

    if (inpu<₽±t == "N")  result =">" 1.0

    if (input == "NB&q∑αδ↔uot;) result = 1.0

 ↑≈δ§   if (input == "B←¶€")  result = 0.0

 ≠®    return result

   }

 

如(rú)果所有(yǒu)的(de)運行(xíng)都(dōu)沒有÷π♦×(yǒu)問(wèn)題,你(nǐ)應該看(kàn)到(dào)這(zhè)σδ★φ樣的(de)輸出:

 

getDoubleValue: (input: String)D↕£¶©ouble

 

現(xiàn)在,我們可(kě)以讀(dú)取到(dàoσ‍§)qualitative_bankruptc∞♠≤ y.data.txt文(wén)件(jiàn)中的(de)數♣ ₹λ(shù)據。從(cóng)Spark的(de♦€±)角度來(lái)看(kàn),這(zhè)是(shì)一(₹≈α÷yī)個(gè)Transformation操作(zuò)。在這(zh'××→è)個(gè)階段,數(shù)據實際上(shàng)♦β$π不(bù)被讀(dú)入內(nèi)存。如π₩(rú)前所述,這(zhè)是(shì)一(yī)個(gè)laφ↔zy的(de)方式執行(xíng)。實際的(de)讀(dú)取操作∏♥(zuò)是(shì)由count()引發,這(zhè)是(ε♦'shì)一(yī)個(gè)Action操↑₽ ε作(zuò)。

 

val data = sc.textFile("playgr£∑ound/Qualitative_Bankruptc"✔δy.data.txt")

data.count()

 

用(yòng)我們val關鍵字聲明(míng)一(yī)個(g让)常量data。它是(shì)一(yī)個(gè)包含輸入數(shε÷ù)據所有(yǒu)行(xíng)的(de)☆πRDD。讀(dú)操作(zuò)被SC或sparkcontext上(shε☆πàng)下(xià)文(wén)變量監聽(tīng)。count操σ≠←作(zuò)應返回以下(xià)結果:

 

res0: Long = 250

 

現(xiàn)在是(shì)時(shí)候☆≤©為(wèi)邏輯回歸算(suàn)法準備數(shù)據,将字符串轉換為β£♣♠(wèi)數(shù)值型。

 

val parsedData = data.map{l$"$ine =

    val parts = line.split(≥•",")

    LabeledPo  £int(getDoubleValue(pa<¶ rts(6)), Vectors.dense(parts.slice(0,♠±6).map(x =getDoubleValue(x))))&¶©

}

 

在這(zhè)裡(lǐ),我們聲明(míng)了(le)另外(wài)一&α&¥(yī)個(gè)常量,命名為(wèi)par↓<♥sedData。對(duì)于data變量中的(de)每一(yī)行(xínε÷​✘g)數(shù)據,我們将做(zuò)以下(xià)操§€λ作(zuò):

 

 

Spark支持map()轉換操作(zuò),Action動作(z€βuò)執行(xíng)時(shí),第一(yī)個(gè)執行(xí Ωδng)的(de)就(jiù)是(shì)map()。

我們來(lái)看(kàn)看(kàn)¶♣β我們準備好(hǎo)的(de)數(shù±™)據,使用(yòng)take():

 

parsedData.take(10)

 

上(shàng)面的(de)代碼,告訴Spark從(cóng)parse​÷π÷dData數(shù)組中取出10個(gè)♥φ•樣本,并打印到(dào)控制(zhì)台。一(yī)樣的(de),tγ× ake()操作(zuò)之前,會(huì)∑♣先執行(xíng)map()。輸出結果如(rú)下(xi →à):

 

res5: Array[org.apache.spa±♠→✔rk.mllib.regression.Labele≈♠dPoint] = Array((1.0,[3.0,3.₩₩0,2.0,2.0,2.0,3.0]), (1.0,[1.0,1>≥.0,2.0,2.0,2.0,1.0]), (1.0,[2.0,2.0,2.0'‌,2.0,2.0,2.0]), (1.0,[3.0,3.0,3.0,>Ω3.0,3.0,3.0]), (1.0,[1.0,1.0,3.0,3.₹↔∞0,3.0,1.0]), (1.0,[2.0,2.0,3.0,3.∞®™ 0,3.0,2.0]), (1.0,[3‌₩.0,3.0,2.0,3.0,3.0,3.0]), (≤δ1.0,[3.0,3.0,3.0,2.0,2.0,3.0]), (1.0,[3¶€ ™.0,3.0,2.0,3.0,2.0,3.0]), (1.0,[3. ¶"0,3.0,2.0,2.0,3.0,3.0]))

 

接著(zhe)我們劃分(fēn)一(yī)下(xià)訓練數(shù)據和(©♦✘φhé)測試數(shù)據,将parsedData的(d§Ω‌≈e)60%分(fēn)為(wèi)訓練數(shù)據,40%分(fēn)為(w€↕èi)測試數(shù)據。

 

val splits = parsedD÷'ata.randomSplit(Arra&§≥y(0.6, 0.4), seed = 11L)

val trainγ₹ingData = splits(0)
&®± 
val testData = splits(1)

 

訓練數(shù)據和(hé)測試數(shù)據也(yě)♥σ可(kě)以像上(shàng)面一(yī)樣,使用(yòng)take()者cφ‍©₹ount()查看(kàn)。

激動人(rén)心的(de)時(shí)刻,我們現(xi&"βàn)在開(kāi)始使用(yòng)Spark的(de)Logisti÷♦δ‌cRegressioinWithLBFGS()來(lái)訓₩÷ →練模型。設置好(hǎo)分(fēn)類個(gè)數(shù),這(zhè)裡(✘÷±​lǐ)是(shì)2個(gè)(破産和(hé)非破産):

 

val model = new LogisticRegre<γssionWithLBFGS().setNum×÷'€Classes(2).run(trainingData)

 

當模型訓練完,我們可(kě)以使用(yòng)testData來(lái‍↕)檢驗一(yī)下(xià)模型的(de)出錯(cuò)率。

 

val labelAndPreds = testData.map {→  point =

  val prediction↑ $ = model.predict(point.≠♦•features)

  (point.label ∏γβ, prediction)

}

val trainErr =φ>↔" labelAndPreds.filter(r = r._1π↕ != r._2).count.toDouble / testDβ&ata.count

 

變量labelAndPreds保存了(le)map()轉σ≠換操作(zuò),map()将每一(yī)個≥​♥←(gè)行(xíng)轉換成二元組。二元組包含了(l↔σ>αe)testData的(de)标簽數(shù)據(point.label,分←σ(fēn)類數(shù)據)和(hé)預測出來(l∞£&ái)的(de)分(fēn)類數(shù)據(predi↑≥‌ction)。模型使用(yòng)point.features作(zu↑φ≥♣ò)為(wèi)輸入數(shù)據。

最後一(yī)行(xíng)代碼,我們使用(yò✔£∏ng)filter()轉換操作(zuò)和γε(hé)count()動作(zuò)操作(zuò)來(lái↓♠​↕)計(jì)算(suàn)模型出錯(cuò)率。filt​✔β÷er()中,保留預測分(fēn)類和(hé)所屬分(fēn∞★γ)類不(bù)一(yī)緻的(de)元組。在Scala中_1和(h¥™σ‍é)_2可(kě)以用(yòng)來(lá↕₩β÷i)訪問(wèn)元組的(de)第一(yī)個(gè)元素和(hé÷→ )第二個(gè)元素。最後用(yòng)預測出錯(cuò)的(de)數(sh♥←>ù)量除以testData訓練集的(de)★∏↓數(shù)量,我們可(kě)以得(de)到(dδ♣ào)模型出錯(cuò)率:

 

trainErr: Double = 0.204301075268817​₹↑22

 

總結

在這(zhè)個(gè)教程中,你(nǐ)已經看(kàn)到(dào)了(le‍↕​)Apache Spark可(kě)以用(yòng)于機(jī​φ )器(qì)學習(xí)的(de)任務,如(rú)logistic ♠§regression。雖然這(zhè)隻是(shì)非分(fēnε  →)布式的(de)單機(jī)環境的(de)Scala shell dem♠×₩&o,但(dàn)是(shì)Spark的(de)真正強大(dà)在于分(  fēn)布式下(xià)的(de)內(nèi)存并行(xíngα₩)處理(lǐ)能(néng)力。

在大(dà)數(shù)據領域,Spark是(shì)目前★↓&‍最活躍的(de)開(kāi)源項目,在過去(qù)幾年(nián)已迅速獲得(≤  de)關注和(hé)發展。在過去(qù)的(de)幾年(niγ≤→±án)裡(lǐ)。采訪了(le)超過2100受訪者,各種各δ€樣的(de)使用(yòng)情況和(hé)環境。

[參考資料]

“Learning Spark&rdqγ‍uo; by HoldenKarau, ↓≥₽Andy Konwinski, Patrick Wendel•♠✔l and Matei Zaharia, O’Re¥φilly Media 2015

Lichman, M. (2013). UCI Mach ≈ine Learning Repositorα↑¶y [http://archive.ics.uci.edu/ml].↓​φ Irvine, CA: University of California, ₩‍♦School of Information and Computer SciΩ↔ε♥ence

https://spark.apache.org/docs/lγ✔ atest/mllib-linear-methods.html#lo© →"gistic-regression

https://spark.apache.org/docs/1.¥♥≠π1.0/mllib-data-types.html

https://archive.ics.uci.edu/ml/datase¥♦ts/Qualitative_Bankruptc♣¥y

https://databricks.com/blog/∞±≤2015/01/27/big-data-projects-are-hungry™✘‌≈-for-simpler-and-more-powerful-tools-s•↑¥∑urvey-validates-apache-spark-is-'←Ω₹gaining-developer-traction.html