summaryrefslogtreecommitdiffstatshomepage
path: root/esp8266/tests/onewire.py
blob: 15fec1e55c82a806b58081d1058b67e368950806 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import time
import pyb
import _onewire as _ow

class OneWire:
    CMD_SEARCHROM = const(0xf0)
    CMD_READROM = const(0x33)
    CMD_MATCHROM = const(0x55)
    CMD_SKIPROM = const(0xcc)

    def __init__(self, pin):
        self.pin = pin
        self.pin.init(pin.OPEN_DRAIN, pin.PULL_NONE)

    def reset(self):
        return _ow.reset(self.pin)

    def read_bit(self):
        return _ow.readbit(self.pin)

    def read_byte(self):
        return _ow.readbyte(self.pin)

    def read_bytes(self, count):
        buf = bytearray(count)
        for i in range(count):
            buf[i] = _ow.readbyte(self.pin)
        return buf

    def write_bit(self, value):
        return _ow.writebit(self.pin, value)

    def write_byte(self, value):
        return _ow.writebyte(self.pin, value)

    def write_bytes(self, buf):
        for b in buf:
            _ow.writebyte(self.pin, b)

    def select_rom(self, rom):
        self.reset()
        self.write_byte(CMD_MATCHROM)
        self.write_bytes(rom)

    def scan(self):
        devices = []
        diff = 65
        rom = False
        for i in range(0xff):
            rom, diff = self._search_rom(rom, diff)
            if rom:
                devices += [rom]
            if diff == 0:
                break
        return devices

    def _search_rom(self, l_rom, diff):
        if not self.reset():
            return None, 0
        self.write_byte(CMD_SEARCHROM)
        if not l_rom:
            l_rom = bytearray(8)
        rom = bytearray(8)
        next_diff = 0
        i = 64
        for byte in range(8):
            r_b = 0
            for bit in range(8):
                b = self.read_bit()
                if self.read_bit():
                    if b: # there are no devices or there is an error on the bus
                        return None, 0
                else:
                    if not b: # collision, two devices with different bit meaning
                        if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):
                            b = 1
                            next_diff = i
                self.write_bit(b)
                if b:
                    r_b |= 1 << bit
                i -= 1
            rom[byte] = r_b
        return rom, next_diff

    def crc8(self, data):
        return _ow.crc8(data)

class DS18B20:
    THERM_CMD_CONVERTTEMP = const(0x44)
    THERM_CMD_RSCRATCHPAD = const(0xbe)

    def __init__(self, onewire):
        self.ow = onewire
        self.roms = []

    def scan(self):
        self.roms = []
        for rom in self.ow.scan():
            if rom[0] == 0x28:
                self.roms += [rom]
        return self.roms

    def start_measure(self):
        if not self.ow.reset():
            return False
        self.ow.write_byte(CMD_SKIPROM)
        self.ow.write_byte(THERM_CMD_CONVERTTEMP)
        return True

    def get_temp(self, rom):
        if not self.ow.reset():
            return None

        self.ow.select_rom(rom)
        self.ow.write_byte(THERM_CMD_RSCRATCHPAD)

        buf = self.ow.read_bytes(9)
        if self.ow.crc8(buf):
            return None

        return self._convert_temp(buf)

    def _convert_temp(self, data):
        temp_lsb = data[0]
        temp_msb = data[1]
        return (temp_msb << 8 | temp_lsb) / 16

# connect 1-wire temp sensors to GPIO12 for this test
def test():
    dat = pyb.Pin(12)
    ow = OneWire(dat)

    ds = DS18B20(ow)
    roms = ow.scan()
    print('found devices:', roms)

    for i in range(4):
        print('temperatures:', end=' ')
        ds.start_measure()
        time.sleep_ms(750)
        for rom in roms:
            print(ds.get_temp(rom), end=' ')
        print()

#pyb.freq(80000000)
#pyb.freq(160000000)
test()