summaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--drivers/onewire/ds18x20.py148
-rw-r--r--drivers/onewire/onewire.py389
-rw-r--r--esp8266/modules/ds18x20.py51
-rw-r--r--esp8266/modules/onewire.py91
4 files changed, 121 insertions, 558 deletions
diff --git a/drivers/onewire/ds18x20.py b/drivers/onewire/ds18x20.py
index 72adcbfd46..bf06094835 100644
--- a/drivers/onewire/ds18x20.py
+++ b/drivers/onewire/ds18x20.py
@@ -1,101 +1,51 @@
-"""
-DS18x20 temperature sensor driver for MicroPython.
-
-This driver uses the OneWire driver to control DS18S20 and DS18B20
-temperature sensors. It supports multiple devices on the same 1-wire bus.
-
-The following example assumes the ground of your DS18x20 is connected to
-Y11, vcc is connected to Y9 and the data pin is connected to Y10.
-
->>> from machine import Pin
->>> gnd = Pin('Y11', Pin.OUT_PP)
->>> gnd.off()
->>> vcc = Pin('Y9', Pin.OUT_PP)
->>> vcc.on()
-
->>> from ds18x20 import DS18X20
->>> d = DS18X20(Pin('Y10'))
-
-Call read_temps to read all sensors:
-
->>> result = d.read_temps()
->>> print(result)
-[20.875, 20.8125]
-
-Call read_temp to read the temperature of a specific sensor:
-
->>> result = d.read_temp(d.roms[0])
->>> print(result)
-20.25
-
-If only one DS18x20 is attached to the bus, then you don't need to
-pass a ROM to read_temp:
-
->>> result = d.read_temp()
->>> print(result)
-20.25
-
-"""
-
-from onewire import OneWire
-
-class DS18X20(object):
- def __init__(self, pin):
- self.ow = OneWire(pin)
- # Scan the 1-wire devices, but only keep those which have the
- # correct # first byte in their rom for a DS18x20 device.
- self.roms = [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28]
-
- def read_temp(self, rom=None):
- """
- Read and return the temperature of one DS18x20 device.
- Pass the 8-byte bytes object with the ROM of the specific device you want to read.
- If only one DS18x20 device is attached to the bus you may omit the rom parameter.
- """
- rom = rom or self.roms[0]
- ow = self.ow
- ow.reset()
- ow.select_rom(rom)
- ow.write_byte(0x44) # Convert Temp
- while True:
- if ow.read_bit():
- break
- ow.reset()
- ow.select_rom(rom)
- ow.write_byte(0xbe) # Read scratch
- data = ow.read_bytes(9)
- return self.convert_temp(rom[0], data)
-
- def read_temps(self):
- """
- Read and return the temperatures of all attached DS18x20 devices.
- """
- temps = []
- for rom in self.roms:
- temps.append(self.read_temp(rom))
- return temps
-
- def convert_temp(self, rom0, data):
- """
- Convert the raw temperature data into degrees celsius and return as a float.
- """
- temp_lsb = data[0]
- temp_msb = data[1]
- if rom0 == 0x10:
- if temp_msb != 0:
- # convert negative number
- temp_read = temp_lsb >> 1 | 0x80 # truncate bit 0 by shifting, fill high bit with 1.
- temp_read = -((~temp_read + 1) & 0xff) # now convert from two's complement
+# DS18x20 temperature sensor driver for MicroPython.
+# MIT license; Copyright (c) 2016 Damien P. George
+
+from micropython import const
+
+_CONVERT = const(0x44)
+_RD_SCRATCH = const(0xbe)
+_WR_SCRATCH = const(0x4e)
+
+class DS18X20:
+ def __init__(self, onewire):
+ self.ow = onewire
+ self.buf = bytearray(9)
+
+ def scan(self):
+ return [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28]
+
+ def convert_temp(self):
+ self.ow.reset(True)
+ self.ow.writebyte(self.ow.SKIP_ROM)
+ self.ow.writebyte(_CONVERT)
+
+ def read_scratch(self, rom):
+ self.ow.reset(True)
+ self.ow.select_rom(rom)
+ self.ow.writebyte(_RD_SCRATCH)
+ self.ow.readinto(self.buf)
+ if self.ow.crc8(self.buf):
+ raise Exception('CRC error')
+ return self.buf
+
+ def write_scratch(self, rom, buf):
+ self.ow.reset(True)
+ self.ow.select_rom(rom)
+ self.ow.writebyte(_WR_SCRATCH)
+ self.ow.write(buf)
+
+ def read_temp(self, rom):
+ buf = self.read_scratch(rom)
+ if rom[0] == 0x10:
+ if buf[1]:
+ t = buf[0] >> 1 | 0x80
+ t = -((~t + 1) & 0xff)
else:
- temp_read = temp_lsb >> 1 # truncate bit 0 by shifting
- count_remain = data[6]
- count_per_c = data[7]
- temp = temp_read - 0.25 + (count_per_c - count_remain) / count_per_c
- return temp
- elif rom0 == 0x28:
- temp = (temp_msb << 8 | temp_lsb) / 16
- if (temp_msb & 0xf8) == 0xf8: # for negative temperature
- temp -= 0x1000
- return temp
+ t = buf[0] >> 1
+ return t - 0.25 + (buf[7] - buf[6]) / buf[7]
else:
- assert False
+ t = buf[1] << 8 | buf[0]
+ if t & 0x8000: # sign bit set
+ t = -((t ^ 0xffff) + 1)
+ return t / 16
diff --git a/drivers/onewire/onewire.py b/drivers/onewire/onewire.py
index c8016c0dac..546a69b304 100644
--- a/drivers/onewire/onewire.py
+++ b/drivers/onewire/onewire.py
@@ -1,336 +1,91 @@
-"""
-OneWire library ported to MicroPython by Jason Hildebrand.
+# 1-Wire driver for MicroPython
+# MIT license; Copyright (c) 2016 Damien P. George
+from micropython import const
+import _onewire as _ow
-TODO:
- * implement and test parasite-power mode (as an init option)
- * port the crc checks
-
-The original upstream copyright and terms follow.
-------------------------------------------------------------------------------
-
-Copyright (c) 2007, Jim Studt (original old version - many contributors since)
-
-OneWire has been maintained by Paul Stoffregen (paul@pjrc.com) since
-January 2010.
-
-26 Sept 2008 -- Robin James
-
-Jim Studt's original library was modified by Josh Larios.
-
-Tom Pollard, pollard@alum.mit.edu, contributed around May 20, 2008
-
-Permission is hereby granted, free of charge, to any person obtaining
-a copy of this software and associated documentation files (the
-"Software"), to deal in the Software without restriction, including
-without limitation the rights to use, copy, modify, merge, publish,
-distribute, sublicense, and/or sell copies of the Software, and to
-permit persons to whom the Software is furnished to do so, subject to
-the following conditions:
-
-The above copyright notice and this permission notice shall be
-included in all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
-EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
-NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
-LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
-OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
-WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-Much of the code was inspired by Derek Yerger's code, though I don't
-think much of that remains. In any event that was..
- (copyleft) 2006 by Derek Yerger - Free to distribute freely.
-"""
-
-import pyb
-from pyb import disable_irq
-from pyb import enable_irq
+class OneWireError(Exception):
+ pass
class OneWire:
- def __init__(self, pin):
- """
- Pass the data pin connected to your one-wire device(s), for example Pin('X1').
- The one-wire protocol allows for multiple devices to be attached.
- """
- self.data_pin = pin
- self.write_delays = (1, 40, 40, 1)
- self.read_delays = (1, 1, 40)
-
- # cache a bunch of methods and attributes. This is necessary in _write_bit and
- # _read_bit to achieve the timing required by the OneWire protocol.
- self.cache = (pin.init, pin.value, pin.OUT_PP, pin.IN, pin.PULL_UP)
-
- pin.init(pin.IN, pin.PULL_UP)
-
- def reset(self):
- """
- Perform the onewire reset function.
- Returns 1 if a device asserted a presence pulse, 0 otherwise.
-
- If you receive 0, then check your wiring and make sure you are providing
- power and ground to your devices.
- """
- retries = 25
- self.data_pin.init(self.data_pin.IN, self.data_pin.PULL_UP)
-
- # We will wait up to 250uS for
- # the bus to come high, if it doesn't then it is broken or shorted
- # and we return a 0;
-
- # wait until the wire is high... just in case
- while True:
- if self.data_pin.value():
- break
- retries -= 1
- if retries == 0:
- raise OSError("OneWire pin didn't go high")
- pyb.udelay(10)
-
- # pull the bus low for at least 480us
- self.data_pin.low()
- self.data_pin.init(self.data_pin.OUT_PP)
- pyb.udelay(480)
+ SEARCH_ROM = const(0xf0)
+ MATCH_ROM = const(0x55)
+ SKIP_ROM = const(0xcc)
- # If there is a slave present, it should pull the bus low within 60us
- i = pyb.disable_irq()
- self.data_pin.init(self.data_pin.IN, self.data_pin.PULL_UP)
- pyb.udelay(70)
- presence = not self.data_pin.value()
- pyb.enable_irq(i)
- pyb.udelay(410)
- return presence
-
- def write_bit(self, value):
- """
- Write a single bit.
- """
- pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
- self._write_bit(value, pin_init, pin_value, Pin_OUT_PP)
+ def __init__(self, pin):
+ self.pin = pin
+ self.pin.init(pin.OPEN_DRAIN)
- def _write_bit(self, value, pin_init, pin_value, Pin_OUT_PP):
- """
- Write a single bit - requires cached methods/attributes be passed as arguments.
- See also write_bit()
- """
- d0, d1, d2, d3 = self.write_delays
- udelay = pyb.udelay
- if value:
- # write 1
- i = disable_irq()
- pin_value(0)
- pin_init(Pin_OUT_PP)
- udelay(d0)
- pin_value(1)
- enable_irq(i)
- udelay(d1)
- else:
- # write 0
- i = disable_irq()
- pin_value(0)
- pin_init(Pin_OUT_PP)
- udelay(d2)
- pin_value(1)
- enable_irq(i)
- udelay(d3)
+ def reset(self, required=False):
+ reset = _ow.reset(self.pin)
+ if required and not reset:
+ raise OneWireError
+ return reset
- def write_byte(self, value):
- """
- Write a byte. The pin will go tri-state at the end of the write to avoid
- heating in a short or other mishap.
- """
- pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
- for i in range(8):
- self._write_bit(value & 1, pin_init, pin_value, Pin_OUT_PP)
- value >>= 1
- pin_init(Pin_IN, Pin_PULL_UP)
+ def readbit(self):
+ return _ow.readbit(self.pin)
- def write_bytes(self, bytestring):
- """
- Write a sequence of bytes.
- """
- for byte in bytestring:
- self.write_byte(byte)
+ def readbyte(self):
+ return _ow.readbyte(self.pin)
- def _read_bit(self, pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP):
- """
- Read a single bit - requires cached methods/attributes be passed as arguments.
- See also read_bit()
- """
- d0, d1, d2 = self.read_delays
- udelay = pyb.udelay
- pin_init(Pin_IN, Pin_PULL_UP) # TODO why do we need this?
- i = disable_irq()
- pin_value(0)
- pin_init(Pin_OUT_PP)
- udelay(d0)
- pin_init(Pin_IN, Pin_PULL_UP)
- udelay(d1)
- value = pin_value()
- enable_irq(i)
- udelay(d2)
- return value
+ def readinto(self, buf):
+ for i in range(len(buf)):
+ buf[i] = _ow.readbyte(self.pin)
- def read_bit(self):
- """
- Read a single bit.
- """
- pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
- return self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP)
+ def writebit(self, value):
+ return _ow.writebit(self.pin, value)
- def read_byte(self):
- """
- Read a single byte and return the value as an integer.
- See also read_bytes()
- """
- pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
- value = 0
- for i in range(8):
- bit = self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP)
- value |= bit << i
- return value
+ def writebyte(self, value):
+ return _ow.writebyte(self.pin, value)
- def read_bytes(self, count):
- """
- Read a sequence of N bytes.
- The bytes are returned as a bytearray.
- """
- s = bytearray(count)
- for i in range(count):
- s[i] = self.read_byte()
- return s
+ def write(self, buf):
+ for b in buf:
+ _ow.writebyte(self.pin, b)
def select_rom(self, rom):
- """
- Select a specific device to talk to. Pass in rom as a bytearray (8 bytes).
- """
- assert len(rom) == 8, "ROM must be 8 bytes"
self.reset()
- self.write_byte(0x55) # ROM MATCH
- self.write_bytes(rom)
-
- def read_rom(self):
- """
- Read the ROM - this works if there is only a single device attached.
- """
- self.reset()
- self.write_byte(0x33) # READ ROM
- rom = self.read_bytes(8)
- # TODO: check CRC of the ROM
- return rom
-
- def skip_rom(self):
- """
- Send skip-rom command - this works if there is only one device attached.
- """
- self.write_byte(0xCC) # SKIP ROM
-
- def depower(self):
- self.data_pin.init(self.data_pin.IN, self.data_pin.PULL_NONE)
+ self.writebyte(MATCH_ROM)
+ self.write(rom)
def scan(self):
- """
- Return a list of ROMs for all attached devices.
- Each ROM is returned as a bytes object of 8 bytes.
- """
devices = []
- self._reset_search()
- while True:
- rom = self._search()
- if not rom:
- return devices
- devices.append(rom)
-
- def _reset_search(self):
- self.last_discrepancy = 0
- self.last_device_flag = False
- self.last_family_discrepancy = 0
- self.rom = bytearray(8)
-
- def _search(self):
- # initialize for search
- id_bit_number = 1
- last_zero = 0
- rom_byte_number = 0
- rom_byte_mask = 1
- search_result = 0
- pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
-
- # if the last call was not the last one
- if not self.last_device_flag:
- # 1-Wire reset
- if not self.reset():
- self._reset_search()
- return None
-
- # issue the search command
- self.write_byte(0xF0)
-
- # loop to do the search
- while rom_byte_number < 8: # loop until through all ROM bytes 0-7
- # read a bit and its complement
- id_bit = self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP)
- cmp_id_bit = self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP)
-
- # check for no devices on 1-wire
- if (id_bit == 1) and (cmp_id_bit == 1):
- break
+ diff = 65
+ rom = False
+ for i in range(0xff):
+ rom, diff = self._search_rom(rom, diff)
+ if rom:
+ devices += [rom]
+ if diff == 0:
+ break
+ return devices
+
+ def _search_rom(self, l_rom, diff):
+ if not self.reset():
+ return None, 0
+ self.writebyte(SEARCH_ROM)
+ if not l_rom:
+ l_rom = bytearray(8)
+ rom = bytearray(8)
+ next_diff = 0
+ i = 64
+ for byte in range(8):
+ r_b = 0
+ for bit in range(8):
+ b = self.readbit()
+ if self.readbit():
+ if b: # there are no devices or there is an error on the bus
+ return None, 0
else:
- # all devices coupled have 0 or 1
- if (id_bit != cmp_id_bit):
- search_direction = id_bit # bit write value for search
- else:
- # if this discrepancy if before the Last Discrepancy
- # on a previous next then pick the same as last time
- if (id_bit_number < self.last_discrepancy):
- search_direction = (self.rom[rom_byte_number] & rom_byte_mask) > 0
- else:
- # if equal to last pick 1, if not then pick 0
- search_direction = (id_bit_number == self.last_discrepancy)
-
- # if 0 was picked then record its position in LastZero
- if search_direction == 0:
- last_zero = id_bit_number
-
- # check for Last discrepancy in family
- if last_zero < 9:
- self.last_family_discrepancy = last_zero
-
- # set or clear the bit in the ROM byte rom_byte_number
- # with mask rom_byte_mask
- if search_direction == 1:
- self.rom[rom_byte_number] |= rom_byte_mask
- else:
- self.rom[rom_byte_number] &= ~rom_byte_mask
-
- # serial number search direction write bit
- #print('sd', search_direction)
- self.write_bit(search_direction)
-
- # increment the byte counter id_bit_number
- # and shift the mask rom_byte_mask
- id_bit_number += 1
- rom_byte_mask <<= 1
-
- # if the mask is 0 then go to new SerialNum byte rom_byte_number and reset mask
- if rom_byte_mask == 0x100:
- rom_byte_number += 1
- rom_byte_mask = 1
-
- # if the search was successful then
- if not (id_bit_number < 65):
- # search successful so set last_discrepancy,last_device_flag,search_result
- self.last_discrepancy = last_zero
-
- # check for last device
- if self.last_discrepancy == 0:
- self.last_device_flag = True
- search_result = True
-
- # if no device found then reset counters so next 'search' will be like a first
- if not search_result or not self.rom[0]:
- self._reset_search()
- return None
- else:
- return bytes(self.rom)
+ if not b: # collision, two devices with different bit meaning
+ if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):
+ b = 1
+ next_diff = i
+ self.writebit(b)
+ if b:
+ r_b |= 1 << bit
+ i -= 1
+ rom[byte] = r_b
+ return rom, next_diff
+
+ def crc8(self, data):
+ return _ow.crc8(data)
diff --git a/esp8266/modules/ds18x20.py b/esp8266/modules/ds18x20.py
deleted file mode 100644
index bf06094835..0000000000
--- a/esp8266/modules/ds18x20.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# DS18x20 temperature sensor driver for MicroPython.
-# MIT license; Copyright (c) 2016 Damien P. George
-
-from micropython import const
-
-_CONVERT = const(0x44)
-_RD_SCRATCH = const(0xbe)
-_WR_SCRATCH = const(0x4e)
-
-class DS18X20:
- def __init__(self, onewire):
- self.ow = onewire
- self.buf = bytearray(9)
-
- def scan(self):
- return [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28]
-
- def convert_temp(self):
- self.ow.reset(True)
- self.ow.writebyte(self.ow.SKIP_ROM)
- self.ow.writebyte(_CONVERT)
-
- def read_scratch(self, rom):
- self.ow.reset(True)
- self.ow.select_rom(rom)
- self.ow.writebyte(_RD_SCRATCH)
- self.ow.readinto(self.buf)
- if self.ow.crc8(self.buf):
- raise Exception('CRC error')
- return self.buf
-
- def write_scratch(self, rom, buf):
- self.ow.reset(True)
- self.ow.select_rom(rom)
- self.ow.writebyte(_WR_SCRATCH)
- self.ow.write(buf)
-
- def read_temp(self, rom):
- buf = self.read_scratch(rom)
- if rom[0] == 0x10:
- if buf[1]:
- t = buf[0] >> 1 | 0x80
- t = -((~t + 1) & 0xff)
- else:
- t = buf[0] >> 1
- return t - 0.25 + (buf[7] - buf[6]) / buf[7]
- else:
- t = buf[1] << 8 | buf[0]
- if t & 0x8000: # sign bit set
- t = -((t ^ 0xffff) + 1)
- return t / 16
diff --git a/esp8266/modules/onewire.py b/esp8266/modules/onewire.py
deleted file mode 100644
index 83318d1a47..0000000000
--- a/esp8266/modules/onewire.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# 1-Wire driver for MicroPython on ESP8266
-# MIT license; Copyright (c) 2016 Damien P. George
-
-from micropython import const
-import _onewire as _ow
-
-class OneWireError(Exception):
- pass
-
-class OneWire:
- SEARCH_ROM = const(0xf0)
- MATCH_ROM = const(0x55)
- SKIP_ROM = const(0xcc)
-
- def __init__(self, pin):
- self.pin = pin
- self.pin.init(pin.OPEN_DRAIN)
-
- def reset(self, required=False):
- reset = _ow.reset(self.pin)
- if required and not reset:
- raise OneWireError
- return reset
-
- def readbit(self):
- return _ow.readbit(self.pin)
-
- def readbyte(self):
- return _ow.readbyte(self.pin)
-
- def readinto(self, buf):
- for i in range(len(buf)):
- buf[i] = _ow.readbyte(self.pin)
-
- def writebit(self, value):
- return _ow.writebit(self.pin, value)
-
- def writebyte(self, value):
- return _ow.writebyte(self.pin, value)
-
- def write(self, buf):
- for b in buf:
- _ow.writebyte(self.pin, b)
-
- def select_rom(self, rom):
- self.reset()
- self.writebyte(MATCH_ROM)
- self.write(rom)
-
- def scan(self):
- devices = []
- diff = 65
- rom = False
- for i in range(0xff):
- rom, diff = self._search_rom(rom, diff)
- if rom:
- devices += [rom]
- if diff == 0:
- break
- return devices
-
- def _search_rom(self, l_rom, diff):
- if not self.reset():
- return None, 0
- self.writebyte(SEARCH_ROM)
- if not l_rom:
- l_rom = bytearray(8)
- rom = bytearray(8)
- next_diff = 0
- i = 64
- for byte in range(8):
- r_b = 0
- for bit in range(8):
- b = self.readbit()
- if self.readbit():
- if b: # there are no devices or there is an error on the bus
- return None, 0
- else:
- if not b: # collision, two devices with different bit meaning
- if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):
- b = 1
- next_diff = i
- self.writebit(b)
- if b:
- r_b |= 1 << bit
- i -= 1
- rom[byte] = r_b
- return rom, next_diff
-
- def crc8(self, data):
- return _ow.crc8(data)