Tuesday, 27 December 2016

Data science prototyping with spark/scala

The practice now is to use R or python for data science prototyping and after having validated the model, the algorithm and the models are migrated to the Big Data Hadoop environment and ported to Spark.

But with Scala available in interactive mode and notebooks like Zeppelin/databricks available, it is possible to straight-away evaluate the models in spark/scala. The only difficulty will be the ease of graph visualisations.

 Just to experiment i used the scala code from
https://github.com/mblanc/spark-ml/blob/master/src/main/scala/fr/xebia/sparkml/Titanic.scala
on zeppelin with spark running in local mode.

It was really fun to use spark interactively in a notebook. Atleast it is useful for learners like me to use scala/spark in notebook


12/27/2016
TitanicKaggle
import java.io.File
FINISHED
import org.apache.commons.io.FileUtils
import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}
Took 1 sec. Last updated by anonymous at December 27 2016, 8:19:44 PM.
FINISHED val csv = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").loa
csv: org.apache.spark.sql.DataFrame = [PassengerId: string, Survived: string ... 10 more fields]
Took 0 sec. Last updated by anonymous at December 27 2016, 8:20:08 PM.

csv.printSchema()
FINISHED
root

|-- PassengerId: string (nullable = true)

|-- Survived: string (nullable = true)

|-- Pclass: string (nullable = true)

|-- Name: string (nullable = true)

|-- Sex: string (nullable = true)

|-- Age: string (nullable = true)

|-- SibSp: string (nullable = true)

|-- Parch: string (nullable = true)

|-- Ticket: string (nullable = true)

|-- Fare: string (nullable = true)

|-- Cabin: string (nullable = true)

|-- Embarked: string (nullable = true)

Took 0 sec. Last updated by anonymous at December 27 2016, 8:20:21 PM.


val df = csv.select(
FINISHED


$"Survived".as("label").cast(DoubleType),


$"Age".as("age").cast(IntegerType),


$"Fare".as("fare").cast(DoubleType),


$"Pclass".as("class").cast(DoubleType),


$"Sex".as("sex"),


$"Name".as("name")


)

df: org.apache.spark.sql.DataFrame = [label: double, age: int ... 4 more fields]

1/6
12/27/2016
Took 0 sec. Last updated by anonymous at December 27 2016, 8:20:33 PM.
df.printSchema()
root
|-- label: double (nullable = true) |-- age: integer (nullable = true) |-- fare: double (nullable = true)
|-- class: double (nullable = true) |-- sex: string (nullable = true) |-- name: string (nullable = true)
Took 0 sec. Last updated by anonymous at December 27 2016, 8:20:43 PM.


df.show()






+
-----+----
+-------
+-----
+------
+--------------------


+
|label| age| fare|class| sex| name|



+-----
+----
+-------
+-----
+------
+--------------------


+
| 0.0| 22| 7.25| 3.0| male|Braund, Mr. Owen ...

|

| 1.0| 38|71.2833| 1.0|female|Cumings, Mrs. Joh
...|
| 1.0| 26| 7.925| 3.0|female|Heikkinen, Miss.
...
|

| 1.0| 35| 53.1| 1.0|female|Futrelle, Mrs. Ja...
|


| 0.0| 35| 8.05| 3.0| male|Allen, Mr. Willia...|



| 0.0|null| 8.4583| 3.0| male| Moran, Mr. James|

| 0.0| 54|51.8625| 1.0| male|McCarthy, Mr. Tim...
|
| 0.0| 2| 21.075| 3.0| male|Palsson, Master. ...

|

| 1.0| 27|11.1333| 3.0|female|Johnson, Mrs. Osc
...|
| 1.0| 14|30.0708| 2.0|female|Nasser, Mrs. Nich...
|
| 1.0| 4| 16.7| 3.0|female|Sandstrom, Miss. ...

|

| 1.0| 58| 26.55| 1.0|female|Bonnell, Miss. El...

|

| 0.0| 20| 8.05| 3.0| male|Saundercock, Mr. ...

|

| 0.0| 39| 31.275| 3.0| male|Andersson, Mr. An...|

