aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/bsddb/test/test_thread.py
diff options
context:
space:
mode:
authorMartin v. Löwis <martin@v.loewis.de>2002-11-19 17:47:07 +0000
committerMartin v. Löwis <martin@v.loewis.de>2002-11-19 17:47:07 +0000
commit1c6b1a2b4ea38955a3f0514f4709bafd0be96c5e (patch)
tree1d75385f12cdeda7369e5a2428d33649e6b70380 /Lib/bsddb/test/test_thread.py
parenta406b58619e3fd8fb26ced18ac64b475a48648d2 (diff)
downloadcpython-1c6b1a2b4ea38955a3f0514f4709bafd0be96c5e.tar.gz
cpython-1c6b1a2b4ea38955a3f0514f4709bafd0be96c5e.zip
Importing test suite from bsddb3 3.4.0 (with modifications).
Diffstat (limited to 'Lib/bsddb/test/test_thread.py')
-rw-r--r--Lib/bsddb/test/test_thread.py487
1 files changed, 487 insertions, 0 deletions
diff --git a/Lib/bsddb/test/test_thread.py b/Lib/bsddb/test/test_thread.py
new file mode 100644
index 00000000000..60d6dc56fb8
--- /dev/null
+++ b/Lib/bsddb/test/test_thread.py
@@ -0,0 +1,487 @@
+"""
+TestCases for multi-threaded access to a DB.
+"""
+
+import sys, os, string
+import tempfile
+import time
+from pprint import pprint
+from whrandom import random
+
+try:
+ from threading import Thread, currentThread
+ have_threads = 1
+except ImportError:
+ have_threads = 0
+
+
+import unittest
+from test.test_support import verbose
+
+from bsddb import db
+
+
+#----------------------------------------------------------------------
+
+class BaseThreadedTestCase(unittest.TestCase):
+ dbtype = db.DB_UNKNOWN # must be set in derived class
+ dbopenflags = 0
+ dbsetflags = 0
+ envflags = 0
+
+
+ def setUp(self):
+ homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
+ self.homeDir = homeDir
+ try: os.mkdir(homeDir)
+ except os.error: pass
+ self.env = db.DBEnv()
+ self.setEnvOpts()
+ self.env.open(homeDir, self.envflags | db.DB_CREATE)
+
+ self.filename = self.__class__.__name__ + '.db'
+ self.d = db.DB(self.env)
+ if self.dbsetflags:
+ self.d.set_flags(self.dbsetflags)
+ self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE)
+
+
+ def tearDown(self):
+ self.d.close()
+ self.env.close()
+ import glob
+ files = glob.glob(os.path.join(self.homeDir, '*'))
+ for file in files:
+ os.remove(file)
+
+
+ def setEnvOpts(self):
+ pass
+
+
+ def makeData(self, key):
+ return string.join([key] * 5, '-')
+
+
+#----------------------------------------------------------------------
+
+
+class ConcurrentDataStoreBase(BaseThreadedTestCase):
+ dbopenflags = db.DB_THREAD
+ envflags = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL
+ readers = 0 # derived class should set
+ writers = 0
+ records = 1000
+
+
+ def test01_1WriterMultiReaders(self):
+ if verbose:
+ print '\n', '-=' * 30
+ print "Running %s.test01_1WriterMultiReaders..." % self.__class__.__name__
+
+ threads = []
+ for x in range(self.writers):
+ wt = Thread(target = self.writerThread,
+ args = (self.d, self.records, x),
+ name = 'writer %d' % x,
+ )#verbose = verbose)
+ threads.append(wt)
+
+ for x in range(self.readers):
+ rt = Thread(target = self.readerThread,
+ args = (self.d, x),
+ name = 'reader %d' % x,
+ )#verbose = verbose)
+ threads.append(rt)
+
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+
+ def writerThread(self, d, howMany, writerNum):
+ #time.sleep(0.01 * writerNum + 0.01)
+ name = currentThread().getName()
+ start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1
+ if verbose:
+ print "%s: creating records %d - %d" % (name, start, stop)
+
+ for x in range(start, stop):
+ key = '%04d' % x
+ d.put(key, self.makeData(key))
+ if verbose and x % 100 == 0:
+ print "%s: records %d - %d finished" % (name, start, x)
+
+ if verbose: print "%s: finished creating records" % name
+
+## # Each write-cursor will be exclusive, the only one that can update the DB...
+## if verbose: print "%s: deleting a few records" % name
+## c = d.cursor(flags = db.DB_WRITECURSOR)
+## for x in range(10):
+## key = int(random() * howMany) + start
+## key = '%04d' % key
+## if d.has_key(key):
+## c.set(key)
+## c.delete()
+
+## c.close()
+ if verbose: print "%s: thread finished" % name
+
+
+ def readerThread(self, d, readerNum):
+ time.sleep(0.01 * readerNum)
+ name = currentThread().getName()
+
+ for loop in range(5):
+ c = d.cursor()
+ count = 0
+ rec = c.first()
+ while rec:
+ count = count + 1
+ key, data = rec
+ assert self.makeData(key) == data
+ rec = c.next()
+ if verbose: print "%s: found %d records" % (name, count)
+ c.close()
+ time.sleep(0.05)
+
+ if verbose: print "%s: thread finished" % name
+
+
+
+class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
+ dbtype = db.DB_BTREE
+ writers = 2
+ readers = 10
+ records = 1000
+
+
+class HashConcurrentDataStore(ConcurrentDataStoreBase):
+ dbtype = db.DB_HASH
+ writers = 2
+ readers = 10
+ records = 1000
+
+#----------------------------------------------------------------------
+
+class SimpleThreadedBase(BaseThreadedTestCase):
+ dbopenflags = db.DB_THREAD
+ envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
+ readers = 5
+ writers = 3
+ records = 1000
+
+
+ def setEnvOpts(self):
+ self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
+
+
+ def test02_SimpleLocks(self):
+ if verbose:
+ print '\n', '-=' * 30
+ print "Running %s.test02_SimpleLocks..." % self.__class__.__name__
+
+ threads = []
+ for x in range(self.writers):
+ wt = Thread(target = self.writerThread,
+ args = (self.d, self.records, x),
+ name = 'writer %d' % x,
+ )#verbose = verbose)
+ threads.append(wt)
+ for x in range(self.readers):
+ rt = Thread(target = self.readerThread,
+ args = (self.d, x),
+ name = 'reader %d' % x,
+ )#verbose = verbose)
+ threads.append(rt)
+
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+
+
+ def writerThread(self, d, howMany, writerNum):
+ name = currentThread().getName()
+ start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1
+ if verbose:
+ print "%s: creating records %d - %d" % (name, start, stop)
+
+ # create a bunch of records
+ for x in xrange(start, stop):
+ key = '%04d' % x
+ d.put(key, self.makeData(key))
+
+ if verbose and x % 100 == 0:
+ print "%s: records %d - %d finished" % (name, start, x)
+
+ # do a bit or reading too
+ if random() <= 0.05:
+ for y in xrange(start, x):
+ key = '%04d' % x
+ data = d.get(key)
+ assert data == self.makeData(key)
+
+ # flush them
+ try:
+ d.sync()
+ except db.DBIncompleteError, val:
+ if verbose:
+ print "could not complete sync()..."
+
+ # read them back, deleting a few
+ for x in xrange(start, stop):
+ key = '%04d' % x
+ data = d.get(key)
+ if verbose and x % 100 == 0:
+ print "%s: fetched record (%s, %s)" % (name, key, data)
+ assert data == self.makeData(key)
+ if random() <= 0.10:
+ d.delete(key)
+ if verbose:
+ print "%s: deleted record %s" % (name, key)
+
+ if verbose: print "%s: thread finished" % name
+
+
+ def readerThread(self, d, readerNum):
+ time.sleep(0.01 * readerNum)
+ name = currentThread().getName()
+
+ for loop in range(5):
+ c = d.cursor()
+ count = 0
+ rec = c.first()
+ while rec:
+ count = count + 1
+ key, data = rec
+ assert self.makeData(key) == data
+ rec = c.next()
+ if verbose: print "%s: found %d records" % (name, count)
+ c.close()
+ time.sleep(0.05)
+
+ if verbose: print "%s: thread finished" % name
+
+
+
+
+class BTreeSimpleThreaded(SimpleThreadedBase):
+ dbtype = db.DB_BTREE
+
+
+class HashSimpleThreaded(SimpleThreadedBase):
+ dbtype = db.DB_BTREE
+
+
+#----------------------------------------------------------------------
+
+
+
+class ThreadedTransactionsBase(BaseThreadedTestCase):
+ dbopenflags = db.DB_THREAD
+ envflags = (db.DB_THREAD |
+ db.DB_INIT_MPOOL |
+ db.DB_INIT_LOCK |
+ db.DB_INIT_LOG |
+ db.DB_INIT_TXN
+ )
+ readers = 0
+ writers = 0
+ records = 2000
+
+ txnFlag = 0
+
+
+ def setEnvOpts(self):
+ #self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
+ pass
+
+
+ def test03_ThreadedTransactions(self):
+ if verbose:
+ print '\n', '-=' * 30
+ print "Running %s.test03_ThreadedTransactions..." % self.__class__.__name__
+
+ threads = []
+ for x in range(self.writers):
+ wt = Thread(target = self.writerThread,
+ args = (self.d, self.records, x),
+ name = 'writer %d' % x,
+ )#verbose = verbose)
+ threads.append(wt)
+
+ for x in range(self.readers):
+ rt = Thread(target = self.readerThread,
+ args = (self.d, x),
+ name = 'reader %d' % x,
+ )#verbose = verbose)
+ threads.append(rt)
+
+ dt = Thread(target = self.deadlockThread)
+ dt.start()
+
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+ self.doLockDetect = 0
+ dt.join()
+
+
+ def doWrite(self, d, name, start, stop):
+ finished = 0
+ while not finished:
+ try:
+ txn = self.env.txn_begin(None, self.txnFlag)
+ for x in range(start, stop):
+ key = '%04d' % x
+ d.put(key, self.makeData(key), txn)
+ if verbose and x % 100 == 0:
+ print "%s: records %d - %d finished" % (name, start, x)
+ txn.commit()
+ finished = 1
+ except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
+ if verbose:
+ print "%s: Aborting transaction (%s)" % (name, val[1])
+ txn.abort()
+ time.sleep(0.05)
+
+
+
+ def writerThread(self, d, howMany, writerNum):
+ name = currentThread().getName()
+ start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1
+ if verbose:
+ print "%s: creating records %d - %d" % (name, start, stop)
+
+ step = 100
+ for x in range(start, stop, step):
+ self.doWrite(d, name, x, min(stop, x+step))
+
+ if verbose: print "%s: finished creating records" % name
+ if verbose: print "%s: deleting a few records" % name
+
+ finished = 0
+ while not finished:
+ try:
+ recs = []
+ txn = self.env.txn_begin(None, self.txnFlag)
+ for x in range(10):
+ key = int(random() * howMany) + start
+ key = '%04d' % key
+ data = d.get(key, None, txn, db.DB_RMW)
+ if data is not None:
+ d.delete(key, txn)
+ recs.append(key)
+ txn.commit()
+ finished = 1
+ if verbose: print "%s: deleted records %s" % (name, recs)
+ except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
+ if verbose:
+ print "%s: Aborting transaction (%s)" % (name, val[1])
+ txn.abort()
+ time.sleep(0.05)
+
+ if verbose: print "%s: thread finished" % name
+
+
+ def readerThread(self, d, readerNum):
+ time.sleep(0.01 * readerNum + 0.05)
+ name = currentThread().getName()
+
+ for loop in range(5):
+ finished = 0
+ while not finished:
+ try:
+ txn = self.env.txn_begin(None, self.txnFlag)
+ c = d.cursor(txn)
+ count = 0
+ rec = c.first()
+ while rec:
+ count = count + 1
+ key, data = rec
+ assert self.makeData(key) == data
+ rec = c.next()
+ if verbose: print "%s: found %d records" % (name, count)
+ c.close()
+ txn.commit()
+ finished = 1
+ except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
+ if verbose:
+ print "%s: Aborting transaction (%s)" % (name, val[1])
+ c.close()
+ txn.abort()
+ time.sleep(0.05)
+
+ time.sleep(0.05)
+
+ if verbose: print "%s: thread finished" % name
+
+
+ def deadlockThread(self):
+ self.doLockDetect = 1
+ while self.doLockDetect:
+ time.sleep(0.5)
+ try:
+ aborted = self.env.lock_detect(db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
+ if verbose and aborted:
+ print "deadlock: Aborted %d deadlocked transaction(s)" % aborted
+ except db.DBError:
+ pass
+
+
+
+class BTreeThreadedTransactions(ThreadedTransactionsBase):
+ dbtype = db.DB_BTREE
+ writers = 3
+ readers = 5
+ records = 2000
+
+class HashThreadedTransactions(ThreadedTransactionsBase):
+ dbtype = db.DB_HASH
+ writers = 1
+ readers = 5
+ records = 2000
+
+class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
+ dbtype = db.DB_BTREE
+ writers = 3
+ readers = 5
+ records = 2000
+ txnFlag = db.DB_TXN_NOWAIT
+
+class HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
+ dbtype = db.DB_HASH
+ writers = 1
+ readers = 5
+ records = 2000
+ txnFlag = db.DB_TXN_NOWAIT
+
+
+#----------------------------------------------------------------------
+
+def suite():
+ theSuite = unittest.TestSuite()
+
+ if have_threads:
+ theSuite.addTest(unittest.makeSuite(BTreeConcurrentDataStore))
+ theSuite.addTest(unittest.makeSuite(HashConcurrentDataStore))
+ theSuite.addTest(unittest.makeSuite(BTreeSimpleThreaded))
+ theSuite.addTest(unittest.makeSuite(HashSimpleThreaded))
+ theSuite.addTest(unittest.makeSuite(BTreeThreadedTransactions))
+ theSuite.addTest(unittest.makeSuite(HashThreadedTransactions))
+ theSuite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions))
+ theSuite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions))
+
+ else:
+ print "Threads not available, skipping thread tests."
+
+ return theSuite
+
+
+if __name__ == '__main__':
+ unittest.main( defaultTest='suite' )