diff options
author | Sam Gross <colesbury@gmail.com> | 2023-09-19 11:54:29 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-19 09:54:29 -0600 |
commit | 0c89056fe59ac42f09978582479d40e58a236856 (patch) | |
tree | 06cd5a790da2a6dd3862567419c25572f96ae373 /Python/parking_lot.c | |
parent | 0a31ff0050eec5079fd4c9cafd33b4e3e9afd9ab (diff) | |
download | cpython-0c89056fe59ac42f09978582479d40e58a236856.tar.gz cpython-0c89056fe59ac42f09978582479d40e58a236856.zip |
gh-108724: Add PyMutex and _PyParkingLot APIs (gh-109344)
PyMutex is a one byte lock with fast, inlineable lock and unlock functions for the common uncontended case. The design is based on WebKit's WTF::Lock.
PyMutex is built using the _PyParkingLot APIs, which provides a cross-platform futex-like API (based on WebKit's WTF::ParkingLot). This internal API will be used for building other synchronization primitives used to implement PEP 703, such as one-time initialization and events.
This also includes tests and a mini benchmark in Tools/lockbench/lockbench.py to compare with the existing PyThread_type_lock.
Uncontended acquisition + release:
* Linux (x86-64): PyMutex: 11 ns, PyThread_type_lock: 44 ns
* macOS (arm64): PyMutex: 13 ns, PyThread_type_lock: 18 ns
* Windows (x86-64): PyMutex: 13 ns, PyThread_type_lock: 38 ns
PR Overview:
The primary purpose of this PR is to implement PyMutex, but there are a number of support pieces (described below).
* PyMutex: A 1-byte lock that doesn't require memory allocation to initialize and is generally faster than the existing PyThread_type_lock. The API is internal only for now.
* _PyParking_Lot: A futex-like API based on the API of the same name in WebKit. Used to implement PyMutex.
* _PyRawMutex: A word sized lock used to implement _PyParking_Lot.
* PyEvent: A one time event. This was used a bunch in the "nogil" fork and is useful for testing the PyMutex implementation, so I've included it as part of the PR.
* pycore_llist.h: Defines common operations on doubly-linked list. Not strictly necessary (could do the list operations manually), but they come up frequently in the "nogil" fork. ( Similar to https://man.freebsd.org/cgi/man.cgi?queue)
---------
Co-authored-by: Eric Snow <ericsnowcurrently@gmail.com>
Diffstat (limited to 'Python/parking_lot.c')
-rw-r--r-- | Python/parking_lot.c | 370 |
1 files changed, 370 insertions, 0 deletions
diff --git a/Python/parking_lot.c b/Python/parking_lot.c new file mode 100644 index 00000000000..664e622cc17 --- /dev/null +++ b/Python/parking_lot.c @@ -0,0 +1,370 @@ +#include "Python.h" + +#include "pycore_llist.h" +#include "pycore_lock.h" // _PyRawMutex +#include "pycore_parking_lot.h" +#include "pycore_pyerrors.h" // _Py_FatalErrorFormat +#include "pycore_pystate.h" // _PyThreadState_GET +#include "pycore_semaphore.h" // _PySemaphore + +#include <stdbool.h> + + +typedef struct { + // The mutex protects the waiter queue and the num_waiters counter. + _PyRawMutex mutex; + + // Linked list of `struct wait_entry` waiters in this bucket. + struct llist_node root; + size_t num_waiters; +} Bucket; + +struct wait_entry { + void *park_arg; + uintptr_t addr; + _PySemaphore sema; + struct llist_node node; + bool is_unparking; +}; + +// Prime number to avoid correlations with memory addresses. +// We want this to be roughly proportional to the number of CPU cores +// to minimize contention on the bucket locks, but not too big to avoid +// wasting memory. The exact choice does not matter much. +#define NUM_BUCKETS 257 + +#define BUCKET_INIT(b, i) [i] = { .root = LLIST_INIT(b[i].root) } +#define BUCKET_INIT_2(b, i) BUCKET_INIT(b, i), BUCKET_INIT(b, i+1) +#define BUCKET_INIT_4(b, i) BUCKET_INIT_2(b, i), BUCKET_INIT_2(b, i+2) +#define BUCKET_INIT_8(b, i) BUCKET_INIT_4(b, i), BUCKET_INIT_4(b, i+4) +#define BUCKET_INIT_16(b, i) BUCKET_INIT_8(b, i), BUCKET_INIT_8(b, i+8) +#define BUCKET_INIT_32(b, i) BUCKET_INIT_16(b, i), BUCKET_INIT_16(b, i+16) +#define BUCKET_INIT_64(b, i) BUCKET_INIT_32(b, i), BUCKET_INIT_32(b, i+32) +#define BUCKET_INIT_128(b, i) BUCKET_INIT_64(b, i), BUCKET_INIT_64(b, i+64) +#define BUCKET_INIT_256(b, i) BUCKET_INIT_128(b, i), BUCKET_INIT_128(b, i+128) + +// Table of waiters (hashed by address) +static Bucket buckets[NUM_BUCKETS] = { + BUCKET_INIT_256(buckets, 0), + BUCKET_INIT(buckets, 256), +}; + +void +_PySemaphore_Init(_PySemaphore *sema) +{ +#if defined(MS_WINDOWS) + sema->platform_sem = CreateSemaphore( + NULL, // attributes + 0, // initial count + 10, // maximum count + NULL // unnamed + ); + if (!sema->platform_sem) { + Py_FatalError("parking_lot: CreateSemaphore failed"); + } +#elif defined(_Py_USE_SEMAPHORES) + if (sem_init(&sema->platform_sem, /*pshared=*/0, /*value=*/0) < 0) { + Py_FatalError("parking_lot: sem_init failed"); + } +#else + if (pthread_mutex_init(&sema->mutex, NULL) != 0) { + Py_FatalError("parking_lot: pthread_mutex_init failed"); + } + if (pthread_cond_init(&sema->cond, NULL)) { + Py_FatalError("parking_lot: pthread_cond_init failed"); + } + sema->counter = 0; +#endif +} + +void +_PySemaphore_Destroy(_PySemaphore *sema) +{ +#if defined(MS_WINDOWS) + CloseHandle(sema->platform_sem); +#elif defined(_Py_USE_SEMAPHORES) + sem_destroy(&sema->platform_sem); +#else + pthread_mutex_destroy(&sema->mutex); + pthread_cond_destroy(&sema->cond); +#endif +} + +static int +_PySemaphore_PlatformWait(_PySemaphore *sema, _PyTime_t timeout) +{ + int res; +#if defined(MS_WINDOWS) + DWORD wait; + DWORD millis = 0; + if (timeout < 0) { + millis = INFINITE; + } + else { + millis = (DWORD) (timeout / 1000000); + } + wait = WaitForSingleObjectEx(sema->platform_sem, millis, FALSE); + if (wait == WAIT_OBJECT_0) { + res = Py_PARK_OK; + } + else if (wait == WAIT_TIMEOUT) { + res = Py_PARK_TIMEOUT; + } + else { + res = Py_PARK_INTR; + } +#elif defined(_Py_USE_SEMAPHORES) + int err; + if (timeout >= 0) { + struct timespec ts; + + _PyTime_t deadline = _PyTime_Add(_PyTime_GetSystemClock(), timeout); + _PyTime_AsTimespec(deadline, &ts); + + err = sem_timedwait(&sema->platform_sem, &ts); + } + else { + err = sem_wait(&sema->platform_sem); + } + if (err == -1) { + err = errno; + if (err == EINTR) { + res = Py_PARK_INTR; + } + else if (err == ETIMEDOUT) { + res = Py_PARK_TIMEOUT; + } + else { + _Py_FatalErrorFormat(__func__, + "unexpected error from semaphore: %d", + err); + } + } + else { + res = Py_PARK_OK; + } +#else + pthread_mutex_lock(&sema->mutex); + int err = 0; + if (sema->counter == 0) { + if (timeout >= 0) { + struct timespec ts; + + _PyTime_t deadline = _PyTime_Add(_PyTime_GetSystemClock(), timeout); + _PyTime_AsTimespec(deadline, &ts); + + err = pthread_cond_timedwait(&sema->cond, &sema->mutex, &ts); + } + else { + err = pthread_cond_wait(&sema->cond, &sema->mutex); + } + } + if (sema->counter > 0) { + sema->counter--; + res = Py_PARK_OK; + } + else if (err) { + res = Py_PARK_TIMEOUT; + } + else { + res = Py_PARK_INTR; + } + pthread_mutex_unlock(&sema->mutex); +#endif + return res; +} + +int +_PySemaphore_Wait(_PySemaphore *sema, _PyTime_t timeout, int detach) +{ + PyThreadState *tstate = NULL; + if (detach) { + tstate = _PyThreadState_GET(); + if (tstate) { + PyEval_ReleaseThread(tstate); + } + } + + int res = _PySemaphore_PlatformWait(sema, timeout); + + if (detach && tstate) { + PyEval_AcquireThread(tstate); + } + return res; +} + +void +_PySemaphore_Wakeup(_PySemaphore *sema) +{ +#if defined(MS_WINDOWS) + if (!ReleaseSemaphore(sema->platform_sem, 1, NULL)) { + Py_FatalError("parking_lot: ReleaseSemaphore failed"); + } +#elif defined(_Py_USE_SEMAPHORES) + int err = sem_post(&sema->platform_sem); + if (err != 0) { + Py_FatalError("parking_lot: sem_post failed"); + } +#else + pthread_mutex_lock(&sema->mutex); + sema->counter++; + pthread_cond_signal(&sema->cond); + pthread_mutex_unlock(&sema->mutex); +#endif +} + +static void +enqueue(Bucket *bucket, const void *address, struct wait_entry *wait) +{ + llist_insert_tail(&bucket->root, &wait->node); + ++bucket->num_waiters; +} + +static struct wait_entry * +dequeue(Bucket *bucket, const void *address) +{ + // find the first waiter that is waiting on `address` + struct llist_node *root = &bucket->root; + struct llist_node *node; + llist_for_each(node, root) { + struct wait_entry *wait = llist_data(node, struct wait_entry, node); + if (wait->addr == (uintptr_t)address) { + llist_remove(node); + --bucket->num_waiters; + return wait; + } + } + return NULL; +} + +static void +dequeue_all(Bucket *bucket, const void *address, struct llist_node *dst) +{ + // remove and append all matching waiters to dst + struct llist_node *root = &bucket->root; + struct llist_node *node; + llist_for_each_safe(node, root) { + struct wait_entry *wait = llist_data(node, struct wait_entry, node); + if (wait->addr == (uintptr_t)address) { + llist_remove(node); + llist_insert_tail(dst, node); + --bucket->num_waiters; + } + } +} + +// Checks that `*addr == *expected` (only works for 1, 2, 4, or 8 bytes) +static int +atomic_memcmp(const void *addr, const void *expected, size_t addr_size) +{ + switch (addr_size) { + case 1: return _Py_atomic_load_uint8(addr) == *(const uint8_t *)expected; + case 2: return _Py_atomic_load_uint16(addr) == *(const uint16_t *)expected; + case 4: return _Py_atomic_load_uint32(addr) == *(const uint32_t *)expected; + case 8: return _Py_atomic_load_uint64(addr) == *(const uint64_t *)expected; + default: Py_UNREACHABLE(); + } +} + +int +_PyParkingLot_Park(const void *addr, const void *expected, size_t size, + _PyTime_t timeout_ns, void *park_arg, int detach) +{ + struct wait_entry wait = { + .park_arg = park_arg, + .addr = (uintptr_t)addr, + .is_unparking = false, + }; + + Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS]; + + _PyRawMutex_Lock(&bucket->mutex); + if (!atomic_memcmp(addr, expected, size)) { + _PyRawMutex_Unlock(&bucket->mutex); + return Py_PARK_AGAIN; + } + _PySemaphore_Init(&wait.sema); + enqueue(bucket, addr, &wait); + _PyRawMutex_Unlock(&bucket->mutex); + + int res = _PySemaphore_Wait(&wait.sema, timeout_ns, detach); + if (res == Py_PARK_OK) { + goto done; + } + + // timeout or interrupt + _PyRawMutex_Lock(&bucket->mutex); + if (wait.is_unparking) { + _PyRawMutex_Unlock(&bucket->mutex); + // Another thread has started to unpark us. Wait until we process the + // wakeup signal. + do { + res = _PySemaphore_Wait(&wait.sema, -1, detach); + } while (res != Py_PARK_OK); + goto done; + } + else { + llist_remove(&wait.node); + --bucket->num_waiters; + } + _PyRawMutex_Unlock(&bucket->mutex); + +done: + _PySemaphore_Destroy(&wait.sema); + return res; + +} + +void +_PyParkingLot_Unpark(const void *addr, _Py_unpark_fn_t *fn, void *arg) +{ + Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS]; + + // Find the first waiter that is waiting on `addr` + _PyRawMutex_Lock(&bucket->mutex); + struct wait_entry *waiter = dequeue(bucket, addr); + if (waiter) { + waiter->is_unparking = true; + + int has_more_waiters = (bucket->num_waiters > 0); + fn(arg, waiter->park_arg, has_more_waiters); + } + else { + fn(arg, NULL, 0); + } + _PyRawMutex_Unlock(&bucket->mutex); + + if (waiter) { + // Wakeup the waiter outside of the bucket lock + _PySemaphore_Wakeup(&waiter->sema); + } +} + +void +_PyParkingLot_UnparkAll(const void *addr) +{ + struct llist_node head = LLIST_INIT(head); + Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS]; + + _PyRawMutex_Lock(&bucket->mutex); + dequeue_all(bucket, addr, &head); + _PyRawMutex_Unlock(&bucket->mutex); + + struct llist_node *node; + llist_for_each_safe(node, &head) { + struct wait_entry *waiter = llist_data(node, struct wait_entry, node); + llist_remove(node); + _PySemaphore_Wakeup(&waiter->sema); + } +} + +void +_PyParkingLot_AfterFork(void) +{ + // After a fork only one thread remains. That thread cannot be blocked + // so all entries in the parking lot are for dead threads. + memset(buckets, 0, sizeof(buckets)); + for (Py_ssize_t i = 0; i < NUM_BUCKETS; i++) { + llist_init(&buckets[i].root); + } +} |