vsync / queue / vqueue_prio_heap_based.h
Unbounded heap-based priority queue.
Groups: Linearizable
This is a fine-grained blocking implementation, which uses rec_spinlock.h. It has an unbounded priority range, and bounded capacity.
Example:
#include <vsync/queue/vqueue_prio_heap_based.h>
#include <pthread.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#define IT 2
#define N 3
typedef struct data_s {
vsize_t id;
} data_t;
__thread vuint32_t my_tid; /* this is just an example, thread local storage is
not the best option */
vqueue_prio_t g_queue;
/* a unique number for the tid */
vuint32_t
get_tid_cb(void)
{
return my_tid;
}
void
destroy_cb(void *node, void *args)
{
/* the cast is not necessary it is just to emphasize that the
object you get here is the same as what has been inserted */
data_t *data = (data_t *)node;
free(data);
(void)args;
}
void *
run(void *args)
{
data_t *data = NULL;
/* this cast is not recommended it is just to demonstrate that each thread
* has a unique id */
my_tid = (vuint32_t)(vuintptr_t)args;
for (vsize_t i = 0; i < IT; i++) {
data = (data_t *)vqueue_prio_remove_min(&g_queue);
if (data == NULL) {
data = malloc(sizeof(data_t));
data->id = i;
vbool_t success = vqueue_prio_add(&g_queue, data, i);
if (success) {
printf("[T%u] enq %zu\n", my_tid, data->id);
} else {
printf("The queue is full");
free(data);
}
} else {
printf("[T%u] deq %zu\n", my_tid, data->id);
free(data);
}
}
return NULL;
}
int
main(void)
{
pthread_t threads[N];
vqueue_prio_init(&g_queue, get_tid_cb);
for (vsize_t i = 0; i < N; i++) {
pthread_create(&threads[i], NULL, run, (void *)i);
}
for (vsize_t i = 0; i < N; i++) {
pthread_join(threads[i], NULL);
}
vqueue_prio_destroy(&g_queue, destroy_cb, NULL);
return 0;
}
Note: If you wish to use this implementation in user-space. Contact the authors for recursive
pthread_mutexsupport.
References:
Maurice Herlihy, Nir Shavit - The Art of Multiprocessor Programming 15.4
Macros
| Macro | Description |
|---|---|
| VQUEUE_PRIO_HEAP_CAPACITY | VQUEUE_PRIO_HEAP_CAPACITY is equal to 1024 by default. |
Macro VQUEUE_PRIO_HEAP_CAPACITY
VQUEUE_PRIO_HEAP_CAPACITY is equal to 1024 by default.
Users may overwrite it by defining VQUEUE_PRIO_HEAP_CAPACITY=N. This value determines how many nodes can be stored in the queue.
Note:
VQUEUE_PRIO_HEAP_CAPACITYmust be power of two.
Functions
| Function | Description |
|---|---|
| vqueue_prio_init | Initializes the given priority queue object pqueue. |
| vqueue_prio_destroy | Destroys all remaining enqueued objects. |
| vqueue_prio_add | Inserts the given object data into the heap. |
| vqueue_prio_remove_min | Retrieves an object with the highest available priority. |
Function vqueue_prio_init
static void vqueue_prio_init(vqueue_prio_t *pqueue, vqueue_prio_fun_get_tid get_tid_fun)
Initializes the given priority queue object pqueue.
Parameters:
get_tid_fun: function pointer that returns the calling thread idvuint32_t.pqueue: address of vqueue_prio_t object.
Function vqueue_prio_destroy
static void vqueue_prio_destroy(vqueue_prio_t *pqueue, vqueue_prio_handle_node_t destroy_cb, void *args)
Destroys all remaining enqueued objects.
Note: this is not thread safe and must be called iff all threads are done accessing the queue.
Parameters:
pqueue: address of vqueue_prio_t object.destroy_cb: address of a callback function to call on each queue object to destroy.args: second argument ofdestroy_cb.
Function vqueue_prio_add
static vbool_t vqueue_prio_add(vqueue_prio_t *pqueue, void *data, vsize_t priority)
Inserts the given object data into the heap.
Parameters:
pqueue: address of vqueue_prio_t object.priority: priority of the given object.data: address of the object to enqueue.
Returns: true insertion succeeded.
Returns: false insertion failed the heap is full.
Function vqueue_prio_remove_min
static void* vqueue_prio_remove_min(vqueue_prio_t *pqueue)
Retrieves an object with the highest available priority.
Parameters:
pqueue: address of vqueue_prio_t object.
Returns: void* address of the retrieved object.