diff options
author | Martin v. Löwis <martin@v.loewis.de> | 2002-11-19 17:47:07 +0000 |
---|---|---|
committer | Martin v. Löwis <martin@v.loewis.de> | 2002-11-19 17:47:07 +0000 |
commit | 1c6b1a2b4ea38955a3f0514f4709bafd0be96c5e (patch) | |
tree | 1d75385f12cdeda7369e5a2428d33649e6b70380 /Lib/bsddb/test/test_thread.py | |
parent | a406b58619e3fd8fb26ced18ac64b475a48648d2 (diff) | |
download | cpython-1c6b1a2b4ea38955a3f0514f4709bafd0be96c5e.tar.gz cpython-1c6b1a2b4ea38955a3f0514f4709bafd0be96c5e.zip |
Importing test suite from bsddb3 3.4.0 (with modifications).
Diffstat (limited to 'Lib/bsddb/test/test_thread.py')
-rw-r--r-- | Lib/bsddb/test/test_thread.py | 487 |
1 files changed, 487 insertions, 0 deletions
diff --git a/Lib/bsddb/test/test_thread.py b/Lib/bsddb/test/test_thread.py new file mode 100644 index 00000000000..60d6dc56fb8 --- /dev/null +++ b/Lib/bsddb/test/test_thread.py @@ -0,0 +1,487 @@ +""" +TestCases for multi-threaded access to a DB. +""" + +import sys, os, string +import tempfile +import time +from pprint import pprint +from whrandom import random + +try: + from threading import Thread, currentThread + have_threads = 1 +except ImportError: + have_threads = 0 + + +import unittest +from test.test_support import verbose + +from bsddb import db + + +#---------------------------------------------------------------------- + +class BaseThreadedTestCase(unittest.TestCase): + dbtype = db.DB_UNKNOWN # must be set in derived class + dbopenflags = 0 + dbsetflags = 0 + envflags = 0 + + + def setUp(self): + homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home') + self.homeDir = homeDir + try: os.mkdir(homeDir) + except os.error: pass + self.env = db.DBEnv() + self.setEnvOpts() + self.env.open(homeDir, self.envflags | db.DB_CREATE) + + self.filename = self.__class__.__name__ + '.db' + self.d = db.DB(self.env) + if self.dbsetflags: + self.d.set_flags(self.dbsetflags) + self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE) + + + def tearDown(self): + self.d.close() + self.env.close() + import glob + files = glob.glob(os.path.join(self.homeDir, '*')) + for file in files: + os.remove(file) + + + def setEnvOpts(self): + pass + + + def makeData(self, key): + return string.join([key] * 5, '-') + + +#---------------------------------------------------------------------- + + +class ConcurrentDataStoreBase(BaseThreadedTestCase): + dbopenflags = db.DB_THREAD + envflags = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL + readers = 0 # derived class should set + writers = 0 + records = 1000 + + + def test01_1WriterMultiReaders(self): + if verbose: + print '\n', '-=' * 30 + print "Running %s.test01_1WriterMultiReaders..." % self.__class__.__name__ + + threads = [] + for x in range(self.writers): + wt = Thread(target = self.writerThread, + args = (self.d, self.records, x), + name = 'writer %d' % x, + )#verbose = verbose) + threads.append(wt) + + for x in range(self.readers): + rt = Thread(target = self.readerThread, + args = (self.d, x), + name = 'reader %d' % x, + )#verbose = verbose) + threads.append(rt) + + for t in threads: + t.start() + for t in threads: + t.join() + + + def writerThread(self, d, howMany, writerNum): + #time.sleep(0.01 * writerNum + 0.01) + name = currentThread().getName() + start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1 + if verbose: + print "%s: creating records %d - %d" % (name, start, stop) + + for x in range(start, stop): + key = '%04d' % x + d.put(key, self.makeData(key)) + if verbose and x % 100 == 0: + print "%s: records %d - %d finished" % (name, start, x) + + if verbose: print "%s: finished creating records" % name + +## # Each write-cursor will be exclusive, the only one that can update the DB... +## if verbose: print "%s: deleting a few records" % name +## c = d.cursor(flags = db.DB_WRITECURSOR) +## for x in range(10): +## key = int(random() * howMany) + start +## key = '%04d' % key +## if d.has_key(key): +## c.set(key) +## c.delete() + +## c.close() + if verbose: print "%s: thread finished" % name + + + def readerThread(self, d, readerNum): + time.sleep(0.01 * readerNum) + name = currentThread().getName() + + for loop in range(5): + c = d.cursor() + count = 0 + rec = c.first() + while rec: + count = count + 1 + key, data = rec + assert self.makeData(key) == data + rec = c.next() + if verbose: print "%s: found %d records" % (name, count) + c.close() + time.sleep(0.05) + + if verbose: print "%s: thread finished" % name + + + +class BTreeConcurrentDataStore(ConcurrentDataStoreBase): + dbtype = db.DB_BTREE + writers = 2 + readers = 10 + records = 1000 + + +class HashConcurrentDataStore(ConcurrentDataStoreBase): + dbtype = db.DB_HASH + writers = 2 + readers = 10 + records = 1000 + +#---------------------------------------------------------------------- + +class SimpleThreadedBase(BaseThreadedTestCase): + dbopenflags = db.DB_THREAD + envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK + readers = 5 + writers = 3 + records = 1000 + + + def setEnvOpts(self): + self.env.set_lk_detect(db.DB_LOCK_DEFAULT) + + + def test02_SimpleLocks(self): + if verbose: + print '\n', '-=' * 30 + print "Running %s.test02_SimpleLocks..." % self.__class__.__name__ + + threads = [] + for x in range(self.writers): + wt = Thread(target = self.writerThread, + args = (self.d, self.records, x), + name = 'writer %d' % x, + )#verbose = verbose) + threads.append(wt) + for x in range(self.readers): + rt = Thread(target = self.readerThread, + args = (self.d, x), + name = 'reader %d' % x, + )#verbose = verbose) + threads.append(rt) + + for t in threads: + t.start() + for t in threads: + t.join() + + + + def writerThread(self, d, howMany, writerNum): + name = currentThread().getName() + start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1 + if verbose: + print "%s: creating records %d - %d" % (name, start, stop) + + # create a bunch of records + for x in xrange(start, stop): + key = '%04d' % x + d.put(key, self.makeData(key)) + + if verbose and x % 100 == 0: + print "%s: records %d - %d finished" % (name, start, x) + + # do a bit or reading too + if random() <= 0.05: + for y in xrange(start, x): + key = '%04d' % x + data = d.get(key) + assert data == self.makeData(key) + + # flush them + try: + d.sync() + except db.DBIncompleteError, val: + if verbose: + print "could not complete sync()..." + + # read them back, deleting a few + for x in xrange(start, stop): + key = '%04d' % x + data = d.get(key) + if verbose and x % 100 == 0: + print "%s: fetched record (%s, %s)" % (name, key, data) + assert data == self.makeData(key) + if random() <= 0.10: + d.delete(key) + if verbose: + print "%s: deleted record %s" % (name, key) + + if verbose: print "%s: thread finished" % name + + + def readerThread(self, d, readerNum): + time.sleep(0.01 * readerNum) + name = currentThread().getName() + + for loop in range(5): + c = d.cursor() + count = 0 + rec = c.first() + while rec: + count = count + 1 + key, data = rec + assert self.makeData(key) == data + rec = c.next() + if verbose: print "%s: found %d records" % (name, count) + c.close() + time.sleep(0.05) + + if verbose: print "%s: thread finished" % name + + + + +class BTreeSimpleThreaded(SimpleThreadedBase): + dbtype = db.DB_BTREE + + +class HashSimpleThreaded(SimpleThreadedBase): + dbtype = db.DB_BTREE + + +#---------------------------------------------------------------------- + + + +class ThreadedTransactionsBase(BaseThreadedTestCase): + dbopenflags = db.DB_THREAD + envflags = (db.DB_THREAD | + db.DB_INIT_MPOOL | + db.DB_INIT_LOCK | + db.DB_INIT_LOG | + db.DB_INIT_TXN + ) + readers = 0 + writers = 0 + records = 2000 + + txnFlag = 0 + + + def setEnvOpts(self): + #self.env.set_lk_detect(db.DB_LOCK_DEFAULT) + pass + + + def test03_ThreadedTransactions(self): + if verbose: + print '\n', '-=' * 30 + print "Running %s.test03_ThreadedTransactions..." % self.__class__.__name__ + + threads = [] + for x in range(self.writers): + wt = Thread(target = self.writerThread, + args = (self.d, self.records, x), + name = 'writer %d' % x, + )#verbose = verbose) + threads.append(wt) + + for x in range(self.readers): + rt = Thread(target = self.readerThread, + args = (self.d, x), + name = 'reader %d' % x, + )#verbose = verbose) + threads.append(rt) + + dt = Thread(target = self.deadlockThread) + dt.start() + + for t in threads: + t.start() + for t in threads: + t.join() + + self.doLockDetect = 0 + dt.join() + + + def doWrite(self, d, name, start, stop): + finished = 0 + while not finished: + try: + txn = self.env.txn_begin(None, self.txnFlag) + for x in range(start, stop): + key = '%04d' % x + d.put(key, self.makeData(key), txn) + if verbose and x % 100 == 0: + print "%s: records %d - %d finished" % (name, start, x) + txn.commit() + finished = 1 + except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: + if verbose: + print "%s: Aborting transaction (%s)" % (name, val[1]) + txn.abort() + time.sleep(0.05) + + + + def writerThread(self, d, howMany, writerNum): + name = currentThread().getName() + start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1 + if verbose: + print "%s: creating records %d - %d" % (name, start, stop) + + step = 100 + for x in range(start, stop, step): + self.doWrite(d, name, x, min(stop, x+step)) + + if verbose: print "%s: finished creating records" % name + if verbose: print "%s: deleting a few records" % name + + finished = 0 + while not finished: + try: + recs = [] + txn = self.env.txn_begin(None, self.txnFlag) + for x in range(10): + key = int(random() * howMany) + start + key = '%04d' % key + data = d.get(key, None, txn, db.DB_RMW) + if data is not None: + d.delete(key, txn) + recs.append(key) + txn.commit() + finished = 1 + if verbose: print "%s: deleted records %s" % (name, recs) + except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: + if verbose: + print "%s: Aborting transaction (%s)" % (name, val[1]) + txn.abort() + time.sleep(0.05) + + if verbose: print "%s: thread finished" % name + + + def readerThread(self, d, readerNum): + time.sleep(0.01 * readerNum + 0.05) + name = currentThread().getName() + + for loop in range(5): + finished = 0 + while not finished: + try: + txn = self.env.txn_begin(None, self.txnFlag) + c = d.cursor(txn) + count = 0 + rec = c.first() + while rec: + count = count + 1 + key, data = rec + assert self.makeData(key) == data + rec = c.next() + if verbose: print "%s: found %d records" % (name, count) + c.close() + txn.commit() + finished = 1 + except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: + if verbose: + print "%s: Aborting transaction (%s)" % (name, val[1]) + c.close() + txn.abort() + time.sleep(0.05) + + time.sleep(0.05) + + if verbose: print "%s: thread finished" % name + + + def deadlockThread(self): + self.doLockDetect = 1 + while self.doLockDetect: + time.sleep(0.5) + try: + aborted = self.env.lock_detect(db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT) + if verbose and aborted: + print "deadlock: Aborted %d deadlocked transaction(s)" % aborted + except db.DBError: + pass + + + +class BTreeThreadedTransactions(ThreadedTransactionsBase): + dbtype = db.DB_BTREE + writers = 3 + readers = 5 + records = 2000 + +class HashThreadedTransactions(ThreadedTransactionsBase): + dbtype = db.DB_HASH + writers = 1 + readers = 5 + records = 2000 + +class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase): + dbtype = db.DB_BTREE + writers = 3 + readers = 5 + records = 2000 + txnFlag = db.DB_TXN_NOWAIT + +class HashThreadedNoWaitTransactions(ThreadedTransactionsBase): + dbtype = db.DB_HASH + writers = 1 + readers = 5 + records = 2000 + txnFlag = db.DB_TXN_NOWAIT + + +#---------------------------------------------------------------------- + +def suite(): + theSuite = unittest.TestSuite() + + if have_threads: + theSuite.addTest(unittest.makeSuite(BTreeConcurrentDataStore)) + theSuite.addTest(unittest.makeSuite(HashConcurrentDataStore)) + theSuite.addTest(unittest.makeSuite(BTreeSimpleThreaded)) + theSuite.addTest(unittest.makeSuite(HashSimpleThreaded)) + theSuite.addTest(unittest.makeSuite(BTreeThreadedTransactions)) + theSuite.addTest(unittest.makeSuite(HashThreadedTransactions)) + theSuite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions)) + theSuite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions)) + + else: + print "Threads not available, skipping thread tests." + + return theSuite + + +if __name__ == '__main__': + unittest.main( defaultTest='suite' ) |