vsync / queue / bounded_spsc.h

Single-producer, single-consumer, wait-free bounded queue.

Groups: Linearizable, Lock-free

Implementation of the classic single-producer, single-consumer, wait-free bounded queue algorithm.

Example:

#include <vsync/queue/bounded_spsc.h>
#include <pthread.h>
#include <assert.h>
#include <stdio.h>
#include <malloc.h>

#define IT      10
#define BUFF_SZ 1000

void *g_buffer[BUFF_SZ];

bounded_spsc_t g_queue;

typedef struct data_s {
    vsize_t id;
} data_t;

void *
producer(void *args)
{
    data_t *data         = NULL;
    bounded_ret_t result = 0;
    for (vsize_t i = 0; i < IT; i++) {
        data     = malloc(sizeof(data_t));
        data->id = i;
        result   = bounded_spsc_enq(&g_queue, data);
        if (result == QUEUE_BOUNDED_OK) {
            printf("[producer] enq %zu\n", data->id);
        } else {
            // queue is full
            free(data);
        }
    }

    (void)args;
    return NULL;
}

void *
consumer(void *args)
{
    void *data           = NULL;
    bounded_ret_t result = 0;
    for (vsize_t i = 0; i < IT; i++) {
        result = bounded_spsc_deq(&g_queue, &data);
        if (result == QUEUE_BOUNDED_OK) {
            printf("[consumer] deq %zu\n", ((data_t *)data)->id);
            free(data);
        }
    }

    (void)args;
    return NULL;
}

int
main(void)
{
    void *data = NULL;

    pthread_t t_consumer;
    pthread_t t_producer;

    bounded_spsc_init(&g_queue, g_buffer, BUFF_SZ);

    pthread_create(&t_producer, NULL, producer, NULL);
    pthread_create(&t_consumer, NULL, consumer, NULL);

    pthread_join(t_consumer, NULL);
    pthread_join(t_producer, NULL);

    while (bounded_spsc_deq(&g_queue, &data) != QUEUE_BOUNDED_EMPTY) {
        free(data);
    }
    return 0;
}

References:

L. Lamport. - Specifying concurrent program modules


Functions

Function Description
bounded_spsc_init Initializes the given queue object.
bounded_spsc_enq Tries to enqueue a value.
bounded_spsc_deq Tries to dequeue a value.

Function bounded_spsc_init

static void bounded_spsc_init(bounded_spsc_t *q, void **b, vuint32_t s)

Initializes the given queue object.

Parameters:

  • q: address of bounded_spsc_t object.
  • b: address of an array of void* slots with size s.
  • s: number of elements in array b (does not have to be power of 2).

Function bounded_spsc_enq

static bounded_ret_t bounded_spsc_enq(bounded_spsc_t *q, void *v)

Tries to enqueue a value.

Parameters:

  • q: address of bounded_spsc_t object.
  • v: address of object to enqueue.

Returns: QUEUE_BOUNDED_OK if successful.

Returns: QUEUE_BOUNDED_FULL if queue is full.

Function bounded_spsc_deq

static bounded_ret_t bounded_spsc_deq(bounded_spsc_t *q, void **v)

Tries to dequeue a value.

Parameters:

  • q: address of bounded_spsc_t object.
  • v: output parameter of type (void**). Contains the address of the dequeued object, if the queue is not empty.

Returns: QUEUE_BOUNDED_OK if successful.

Returns: QUEUE_BOUNDED_EMPTY if queue is empty.