in producer consumer scenario
SEM_INIT(3) Linux Programmer's Manual SEM_INIT(3)
NAME
sem_init - initialize an unnamed semaphore
SYNOPSIS
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
Link with -pthread.
DESCRIPTION
sem_init() initializes the unnamed semaphore at the address pointed to by sem. The value argument specifies
the initial value for the semaphore.
The pshared argument indicates whether this semaphore is to be shared between the threads of a process, or be‐
tween processes.
If pshared has the value 0, then the semaphore is shared between the threads of a process, and should be lo‐
cated at some address that is visible to all threads (e.g., a global variable, or a variable allocated dynami‐
cally on the heap).
If pshared is nonzero, then the semaphore is shared between processes, and should be located in a region of
shared memory (see shm_open(3), mmap(2), and shmget(2)). (Since a child created by fork(2) inherits its par‐
ent's memory mappings, it can also access the semaphore.) Any process that can access the shared memory re‐
gion can operate on the semaphore using sem_post(3), sem_wait(3), and so on.
Initializing a semaphore that has already been initialized results in undefined behavior.
RETURN VALUE
sem_init() returns 0 on success; on error, -1 is returned, and errno is set to indicate the error.
ERRORS
EINVAL value exceeds SEM_VALUE_MAX.
Excerpts
from threading import Semaphore
# Create a semaphore with 3 permits
printer_sem = Semaphore(3)
Basis | Reentrant mutex | Semaphore |
---|---|---|
Owner tracking | Reentrant mutex remembers which thread owns it and only allows that same thread to re-acquire it | Semaphore doesn’t track ownership - any thread can take available permits |
Release behavior | Reentrant mutex must be unlocked by the same thread that locked it | Semaphore’s release() can be called by any thread, not just the one that called acquire() |
Counter semantics | Reentrant mutex’s counter tracks how many times the owning thread has locked it | Semaphore’s counter tracks how many total permits are available to any thread |
sem_open()
sem_init()
Version 1 | my code
My version 1 of reactor pattern code blocks if I send the events in succession without
sleep
in between. See commit 9fc755b042163a33a006ef893b22c475dace0a82
bool reactor_send(ReactorEvent event)
{
mtx_lock(&queue_lock);
if(q_idx >= 10) {
for(int i = 1; i < 10; i++){
//// drop the earliest event
simple_queue[i-1] = simple_queue[i];
}
q_idx = 9;
}
simple_queue[q_idx] = event;
q_idx++;
mtx_unlock(&queue_lock);
mtx_unlock(&event_lock);
return true;
}
int task_function(void* arg)
{
while(task_run_status) {
DBG;
mtx_lock(&event_lock);
if(task_run_status == false) {
break;
}
mtx_lock(&queue_lock);
if(q_idx > 0) {
ReactorEvent event = simple_queue[0];
for(int i = 1; i < q_idx; i++){
simple_queue[i-1] = simple_queue[i];
}
q_idx--;
DBG;
printf("\nPrabal's debug: event data = %s\n", event.data);
}
mtx_unlock(&queue_lock);
}
return 0;
}
Version 2 | by AI
bool reactor_send(ReactorEvent event) {
mtx_lock(&queue_lock);
bool result = enqueue_event(&event_queue, event);
cnd_signal(&queue_cond);
mtx_unlock(&queue_lock);
return result;
}
int task_function(void* arg) {
while (task_run_status) {
ReactorEvent event;
bool has_event = false;
mtx_lock(&queue_lock);
while (queue_is_empty(&event_queue) && task_run_status) {
cnd_wait(&queue_cond, &queue_lock);
}
if (task_run_status) {
has_event = dequeue_event(&event_queue, &event);
}
mtx_unlock(&queue_lock);
if (has_event) {
printf("\nProcessing event: %s\n", event.data);
}
}
return 0;
}
Version 3 | by AI
bool reactor_send(ReactorEvent event) {
mtx_lock(&queue_lock);
bool result = enqueue_event(&event_queue, event);
mtx_unlock(&queue_lock);
// Signal that new event is available
if (result) {
sem_post(&queue_sem);
}
return result;
}
int task_function(void* arg) {
while (task_run_status) {
ReactorEvent event;
bool has_event = false;
// Wait for event to be available
if (sem_wait(&queue_sem) != 0) {
// Handle error
continue;
}
if (!task_run_status) {
break;
}
mtx_lock(&queue_lock);
has_event = dequeue_event(&event_queue, &event);
mtx_unlock(&queue_lock);
if (has_event) {
printf("\nProcessing event: %s\n", event.data);
}
}
return 0;
}
sem_wait()
automatically blocks until an event is availablesem_post()
signals the availability of new events