summaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--drivers/onewire/ds18x20.py104
-rw-r--r--drivers/onewire/onewire.py336
2 files changed, 440 insertions, 0 deletions
diff --git a/drivers/onewire/ds18x20.py b/drivers/onewire/ds18x20.py
new file mode 100644
index 0000000000..06c9fc968b
--- /dev/null
+++ b/drivers/onewire/ds18x20.py
@@ -0,0 +1,104 @@
+"""
+DS18x20 temperature sensor driver for MicroPython.
+
+This driver uses the OneWire driver to control DS18S20 and DS18B20
+temperature sensors. It supports multiple devices on the same 1-wire bus.
+
+The following example assumes the ground of your DS18x20 is connected to
+Y11, vcc is connected to Y9 and the data pin is connected to Y10.
+
+>>> gnd = Pin('Y11')
+>>> gnd.init(Pin.OUT_PP)
+>>> gnd.low()
+
+>>> vcc = Pin('Y9')
+>>> vcc.init(Pin.OUT_PP)
+>>> vcc.high()
+
+>>> d = DS18X20(Pin('Y10'))
+
+Call read_temps to read all sensors:
+
+>>> result = d.read_temps()
+>>> print(result)
+[20.875, 20.8125]
+
+Call read_temp to read the temperature of a specific sensor:
+
+>>> result = d.read_temp(d.roms[0])
+>>> print(result)
+20.25
+
+If only one DS18x20 is attached to the bus, then you don't need to
+pass a ROM to read_temp:
+
+>>> result = d.read_temp()
+>>> print(result)
+20.25
+
+"""
+
+from onewire import OneWire
+
+class DS18X20(object):
+ def __init__(self, pin):
+ self.ow = OneWire(pin)
+ # Scan the 1-wire devices, but only keep those which have the
+ # correct # first byte in their rom for a DS18x20 device.
+ self.roms = [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28]
+
+ def _select_rom(self, rom):
+ if rom:
+ self.ow.select_rom(rom)
+ else:
+ self.ow.skip_rom()
+
+ def read_temp(self, rom=None):
+ """
+ Read and return the temperature of one DS18x20 device.
+ Pass the 8-byte bytes object with the ROM of the specific device you want to read.
+ If only one DS18x20 device is attached to the bus you may omit the rom parameter.
+ """
+ ow = self.ow
+ ow.reset()
+ self._select_rom(rom)
+ ow.write_byte(0x44) # Convert Temp
+ while True:
+ if ow.read_bit():
+ break
+ ow.reset()
+ self._select_rom(rom)
+ ow.write_byte(0xbe) # Read scratch
+ data = ow.read_bytes(9)
+ return self.convert_temp(rom[0], data)
+
+ def read_temps(self):
+ """
+ Read and return the temperatures of all attached DS18x20 devices.
+ """
+ temps = []
+ for rom in self.roms:
+ temps.append(self.read_temp(rom))
+ return temps
+
+ def convert_temp(self, rom0, data):
+ """
+ Convert the raw temperature data into degrees celsius and return as a float.
+ """
+ temp_lsb = data[0]
+ temp_msb = data[1]
+ if rom0 == 0x10:
+ if temp_msb != 0:
+ # convert negative number
+ temp_read = temp_lsb >> 1 | 0x80 # truncate bit 0 by shifting, fill high bit with 1.
+ temp_read = -((~temp_read + 1) & 0xff) # now convert from two's complement
+ else:
+ temp_read = temp_lsb >> 1 # truncate bit 0 by shifting
+ count_remain = data[6]
+ count_per_c = data[7]
+ temp = temp_read - 0.25 + (count_per_c - count_remain) / count_per_c
+ return temp
+ elif rom0 == 0x28:
+ return (temp_msb << 8 | temp_lsb) / 16
+ else:
+ assert False
diff --git a/drivers/onewire/onewire.py b/drivers/onewire/onewire.py
new file mode 100644
index 0000000000..ffeb130d6e
--- /dev/null
+++ b/drivers/onewire/onewire.py
@@ -0,0 +1,336 @@
+"""
+OneWire library ported to MicroPython by Jason Hildebrand.
+
+
+TODO:
+ * implement and test parasite-power mode (as an init option)
+ * port the crc checks
+
+The original upstream copyright and terms follow.
+------------------------------------------------------------------------------
+
+Copyright (c) 2007, Jim Studt (original old version - many contributors since)
+
+OneWire has been maintained by Paul Stoffregen (paul@pjrc.com) since
+January 2010.
+
+26 Sept 2008 -- Robin James
+
+Jim Studt's original library was modified by Josh Larios.
+
+Tom Pollard, pollard@alum.mit.edu, contributed around May 20, 2008
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+Much of the code was inspired by Derek Yerger's code, though I don't
+think much of that remains. In any event that was..
+ (copyleft) 2006 by Derek Yerger - Free to distribute freely.
+"""
+
+import pyb
+from pyb import disable_irq
+from pyb import enable_irq
+
+class OneWire:
+ def __init__(self, pin):
+ """
+ Pass the data pin connected to your one-wire device(s), for example Pin('X1').
+ The one-wire protocol allows for multiple devices to be attached.
+ """
+ self.data_pin = pin
+ self.write_delays = (1, 40, 40, 1)
+ self.read_delays = (1, 1, 40)
+
+ # cache a bunch of methods and attributes. This is necessary in _write_bit and
+ # _read_bit to achieve the timing required by the OneWire protocol.
+ self.cache = (pin.init, pin.value, pin.OUT_PP, pin.IN, pin.PULL_NONE)
+
+ pin.init(pin.IN, pin.PULL_UP)
+
+ def reset(self):
+ """
+ Perform the onewire reset function.
+ Returns 1 if a device asserted a presence pulse, 0 otherwise.
+
+ If you receive 0, then check your wiring and make sure you are providing
+ power and ground to your devices.
+ """
+ retries = 25
+ self.data_pin.init(self.data_pin.IN, self.data_pin.PULL_UP)
+
+ # We will wait up to 250uS for
+ # the bus to come high, if it doesn't then it is broken or shorted
+ # and we return a 0;
+
+ # wait until the wire is high... just in case
+ while True:
+ if self.data_pin.value():
+ break
+ retries -= 1
+ if retries == 0:
+ raise OSError("OneWire pin didn't go high")
+ pyb.udelay(10)
+
+ # pull the bus low for at least 480us
+ self.data_pin.low()
+ self.data_pin.init(self.data_pin.OUT_PP)
+ pyb.udelay(480)
+
+ # If there is a slave present, it should pull the bus low within 60us
+ i = pyb.disable_irq()
+ self.data_pin.init(self.data_pin.IN, self.data_pin.PULL_UP)
+ pyb.udelay(70)
+ presence = not self.data_pin.value()
+ pyb.enable_irq(i)
+ pyb.udelay(410)
+ return presence
+
+ def write_bit(self, value):
+ """
+ Write a single bit.
+ """
+ pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
+ self._write_bit(value, pin_init, pin_value, Pin_OUT_PP)
+
+ def _write_bit(self, value, pin_init, pin_value, Pin_OUT_PP):
+ """
+ Write a single bit - requires cached methods/attributes be passed as arguments.
+ See also write_bit()
+ """
+ d0, d1, d2, d3 = self.write_delays
+ udelay = pyb.udelay
+ if value:
+ # write 1
+ i = disable_irq()
+ pin_value(0)
+ pin_init(Pin_OUT_PP)
+ udelay(d0)
+ pin_value(1)
+ enable_irq(i)
+ udelay(d1)
+ else:
+ # write 0
+ i = disable_irq()
+ pin_value(0)
+ pin_init(Pin_OUT_PP)
+ udelay(d2)
+ pin_value(1)
+ enable_irq(i)
+ udelay(d3)
+
+ def write_byte(self, value):
+ """
+ Write a byte. The pin will go tri-state at the end of the write to avoid
+ heating in a short or other mishap.
+ """
+ pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
+ for i in range(8):
+ self._write_bit(value & 1, pin_init, pin_value, Pin_OUT_PP)
+ value >>= 1
+ pin_init(Pin_IN, Pin_PULL_UP)
+
+ def write_bytes(self, bytestring):
+ """
+ Write a sequence of bytes.
+ """
+ for byte in bytestring:
+ self.write_byte(byte)
+
+ def _read_bit(self, pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP):
+ """
+ Read a single bit - requires cached methods/attributes be passed as arguments.
+ See also read_bit()
+ """
+ d0, d1, d2 = self.read_delays
+ udelay = pyb.udelay
+ pin_init(Pin_IN, Pin_PULL_UP) # TODO why do we need this?
+ i = disable_irq()
+ pin_value(0)
+ pin_init(Pin_OUT_PP)
+ udelay(d0)
+ pin_init(Pin_IN, Pin_PULL_UP)
+ udelay(d1)
+ value = pin_value()
+ enable_irq(i)
+ udelay(d2)
+ return value
+
+ def read_bit(self):
+ """
+ Read a single bit.
+ """
+ pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
+ return self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP)
+
+ def read_byte(self):
+ """
+ Read a single byte and return the value as an integer.
+ See also read_bytes()
+ """
+ pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
+ value = 0
+ for i in range(8):
+ bit = self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP)
+ value |= bit << i
+ return value
+
+ def read_bytes(self, count):
+ """
+ Read a sequence of N bytes.
+ The bytes are returned as a bytearray.
+ """
+ s = bytearray(count)
+ for i in range(count):
+ s[i] = self.read_byte()
+ return s
+
+ def select_rom(self, rom):
+ """
+ Select a specific device to talk to. Pass in rom as a bytearray (8 bytes).
+ """
+ assert len(rom) == 8, "ROM must be 8 bytes"
+ self.reset()
+ self.write_byte(0x55) # ROM MATCH
+ self.write_bytes(rom)
+
+ def read_rom(self):
+ """
+ Read the ROM - this works if there is only a single device attached.
+ """
+ self.reset()
+ self.write_byte(0x33) # READ ROM
+ rom = self.read_bytes(8)
+ # TODO: check CRC of the ROM
+ return rom
+
+ def skip_rom(self):
+ """
+ Send skip-rom command - this works if there is only one device attached.
+ """
+ self.write_byte(0xCC) # SKIP ROM
+
+ def depower(self):
+ self.data_pin.init(self.data_pin.IN, self.data_pin.PULL_NONE)
+
+ def scan(self):
+ """
+ Return a list of ROMs for all attached devices.
+ Each ROM is returned as a bytes object of 8 bytes.
+ """
+ devices = []
+ self._reset_search()
+ while True:
+ rom = self._search()
+ if not rom:
+ return devices
+ devices.append(rom)
+
+ def _reset_search(self):
+ self.last_discrepancy = 0
+ self.last_device_flag = False
+ self.last_family_discrepancy = 0
+ self.rom = bytearray(8)
+
+ def _search(self):
+ # initialize for search
+ id_bit_number = 1
+ last_zero = 0
+ rom_byte_number = 0
+ rom_byte_mask = 1
+ search_result = 0
+ pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
+
+ # if the last call was not the last one
+ if not self.last_device_flag:
+ # 1-Wire reset
+ if not self.reset():
+ self._reset_search()
+ return None
+
+ # issue the search command
+ self.write_byte(0xF0)
+
+ # loop to do the search
+ while rom_byte_number < 8: # loop until through all ROM bytes 0-7
+ # read a bit and its complement
+ id_bit = self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP)
+ cmp_id_bit = self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP)
+
+ # check for no devices on 1-wire
+ if (id_bit == 1) and (cmp_id_bit == 1):
+ break
+ else:
+ # all devices coupled have 0 or 1
+ if (id_bit != cmp_id_bit):
+ search_direction = id_bit # bit write value for search
+ else:
+ # if this discrepancy if before the Last Discrepancy
+ # on a previous next then pick the same as last time
+ if (id_bit_number < self.last_discrepancy):
+ search_direction = (self.rom[rom_byte_number] & rom_byte_mask) > 0
+ else:
+ # if equal to last pick 1, if not then pick 0
+ search_direction = (id_bit_number == self.last_discrepancy)
+
+ # if 0 was picked then record its position in LastZero
+ if search_direction == 0:
+ last_zero = id_bit_number
+
+ # check for Last discrepancy in family
+ if last_zero < 9:
+ self.last_family_discrepancy = last_zero
+
+ # set or clear the bit in the ROM byte rom_byte_number
+ # with mask rom_byte_mask
+ if search_direction == 1:
+ self.rom[rom_byte_number] |= rom_byte_mask
+ else:
+ self.rom[rom_byte_number] &= ~rom_byte_mask
+
+ # serial number search direction write bit
+ #print('sd', search_direction)
+ self.write_bit(search_direction)
+
+ # increment the byte counter id_bit_number
+ # and shift the mask rom_byte_mask
+ id_bit_number += 1
+ rom_byte_mask <<= 1
+
+ # if the mask is 0 then go to new SerialNum byte rom_byte_number and reset mask
+ if rom_byte_mask == 0x100:
+ rom_byte_number += 1
+ rom_byte_mask = 1
+
+ # if the search was successful then
+ if not (id_bit_number < 65):
+ # search successful so set last_discrepancy,last_device_flag,search_result
+ self.last_discrepancy = last_zero
+
+ # check for last device
+ if self.last_discrepancy == 0:
+ self.last_device_flag = True
+ search_result = True
+
+ # if no device found then reset counters so next 'search' will be like a first
+ if not search_result or not self.rom[0]:
+ self._reset_search()
+ return None
+ else:
+ return bytes(self.rom)