-
Notifications
You must be signed in to change notification settings - Fork 4
/
PMS7003.py
185 lines (127 loc) · 6.19 KB
/
PMS7003.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""
* PMS7003 데이터 수신 프로그램
* 수정 : 2018. 11. 19
* 제작 : eleparts 부설연구소
* SW ver. 1.0.2
> 관련자료
파이썬 라이브러리
https://docs.python.org/3/library/struct.html
점프 투 파이썬
https://wikidocs.net/book/1
PMS7003 datasheet
http://eleparts.co.kr/data/_gextends/good-pdf/201803/good-pdf-4208690-1.pdf
"""
import serial
import struct
import time
class PMS7003(object):
# PMS7003 protocol data (HEADER 2byte + 30byte)
PMS_7003_PROTOCOL_SIZE = 32
# PMS7003 data list
HEADER_HIGH = 0 # 0x42
HEADER_LOW = 1 # 0x4d
FRAME_LENGTH = 2 # 2x13+2(data+check bytes)
DUST_PM1_0_CF1 = 3 # PM1.0 concentration unit μ g/m3(CF=1,standard particle)
DUST_PM2_5_CF1 = 4 # PM2.5 concentration unit μ g/m3(CF=1,standard particle)
DUST_PM10_0_CF1 = 5 # PM10 concentration unit μ g/m3(CF=1,standard particle)
DUST_PM1_0_ATM = 6 # PM1.0 concentration unit μ g/m3(under atmospheric environment)
DUST_PM2_5_ATM = 7 # PM2.5 concentration unit μ g/m3(under atmospheric environment)
DUST_PM10_0_ATM = 8 # PM10 concentration unit μ g/m3 (under atmospheric environment)
DUST_AIR_0_3 = 9 # indicates the number of particles with diameter beyond 0.3 um in 0.1 L of air.
DUST_AIR_0_5 = 10 # indicates the number of particles with diameter beyond 0.5 um in 0.1 L of air.
DUST_AIR_1_0 = 11 # indicates the number of particles with diameter beyond 1.0 um in 0.1 L of air.
DUST_AIR_2_5 = 12 # indicates the number of particles with diameter beyond 2.5 um in 0.1 L of air.
DUST_AIR_5_0 = 13 # indicates the number of particles with diameter beyond 5.0 um in 0.1 L of air.
DUST_AIR_10_0 = 14 # indicates the number of particles with diameter beyond 10 um in 0.1 L of air.
RESERVEDF = 15 # Data13 Reserved high 8 bits
RESERVEDB = 16 # Data13 Reserved low 8 bits
CHECKSUM = 17 # Checksum code
# header check
def header_chk(self, buffer):
if (buffer[self.HEADER_HIGH] == 66 and buffer[self.HEADER_LOW] == 77):
return True
else:
return False
# chksum value calculation
def chksum_cal(self, buffer):
buffer = buffer[0:self.PMS_7003_PROTOCOL_SIZE]
# data unpack (Byte -> Tuple (30 x unsigned char <B> + unsigned short <H>))
chksum_data = struct.unpack('!30BH', buffer)
chksum = 0
for i in range(30):
chksum = chksum + chksum_data[i]
return chksum
# checksum check
def chksum_chk(self, buffer):
chk_result = self.chksum_cal(buffer)
chksum_buffer = buffer[30:self.PMS_7003_PROTOCOL_SIZE]
chksum = struct.unpack('!H', chksum_buffer)
if (chk_result == chksum[0]):
return True
else:
return False
# protocol size(small) check
def protocol_size_chk(self, buffer):
if(self.PMS_7003_PROTOCOL_SIZE <= len(buffer)):
return True
else:
return False
# protocol check
def protocol_chk(self, buffer):
if(self.protocol_size_chk(buffer)):
if(self.header_chk(buffer)):
if(self.chksum_chk(buffer)):
return True
else:
print("Chksum err")
else:
print("Header err")
else:
print("Protol err")
return False
# unpack data
# <Tuple (13 x unsigned short <H> + 2 x unsigned char <B> + unsigned short <H>)>
def unpack_data(self, buffer):
buffer = buffer[0:self.PMS_7003_PROTOCOL_SIZE]
# data unpack (Byte -> Tuple (13 x unsigned short <H> + 2 x unsigned char <B> + unsigned short <H>))
data = struct.unpack('!2B13H2BH', buffer)
return data
def print_serial(self, buffer):
chksum = self.chksum_cal(buffer)
data = self.unpack_data(buffer)
print ("============================================================================")
print ("Header : %c %c \t\t | Frame length : %s" % (data[self.HEADER_HIGH], data[self.HEADER_LOW], data[self.FRAME_LENGTH]))
print ("PM 1.0 (CF=1) : %s\t | PM 1.0 : %s" % (data[self.DUST_PM1_0_CF1], data[self.DUST_PM1_0_ATM]))
print ("PM 2.5 (CF=1) : %s\t | PM 2.5 : %s" % (data[self.DUST_PM2_5_CF1], data[self.DUST_PM2_5_ATM]))
print ("PM 10.0 (CF=1) : %s\t | PM 10.0 : %s" % (data[self.DUST_PM10_0_CF1], data[self.DUST_PM10_0_ATM]))
print ("0.3um in 0.1L of air : %s" % (data[self.DUST_AIR_0_3]))
print ("0.5um in 0.1L of air : %s" % (data[self.DUST_AIR_0_5]))
print ("1.0um in 0.1L of air : %s" % (data[self.DUST_AIR_1_0]))
print ("2.5um in 0.1L of air : %s" % (data[self.DUST_AIR_2_5]))
print ("5.0um in 0.1L of air : %s" % (data[self.DUST_AIR_5_0]))
print ("10.0um in 0.1L of air : %s" % (data[self.DUST_AIR_10_0]))
print ("Reserved F : %s | Reserved B : %s" % (data[self.RESERVEDF],data[self.RESERVEDB]))
print ("CHKSUM : %s | read CHKSUM : %s | CHKSUM result : %s" % (chksum, data[self.CHECKSUM], chksum == data[self.CHECKSUM]))
print ("============================================================================")
# UART / USB Serial : 'dmesg | grep ttyUSB'
USB0 = '/dev/ttyUSB0'
UART = '/dev/ttyAMA0'
# USE PORT
SERIAL_PORT = UART
# Baud Rate
Speed = 9600
# example
if __name__=='__main__':
#serial setting
ser = serial.Serial(SERIAL_PORT, Speed, timeout = 1)
dust = PMS7003()
while True:
ser.flushInput()
buffer = ser.read(1024)
if(dust.protocol_chk(buffer)):
print("DATA read success")
# print data
dust.print_serial(buffer)
else:
print("DATA read fail...")
ser.close()