aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/test_weakref.py152
-rw-r--r--Lib/weakref.py137
2 files changed, 287 insertions, 2 deletions
diff --git a/Lib/test/test_weakref.py b/Lib/test/test_weakref.py
index e32e2485615..551d95cb91b 100644
--- a/Lib/test/test_weakref.py
+++ b/Lib/test/test_weakref.py
@@ -7,11 +7,15 @@ import operator
import contextlib
import copy
-from test import support
+from test import support, script_helper
# Used in ReferencesTestCase.test_ref_created_during_del() .
ref_from_del = None
+# Used by FinalizeTestCase as a global that may be replaced by None
+# when the interpreter shuts down.
+_global_var = 'foobar'
+
class C:
def method(self):
pass
@@ -1551,6 +1555,151 @@ class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
def _reference(self):
return self.__ref.copy()
+
+class FinalizeTestCase(unittest.TestCase):
+
+ class A:
+ pass
+
+ def _collect_if_necessary(self):
+ # we create no ref-cycles so in CPython no gc should be needed
+ if sys.implementation.name != 'cpython':
+ support.gc_collect()
+
+ def test_finalize(self):
+ def add(x,y,z):
+ res.append(x + y + z)
+ return x + y + z
+
+ a = self.A()
+
+ res = []
+ f = weakref.finalize(a, add, 67, 43, z=89)
+ self.assertEqual(f.alive, True)
+ self.assertEqual(f.peek(), (a, add, (67,43), {'z':89}))
+ self.assertEqual(f(), 199)
+ self.assertEqual(f(), None)
+ self.assertEqual(f(), None)
+ self.assertEqual(f.peek(), None)
+ self.assertEqual(f.detach(), None)
+ self.assertEqual(f.alive, False)
+ self.assertEqual(res, [199])
+
+ res = []
+ f = weakref.finalize(a, add, 67, 43, 89)
+ self.assertEqual(f.peek(), (a, add, (67,43,89), {}))
+ self.assertEqual(f.detach(), (a, add, (67,43,89), {}))
+ self.assertEqual(f(), None)
+ self.assertEqual(f(), None)
+ self.assertEqual(f.peek(), None)
+ self.assertEqual(f.detach(), None)
+ self.assertEqual(f.alive, False)
+ self.assertEqual(res, [])
+
+ res = []
+ f = weakref.finalize(a, add, x=67, y=43, z=89)
+ del a
+ self._collect_if_necessary()
+ self.assertEqual(f(), None)
+ self.assertEqual(f(), None)
+ self.assertEqual(f.peek(), None)
+ self.assertEqual(f.detach(), None)
+ self.assertEqual(f.alive, False)
+ self.assertEqual(res, [199])
+
+ def test_order(self):
+ a = self.A()
+ res = []
+
+ f1 = weakref.finalize(a, res.append, 'f1')
+ f2 = weakref.finalize(a, res.append, 'f2')
+ f3 = weakref.finalize(a, res.append, 'f3')
+ f4 = weakref.finalize(a, res.append, 'f4')
+ f5 = weakref.finalize(a, res.append, 'f5')
+
+ # make sure finalizers can keep themselves alive
+ del f1, f4
+
+ self.assertTrue(f2.alive)
+ self.assertTrue(f3.alive)
+ self.assertTrue(f5.alive)
+
+ self.assertTrue(f5.detach())
+ self.assertFalse(f5.alive)
+
+ f5() # nothing because previously unregistered
+ res.append('A')
+ f3() # => res.append('f3')
+ self.assertFalse(f3.alive)
+ res.append('B')
+ f3() # nothing because previously called
+ res.append('C')
+ del a
+ self._collect_if_necessary()
+ # => res.append('f4')
+ # => res.append('f2')
+ # => res.append('f1')
+ self.assertFalse(f2.alive)
+ res.append('D')
+ f2() # nothing because previously called by gc
+
+ expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D']
+ self.assertEqual(res, expected)
+
+ def test_all_freed(self):
+ # we want a weakrefable subclass of weakref.finalize
+ class MyFinalizer(weakref.finalize):
+ pass
+
+ a = self.A()
+ res = []
+ def callback():
+ res.append(123)
+ f = MyFinalizer(a, callback)
+
+ wr_callback = weakref.ref(callback)
+ wr_f = weakref.ref(f)
+ del callback, f
+
+ self.assertIsNotNone(wr_callback())
+ self.assertIsNotNone(wr_f())
+
+ del a
+ self._collect_if_necessary()
+
+ self.assertIsNone(wr_callback())
+ self.assertIsNone(wr_f())
+ self.assertEqual(res, [123])
+
+ @classmethod
+ def run_in_child(cls):
+ def error():
+ # Create an atexit finalizer from inside a finalizer called
+ # at exit. This should be the next to be run.
+ g1 = weakref.finalize(cls, print, 'g1')
+ print('f3 error')
+ 1/0
+
+ # cls should stay alive till atexit callbacks run
+ f1 = weakref.finalize(cls, print, 'f1', _global_var)
+ f2 = weakref.finalize(cls, print, 'f2', _global_var)
+ f3 = weakref.finalize(cls, error)
+ f4 = weakref.finalize(cls, print, 'f4', _global_var)
+
+ assert f1.atexit == True
+ f2.atexit = False
+ assert f3.atexit == True
+ assert f4.atexit == True
+
+ def test_atexit(self):
+ prog = ('from test.test_weakref import FinalizeTestCase;'+
+ 'FinalizeTestCase.run_in_child()')
+ rc, out, err = script_helper.assert_python_ok('-c', prog)
+ out = out.decode('ascii').splitlines()
+ self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar'])
+ self.assertTrue(b'ZeroDivisionError' in err)
+
+
libreftest = """ Doctest for examples in the library reference: weakref.rst
>>> import weakref
@@ -1644,6 +1793,7 @@ def test_main():
WeakValueDictionaryTestCase,
WeakKeyDictionaryTestCase,
SubclassableWeakrefTestCase,
+ FinalizeTestCase,
)
support.run_doctest(sys.modules[__name__])
diff --git a/Lib/weakref.py b/Lib/weakref.py
index 8f9c107aa6f..7a17d17fede 100644
--- a/Lib/weakref.py
+++ b/Lib/weakref.py
@@ -21,13 +21,16 @@ from _weakref import (
from _weakrefset import WeakSet, _IterationGuard
import collections # Import after _weakref to avoid circular import.
+import sys
+import atexit
+import itertools
ProxyTypes = (ProxyType, CallableProxyType)
__all__ = ["ref", "proxy", "getweakrefcount", "getweakrefs",
"WeakKeyDictionary", "ReferenceType", "ProxyType",
"CallableProxyType", "ProxyTypes", "WeakValueDictionary",
- "WeakSet", "WeakMethod"]
+ "WeakSet", "WeakMethod", "finalize"]
class WeakMethod(ref):
@@ -436,3 +439,135 @@ class WeakKeyDictionary(collections.MutableMapping):
d[ref(key, self._remove)] = value
if len(kwargs):
self.update(kwargs)
+
+
+class finalize:
+ """Class for finalization of weakrefable objects
+
+ finalize(obj, func, *args, **kwargs) returns a callable finalizer
+ object which will be called when obj is garbage collected. The
+ first time the finalizer is called it evaluates func(*arg, **kwargs)
+ and returns the result. After this the finalizer is dead, and
+ calling it just returns None.
+
+ When the program exits any remaining finalizers for which the
+ atexit attribute is true will be run in reverse order of creation.
+ By default atexit is true.
+ """
+
+ # Finalizer objects don't have any state of their own. They are
+ # just used as keys to lookup _Info objects in the registry. This
+ # ensures that they cannot be part of a ref-cycle.
+
+ __slots__ = ()
+ _registry = {}
+ _shutdown = False
+ _index_iter = itertools.count()
+ _dirty = False
+
+ class _Info:
+ __slots__ = ("weakref", "func", "args", "kwargs", "atexit", "index")
+
+ def __init__(self, obj, func, *args, **kwargs):
+ info = self._Info()
+ info.weakref = ref(obj, self)
+ info.func = func
+ info.args = args
+ info.kwargs = kwargs or None
+ info.atexit = True
+ info.index = next(self._index_iter)
+ self._registry[self] = info
+ finalize._dirty = True
+
+ def __call__(self, _=None):
+ """If alive then mark as dead and return func(*args, **kwargs);
+ otherwise return None"""
+ info = self._registry.pop(self, None)
+ if info and not self._shutdown:
+ return info.func(*info.args, **(info.kwargs or {}))
+
+ def detach(self):
+ """If alive then mark as dead and return (obj, func, args, kwargs);
+ otherwise return None"""
+ info = self._registry.get(self)
+ obj = info and info.weakref()
+ if obj is not None and self._registry.pop(self, None):
+ return (obj, info.func, info.args, info.kwargs or {})
+
+ def peek(self):
+ """If alive then return (obj, func, args, kwargs);
+ otherwise return None"""
+ info = self._registry.get(self)
+ obj = info and info.weakref()
+ if obj is not None:
+ return (obj, info.func, info.args, info.kwargs or {})
+
+ @property
+ def alive(self):
+ """Whether finalizer is alive"""
+ return self in self._registry
+
+ @property
+ def atexit(self):
+ """Whether finalizer should be called at exit"""
+ info = self._registry.get(self)
+ return bool(info) and info.atexit
+
+ @atexit.setter
+ def atexit(self, value):
+ info = self._registry.get(self)
+ if info:
+ info.atexit = bool(value)
+
+ def __repr__(self):
+ info = self._registry.get(self)
+ obj = info and info.weakref()
+ if obj is None:
+ return '<%s object at %#x; dead>' % (type(self).__name__, id(self))
+ else:
+ return '<%s object at %#x; for %r at %#x>' % \
+ (type(self).__name__, id(self), type(obj).__name__, id(obj))
+
+ @classmethod
+ def _select_for_exit(cls):
+ # Return live finalizers marked for exit, oldest first
+ L = [(f,i) for (f,i) in cls._registry.items() if i.atexit]
+ L.sort(key=lambda item:item[1].index)
+ return [f for (f,i) in L]
+
+ @classmethod
+ def _exitfunc(cls):
+ # At shutdown invoke finalizers for which atexit is true.
+ # This is called once all other non-daemonic threads have been
+ # joined.
+ reenable_gc = False
+ try:
+ if cls._registry:
+ import gc
+ if gc.isenabled():
+ reenable_gc = True
+ gc.disable()
+ pending = None
+ while True:
+ if pending is None or finalize._dirty:
+ pending = cls._select_for_exit()
+ finalize._dirty = False
+ if not pending:
+ break
+ f = pending.pop()
+ try:
+ # gc is disabled, so (assuming no daemonic
+ # threads) the following is the only line in
+ # this function which might trigger creation
+ # of a new finalizer
+ f()
+ except Exception:
+ sys.excepthook(*sys.exc_info())
+ assert f not in cls._registry
+ finally:
+ # prevent any more finalizers from executing during shutdown
+ finalize._shutdown = True
+ if reenable_gc:
+ gc.enable()
+
+atexit.register(finalize._exitfunc)