vsync / queue / bounded_locked.h
Multi-producer, multi-consumer bounded queue protected by a spinlock.
Groups: Linearizable
This is a non-blocking, multi-producer, multi-consumer queue protected by a spinlock. The queue has a bounded size and returns errors in case the queue is full, empty or the spinlock cannot be acquired.
Note: This queue implementation is very simple and may be inefficient due to the spinlock bottleneck. Prefer using bounded_mpmc.h instead.
Example:
#include <vsync/queue/bounded_locked.h>
#include <pthread.h>
#include <assert.h>
#include <stdio.h>
#include <malloc.h>
#define IT 10
#define N 12
#define BUFF_SZ 1000
void *g_buffer[BUFF_SZ];
bounded_locked_t g_queue;
typedef struct data_s {
vsize_t id;
} data_t;
void
produce(vsize_t tid)
{
data_t *data = NULL;
bounded_ret_t result = 0;
for (vsize_t i = 0; i < IT; i++) {
data = malloc(sizeof(data_t));
data->id = i;
result = bounded_locked_enq(&g_queue, data);
if (result == QUEUE_BOUNDED_OK) {
printf("[T%zu] enq %zu\n", tid, data->id);
} else {
// either full/ busy
free(data);
}
}
}
void
consume(vsize_t tid)
{
void *data = NULL;
bounded_ret_t result = 0;
for (vsize_t i = 0; i < IT; i++) {
if (bounded_locked_empty(&g_queue)) {
continue;
}
result = bounded_locked_deq(&g_queue, &data);
if (result == QUEUE_BOUNDED_OK) {
printf("[T%zu] deq %zu\n", tid, ((data_t *)data)->id);
free(data);
}
}
}
void *
run(void *args)
{
vsize_t tid = (vsize_t)args;
if (tid % 2 == 0) {
produce(tid);
} else {
consume(tid);
}
return NULL;
}
int
main(void)
{
void *data = NULL;
pthread_t threads[N];
bounded_locked_init(&g_queue, g_buffer, BUFF_SZ);
for (vsize_t i = 0; i < N; i++) {
pthread_create(&threads[i], NULL, run, (void *)i);
}
for (vsize_t i = 0; i < N; i++) {
pthread_join(threads[i], NULL);
}
while (bounded_locked_deq(&g_queue, &data) != QUEUE_BOUNDED_EMPTY) {
free(data);
}
return 0;
}
Functions
| Function | Description |
|---|---|
| bounded_locked_init | Initializes the given queue object. |
| bounded_locked_enq | Tries to enqueue a value. |
| bounded_locked_deq | Tries to dequeue a value. |
| bounded_locked_empty | Tests whether queue is empty without performing an action. |
Function bounded_locked_init
static void bounded_locked_init(bounded_locked_t *q, void **b, vuint32_t s)
Initializes the given queue object.
Parameters:
q: address of bounded_locked_t object.b: address of an array of void* slots with sizes.s: capacity ofb(in number of array elements/slots).
Function bounded_locked_enq
static bounded_ret_t bounded_locked_enq(bounded_locked_t *q, void *v)
Tries to enqueue a value.
Parameters:
q: address of bounded_locked_t object.v: address of object to enqueue.
Returns: QUEUE_BOUNDED_OK if successful.
Returns: QUEUE_BOUNDED_FULL if queue is full.
Returns: QUEUE_AGAIN if queue is currently locked and the caller should try again.
Function bounded_locked_deq
static bounded_ret_t bounded_locked_deq(bounded_locked_t *q, void **v)
Tries to dequeue a value.
Parameters:
q: address of bounded_locked_t object.v: output parameter of type (void**). Contains the address of the dequeued object, if the dequeue was successful.
Returns: QUEUE_BOUNDED_OK if successful.
Returns: QUEUE_BOUNDED_EMPTY if queue is empty.
Returns: QUEUE_BOUNDED_AGAIN if queue is currently locked and the caller should try again.
Note: The output parameter
vis only set when QUEUE_BOUNDED_OK is returned.
Function bounded_locked_empty
static vbool_t bounded_locked_empty(bounded_locked_t *q)
Tests whether queue is empty without performing an action.
Note: this function is blocking.
Parameters:
q: address of bounded_locked_t object.
Returns: true if empty.
Returns: false if not empty.