-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_etl.py
49 lines (39 loc) · 1.36 KB
/
data_etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# -* - coding: UTF-8 -* -
import ConfigParser
from pyspark import SparkContext
import time
'''定义函数'''
def to_timestamp(time_point):
d = time.strptime(time_point,"%Y-%m-%d %H:%M:%S")
return time.mktime(d)
def to_datetime(time_interval):
x = time.localtime(float(time_interval))
return time.strftime('%Y-%m-%d %H:%M:%S',x)
def etl(line, idx):
idx = idx.split(',')
idx = [int(i) for i in idx]
line = line.split('\t')
rst = []
for i in xrange(0, len(idx)):
if i==1:
line[idx[i]] = to_datetime(line[idx[i]])
rst.append(line[idx[i]])
return ','.join(rst)
'''读取配置信息'''
config = ConfigParser.ConfigParser()
config.read('para.conf')
#spark配置参数[spark_conf]
spark_host = config.get('spark_conf', 'spark_host')
spark_mode = config.get('spark_conf', 'spark_mode')
#程序中其他参数[etl]
app_name = config.get('etl', 'app_name')
read_data = config.get('etl', 'etl_read_data')
write_data = config.get('etl', 'etl_write_data')
extract_fields_names = config.get('etl', 'extract_fields_names')
extract_fields_indexes = config.get('etl', 'extract_fields_indexes')
'''数据处理'''
sc = SparkContext(spark_mode, app_name)
netlogs = sc.textFile(spark_host+read_data)
st_data = netlogs.map(lambda line: etl(line, extract_fields_indexes)) \
.sortBy(lambda x: (x.split(',')[0], x.split(',')[1]))
st_data.saveAsTextFile(spark_host+write_data)