-
Notifications
You must be signed in to change notification settings - Fork 119
/
Copy pathtitanic.py
68 lines (44 loc) · 1.92 KB
/
titanic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import pyspark.ml.classification as cl
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator,VectorAssembler
from pyspark.ml import Pipeline
class Titanic:
def __init__(self,spark,input_data,output_data):
self.spark = spark
self.input = input_data
self.output = output_data
def load(self):
self.data_df = self.spark.read.csv(self.input,inferSchema=True,header=True)
self.data_df.cache()
def clean(self):
self.data_df = self.data_df.fillna('S',['Embarked'])
self.data_df = self.data_df.fillna(29,['Age'])
def create_preprocessors(self):
self.stages = []
cat_cols = ['Sex','Embarked']
st_list = []
for col in cat_cols:
st = StringIndexer(inputCol=col, outputCol=col+'_si')
st_list.append(st)
self.stages.extend(st_list)
ohe = OneHotEncoderEstimator(inputCols=['Sex_si','Embarked_si'], \
outputCols=['Sex_en','Embarked_en'])
self.stages.append(ohe)
num_cols = ['Pclass','Age','Fare']
feature_cols = num_cols + ['Sex_en','Embarked_en']
va = VectorAssembler(inputCols=feature_cols, outputCol='feature_vec')
self.stages.append(va)
def dimensionaity_reduction(self):
pca = PCA(k=3, inputCol='feature_vec', outputCol='feature_data')
self.stages.append(pca)
def create_estimators(self):
logistic = cl.LogisticRegression(maxIter=10, regParam=0.01, labelCol='Survived',featuresCol='feature_data')
self.stages.append(logistic)
def create_pipeline(self):
self.pipeline = Pipeline(stages=self.stages)
def split_data(self):
return self.data_df.randomSplit([0.7,0.3])
def fit(self,train):
self.pipeline_model = self.pipeline.fit(train)
def predict(self,test):
return self.pipeline_model.transform(test)