| 0.0| 14| 7.8542| 3.0|female|Vestrom, Miss. Hu...
|
| 1.0| 55| 16.0| 2.0|female|Hewlett, Mrs. (Ma...

|

| 0.0| 2| 29.125| 3.0| male|Rice, Master. Eugene|

| 1.0|null| 13.0| 2.0| male|Williams, Mr. Cha...
|

| 0.0| 31| 18.0| 3.0|female|Vander Planke, Mr...
|

| 1.0|null| 7.225| 3.0|female|Masselmani, Mrs.
...|
+-----
+----
+-------
+-----
+------
+--------------------


+
only showing top 20 rows
Took 0 sec. Last updated by anonymous at December 27 2016, 8:20:54 PM.
FINISHED
FINISHED

df.describe(df.columns: _*).show()




FINISHED
+-------
+-------------------
+------------------
+-----------------
+------------------
+------
+
--------------------+
|summary| label| age| fare| class| sex| name|




+-------
+-------------------
+------------------
+-----------------
+------------------
+------
+--------------------
+
| count| 891| 714| 891| 891| 891| 891|





2/6
| mean| 0.3838383838383838|29.712885154061624| 32.2042079685746| 2.308641975308642| null| null|
| stddev|0.48659245426485753|14.529273128376575|49.69342859718089|0.8360712409770491| null| null|
| min| 0.0| 0| 0.0| 1.0|female|"Andersson, Mr. A...|




| max| 1.0| 80| 512.3292| 3.0| male|van Melkebeke, Mr...
|



+-------
+-------------------
+------------------
+-----------------
+------------------
+------
+--------------------
+
Took 1 sec. Last updated by anonymous at December 27 2016, 8:21:07 PM.

val select = df.na.fill(Map("age" -> 30, "fare" -> 32.2))
FINISHED
select: org.apache.spark.sql.DataFrame = [label: double, age: int ... 4 more fields]
Took 1 sec. Last updated by anonymous at December 27 2016, 8:21:23 PM.

val Array(trainSet, validationSet) = select.randomSplit(Array(0.75, 0.25))
FINISHED
trainSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: double, age: int ... 4 more fields] validationSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: double, age: int ... 4 more fields]
Took 0 sec. Last updated by anonymous at December 27 2016, 8:21:34 PM.
// The stages of our
pipeline
FINISHED
val sexIndexer =
new StringIndexer().setInputCol("sex").setOutputCol("sexIndex")

