From b3782c87b0ba9750f032b3be0f268d1817aa53eb Mon Sep 17 00:00:00 2001 From: vbcssc Date: Sat, 12 Oct 2024 16:53:18 +0800 Subject: [PATCH] adding threading module --- examples/threading/lock_rlock.py | 83 ++++ package/threading/PikaPlatformEx.c | 458 ++++++++++++++++++ package/threading/PikaPlatformEx.h | 53 ++ package/threading/pika_hal_ex.c | 180 +++++++ package/threading/pika_hal_ex.h | 44 ++ package/threading/threading.c | 5 + package/threading/threading.pyi | 52 ++ package/threading/threading_Condition.c | 45 ++ package/threading/threading_Lock.c | 49 ++ package/threading/threading_RLock.c | 34 ++ port/linux/package/pikascript/module_list.txt | 3 +- .../pikascript-lib/threading/PikaPlatformEx.c | 458 ++++++++++++++++++ .../pikascript-lib/threading/PikaPlatformEx.h | 53 ++ .../pikascript-lib/threading/pika_hal_ex.c | 180 +++++++ .../pikascript-lib/threading/pika_hal_ex.h | 44 ++ .../pikascript-lib/threading/threading.c | 5 + .../threading/threading_Condition.c | 45 ++ .../pikascript-lib/threading/threading_Lock.c | 49 ++ .../threading/threading_RLock.c | 34 ++ port/linux/package/pikascript/threading.pyi | 52 ++ port/linux/test/module-test.cpp | 2 + .../linux/test/python/threading/lock_rlock.py | 83 ++++ 22 files changed, 2010 insertions(+), 1 deletion(-) create mode 100644 examples/threading/lock_rlock.py create mode 100644 package/threading/PikaPlatformEx.c create mode 100644 package/threading/PikaPlatformEx.h create mode 100644 package/threading/pika_hal_ex.c create mode 100644 package/threading/pika_hal_ex.h create mode 100644 package/threading/threading.c create mode 100644 package/threading/threading.pyi create mode 100644 package/threading/threading_Condition.c create mode 100644 package/threading/threading_Lock.c create mode 100644 package/threading/threading_RLock.c create mode 100644 port/linux/package/pikascript/pikascript-lib/threading/PikaPlatformEx.c create mode 100644 port/linux/package/pikascript/pikascript-lib/threading/PikaPlatformEx.h create mode 100644 port/linux/package/pikascript/pikascript-lib/threading/pika_hal_ex.c create mode 100644 port/linux/package/pikascript/pikascript-lib/threading/pika_hal_ex.h create mode 100644 port/linux/package/pikascript/pikascript-lib/threading/threading.c create mode 100644 port/linux/package/pikascript/pikascript-lib/threading/threading_Condition.c create mode 100644 port/linux/package/pikascript/pikascript-lib/threading/threading_Lock.c create mode 100644 port/linux/package/pikascript/pikascript-lib/threading/threading_RLock.c create mode 100644 port/linux/package/pikascript/threading.pyi create mode 100644 port/linux/test/python/threading/lock_rlock.py diff --git a/examples/threading/lock_rlock.py b/examples/threading/lock_rlock.py new file mode 100644 index 000000000..6e1dca99e --- /dev/null +++ b/examples/threading/lock_rlock.py @@ -0,0 +1,83 @@ +import _thread +import time +import threading +import PikaStdLib +# 共享资源 +shared_resource = 0 + +# 互斥锁 +mutex = threading.Lock() + +# 线程函数 + +finished = 0 + + +def thread_function(name, delay): + global shared_resource + global mutex, finished + print("delay : %s" % str(delay)) + k = 0 + i = 0 + mem = PikaStdLib.MemChecker() + for i in range(5): + # while 1: + + try: + # 获取互斥锁 + print("%s try to acquire lock. #1" % name) + res = mutex.acquire(True, None) + print("res: %s" % str(res)) + if 1: # 测试RLock或者Lock的超时加上 + print("%s try to acquire lock. #2" % name) + res = mutex.acquire(True, 0.5) + print("res: %s" % str(res)) + if res: + print("%s acquire lock SUCC." % name) + else: + print("%s acquire lock FAIL." % name) + # 打印当前线程名称和共享资源的值 + print("Thread %s: Iteration %d, Shared Resource: %d" % + (name, i, shared_resource)) + + # 更新共享资源 + shared_resource += 1 + + # 模拟工作时间 + + time.sleep(delay) + print("wake") + + # 释放互斥锁 + mutex.release() + mutex.release() + k += 1 + + print("%s i = %d." % (name, i)) + # print('mem used now:') + # mem.now() + + except: + print("------------- error ---------------") + + print("%s exit , at last, i = %d." % (name, k)) + finished += 1 + +# 主函数 + + +def main(): + # 创建第一个线程 + _thread.start_new_thread(thread_function, ("Thread-1", 0.1)) + time.sleep(0.5) + # 创建第二个线程 + _thread.start_new_thread(thread_function, ("Thread-2", 0.2)) + + # 主线程等待子线程结束 + # 由于 _thread 没有 join 方法,我们通过 sleep 来模拟等待 + # time.sleep(60) + while finished < 2: + time.sleep(1) + + +main() diff --git a/package/threading/PikaPlatformEx.c b/package/threading/PikaPlatformEx.c new file mode 100644 index 000000000..71cb717d6 --- /dev/null +++ b/package/threading/PikaPlatformEx.c @@ -0,0 +1,458 @@ +#include "PikaPlatformEx.h" + +//----------------------------- mutex ------------------------------- +// 带超时的互斥锁加锁 +int pika_platform_thread_mutex_timedlock(pika_platform_thread_mutex_t* m, + pika_bool block, + Arg* timeout) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + ArgType timout_type = arg_getType(timeout); + pika_float timeout_f; + int result; + if (!(timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT || + timout_type == ARG_TYPE_NONE)) { + return PIKA_RES_ERR_INVALID_PARAM; + } + + if (timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT) { + // printf("==== #01\n"); + if (timout_type == ARG_TYPE_FLOAT) { + timeout_f = arg_getFloat(timeout); + } + if (timout_type == ARG_TYPE_INT) { + int timeout_d = arg_getInt(timeout); + timeout_f = (pika_float)timeout_d; + // printf("==== #04 %lf\n", timeout_f); + } + if (timeout_f < 0.0f) { + return PIKA_RES_ERR_INVALID_PARAM; + } + + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); // 获取当前时间 + + // 将浮点数秒转换为秒和纳秒 + long sec = (long)timeout_f; + long nsec = (long)((timeout_f - (pika_float)sec) * 1000000000.0); + + ts.tv_sec += sec; + ts.tv_nsec += nsec; + + // 如果纳秒数超过 1 秒,则需要调整秒数和纳秒数 + if (ts.tv_nsec >= 1000000000) { + ts.tv_nsec -= 1000000000; // 减去 1 秒的纳秒数 + ts.tv_sec += 1; // 增加 1 秒 + } + + pika_GIL_EXIT(); + result = pthread_mutex_timedlock(&m->mutex, &ts); + pika_GIL_ENTER(); + return result == 0 ? 0 : -1; + + } else if (timout_type == ARG_TYPE_NONE) { + if (block) { + // printf("==== #02\n"); + pika_GIL_EXIT(); + result = pthread_mutex_lock(&m->mutex); + pika_GIL_ENTER(); + return result == 0 ? 0 : -1; + } else { + // printf("==== #03\n"); + pika_GIL_EXIT(); + result = pthread_mutex_trylock(&m->mutex); + pika_GIL_ENTER(); + return result == 0 ? 0 : -1; + } + } else { + return PIKA_RES_ERR_INVALID_PARAM; + } + +#elif PIKA_FREERTOS_ENABLE + if (pdTRUE == xSemaphoreTake(m->mutex, (TickType_t)(timeout * 1000.0f))) { + return 0; + } + return -1; + +#elif PIKA_RTTHREAD_ENABLE + return rt_mutex_take((m->mutex), (rt_tick_t)(timeout * RT_TICK_PER_SECOND)); + +#elif PIKA_ZEUSOS_ENABLE + return zos_mutex_lock(m->mutex, (uint32_t)(timeout * 1000.0f)); + +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); + return -1; +#endif +} + +//----------------------------- rtmutex ------------------------------- +// 初始化递归互斥锁 +void pika_platform_thread_rtmutex_init(pika_platform_thread_rtmutex_t* rtm) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + pthread_mutexattr_t attr; + if (pthread_mutexattr_init(&attr) != 0) { + perror("pthread_mutexattr_init"); + exit(EXIT_FAILURE); + } + // 设置互斥锁类型为递归互斥锁 + if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) != 0) { + perror("pthread_mutexattr_settype"); + pthread_mutexattr_destroy(&attr); + exit(EXIT_FAILURE); + } + + pthread_mutex_init(&rtm->mutex, &attr); + pthread_cond_init(&rtm->cond, NULL); + rtm->owner = (pthread_t)0; + rtm->count = 0; +#elif PIKA_FREERTOS_ENABLE + +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +// 销毁递归互斥锁 +void pika_platform_thread_rtmutex_destroy(pika_platform_thread_rtmutex_t* rtm) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + pthread_cond_destroy(&rtm->cond); + pthread_mutex_destroy(&rtm->mutex); +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +// 带超时的递归互斥锁加锁 +int pika_platform_thread_rtmutex_lock(pika_platform_thread_rtmutex_t* rtm, + pika_bool block, + Arg* timeout) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + ArgType timout_type = arg_getType(timeout); + pika_float timeout_f; + int result; + + if (!(timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT || + timout_type == ARG_TYPE_NONE)) { + return PIKA_RES_ERR_INVALID_PARAM; + } + + pika_GIL_EXIT(); + pthread_mutex_lock(&rtm->mutex); + pika_GIL_ENTER(); + + if (rtm->owner == pthread_self()) { + // 如果当前线程已经持有锁,则递归深度加1 + rtm->count++; + // printf("rtm->count = %d\n", rtm->count); + pthread_mutex_unlock(&rtm->mutex); + // printf("succ\n"); + return 0; + } + + if (timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT) { + // printf("==== #01\n"); + if (timout_type == ARG_TYPE_FLOAT) { + timeout_f = arg_getFloat(timeout); + } + if (timout_type == ARG_TYPE_INT) { + int timeout_d = arg_getInt(timeout); + timeout_f = (pika_float)timeout_d; + // printf("==== #04 %lf\n", timeout_f); + } + if (timeout_f < 0.0f) { + return PIKA_RES_ERR_INVALID_PARAM; + } + + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); // 获取当前时间 + + // 将浮点数秒转换为秒和纳秒 + long sec = (long)timeout_f; + long nsec = (long)((timeout_f - (pika_float)sec) * 1000000000.0); + + ts.tv_sec += sec; + ts.tv_nsec += nsec; + + // 如果纳秒数超过 1 秒,则需要调整秒数和纳秒数 + if (ts.tv_nsec >= 1000000000) { + ts.tv_nsec -= 1000000000; // 减去 1 秒的纳秒数 + ts.tv_sec += 1; // 增加 1 秒 + } + + // 等待直到获得锁或超时 + while (rtm->owner != (pthread_t)0) { + pika_GIL_EXIT(); + result = pthread_cond_timedwait(&rtm->cond, &rtm->mutex, &ts); + pika_GIL_ENTER(); + + if (result != 0) { + pthread_mutex_unlock(&rtm->mutex); + return -1; + } + } + // 设置当前线程为锁的持有者 + rtm->owner = pthread_self(); + rtm->count = 1; + pthread_mutex_unlock(&rtm->mutex); + return 0; + + } else if (timout_type == ARG_TYPE_NONE) { + if (block) { + // 永久等待 + while (rtm->owner != (pthread_t)0) { + pika_GIL_EXIT(); + result = pthread_cond_wait(&rtm->cond, &rtm->mutex); + pika_GIL_ENTER(); + + if (result != 0) { + pthread_mutex_unlock(&rtm->mutex); + return -1; + } + } + // 设置当前线程为锁的持有者 + rtm->owner = pthread_self(); + rtm->count = 1; + pthread_mutex_unlock(&rtm->mutex); + return 0; + + } else { + // 非阻塞模式 + if (rtm->owner == (pthread_t)0) { + // 如果没有其他线程持有锁,获取锁 + rtm->owner = pthread_self(); + rtm->count = 1; + pthread_mutex_unlock(&rtm->mutex); + return 0; + } else { + // 如果已经有其他线程持有锁,立即返回 -1 + pthread_mutex_unlock(&rtm->mutex); + return -1; + } + } + } else { + return PIKA_RES_ERR_INVALID_PARAM; + } + +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +#if 1 +// 释放递归互斥锁 +int pika_platform_thread_rtmutex_unlock(pika_platform_thread_rtmutex_t* rtm) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + pthread_t self = pthread_self(); + + pika_GIL_EXIT(); + pthread_mutex_lock(&rtm->mutex); + pika_GIL_ENTER(); + + // printf("rtm->owner = %lu\n", rtm->owner); + if (rtm->owner != self) { + perror("Attempt to unlock a mutex not owned by the current thread"); + pthread_mutex_unlock(&rtm->mutex); + return -1; + } + + rtm->count--; + if (rtm->count == 0) { + rtm->owner = (pthread_t)0; + pthread_cond_signal(&rtm->cond); + // printf("rtm->owner = %lu\n", rtm->owner); + } + + pthread_mutex_unlock(&rtm->mutex); + return 0; +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} +#endif + +// 检查递归互斥锁是否已被当前线程获取 +int pika_platform_thread_rtmutex_locked(pika_platform_thread_rtmutex_t* rtm) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + pthread_t self = pthread_self(); + pika_GIL_EXIT(); + pthread_mutex_lock(&rtm->mutex); + pika_GIL_ENTER(); + int is_locked = (rtm->owner == self && rtm->count > 0); + + pthread_mutex_unlock(&rtm->mutex); + + return is_locked; +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +//----------------------------- cond ------------------------------- + +void pika_platform_thread_cond_init(pika_platform_thread_cond_t* cond) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + pika_platform_thread_rtmutex_init(&cond->rtmutex); + pthread_cond_init(&cond->cond, NULL); + cond->owner = (pthread_t)0; + +#elif PIKA_FREERTOS_ENABLE + +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +void pika_platform_thread_cond_destroy(pika_platform_thread_cond_t* cond) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + pthread_cond_destroy(&cond->cond); + pika_platform_thread_rtmutex_destroy(&cond->rtmutex); + cond->owner = (pthread_t)0; // 释放资源后重置 owner +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +// 检查当前线程是否持有互斥锁 +static int is_mutex_owned(pika_platform_thread_cond_t* cond) { + // pthread_t current_thread = pthread_self(); + + // 使用 pthread_mutex_trylock 来检查是否已经持有锁 + if (pthread_mutex_trylock(&cond->rtmutex.mutex) == EBUSY) { + // 如果锁已经被持有,尝试解锁并检查是否是当前线程持有的 + if (pthread_mutex_unlock(&cond->rtmutex.mutex) == 0) { + return 1; // 当前线程持有锁 + } + } + + return 0; +} + +// 带阻塞和超时功能的条件变量等待 +int pika_platform_thread_cond_timedwait(pika_platform_thread_cond_t* cond, + Arg* timeout) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + ArgType timout_type = arg_getType(timeout); + pika_float timeout_f; + int result; + + // 检查是否已经获得了互斥锁 + if (!is_mutex_owned(cond)) { + return -1; + } + + if (!(timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT || + timout_type == ARG_TYPE_NONE)) { + return PIKA_RES_ERR_INVALID_PARAM; + } + + if (timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT) { + if (timout_type == ARG_TYPE_FLOAT) { + timeout_f = arg_getFloat(timeout); + } else { + int timeout_d = arg_getInt(timeout); + timeout_f = (pika_float)timeout_d; + } + + if (timeout_f < 0.0f) { + return PIKA_RES_ERR_INVALID_PARAM; + } + + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); // 获取当前时间 + + // 将浮点数秒转换为秒和纳秒 + long sec = (long)timeout_f; + long nsec = (long)((timeout_f - (pika_float)sec) * 1000000000.0); + + ts.tv_sec += sec; + ts.tv_nsec += nsec; + + // 如果纳秒数超过 1 秒,则需要调整秒数和纳秒数 + if (ts.tv_nsec >= 1000000000) { + ts.tv_nsec -= 1000000000; // 减去 1 秒的纳秒数 + ts.tv_sec += 1; // 增加 1 秒 + } + + // 等待直到被通知或超时 + pika_GIL_EXIT(); + result = pthread_cond_timedwait(&cond->cond, &cond->rtmutex.mutex, &ts); + pika_GIL_ENTER(); + + if (result != 0) { + if (result == ETIMEDOUT) { + return -1; // 超时 + } + perror("pthread_cond_timedwait"); + return -1; // 其他错误 + } + } else if (timout_type == ARG_TYPE_NONE) { + // 永久等待 + pika_GIL_EXIT(); + result = pthread_cond_wait(&cond->cond, &cond->rtmutex.mutex); + pika_GIL_ENTER(); + + if (result != 0) { + perror("pthread_cond_wait"); + return -1; + } + } else { + return PIKA_RES_ERR_INVALID_PARAM; + } + + return 0; + +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} +// 信号量通知 +int pika_platform_thread_cond_signal(pika_platform_thread_cond_t* cond) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + int result; + result = pthread_cond_signal(&cond->cond); + cond->owner = (pthread_t)0; // 通知后重置 owner + return result == 0 ? 0 : -1; +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +// 信号量广播 +int pika_platform_thread_cond_broadcast(pika_platform_thread_cond_t* cond) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + int result; + result = pthread_cond_broadcast(&cond->cond); + cond->owner = (pthread_t)0; // 广播后重置 owner + return result == 0 ? 0 : -1; +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} diff --git a/package/threading/PikaPlatformEx.h b/package/threading/PikaPlatformEx.h new file mode 100644 index 000000000..add5a126a --- /dev/null +++ b/package/threading/PikaPlatformEx.h @@ -0,0 +1,53 @@ +#ifndef ___PikaPlatformEx__H +#define ___PikaPlatformEx__H +#include "PikaPlatform.h" +#include "PikaObj.h" +#include +#include +#include +int pika_platform_thread_mutex_timedlock(pika_platform_thread_mutex_t* m, + pika_bool block, + Arg* timeout); + +//------------------------------- + +// 递归带超时互斥锁结构体 +typedef struct { + pthread_mutex_t mutex; + pthread_cond_t cond; + pthread_t owner; // 当前持有锁的线程 ID + int count; // 递归深度 +} pika_platform_thread_rtmutex_t; + +// 初始化递归互斥锁 +void pika_platform_thread_rtmutex_init(pika_platform_thread_rtmutex_t* rtm); + +// 销毁递归互斥锁 +void pika_platform_thread_rtmutex_destroy(pika_platform_thread_rtmutex_t* rtm); + +// 带超时的递归互斥锁加锁 +int pika_platform_thread_rtmutex_lock(pika_platform_thread_rtmutex_t* rtm, + pika_bool block, + Arg* timeout); + +// 递归互斥锁解锁 +int pika_platform_thread_rtmutex_unlock(pika_platform_thread_rtmutex_t* rtm); + +// 是否已获得锁 +int pika_platform_thread_rtmutex_locked(pika_platform_thread_rtmutex_t* rtm); + +//------------------------------------- + +typedef struct { + pika_platform_thread_rtmutex_t rtmutex; + pthread_cond_t cond; + pthread_t owner; // 当前持有锁的线程 ID +} pika_platform_thread_cond_t; + +void pika_platform_thread_cond_init(pika_platform_thread_cond_t* cond); +void pika_platform_thread_cond_destroy(pika_platform_thread_cond_t* cond); +int pika_platform_thread_cond_signal(pika_platform_thread_cond_t* cond); +int pika_platform_thread_cond_broadcast(pika_platform_thread_cond_t* cond); +int pika_platform_thread_cond_timedwait(pika_platform_thread_cond_t* cond, + Arg* timeout); +#endif diff --git a/package/threading/pika_hal_ex.c b/package/threading/pika_hal_ex.c new file mode 100644 index 000000000..b9d6acb3e --- /dev/null +++ b/package/threading/pika_hal_ex.c @@ -0,0 +1,180 @@ + +#include "pika_hal_ex.h" + +pika_hal_CircularPtrQueue* pika_hal_circularPtrQueue_create(size_t capacity) { + pika_hal_CircularPtrQueue* cb = (pika_hal_CircularPtrQueue*)pikaMalloc( + sizeof(pika_hal_CircularPtrQueue)); + if (NULL == cb) { + return NULL; + } + cb->head = 0; + cb->tail = 0; + cb->count = 0; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_init(&cb->mutex); +#endif + cb->capacity = capacity; + cb->buffer = (void**)pikaMalloc(capacity * + sizeof(void*)); // 分配足够的空间来存储指针 + if (NULL == cb->buffer) { + pikaFree(cb, sizeof(pika_hal_CircularPtrQueue)); + return NULL; + } + return cb; +} + +int pika_hal_circularPtrQueue_enqueue(pika_hal_CircularPtrQueue* cb, + void* data) { + int ret = 0; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + if (cb->count == cb->capacity) { + ret = -1; + goto __exit; + } + + cb->buffer[cb->tail] = data; + cb->tail = (cb->tail + 1) % cb->capacity; + cb->count++; +__exit: +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + return ret; +} + +int pika_hal_circularPtrQueue_dequeue(pika_hal_CircularPtrQueue* cb, + void** value) { + int ret = 0; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + if (cb->count == 0) { + ret = -1; + goto __exit; + } + + *value = cb->buffer[cb->head]; + cb->head = (cb->head + 1) % cb->capacity; + cb->count--; +__exit: +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + return ret; +} + +int pika_hal_circularPtrQueue_deinit(pika_hal_CircularPtrQueue* cb) { +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + pikaFree(cb->buffer, cb->capacity * sizeof(void*)); // 释放指针数组 + cb->buffer = NULL; + cb->head = 0; + cb->tail = 0; + cb->count = 0; + cb->capacity = 0; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_destroy(&cb->mutex); +#endif + pikaFree(cb, sizeof(pika_hal_CircularPtrQueue)); + return 0; +} + +size_t pika_hal_circularPtrQueue_getCount(pika_hal_CircularPtrQueue* cb) { +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + size_t count = cb->count; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + return count; +} + +int pika_hal_circularPtrQueue_isEmpty(pika_hal_CircularPtrQueue* cb) { + return cb->count == 0; +} + +int pika_hal_circularPtrQueue_isFull(pika_hal_CircularPtrQueue* cb) { + return cb->count == cb->capacity; +} + +int pika_hal_circularPtrQueue_peek(pika_hal_CircularPtrQueue* cb, + void** value) { + if (cb->count == 0) { + return -1; + } + +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + *value = cb->buffer[cb->head]; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + + return 0; +} + +// 以下待测试 +int pika_hal_circularPtrQueue_enqueueHead(pika_hal_CircularPtrQueue* cb, + void* data) { + int ret = 0; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + if (cb->count == cb->capacity) { + ret = -1; // 队列已满 + goto __exit; + } + + // 更新 head 指针前的位置,然后更新 head + cb->head = (cb->head - 1 + cb->capacity) % cb->capacity; + cb->buffer[cb->head] = data; + cb->count++; +__exit: +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + return ret; +} + +int pika_hal_circularPtrQueue_dequeueTail(pika_hal_CircularPtrQueue* cb, + void** value) { + int ret = 0; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + if (cb->count == 0) { + ret = -1; // 队列为空 + goto __exit; + } + + *value = cb->buffer[cb->tail - 1]; + cb->tail = (cb->tail - 1 + cb->capacity) % cb->capacity; + cb->count--; +__exit: +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + return ret; +} + +int pika_hal_circularPtrQueue_peekTail(pika_hal_CircularPtrQueue* cb, + void** value) { + if (cb->count == 0) { + return -1; // 队列为空 + } + +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + *value = cb->buffer[(cb->tail - 1 + cb->capacity) % cb->capacity]; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + + return 0; +} diff --git a/package/threading/pika_hal_ex.h b/package/threading/pika_hal_ex.h new file mode 100644 index 000000000..8b0612240 --- /dev/null +++ b/package/threading/pika_hal_ex.h @@ -0,0 +1,44 @@ +#ifndef PIKA_HAL_CIRCULAR_PTR_QUEUE_H +#define PIKA_HAL_CIRCULAR_PTR_QUEUE_H +#include "pika_hal.h" +#include "PikaObj.h" + +// 定义循环指针队列的结构体 +typedef struct pika_hal_CircularPtrQueue { + void** buffer; // 存储元素的缓冲区 + size_t head; // 队头指针 + size_t tail; // 队尾指针 + size_t count; // 当前元素数量 + size_t capacity; // 缓冲区容量 +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_t mutex; // 互斥锁 +#endif +} pika_hal_CircularPtrQueue; + +// 创建一个新的循环指针队列 +pika_hal_CircularPtrQueue* pika_hal_circularPtrQueue_create(size_t capacity); + +// 向队列中添加一个元素 +int pika_hal_circularPtrQueue_enqueue(pika_hal_CircularPtrQueue* cb, + void* data); + +// 从队列中移除一个元素 +int pika_hal_circularPtrQueue_dequeue(pika_hal_CircularPtrQueue* cb, + void** value); + +// 销毁队列并释放相关资源 +int pika_hal_circularPtrQueue_deinit(pika_hal_CircularPtrQueue* cb); + +// 获取队列中的元素数量 +size_t pika_hal_circularPtrQueue_getCount(pika_hal_CircularPtrQueue* cb); + +// 检查队列是否为空 +int pika_hal_circularPtrQueue_isEmpty(pika_hal_CircularPtrQueue* cb); + +// 检查队列是否已满 +int pika_hal_circularPtrQueue_isFull(pika_hal_CircularPtrQueue* cb); + +// 查看队列头部的元素,但不移除它 +int pika_hal_circularPtrQueue_peek(pika_hal_CircularPtrQueue* cb, void** value); + +#endif // PIKA_HAL_CIRCULAR_PTR_QUEUE_H diff --git a/package/threading/threading.c b/package/threading/threading.c new file mode 100644 index 000000000..834230df3 --- /dev/null +++ b/package/threading/threading.c @@ -0,0 +1,5 @@ +#include "threading.h" + +int64_t threading_get_ident(PikaObj* self) { + return (int64_t)pika_platform_thread_self(); +} diff --git a/package/threading/threading.pyi b/package/threading/threading.pyi new file mode 100644 index 000000000..7040384b1 --- /dev/null +++ b/package/threading/threading.pyi @@ -0,0 +1,52 @@ +import _thread +from PikaObj import * + +def get_ident() ->int64: ... + +class Lock(): + + def __init__(self): # real signature unknown + self._mutex_ = None + + def acquire(self, block:bool, timeout:any) -> bool: ... # real signature unknown; restored from __doc__ + + def locked(self) -> bool: ...# real signature unknown; restored from __doc__ + + def release(self): ... # real signature unknown; restored from __doc__ + + def __del__(self): ... + + +class RLock(): + + def __init__(self): # real signature unknown + self._rmutex_ = None + + def acquire(self, block:bool, timeout:any) -> bool: ...# real signature unknown; restored from __doc__ + + def locked(self) -> bool: ... # real signature unknown; restored from __doc__ + + def release(self): ...# real signature unknown; restored from __doc__ + + def __del__(self): ... + + +class Condition: + + def __init__(self): # real signature unknown + self._cond_ = None + self._rtmetux_ = None + + def wait(self, timeout:any) -> bool: ... + + def notify(self, n:int): ... + + def notify_all(self): ... + + def _is_owned(self) -> bool: ... + + def __del__(self): ... + + def acquire(self, block:bool, timeout:any) -> bool: ... + + def release(self): ... diff --git a/package/threading/threading_Condition.c b/package/threading/threading_Condition.c new file mode 100644 index 000000000..9b2a7a6aa --- /dev/null +++ b/package/threading/threading_Condition.c @@ -0,0 +1,45 @@ +#include "threading_Condition.h" +#include "PikaPlatformEx.h" + +void threading_Condition___del__(PikaObj* self) { + pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_"); + pika_platform_thread_cond_destroy(cond); + pikaFree(cond, sizeof(pika_platform_thread_cond_t)); +} + +void threading_Condition___init__(PikaObj* self) { + pika_platform_thread_cond_t* cond = + pikaMalloc(sizeof(pika_platform_thread_cond_t)); + pika_platform_thread_cond_init(cond); + obj_setPtr(self, "_cond_", cond); +} + +pika_bool threading_Condition__is_owned(PikaObj* self) { + pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_"); + if (cond->owner != 0) { + return pika_true; + } else { + return pika_false; + } +} + +void threading_Condition_notify(PikaObj* self, int n) { + pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_"); + pika_platform_thread_cond_signal(cond); +} + +void threading_Condition_notify_all(PikaObj* self) { + pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_"); + pika_platform_thread_cond_broadcast(cond); +} + +pika_bool threading_Condition_wait(PikaObj* self, Arg* timeout) { + // pika_platform_thread_cond_t * cond = obj_getPtr(self, "_cond_"); + return pika_false; +} +pika_bool threading_Condition_acquire(PikaObj* self, + pika_bool block, + Arg* timeout) { + return pika_false; +} +void threading_Condition_release(PikaObj* self) {} diff --git a/package/threading/threading_Lock.c b/package/threading/threading_Lock.c new file mode 100644 index 000000000..aaee17490 --- /dev/null +++ b/package/threading/threading_Lock.c @@ -0,0 +1,49 @@ +#include "threading_Lock.h" +#include "PikaPlatformEx.h" + +void threading_Lock___del__(PikaObj* self) { + pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_"); + // pika_platform_thread_mutex_unlock(m); + pika_platform_thread_mutex_destroy(m); + pikaFree(m, sizeof(pika_platform_thread_mutex_t)); +} + +void threading_Lock___init__(PikaObj* self) { + pika_platform_thread_mutex_t* m = + pikaMalloc(sizeof(pika_platform_thread_mutex_t)); + pika_platform_thread_mutex_init(m); + obj_setPtr(self, "_mutex_", m); +} +pika_bool threading_Lock_acquire(PikaObj* self, pika_bool block, Arg* timeout) { + pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_"); + int result = pika_platform_thread_mutex_timedlock(m, block, timeout); + if (result == PIKA_RES_ERR_INVALID_PARAM) { + obj_setErrorCode(self, PIKA_RES_ERR_INVALID_PARAM); + obj_setSysOut(self, "invalid param!"); + } + return result == 0 ? pika_true : pika_false; +} +pika_bool threading_Lock_locked(PikaObj* self) { + pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_"); + pika_GIL_EXIT(); + int result = pika_platform_thread_mutex_trylock(m); + pika_GIL_ENTER(); + if (result == 0) { + // 成功获得了锁,需要解锁 + // pika_GIL_EXIT(); + pika_platform_thread_mutex_unlock(m); + // pika_GIL_ENTER();/* */ + return pika_false; // 锁未被占用/* */ + } else { + // 锁已被占用或发生了其他错误 + // perror("pthread_mutex_trylock"); + return pika_true; + } +} + +void threading_Lock_release(PikaObj* self) { + pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_"); + // pika_GIL_EXIT(); + pika_platform_thread_mutex_unlock(m); + // pika_GIL_ENTER(); +} diff --git a/package/threading/threading_RLock.c b/package/threading/threading_RLock.c new file mode 100644 index 000000000..b9c43a857 --- /dev/null +++ b/package/threading/threading_RLock.c @@ -0,0 +1,34 @@ +#include "threading_Lock.h" +#include "PikaPlatformEx.h" + +void threading_RLock___del__(PikaObj* self) { + pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_"); + pika_platform_thread_rtmutex_destroy(rm); + pikaFree(rm, sizeof(pika_platform_thread_rtmutex_t)); +} +void threading_RLock___init__(PikaObj* self) { + pika_platform_thread_rtmutex_t* rm = + pikaMalloc(sizeof(pika_platform_thread_rtmutex_t)); + pika_platform_thread_rtmutex_init(rm); + obj_setPtr(self, "_rmutex_", rm); +} +pika_bool threading_RLock_acquire(PikaObj* self, + pika_bool block, + Arg* timeout) { + pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_"); + + int result = pika_platform_thread_rtmutex_lock(rm, block, timeout); + + return result == 0 ? pika_true : pika_false; +} +pika_bool threading_RLock_locked(PikaObj* self) { + pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_"); + + int result = pika_platform_thread_rtmutex_locked(rm); + + return result != 0 ? pika_true : pika_false; +} +void threading_RLock_release(PikaObj* self) { + pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_"); + pika_platform_thread_rtmutex_unlock(rm); +} diff --git a/port/linux/package/pikascript/module_list.txt b/port/linux/package/pikascript/module_list.txt index d0a23afb2..31b939926 100644 --- a/port/linux/package/pikascript/module_list.txt +++ b/port/linux/package/pikascript/module_list.txt @@ -44,4 +44,5 @@ flashdb fuzzypid pika_lvgl import_err -_from_import_pyi \ No newline at end of file +_from_import_pyi +threading \ No newline at end of file diff --git a/port/linux/package/pikascript/pikascript-lib/threading/PikaPlatformEx.c b/port/linux/package/pikascript/pikascript-lib/threading/PikaPlatformEx.c new file mode 100644 index 000000000..71cb717d6 --- /dev/null +++ b/port/linux/package/pikascript/pikascript-lib/threading/PikaPlatformEx.c @@ -0,0 +1,458 @@ +#include "PikaPlatformEx.h" + +//----------------------------- mutex ------------------------------- +// 带超时的互斥锁加锁 +int pika_platform_thread_mutex_timedlock(pika_platform_thread_mutex_t* m, + pika_bool block, + Arg* timeout) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + ArgType timout_type = arg_getType(timeout); + pika_float timeout_f; + int result; + if (!(timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT || + timout_type == ARG_TYPE_NONE)) { + return PIKA_RES_ERR_INVALID_PARAM; + } + + if (timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT) { + // printf("==== #01\n"); + if (timout_type == ARG_TYPE_FLOAT) { + timeout_f = arg_getFloat(timeout); + } + if (timout_type == ARG_TYPE_INT) { + int timeout_d = arg_getInt(timeout); + timeout_f = (pika_float)timeout_d; + // printf("==== #04 %lf\n", timeout_f); + } + if (timeout_f < 0.0f) { + return PIKA_RES_ERR_INVALID_PARAM; + } + + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); // 获取当前时间 + + // 将浮点数秒转换为秒和纳秒 + long sec = (long)timeout_f; + long nsec = (long)((timeout_f - (pika_float)sec) * 1000000000.0); + + ts.tv_sec += sec; + ts.tv_nsec += nsec; + + // 如果纳秒数超过 1 秒,则需要调整秒数和纳秒数 + if (ts.tv_nsec >= 1000000000) { + ts.tv_nsec -= 1000000000; // 减去 1 秒的纳秒数 + ts.tv_sec += 1; // 增加 1 秒 + } + + pika_GIL_EXIT(); + result = pthread_mutex_timedlock(&m->mutex, &ts); + pika_GIL_ENTER(); + return result == 0 ? 0 : -1; + + } else if (timout_type == ARG_TYPE_NONE) { + if (block) { + // printf("==== #02\n"); + pika_GIL_EXIT(); + result = pthread_mutex_lock(&m->mutex); + pika_GIL_ENTER(); + return result == 0 ? 0 : -1; + } else { + // printf("==== #03\n"); + pika_GIL_EXIT(); + result = pthread_mutex_trylock(&m->mutex); + pika_GIL_ENTER(); + return result == 0 ? 0 : -1; + } + } else { + return PIKA_RES_ERR_INVALID_PARAM; + } + +#elif PIKA_FREERTOS_ENABLE + if (pdTRUE == xSemaphoreTake(m->mutex, (TickType_t)(timeout * 1000.0f))) { + return 0; + } + return -1; + +#elif PIKA_RTTHREAD_ENABLE + return rt_mutex_take((m->mutex), (rt_tick_t)(timeout * RT_TICK_PER_SECOND)); + +#elif PIKA_ZEUSOS_ENABLE + return zos_mutex_lock(m->mutex, (uint32_t)(timeout * 1000.0f)); + +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); + return -1; +#endif +} + +//----------------------------- rtmutex ------------------------------- +// 初始化递归互斥锁 +void pika_platform_thread_rtmutex_init(pika_platform_thread_rtmutex_t* rtm) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + pthread_mutexattr_t attr; + if (pthread_mutexattr_init(&attr) != 0) { + perror("pthread_mutexattr_init"); + exit(EXIT_FAILURE); + } + // 设置互斥锁类型为递归互斥锁 + if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) != 0) { + perror("pthread_mutexattr_settype"); + pthread_mutexattr_destroy(&attr); + exit(EXIT_FAILURE); + } + + pthread_mutex_init(&rtm->mutex, &attr); + pthread_cond_init(&rtm->cond, NULL); + rtm->owner = (pthread_t)0; + rtm->count = 0; +#elif PIKA_FREERTOS_ENABLE + +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +// 销毁递归互斥锁 +void pika_platform_thread_rtmutex_destroy(pika_platform_thread_rtmutex_t* rtm) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + pthread_cond_destroy(&rtm->cond); + pthread_mutex_destroy(&rtm->mutex); +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +// 带超时的递归互斥锁加锁 +int pika_platform_thread_rtmutex_lock(pika_platform_thread_rtmutex_t* rtm, + pika_bool block, + Arg* timeout) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + ArgType timout_type = arg_getType(timeout); + pika_float timeout_f; + int result; + + if (!(timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT || + timout_type == ARG_TYPE_NONE)) { + return PIKA_RES_ERR_INVALID_PARAM; + } + + pika_GIL_EXIT(); + pthread_mutex_lock(&rtm->mutex); + pika_GIL_ENTER(); + + if (rtm->owner == pthread_self()) { + // 如果当前线程已经持有锁,则递归深度加1 + rtm->count++; + // printf("rtm->count = %d\n", rtm->count); + pthread_mutex_unlock(&rtm->mutex); + // printf("succ\n"); + return 0; + } + + if (timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT) { + // printf("==== #01\n"); + if (timout_type == ARG_TYPE_FLOAT) { + timeout_f = arg_getFloat(timeout); + } + if (timout_type == ARG_TYPE_INT) { + int timeout_d = arg_getInt(timeout); + timeout_f = (pika_float)timeout_d; + // printf("==== #04 %lf\n", timeout_f); + } + if (timeout_f < 0.0f) { + return PIKA_RES_ERR_INVALID_PARAM; + } + + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); // 获取当前时间 + + // 将浮点数秒转换为秒和纳秒 + long sec = (long)timeout_f; + long nsec = (long)((timeout_f - (pika_float)sec) * 1000000000.0); + + ts.tv_sec += sec; + ts.tv_nsec += nsec; + + // 如果纳秒数超过 1 秒,则需要调整秒数和纳秒数 + if (ts.tv_nsec >= 1000000000) { + ts.tv_nsec -= 1000000000; // 减去 1 秒的纳秒数 + ts.tv_sec += 1; // 增加 1 秒 + } + + // 等待直到获得锁或超时 + while (rtm->owner != (pthread_t)0) { + pika_GIL_EXIT(); + result = pthread_cond_timedwait(&rtm->cond, &rtm->mutex, &ts); + pika_GIL_ENTER(); + + if (result != 0) { + pthread_mutex_unlock(&rtm->mutex); + return -1; + } + } + // 设置当前线程为锁的持有者 + rtm->owner = pthread_self(); + rtm->count = 1; + pthread_mutex_unlock(&rtm->mutex); + return 0; + + } else if (timout_type == ARG_TYPE_NONE) { + if (block) { + // 永久等待 + while (rtm->owner != (pthread_t)0) { + pika_GIL_EXIT(); + result = pthread_cond_wait(&rtm->cond, &rtm->mutex); + pika_GIL_ENTER(); + + if (result != 0) { + pthread_mutex_unlock(&rtm->mutex); + return -1; + } + } + // 设置当前线程为锁的持有者 + rtm->owner = pthread_self(); + rtm->count = 1; + pthread_mutex_unlock(&rtm->mutex); + return 0; + + } else { + // 非阻塞模式 + if (rtm->owner == (pthread_t)0) { + // 如果没有其他线程持有锁,获取锁 + rtm->owner = pthread_self(); + rtm->count = 1; + pthread_mutex_unlock(&rtm->mutex); + return 0; + } else { + // 如果已经有其他线程持有锁,立即返回 -1 + pthread_mutex_unlock(&rtm->mutex); + return -1; + } + } + } else { + return PIKA_RES_ERR_INVALID_PARAM; + } + +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +#if 1 +// 释放递归互斥锁 +int pika_platform_thread_rtmutex_unlock(pika_platform_thread_rtmutex_t* rtm) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + pthread_t self = pthread_self(); + + pika_GIL_EXIT(); + pthread_mutex_lock(&rtm->mutex); + pika_GIL_ENTER(); + + // printf("rtm->owner = %lu\n", rtm->owner); + if (rtm->owner != self) { + perror("Attempt to unlock a mutex not owned by the current thread"); + pthread_mutex_unlock(&rtm->mutex); + return -1; + } + + rtm->count--; + if (rtm->count == 0) { + rtm->owner = (pthread_t)0; + pthread_cond_signal(&rtm->cond); + // printf("rtm->owner = %lu\n", rtm->owner); + } + + pthread_mutex_unlock(&rtm->mutex); + return 0; +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} +#endif + +// 检查递归互斥锁是否已被当前线程获取 +int pika_platform_thread_rtmutex_locked(pika_platform_thread_rtmutex_t* rtm) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + pthread_t self = pthread_self(); + pika_GIL_EXIT(); + pthread_mutex_lock(&rtm->mutex); + pika_GIL_ENTER(); + int is_locked = (rtm->owner == self && rtm->count > 0); + + pthread_mutex_unlock(&rtm->mutex); + + return is_locked; +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +//----------------------------- cond ------------------------------- + +void pika_platform_thread_cond_init(pika_platform_thread_cond_t* cond) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + pika_platform_thread_rtmutex_init(&cond->rtmutex); + pthread_cond_init(&cond->cond, NULL); + cond->owner = (pthread_t)0; + +#elif PIKA_FREERTOS_ENABLE + +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +void pika_platform_thread_cond_destroy(pika_platform_thread_cond_t* cond) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + pthread_cond_destroy(&cond->cond); + pika_platform_thread_rtmutex_destroy(&cond->rtmutex); + cond->owner = (pthread_t)0; // 释放资源后重置 owner +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +// 检查当前线程是否持有互斥锁 +static int is_mutex_owned(pika_platform_thread_cond_t* cond) { + // pthread_t current_thread = pthread_self(); + + // 使用 pthread_mutex_trylock 来检查是否已经持有锁 + if (pthread_mutex_trylock(&cond->rtmutex.mutex) == EBUSY) { + // 如果锁已经被持有,尝试解锁并检查是否是当前线程持有的 + if (pthread_mutex_unlock(&cond->rtmutex.mutex) == 0) { + return 1; // 当前线程持有锁 + } + } + + return 0; +} + +// 带阻塞和超时功能的条件变量等待 +int pika_platform_thread_cond_timedwait(pika_platform_thread_cond_t* cond, + Arg* timeout) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + ArgType timout_type = arg_getType(timeout); + pika_float timeout_f; + int result; + + // 检查是否已经获得了互斥锁 + if (!is_mutex_owned(cond)) { + return -1; + } + + if (!(timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT || + timout_type == ARG_TYPE_NONE)) { + return PIKA_RES_ERR_INVALID_PARAM; + } + + if (timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT) { + if (timout_type == ARG_TYPE_FLOAT) { + timeout_f = arg_getFloat(timeout); + } else { + int timeout_d = arg_getInt(timeout); + timeout_f = (pika_float)timeout_d; + } + + if (timeout_f < 0.0f) { + return PIKA_RES_ERR_INVALID_PARAM; + } + + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); // 获取当前时间 + + // 将浮点数秒转换为秒和纳秒 + long sec = (long)timeout_f; + long nsec = (long)((timeout_f - (pika_float)sec) * 1000000000.0); + + ts.tv_sec += sec; + ts.tv_nsec += nsec; + + // 如果纳秒数超过 1 秒,则需要调整秒数和纳秒数 + if (ts.tv_nsec >= 1000000000) { + ts.tv_nsec -= 1000000000; // 减去 1 秒的纳秒数 + ts.tv_sec += 1; // 增加 1 秒 + } + + // 等待直到被通知或超时 + pika_GIL_EXIT(); + result = pthread_cond_timedwait(&cond->cond, &cond->rtmutex.mutex, &ts); + pika_GIL_ENTER(); + + if (result != 0) { + if (result == ETIMEDOUT) { + return -1; // 超时 + } + perror("pthread_cond_timedwait"); + return -1; // 其他错误 + } + } else if (timout_type == ARG_TYPE_NONE) { + // 永久等待 + pika_GIL_EXIT(); + result = pthread_cond_wait(&cond->cond, &cond->rtmutex.mutex); + pika_GIL_ENTER(); + + if (result != 0) { + perror("pthread_cond_wait"); + return -1; + } + } else { + return PIKA_RES_ERR_INVALID_PARAM; + } + + return 0; + +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} +// 信号量通知 +int pika_platform_thread_cond_signal(pika_platform_thread_cond_t* cond) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + int result; + result = pthread_cond_signal(&cond->cond); + cond->owner = (pthread_t)0; // 通知后重置 owner + return result == 0 ? 0 : -1; +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} + +// 信号量广播 +int pika_platform_thread_cond_broadcast(pika_platform_thread_cond_t* cond) { +#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE) + int result; + result = pthread_cond_broadcast(&cond->cond); + cond->owner = (pthread_t)0; // 广播后重置 owner + return result == 0 ? 0 : -1; +#elif PIKA_FREERTOS_ENABLE +#elif PIKA_RTTHREAD_ENABLE +#elif PIKA_ZEUSOS_ENABLE +#else + WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_); +#endif +} diff --git a/port/linux/package/pikascript/pikascript-lib/threading/PikaPlatformEx.h b/port/linux/package/pikascript/pikascript-lib/threading/PikaPlatformEx.h new file mode 100644 index 000000000..add5a126a --- /dev/null +++ b/port/linux/package/pikascript/pikascript-lib/threading/PikaPlatformEx.h @@ -0,0 +1,53 @@ +#ifndef ___PikaPlatformEx__H +#define ___PikaPlatformEx__H +#include "PikaPlatform.h" +#include "PikaObj.h" +#include +#include +#include +int pika_platform_thread_mutex_timedlock(pika_platform_thread_mutex_t* m, + pika_bool block, + Arg* timeout); + +//------------------------------- + +// 递归带超时互斥锁结构体 +typedef struct { + pthread_mutex_t mutex; + pthread_cond_t cond; + pthread_t owner; // 当前持有锁的线程 ID + int count; // 递归深度 +} pika_platform_thread_rtmutex_t; + +// 初始化递归互斥锁 +void pika_platform_thread_rtmutex_init(pika_platform_thread_rtmutex_t* rtm); + +// 销毁递归互斥锁 +void pika_platform_thread_rtmutex_destroy(pika_platform_thread_rtmutex_t* rtm); + +// 带超时的递归互斥锁加锁 +int pika_platform_thread_rtmutex_lock(pika_platform_thread_rtmutex_t* rtm, + pika_bool block, + Arg* timeout); + +// 递归互斥锁解锁 +int pika_platform_thread_rtmutex_unlock(pika_platform_thread_rtmutex_t* rtm); + +// 是否已获得锁 +int pika_platform_thread_rtmutex_locked(pika_platform_thread_rtmutex_t* rtm); + +//------------------------------------- + +typedef struct { + pika_platform_thread_rtmutex_t rtmutex; + pthread_cond_t cond; + pthread_t owner; // 当前持有锁的线程 ID +} pika_platform_thread_cond_t; + +void pika_platform_thread_cond_init(pika_platform_thread_cond_t* cond); +void pika_platform_thread_cond_destroy(pika_platform_thread_cond_t* cond); +int pika_platform_thread_cond_signal(pika_platform_thread_cond_t* cond); +int pika_platform_thread_cond_broadcast(pika_platform_thread_cond_t* cond); +int pika_platform_thread_cond_timedwait(pika_platform_thread_cond_t* cond, + Arg* timeout); +#endif diff --git a/port/linux/package/pikascript/pikascript-lib/threading/pika_hal_ex.c b/port/linux/package/pikascript/pikascript-lib/threading/pika_hal_ex.c new file mode 100644 index 000000000..b9d6acb3e --- /dev/null +++ b/port/linux/package/pikascript/pikascript-lib/threading/pika_hal_ex.c @@ -0,0 +1,180 @@ + +#include "pika_hal_ex.h" + +pika_hal_CircularPtrQueue* pika_hal_circularPtrQueue_create(size_t capacity) { + pika_hal_CircularPtrQueue* cb = (pika_hal_CircularPtrQueue*)pikaMalloc( + sizeof(pika_hal_CircularPtrQueue)); + if (NULL == cb) { + return NULL; + } + cb->head = 0; + cb->tail = 0; + cb->count = 0; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_init(&cb->mutex); +#endif + cb->capacity = capacity; + cb->buffer = (void**)pikaMalloc(capacity * + sizeof(void*)); // 分配足够的空间来存储指针 + if (NULL == cb->buffer) { + pikaFree(cb, sizeof(pika_hal_CircularPtrQueue)); + return NULL; + } + return cb; +} + +int pika_hal_circularPtrQueue_enqueue(pika_hal_CircularPtrQueue* cb, + void* data) { + int ret = 0; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + if (cb->count == cb->capacity) { + ret = -1; + goto __exit; + } + + cb->buffer[cb->tail] = data; + cb->tail = (cb->tail + 1) % cb->capacity; + cb->count++; +__exit: +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + return ret; +} + +int pika_hal_circularPtrQueue_dequeue(pika_hal_CircularPtrQueue* cb, + void** value) { + int ret = 0; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + if (cb->count == 0) { + ret = -1; + goto __exit; + } + + *value = cb->buffer[cb->head]; + cb->head = (cb->head + 1) % cb->capacity; + cb->count--; +__exit: +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + return ret; +} + +int pika_hal_circularPtrQueue_deinit(pika_hal_CircularPtrQueue* cb) { +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + pikaFree(cb->buffer, cb->capacity * sizeof(void*)); // 释放指针数组 + cb->buffer = NULL; + cb->head = 0; + cb->tail = 0; + cb->count = 0; + cb->capacity = 0; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_destroy(&cb->mutex); +#endif + pikaFree(cb, sizeof(pika_hal_CircularPtrQueue)); + return 0; +} + +size_t pika_hal_circularPtrQueue_getCount(pika_hal_CircularPtrQueue* cb) { +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + size_t count = cb->count; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + return count; +} + +int pika_hal_circularPtrQueue_isEmpty(pika_hal_CircularPtrQueue* cb) { + return cb->count == 0; +} + +int pika_hal_circularPtrQueue_isFull(pika_hal_CircularPtrQueue* cb) { + return cb->count == cb->capacity; +} + +int pika_hal_circularPtrQueue_peek(pika_hal_CircularPtrQueue* cb, + void** value) { + if (cb->count == 0) { + return -1; + } + +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + *value = cb->buffer[cb->head]; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + + return 0; +} + +// 以下待测试 +int pika_hal_circularPtrQueue_enqueueHead(pika_hal_CircularPtrQueue* cb, + void* data) { + int ret = 0; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + if (cb->count == cb->capacity) { + ret = -1; // 队列已满 + goto __exit; + } + + // 更新 head 指针前的位置,然后更新 head + cb->head = (cb->head - 1 + cb->capacity) % cb->capacity; + cb->buffer[cb->head] = data; + cb->count++; +__exit: +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + return ret; +} + +int pika_hal_circularPtrQueue_dequeueTail(pika_hal_CircularPtrQueue* cb, + void** value) { + int ret = 0; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + if (cb->count == 0) { + ret = -1; // 队列为空 + goto __exit; + } + + *value = cb->buffer[cb->tail - 1]; + cb->tail = (cb->tail - 1 + cb->capacity) % cb->capacity; + cb->count--; +__exit: +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + return ret; +} + +int pika_hal_circularPtrQueue_peekTail(pika_hal_CircularPtrQueue* cb, + void** value) { + if (cb->count == 0) { + return -1; // 队列为空 + } + +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_lock(&cb->mutex); +#endif + *value = cb->buffer[(cb->tail - 1 + cb->capacity) % cb->capacity]; +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_unlock(&cb->mutex); +#endif + + return 0; +} diff --git a/port/linux/package/pikascript/pikascript-lib/threading/pika_hal_ex.h b/port/linux/package/pikascript/pikascript-lib/threading/pika_hal_ex.h new file mode 100644 index 000000000..8b0612240 --- /dev/null +++ b/port/linux/package/pikascript/pikascript-lib/threading/pika_hal_ex.h @@ -0,0 +1,44 @@ +#ifndef PIKA_HAL_CIRCULAR_PTR_QUEUE_H +#define PIKA_HAL_CIRCULAR_PTR_QUEUE_H +#include "pika_hal.h" +#include "PikaObj.h" + +// 定义循环指针队列的结构体 +typedef struct pika_hal_CircularPtrQueue { + void** buffer; // 存储元素的缓冲区 + size_t head; // 队头指针 + size_t tail; // 队尾指针 + size_t count; // 当前元素数量 + size_t capacity; // 缓冲区容量 +#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE + pika_platform_thread_mutex_t mutex; // 互斥锁 +#endif +} pika_hal_CircularPtrQueue; + +// 创建一个新的循环指针队列 +pika_hal_CircularPtrQueue* pika_hal_circularPtrQueue_create(size_t capacity); + +// 向队列中添加一个元素 +int pika_hal_circularPtrQueue_enqueue(pika_hal_CircularPtrQueue* cb, + void* data); + +// 从队列中移除一个元素 +int pika_hal_circularPtrQueue_dequeue(pika_hal_CircularPtrQueue* cb, + void** value); + +// 销毁队列并释放相关资源 +int pika_hal_circularPtrQueue_deinit(pika_hal_CircularPtrQueue* cb); + +// 获取队列中的元素数量 +size_t pika_hal_circularPtrQueue_getCount(pika_hal_CircularPtrQueue* cb); + +// 检查队列是否为空 +int pika_hal_circularPtrQueue_isEmpty(pika_hal_CircularPtrQueue* cb); + +// 检查队列是否已满 +int pika_hal_circularPtrQueue_isFull(pika_hal_CircularPtrQueue* cb); + +// 查看队列头部的元素,但不移除它 +int pika_hal_circularPtrQueue_peek(pika_hal_CircularPtrQueue* cb, void** value); + +#endif // PIKA_HAL_CIRCULAR_PTR_QUEUE_H diff --git a/port/linux/package/pikascript/pikascript-lib/threading/threading.c b/port/linux/package/pikascript/pikascript-lib/threading/threading.c new file mode 100644 index 000000000..834230df3 --- /dev/null +++ b/port/linux/package/pikascript/pikascript-lib/threading/threading.c @@ -0,0 +1,5 @@ +#include "threading.h" + +int64_t threading_get_ident(PikaObj* self) { + return (int64_t)pika_platform_thread_self(); +} diff --git a/port/linux/package/pikascript/pikascript-lib/threading/threading_Condition.c b/port/linux/package/pikascript/pikascript-lib/threading/threading_Condition.c new file mode 100644 index 000000000..9b2a7a6aa --- /dev/null +++ b/port/linux/package/pikascript/pikascript-lib/threading/threading_Condition.c @@ -0,0 +1,45 @@ +#include "threading_Condition.h" +#include "PikaPlatformEx.h" + +void threading_Condition___del__(PikaObj* self) { + pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_"); + pika_platform_thread_cond_destroy(cond); + pikaFree(cond, sizeof(pika_platform_thread_cond_t)); +} + +void threading_Condition___init__(PikaObj* self) { + pika_platform_thread_cond_t* cond = + pikaMalloc(sizeof(pika_platform_thread_cond_t)); + pika_platform_thread_cond_init(cond); + obj_setPtr(self, "_cond_", cond); +} + +pika_bool threading_Condition__is_owned(PikaObj* self) { + pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_"); + if (cond->owner != 0) { + return pika_true; + } else { + return pika_false; + } +} + +void threading_Condition_notify(PikaObj* self, int n) { + pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_"); + pika_platform_thread_cond_signal(cond); +} + +void threading_Condition_notify_all(PikaObj* self) { + pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_"); + pika_platform_thread_cond_broadcast(cond); +} + +pika_bool threading_Condition_wait(PikaObj* self, Arg* timeout) { + // pika_platform_thread_cond_t * cond = obj_getPtr(self, "_cond_"); + return pika_false; +} +pika_bool threading_Condition_acquire(PikaObj* self, + pika_bool block, + Arg* timeout) { + return pika_false; +} +void threading_Condition_release(PikaObj* self) {} diff --git a/port/linux/package/pikascript/pikascript-lib/threading/threading_Lock.c b/port/linux/package/pikascript/pikascript-lib/threading/threading_Lock.c new file mode 100644 index 000000000..aaee17490 --- /dev/null +++ b/port/linux/package/pikascript/pikascript-lib/threading/threading_Lock.c @@ -0,0 +1,49 @@ +#include "threading_Lock.h" +#include "PikaPlatformEx.h" + +void threading_Lock___del__(PikaObj* self) { + pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_"); + // pika_platform_thread_mutex_unlock(m); + pika_platform_thread_mutex_destroy(m); + pikaFree(m, sizeof(pika_platform_thread_mutex_t)); +} + +void threading_Lock___init__(PikaObj* self) { + pika_platform_thread_mutex_t* m = + pikaMalloc(sizeof(pika_platform_thread_mutex_t)); + pika_platform_thread_mutex_init(m); + obj_setPtr(self, "_mutex_", m); +} +pika_bool threading_Lock_acquire(PikaObj* self, pika_bool block, Arg* timeout) { + pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_"); + int result = pika_platform_thread_mutex_timedlock(m, block, timeout); + if (result == PIKA_RES_ERR_INVALID_PARAM) { + obj_setErrorCode(self, PIKA_RES_ERR_INVALID_PARAM); + obj_setSysOut(self, "invalid param!"); + } + return result == 0 ? pika_true : pika_false; +} +pika_bool threading_Lock_locked(PikaObj* self) { + pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_"); + pika_GIL_EXIT(); + int result = pika_platform_thread_mutex_trylock(m); + pika_GIL_ENTER(); + if (result == 0) { + // 成功获得了锁,需要解锁 + // pika_GIL_EXIT(); + pika_platform_thread_mutex_unlock(m); + // pika_GIL_ENTER();/* */ + return pika_false; // 锁未被占用/* */ + } else { + // 锁已被占用或发生了其他错误 + // perror("pthread_mutex_trylock"); + return pika_true; + } +} + +void threading_Lock_release(PikaObj* self) { + pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_"); + // pika_GIL_EXIT(); + pika_platform_thread_mutex_unlock(m); + // pika_GIL_ENTER(); +} diff --git a/port/linux/package/pikascript/pikascript-lib/threading/threading_RLock.c b/port/linux/package/pikascript/pikascript-lib/threading/threading_RLock.c new file mode 100644 index 000000000..b9c43a857 --- /dev/null +++ b/port/linux/package/pikascript/pikascript-lib/threading/threading_RLock.c @@ -0,0 +1,34 @@ +#include "threading_Lock.h" +#include "PikaPlatformEx.h" + +void threading_RLock___del__(PikaObj* self) { + pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_"); + pika_platform_thread_rtmutex_destroy(rm); + pikaFree(rm, sizeof(pika_platform_thread_rtmutex_t)); +} +void threading_RLock___init__(PikaObj* self) { + pika_platform_thread_rtmutex_t* rm = + pikaMalloc(sizeof(pika_platform_thread_rtmutex_t)); + pika_platform_thread_rtmutex_init(rm); + obj_setPtr(self, "_rmutex_", rm); +} +pika_bool threading_RLock_acquire(PikaObj* self, + pika_bool block, + Arg* timeout) { + pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_"); + + int result = pika_platform_thread_rtmutex_lock(rm, block, timeout); + + return result == 0 ? pika_true : pika_false; +} +pika_bool threading_RLock_locked(PikaObj* self) { + pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_"); + + int result = pika_platform_thread_rtmutex_locked(rm); + + return result != 0 ? pika_true : pika_false; +} +void threading_RLock_release(PikaObj* self) { + pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_"); + pika_platform_thread_rtmutex_unlock(rm); +} diff --git a/port/linux/package/pikascript/threading.pyi b/port/linux/package/pikascript/threading.pyi new file mode 100644 index 000000000..7040384b1 --- /dev/null +++ b/port/linux/package/pikascript/threading.pyi @@ -0,0 +1,52 @@ +import _thread +from PikaObj import * + +def get_ident() ->int64: ... + +class Lock(): + + def __init__(self): # real signature unknown + self._mutex_ = None + + def acquire(self, block:bool, timeout:any) -> bool: ... # real signature unknown; restored from __doc__ + + def locked(self) -> bool: ...# real signature unknown; restored from __doc__ + + def release(self): ... # real signature unknown; restored from __doc__ + + def __del__(self): ... + + +class RLock(): + + def __init__(self): # real signature unknown + self._rmutex_ = None + + def acquire(self, block:bool, timeout:any) -> bool: ...# real signature unknown; restored from __doc__ + + def locked(self) -> bool: ... # real signature unknown; restored from __doc__ + + def release(self): ...# real signature unknown; restored from __doc__ + + def __del__(self): ... + + +class Condition: + + def __init__(self): # real signature unknown + self._cond_ = None + self._rtmetux_ = None + + def wait(self, timeout:any) -> bool: ... + + def notify(self, n:int): ... + + def notify_all(self): ... + + def _is_owned(self) -> bool: ... + + def __del__(self): ... + + def acquire(self, block:bool, timeout:any) -> bool: ... + + def release(self): ... diff --git a/port/linux/test/module-test.cpp b/port/linux/test/module-test.cpp index 471e96d06..5583df41e 100644 --- a/port/linux/test/module-test.cpp +++ b/port/linux/test/module-test.cpp @@ -1373,4 +1373,6 @@ TEST(jrpc, exec_concat_str_space) { free(response); } +TEST_RUN_SINGLE_FILE(threading, lock_rlock, "test/python/threading/lock_rlock.py") + TEST_END diff --git a/port/linux/test/python/threading/lock_rlock.py b/port/linux/test/python/threading/lock_rlock.py new file mode 100644 index 000000000..6e1dca99e --- /dev/null +++ b/port/linux/test/python/threading/lock_rlock.py @@ -0,0 +1,83 @@ +import _thread +import time +import threading +import PikaStdLib +# 共享资源 +shared_resource = 0 + +# 互斥锁 +mutex = threading.Lock() + +# 线程函数 + +finished = 0 + + +def thread_function(name, delay): + global shared_resource + global mutex, finished + print("delay : %s" % str(delay)) + k = 0 + i = 0 + mem = PikaStdLib.MemChecker() + for i in range(5): + # while 1: + + try: + # 获取互斥锁 + print("%s try to acquire lock. #1" % name) + res = mutex.acquire(True, None) + print("res: %s" % str(res)) + if 1: # 测试RLock或者Lock的超时加上 + print("%s try to acquire lock. #2" % name) + res = mutex.acquire(True, 0.5) + print("res: %s" % str(res)) + if res: + print("%s acquire lock SUCC." % name) + else: + print("%s acquire lock FAIL." % name) + # 打印当前线程名称和共享资源的值 + print("Thread %s: Iteration %d, Shared Resource: %d" % + (name, i, shared_resource)) + + # 更新共享资源 + shared_resource += 1 + + # 模拟工作时间 + + time.sleep(delay) + print("wake") + + # 释放互斥锁 + mutex.release() + mutex.release() + k += 1 + + print("%s i = %d." % (name, i)) + # print('mem used now:') + # mem.now() + + except: + print("------------- error ---------------") + + print("%s exit , at last, i = %d." % (name, k)) + finished += 1 + +# 主函数 + + +def main(): + # 创建第一个线程 + _thread.start_new_thread(thread_function, ("Thread-1", 0.1)) + time.sleep(0.5) + # 创建第二个线程 + _thread.start_new_thread(thread_function, ("Thread-2", 0.2)) + + # 主线程等待子线程结束 + # 由于 _thread 没有 join 方法,我们通过 sleep 来模拟等待 + # time.sleep(60) + while finished < 2: + time.sleep(1) + + +main()