summaryrefslogtreecommitdiffstats
path: root/predPy/predictor.py
diff options
context:
space:
mode:
authorLinghui Zeng <linghui.zeng@huawei.com>2016-05-09 16:26:57 +0800
committerLinghui Zeng <linghui.zeng@huawei.com>2016-05-09 16:26:57 +0800
commit2cbd32234e1e3c8e27f082ce0432f4797395d714 (patch)
tree66e6c6df41500bf3f5a0a5239b438d04918e6c28 /predPy/predictor.py
parent5b69961354a03dba2c0025255a08e2fde0f78048 (diff)
Add prediction algorithm run on Spark
JIRA: PREDICTION-64 Change-Id: I300aafbc791d9d7a87d70cc2bde888aff18dddf0 Signed-off-by: Linghui Zeng <linghui.zeng@huawei.com>
Diffstat (limited to 'predPy/predictor.py')
-rw-r--r--predPy/predictor.py139
1 files changed, 139 insertions, 0 deletions
diff --git a/predPy/predictor.py b/predPy/predictor.py
new file mode 100644
index 0000000..159fcf4
--- /dev/null
+++ b/predPy/predictor.py
@@ -0,0 +1,139 @@
+# Copyright (c) 2016 Huawei
+# All Rights Reserved.
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+SVM
+Logistic Regression with SGD
+Logistic Regression with LBFGS
+Multinomial Naive Bayes
+Decision Tree
+Random Forest
+Gradient Boosted Trees
+"""
+from __future__ import print_function
+
+from pyspark import SparkContext
+
+import csv
+import StringIO
+
+import tempfile
+from shutil import rmtree
+
+# from pyspark.mllib.classification import SVMWithSGD, SVMModel
+from pyspark.mllib.classification import LogisticRegressionWithSGD
+# from pyspark.mllib.classification import LogisticRegressionWithLBFGS
+from pyspark.mllib.classification import LogisticRegressionModel
+# from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
+# from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
+# from pyspark.mllib.tree import RandomForest, RandomForestModel
+# from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
+from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.regression import LabeledPoint
+
+
+def loadRecord(line):
+ """Load a CSV line and select 26 indicative parameters"""
+ input = StringIO.StringIO(line)
+ reader = csv.reader(input)
+ parameters = reader.next()
+ # Instances that were collected within seven days before the failures
+ # are used to train the failing model
+ if parameters[3] >= 168:
+ parameters[-1] = 0
+ selectedParameters = parameters[12:17] + parameters[19:20] \
+ + parameters[23:26] + parameters[39:47] + parameters[54:61] \
+ + parameters[62:]
+ return selectedParameters
+
+
+def parseLine(line):
+ """Parse a row """
+ label = float(line[-1])
+ features = Vectors.dense(map(float, line[:-1]))
+ return LabeledPoint(label, features)
+
+
+if __name__ == "__main__":
+
+ sc = SparkContext(appName="HardDriveFailurePrediction")
+
+ # $example on$
+ data = sc.textFile('hdd/harddrive1.csv').map(loadRecord)\
+ .map(parseLine)
+
+ # Split data aproximately into training (60%) and test (40%)
+ [trainingData, testData] = data.randomSplit([0.6, 0.4], seed=0)
+
+ # Train a SVM model
+# model = SVMWithSGD.train(trainingData, iterations=2)
+ # Train a logistic regression model
+ model = LogisticRegressionWithSGD.train(trainingData, iterations=3)
+# model = LogisticRegressionWithLBFGS.train(trainingData)
+ # Train a multinomial naive Bayes model given an RDD of LabeledPoint.
+# model = NaiveBayes.train(trainingData, 0.8)
+ # Train a decision tree model.
+ # Empty categoricalFeaturesInfo indicates all features are continuous.
+# model = DecisionTree.trainClassifier(trainingData, numClasses=2,
+# categoricalFeaturesInfo={},
+# impurity='entropy', maxDepth=5,
+# maxBins=32)
+ # Train a RandomForest model.
+ # Empty categoricalFeaturesInfo indicates all features are continuous.
+ # Note: Use larger numTrees in practice.
+ # Setting featureSubsetStrategy="auto" lets the algorithm choose.
+# model = RandomForest.trainClassifier(trainingData, numClasses=2,
+# categoricalFeaturesInfo={},
+# numTrees=3,
+# featureSubsetStrategy="auto",
+# impurity='gini', maxDepth=7,
+# maxBins=32)
+
+ # Train a GradientBoostedTrees model.
+ # Empty categoricalFeaturesInfo indicates all features are continuous.
+# model = GradientBoostedTrees.trainClassifier(trainingData,
+# categoricalFeaturesInfo={},
+# numIterations=3, maxDepth=3,
+# maxBins=32)
+ # Make prediction and test accuracy.
+# labelsAndPredictions = testData\
+# .map(lambda p: (p.label, model.predict(p.features)))
+# accuracy = labelsAndPredictions\
+# .filter(lambda (x, v): x == v).count() / float(testData.count())
+ predictions = model.predict(testData.map(lambda x: x.features))
+ labelsAndPredictions = testData.map(lambda p: p.label).zip(predictions)
+ accuracy = labelsAndPredictions.filter(lambda (v, p): v == p).\
+ count() / float(testData.count())
+ print('Test Accuracy = ' + str(accuracy))
+# print('Learned classification tree model:')
+# print(model.toDebugString())
+
+ # Save and load model
+ path = tempfile.mkdtemp(dir='.')
+ model.save(sc, path)
+# sameModel = SVMModel.load(sc, path)
+ sameModel = LogisticRegressionModel.load(sc, path)
+# sameModel = NaiveBayesModel.load(sc, path)
+# sameModel = DecisionTreeModel.load(sc, path)
+# sameModel = RandomForestModel.load(sc, path)
+# sameModel = GradientBoostedTreesModel.load(sc, path)
+ try:
+ rmtree(path)
+ except OSError:
+ pass