val classEncoder
= new OneHotEncoder().setInputCol("class").setOutputCol("classVec")
val tokenizer = new Tokenizer().setInputCol("name").setOutputCol("words")
val hashingTF = new HashingTF().setInputCol(tokenizer.getOutputCol).setOutputCol("hash") val vectorAssembler = new VectorAssembler().setInputCols(Array("hash", "age", "fare", "se val logisticRegression = new LogisticRegression()
val pipeline = new Pipeline().setStages(Array(sexIndexer, classEncoder, tokenizer, hashin
sexIndexer: org.apache.spark.ml.feature.StringIndexer = strIdx_9a6568bd71ad classEncoder: org.apache.spark.ml.feature.OneHotEncoder = oneHot_d8409501e075 tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_024529536be6
hashingTF: org.apache.spark.ml.feature.HashingTF = hashingTF_dbfc7eaf30f2 vectorAssembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_0c2becd4ab86
logisticRegression: org.apache.spark.ml.classification.LogisticRegression = logreg_a92cd69c19ea pipeline: org.apache.spark.ml.Pipeline = pipeline_d00b1d3771e8
Took 1 sec. Last updated by anonymous at December 27 2016, 8:21:47 PM.
val crossValidator = new CrossValidator()
FINISHED
.setEstimator(pipeline)

.setEvaluator(new BinaryClassificationEvaluator)

crossValidator: org.apache.spark.ml.tuning.CrossValidator = cv_ee05a0b10cc4
Took 0 sec. Last updated by anonymous at December 27 2016, 8:22:00 PM.
//set the params
FINISHED
val paramGrid = new ParamGridBuilder()

.addGrid(hashingTF.numFeatures, Array(2, 5, 1000))

.addGrid(logisticRegression.regParam, Array(1, 0.1, 0.01))

.addGrid(logisticRegression.maxIter, Array(10, 50, 100))

.build()

3/6
12/27/2016
crossValidator.setEstimatorParamMaps(paramGrid)
paramGrid: Array[org.apache.spark.ml.param.ParamMap] = Array({
logreg_a92cd69c19ea-maxIter: 10, hashingTF_dbfc7eaf30f2-numFeatures: 2, logreg_a92cd69c19ea-regParam: 1.0
}, { logreg_a92cd69c19ea-maxIter: 50,
hashingTF_dbfc7eaf30f2-numFeatures: 2, logreg_a92cd69c19ea-regParam: 1.0
}, {
logreg_a92cd69c19ea-maxIter: 100, hashingTF_dbfc7eaf30f2-numFeatures: 2, logreg_a92cd69c19ea-regParam: 1.0
}, { logreg_a92cd69c19ea-maxIter: 10,
hashingTF_dbfc7eaf30f2-numFeatures: 5, logreg_a92cd69c19ea-regParam: 1.0
}, { logreg_a92cd69c19ea-maxIter: 50,
hashingTF_dbfc7eaf30f2-numFeatures: 5, logreg_a92cd69c19ea-regParam: 1.0
}, {
logreg_a92cd69c19ea-maxIter: 100, hashingTF_dbfc7eaf30f2-numFeatures: 5, logreg_a92cd69c19ea-regParam: 1.0
}, { logreg_a92cd69c19ea-maxIter: 10, hashingTF_db...
res18: crossValidator.type = cv_ee05a0b10cc4
Took 0 sec. Last updated by anonymous at December 27 2016, 8:22:20 PM.
crossValidator.setNumFolds(3)
res19: crossValidator.type = cv_ee05a0b10cc4
Took 0 sec. Last updated by anonymous at December 27 2016, 8:22:32 PM.
val cvModel = crossValidator.fit(trainSet)
cvModel: org.apache.spark.ml.tuning.CrossValidatorModel = cv_ee05a0b10cc4
Took 44 sec. Last updated by anonymous at December 27 2016, 8:23:25 PM.
FINISHED
FINISHED
for (stage <- cvModel.bestModel.asInstanceOf[PipelineModel].stages) println(stage.explainParaFINISHED
handleInvalid: how to handle invalid entries. Options are skip (which will filter out rows with bad values), or error (which will throw an error). More options may be added later (default: error)
inputCol: input column name (current: sex)
outputCol: output column name (default: strIdx_9a6568bd71ad__output, current: sexIndex)
4/6
12/27/2016
dropLast: whether to drop the last category (default: true) inputCol: input column name (current: class)
outputCol: output column name (default: oneHot_d8409501e075__output, current: classVec) inputCol: input column name (current: name)
outputCol: output column name (default: tok_024529536be6__output, current: words)
binary: If true, all non zero counts are set to 1. This is useful for discrete probabilistic models that model binary events rather than integer counts (default: false)
inputCol: input column name (current: words)
numFeatures: number of features (> 0) (default: 262144, current: 2)
outputCol: output column name (default: hashingTF_dbfc7eaf30f2__output, current: hash) inputCols: input column names (current: [Ljava.lang.String;@54455089)
outputCol: output column name (default: vecAssembler_0c2becd4ab86__output, current: features) elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
featuresCol: features column name (default: features) fitIntercept: whether to fit an intercept term (default: true) labelCol: label column name (default: label)
maxIter: maximum number of iterations (>= 0) (default: 100, current: 50) predictionCol: prediction column name (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well- calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities (default: probability)
rawPredictionCol: raw prediction (a.k.a. confidence) column name (default: rawPrediction) regParam: regularization parameter (>= 0) (default: 0.0, current: 0.01)
standardization: whether to standardize the training features before fitting the model (default: true) threshold: threshold in binary classification prediction, in range [0, 1] (default: 0.5)
thresholds: Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold (undefined)
tol: the convergence tolerance for iterative algorithms (default: 1.0E-6)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0 (undefined)
Took 0 sec. Last updated by anonymous at December 27 2016, 8:23:42 PM.

val validationPredictions = cvModel.transform(validationSet)
FINISHED
validationPredictions: org.apache.spark.sql.DataFrame = [label: double, age: int ... 12 more fields]
Took 0 sec. Last updated by anonymous at December 27 2016, 8:23:53 PM.
val binaryClassificationEvaluator: BinaryClassificationEvaluator = new BinaryClassificationEvFINISH D
binaryClassificationEvaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_24074166d153
Took 0 sec. Last updated by anonymous at December 27 2016, 8:24:07 PM.
println(s"${binaryClassificationEvaluator.getMetricName} ${binaryClassificationEvaluatorFINISHED.ev
areaUnderROC 0.839148351648352
Took 0 sec. Last updated by anonymous at December 27 2016, 8:24:16 PM.
5/6
val total = validationPredictions.count() FINISHED total: Long = 218
Took 0 sec. Last updated by anonymous at December 27 2016, 8:24:26 PM.
val goodPredictionCount = validationPredictions.filter(validationPredictions("label")FINISHED=== val goodPredictionCount: Long = 173
Took 0 sec. Last updated by anonymous at December 27 2016, 8:24:37 PM.
println(s"correct prediction percentage : ${goodPredictionCount / total.toDouble}") FINISHED
correct prediction percentage : 0.7935779816513762
Took 0 sec. Last updated by anonymous at December 27 2016, 8:24:55 PM.
READY
6/6

Sunday, 21 August 2016

Effectiveness of weight lifting exercise

mysoln.R