Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
146 additions
and
131 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,22 @@ | ||
from sgp30 import SGP30 | ||
import time | ||
import sys | ||
|
||
sgp30 = SGP30() | ||
|
||
# result = sgp30.command('set_baseline', (0xFECA, 0xBEBA)) | ||
# result = sgp30.command('get_baseline') | ||
# print(["{:02x}".format(n) for n in result]) | ||
|
||
print("Sensor warming up, please wait...") | ||
def crude_progress_bar(): | ||
sys.stdout.write('.') | ||
sys.stdout.flush() | ||
|
||
sgp30.start_measurement(crude_progress_bar) | ||
sys.stdout.write('\n') | ||
|
||
while True: | ||
result = sgp30.get_air_quality() | ||
print(result) | ||
time.sleep(1.0) | ||
from sgp30 import SGP30 | ||
import time | ||
import sys | ||
|
||
sgp30 = SGP30() | ||
|
||
# result = sgp30.command('set_baseline', (0xFECA, 0xBEBA)) | ||
# result = sgp30.command('get_baseline') | ||
# print(["{:02x}".format(n) for n in result]) | ||
|
||
print("Sensor warming up, please wait...") | ||
def crude_progress_bar(): | ||
sys.stdout.write('.') | ||
sys.stdout.flush() | ||
|
||
sgp30.start_measurement(crude_progress_bar) | ||
sys.stdout.write('\n') | ||
|
||
while True: | ||
result = sgp30.get_air_quality() | ||
print(result) | ||
time.sleep(1.0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
import time | ||
|
||
|
||
__version__ = '0.0.1' | ||
__version__ = '0.0.2' | ||
|
||
SGP30_I2C_ADDR = 0x58 | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,22 @@ | ||
from tools import MockI2CDev, MockI2CMsg | ||
|
||
|
||
def test_setup(): | ||
from sgp30 import SGP30 | ||
dev = MockI2CDev() | ||
assert dev._open is True | ||
sgp30 = SGP30(i2c_dev=dev, i2c_msg=MockI2CMsg()) | ||
del sgp30 | ||
assert dev._open is False | ||
|
||
|
||
def test_get_unique_id(): | ||
from sgp30 import SGP30 | ||
sgp30 = SGP30(i2c_dev=MockI2CDev(), i2c_msg=MockI2CMsg()) | ||
assert sgp30.get_unique_id() == 0xffffffffffff | ||
|
||
|
||
def test_get_feature_set_version(): | ||
from sgp30 import SGP30 | ||
sgp30 = SGP30(i2c_dev=MockI2CDev(), i2c_msg=MockI2CMsg()) | ||
assert sgp30.get_feature_set_version() == (0xc, 0xfe) | ||
from tools import MockI2CDev, MockI2CMsg | ||
|
||
|
||
def test_setup(): | ||
from sgp30 import SGP30 | ||
dev = MockI2CDev() | ||
assert dev._open is True | ||
sgp30 = SGP30(i2c_dev=dev, i2c_msg=MockI2CMsg()) | ||
del sgp30 | ||
assert dev._open is False | ||
|
||
|
||
def test_get_unique_id(): | ||
from sgp30 import SGP30 | ||
sgp30 = SGP30(i2c_dev=MockI2CDev(), i2c_msg=MockI2CMsg()) | ||
assert sgp30.get_unique_id() == 0xffffffffffff | ||
|
||
|
||
def test_get_feature_set_version(): | ||
from sgp30 import SGP30 | ||
sgp30 = SGP30(i2c_dev=MockI2CDev(), i2c_msg=MockI2CMsg()) | ||
assert sgp30.get_feature_set_version() == (0xc, 0xfe) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,81 +1,81 @@ | ||
import struct | ||
|
||
|
||
class MockI2CDev(): | ||
def __init__(self): | ||
self._open = True | ||
self._last_write = None | ||
|
||
def i2c_rdwr(self, a, b=None): | ||
if isinstance(a, MockI2CMsgW): | ||
self._last_write = a.process() | ||
if isinstance(a, MockI2CMsgR): | ||
a.process(self._last_write) | ||
if b is not None: | ||
b.process(self._last_write) | ||
|
||
def close(self): | ||
self._open = False | ||
|
||
|
||
class MockI2CMsg(): | ||
def write(self, i2c_addr, data): | ||
return MockI2CMsgW(i2c_addr, data) | ||
|
||
def read(self, i2c_addr, response_len): | ||
return MockI2CMsgR(i2c_addr, response_len) | ||
|
||
|
||
class MockI2CMsgR(): | ||
def __init__(self, i2c_addr, response_len): | ||
self._num_words = response_len // 3 | ||
self.buf = None | ||
|
||
def process(self, last_command): | ||
result = [0, 0, 0] | ||
if last_command[0] == 0x3682: # get_serial_id | ||
result = [0xffff, 0xffff, 0xffff] | ||
if last_command[0] == 0x202f: # get_feature_set_version | ||
result = [0xCAFE, 0x0000, 0x0000] | ||
|
||
buf = [] | ||
|
||
for i in range(self._num_words): | ||
word = result[i] | ||
packed = bytearray(struct.pack('>H', word)) | ||
buf.append(packed[0]) | ||
buf.append(packed[1]) | ||
buf.append(self.calculate_crc(word)) | ||
|
||
self.buf = bytearray(buf) | ||
|
||
def calculate_crc(self, data): | ||
"""Calculate an 8-bit CRC from a 16-bit word | ||
Defined in section 6.6 of the SGP30 datasheet. | ||
Polynominal: 0x31 (x8 + x5 + x4 + x1) | ||
Initialization: 0xFF | ||
Reflect input/output: False | ||
Final XOR: 0x00 | ||
""" | ||
crc = 0xff # Initialization value | ||
# calculates 8-Bit checksum with given polynomial | ||
for byte in [(data & 0xff00) >> 8, data & 0x00ff]: | ||
crc ^= byte | ||
for _ in range(8): | ||
if crc & 0x80: | ||
crc = (crc << 1) ^ 0x31 # XOR with polynominal | ||
else: | ||
crc <<= 1 | ||
return crc & 0xff | ||
|
||
|
||
class MockI2CMsgW(): | ||
def __init__(self, i2c_addr, data): | ||
self._data = data | ||
|
||
def process(self): | ||
command_len = (len(self._data) - 2) // 3 | ||
return struct.unpack('>H' + ('Hx' * command_len), self._data) | ||
import struct | ||
|
||
|
||
class MockI2CDev(): | ||
def __init__(self): | ||
self._open = True | ||
self._last_write = None | ||
|
||
def i2c_rdwr(self, a, b=None): | ||
if isinstance(a, MockI2CMsgW): | ||
self._last_write = a.process() | ||
if isinstance(a, MockI2CMsgR): | ||
a.process(self._last_write) | ||
if b is not None: | ||
b.process(self._last_write) | ||
|
||
def close(self): | ||
self._open = False | ||
|
||
|
||
class MockI2CMsg(): | ||
def write(self, i2c_addr, data): | ||
return MockI2CMsgW(i2c_addr, data) | ||
|
||
def read(self, i2c_addr, response_len): | ||
return MockI2CMsgR(i2c_addr, response_len) | ||
|
||
|
||
class MockI2CMsgR(): | ||
def __init__(self, i2c_addr, response_len): | ||
self._num_words = response_len // 3 | ||
self.buf = None | ||
|
||
def process(self, last_command): | ||
result = [0, 0, 0] | ||
if last_command[0] == 0x3682: # get_serial_id | ||
result = [0xffff, 0xffff, 0xffff] | ||
if last_command[0] == 0x202f: # get_feature_set_version | ||
result = [0xCAFE, 0x0000, 0x0000] | ||
|
||
buf = [] | ||
|
||
for i in range(self._num_words): | ||
word = result[i] | ||
packed = bytearray(struct.pack('>H', word)) | ||
buf.append(packed[0]) | ||
buf.append(packed[1]) | ||
buf.append(self.calculate_crc(word)) | ||
|
||
self.buf = bytearray(buf) | ||
|
||
def calculate_crc(self, data): | ||
"""Calculate an 8-bit CRC from a 16-bit word | ||
Defined in section 6.6 of the SGP30 datasheet. | ||
Polynominal: 0x31 (x8 + x5 + x4 + x1) | ||
Initialization: 0xFF | ||
Reflect input/output: False | ||
Final XOR: 0x00 | ||
""" | ||
crc = 0xff # Initialization value | ||
# calculates 8-Bit checksum with given polynomial | ||
for byte in [(data & 0xff00) >> 8, data & 0x00ff]: | ||
crc ^= byte | ||
for _ in range(8): | ||
if crc & 0x80: | ||
crc = (crc << 1) ^ 0x31 # XOR with polynominal | ||
else: | ||
crc <<= 1 | ||
return crc & 0xff | ||
|
||
|
||
class MockI2CMsgW(): | ||
def __init__(self, i2c_addr, data): | ||
self._data = data | ||
|
||
def process(self): | ||
command_len = (len(self._data) - 2) // 3 | ||
return struct.unpack('>H' + ('Hx' * command_len), self._data) |