diff options
Diffstat (limited to 'Objects/obmalloc.c')
-rw-r--r-- | Objects/obmalloc.c | 133 |
1 files changed, 116 insertions, 17 deletions
diff --git a/Objects/obmalloc.c b/Objects/obmalloc.c index b209808da90..deb7fd957e5 100644 --- a/Objects/obmalloc.c +++ b/Objects/obmalloc.c @@ -124,6 +124,33 @@ _PyMem_mi_page_is_safe_to_free(mi_page_t *page) } +#ifdef Py_GIL_DISABLED + +// If we are deferring collection of more than this amount of memory for +// mimalloc pages, advance the write sequence. Advancing allows these +// pages to be re-used in a different thread or for a different size class. +#define QSBR_PAGE_MEM_LIMIT 4096*20 + +// Return true if the global write sequence should be advanced for a mimalloc +// page that is deferred from collection. +static bool +should_advance_qsbr_for_page(struct _qsbr_thread_state *qsbr, mi_page_t *page) +{ + size_t bsize = mi_page_block_size(page); + size_t page_size = page->capacity*bsize; + if (page_size > QSBR_PAGE_MEM_LIMIT) { + qsbr->deferred_page_memory = 0; + return true; + } + qsbr->deferred_page_memory += page_size; + if (qsbr->deferred_page_memory > QSBR_PAGE_MEM_LIMIT) { + qsbr->deferred_page_memory = 0; + return true; + } + return false; +} +#endif + static bool _PyMem_mi_page_maybe_free(mi_page_t *page, mi_page_queue_t *pq, bool force) { @@ -139,7 +166,14 @@ _PyMem_mi_page_maybe_free(mi_page_t *page, mi_page_queue_t *pq, bool force) _PyMem_mi_page_clear_qsbr(page); page->retire_expire = 0; - page->qsbr_goal = _Py_qsbr_deferred_advance(tstate->qsbr); + + if (should_advance_qsbr_for_page(tstate->qsbr, page)) { + page->qsbr_goal = _Py_qsbr_advance(tstate->qsbr->shared); + } + else { + page->qsbr_goal = _Py_qsbr_shared_next(tstate->qsbr->shared); + } + llist_insert_tail(&tstate->mimalloc.page_list, &page->qsbr_node); return false; } @@ -1141,8 +1175,44 @@ free_work_item(uintptr_t ptr, delayed_dealloc_cb cb, void *state) } } + +#ifdef Py_GIL_DISABLED + +// For deferred advance on free: the number of deferred items before advancing +// the write sequence. This is based on WORK_ITEMS_PER_CHUNK. We ideally +// want to process a chunk before it overflows. +#define QSBR_DEFERRED_LIMIT 127 + +// If the deferred memory exceeds 1 MiB, advance the write sequence. This +// helps limit memory usage due to QSBR delaying frees too long. +#define QSBR_FREE_MEM_LIMIT 1024*1024 + +// Return true if the global write sequence should be advanced for a deferred +// memory free. +static bool +should_advance_qsbr_for_free(struct _qsbr_thread_state *qsbr, size_t size) +{ + if (size > QSBR_FREE_MEM_LIMIT) { + qsbr->deferred_count = 0; + qsbr->deferred_memory = 0; + qsbr->should_process = true; + return true; + } + qsbr->deferred_count++; + qsbr->deferred_memory += size; + if (qsbr->deferred_count > QSBR_DEFERRED_LIMIT || + qsbr->deferred_memory > QSBR_FREE_MEM_LIMIT) { + qsbr->deferred_count = 0; + qsbr->deferred_memory = 0; + qsbr->should_process = true; + return true; + } + return false; +} +#endif + static void -free_delayed(uintptr_t ptr) +free_delayed(uintptr_t ptr, size_t size) { #ifndef Py_GIL_DISABLED free_work_item(ptr, NULL, NULL); @@ -1200,23 +1270,32 @@ free_delayed(uintptr_t ptr) } assert(buf != NULL && buf->wr_idx < WORK_ITEMS_PER_CHUNK); - uint64_t seq = _Py_qsbr_deferred_advance(tstate->qsbr); + uint64_t seq; + if (should_advance_qsbr_for_free(tstate->qsbr, size)) { + seq = _Py_qsbr_advance(tstate->qsbr->shared); + } + else { + seq = _Py_qsbr_shared_next(tstate->qsbr->shared); + } buf->array[buf->wr_idx].ptr = ptr; buf->array[buf->wr_idx].qsbr_goal = seq; buf->wr_idx++; if (buf->wr_idx == WORK_ITEMS_PER_CHUNK) { + // Normally the processing of delayed items is done from the eval + // breaker. Processing here is a safety measure to ensure too much + // work does not accumulate. _PyMem_ProcessDelayed((PyThreadState *)tstate); } #endif } void -_PyMem_FreeDelayed(void *ptr) +_PyMem_FreeDelayed(void *ptr, size_t size) { assert(!((uintptr_t)ptr & 0x01)); if (ptr != NULL) { - free_delayed((uintptr_t)ptr); + free_delayed((uintptr_t)ptr, size); } } @@ -1226,7 +1305,25 @@ _PyObject_XDecRefDelayed(PyObject *ptr) { assert(!((uintptr_t)ptr & 0x01)); if (ptr != NULL) { - free_delayed(((uintptr_t)ptr)|0x01); + // We use 0 as the size since we don't have an easy way to know the + // actual size. If we are freeing many objects, the write sequence + // will be advanced due to QSBR_DEFERRED_LIMIT. + free_delayed(((uintptr_t)ptr)|0x01, 0); + } +} +#endif + +#ifdef Py_GIL_DISABLED +void +_PyObject_XSetRefDelayed(PyObject **ptr, PyObject *value) +{ + PyObject *old = *ptr; + FT_ATOMIC_STORE_PTR_RELEASE(*ptr, value); + if (old == NULL) { + return; + } + if (!_Py_IsImmortal(old)) { + _PyObject_XDecRefDelayed(old); } } #endif @@ -1238,7 +1335,7 @@ work_queue_first(struct llist_node *head) } static void -process_queue(struct llist_node *head, struct _qsbr_thread_state *qsbr, +process_queue(struct llist_node *head, _PyThreadStateImpl *tstate, bool keep_empty, delayed_dealloc_cb cb, void *state) { while (!llist_empty(head)) { @@ -1246,7 +1343,7 @@ process_queue(struct llist_node *head, struct _qsbr_thread_state *qsbr, if (buf->rd_idx < buf->wr_idx) { struct _mem_work_item *item = &buf->array[buf->rd_idx]; - if (!_Py_qsbr_poll(qsbr, item->qsbr_goal)) { + if (!_Py_qsbr_poll(tstate->qsbr, item->qsbr_goal)) { return; } @@ -1270,11 +1367,11 @@ process_queue(struct llist_node *head, struct _qsbr_thread_state *qsbr, static void process_interp_queue(struct _Py_mem_interp_free_queue *queue, - struct _qsbr_thread_state *qsbr, delayed_dealloc_cb cb, + _PyThreadStateImpl *tstate, delayed_dealloc_cb cb, void *state) { assert(PyMutex_IsLocked(&queue->mutex)); - process_queue(&queue->head, qsbr, false, cb, state); + process_queue(&queue->head, tstate, false, cb, state); int more_work = !llist_empty(&queue->head); _Py_atomic_store_int_relaxed(&queue->has_work, more_work); @@ -1282,7 +1379,7 @@ process_interp_queue(struct _Py_mem_interp_free_queue *queue, static void maybe_process_interp_queue(struct _Py_mem_interp_free_queue *queue, - struct _qsbr_thread_state *qsbr, delayed_dealloc_cb cb, + _PyThreadStateImpl *tstate, delayed_dealloc_cb cb, void *state) { if (!_Py_atomic_load_int_relaxed(&queue->has_work)) { @@ -1291,7 +1388,7 @@ maybe_process_interp_queue(struct _Py_mem_interp_free_queue *queue, // Try to acquire the lock, but don't block if it's already held. if (_PyMutex_LockTimed(&queue->mutex, 0, 0) == PY_LOCK_ACQUIRED) { - process_interp_queue(queue, qsbr, cb, state); + process_interp_queue(queue, tstate, cb, state); PyMutex_Unlock(&queue->mutex); } } @@ -1302,11 +1399,13 @@ _PyMem_ProcessDelayed(PyThreadState *tstate) PyInterpreterState *interp = tstate->interp; _PyThreadStateImpl *tstate_impl = (_PyThreadStateImpl *)tstate; + tstate_impl->qsbr->should_process = false; + // Process thread-local work - process_queue(&tstate_impl->mem_free_queue, tstate_impl->qsbr, true, NULL, NULL); + process_queue(&tstate_impl->mem_free_queue, tstate_impl, true, NULL, NULL); // Process shared interpreter work - maybe_process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr, NULL, NULL); + maybe_process_interp_queue(&interp->mem_free_queue, tstate_impl, NULL, NULL); } void @@ -1316,10 +1415,10 @@ _PyMem_ProcessDelayedNoDealloc(PyThreadState *tstate, delayed_dealloc_cb cb, voi _PyThreadStateImpl *tstate_impl = (_PyThreadStateImpl *)tstate; // Process thread-local work - process_queue(&tstate_impl->mem_free_queue, tstate_impl->qsbr, true, cb, state); + process_queue(&tstate_impl->mem_free_queue, tstate_impl, true, cb, state); // Process shared interpreter work - maybe_process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr, cb, state); + maybe_process_interp_queue(&interp->mem_free_queue, tstate_impl, cb, state); } void @@ -1348,7 +1447,7 @@ _PyMem_AbandonDelayed(PyThreadState *tstate) // Process the merged queue now (see gh-130794). _PyThreadStateImpl *this_tstate = (_PyThreadStateImpl *)_PyThreadState_GET(); - process_interp_queue(&interp->mem_free_queue, this_tstate->qsbr, NULL, NULL); + process_interp_queue(&interp->mem_free_queue, this_tstate, NULL, NULL); PyMutex_Unlock(&interp->mem_free_queue.mutex); |