vsync / queue / unbounded_queue_lf_recycle.h
Lock-free recycle unbounded queue.
Groups: Linearizable, Lock-free, Unbounded-Queue
Note: Retired Nodes of this queue cannot be freed, but can be recycled This queue makes sure that the occurrence of ABA problem is improbable, by using the 7 LSB bits of nodes addresses as a stamp.
cascan succeed iff the stamps match.
Note:
vqueue_ub_node_tobjects must be 128byte aligned (i.e. the 7 LSB of their addresses are always zeros). i.e. allocate usingmemalign
Note: maintain a thread local free node list that you place retired nodes on and reuse them for new allocations, unless the list is empty
Example:
#include <vsync/queue/unbounded_queue_lf_recycle.h>
#include <pthread.h>
#include <assert.h>
#include <stdio.h>
#include <malloc.h>
#define IT 100
#define N 12
/* this is just a proof of concept and not an optimal. Each thread shall have a
* its own pool */
vqueue_ub_node_t *local_pools[N] = {0};
typedef struct data_s {
vsize_t id;
} data_t;
vqueue_ub_t g_queue;
void
destroy_pools(void)
{
vqueue_ub_node_t *curr = NULL;
vqueue_ub_node_t *succ = NULL;
for (vsize_t i = 0; i < N; i++) {
curr = local_pools[i];
while (curr) {
succ = vatomicptr_stamped_get_pointer(&curr->next);
free(curr);
curr = succ;
}
}
}
void
destroy_cb(vqueue_ub_node_t *qnode, void *args)
{
/* Nodes can be freed only after queue destruction */
free(qnode);
(void)args;
}
void
retire_cb(vqueue_ub_node_t *qnode, void *args)
{
vsize_t tid = (vsize_t)args;
/* @note: Users should not rely on the qnode internal fields, they should
* have an independent pool. The following is just a proof of concept. */
/* retired nodes are added to the local pool */
vuint8_t current_stamp = 0;
(void)vatomicptr_stamped_get(&qnode->next, ¤t_stamp);
/* connect the retired node to the local pool */
vatomicptr_stamped_set(&qnode->next, local_pools[tid], current_stamp + 1);
local_pools[tid] = qnode;
(void)args;
}
vqueue_ub_node_t *
alloc_qnode(vsize_t tid)
{
vqueue_ub_node_t *qnode = NULL;
if (local_pools[tid]) {
qnode = local_pools[tid];
/* if the local pool is not empty use a node from there */
local_pools[tid] =
vatomicptr_stamped_get_pointer(&local_pools[tid]->next);
printf("Recycle allocation\n");
} else {
/* otherwise allocated an aligned node */
qnode =
memalign(RECYCLE_QUEUE_NODE_ALIGNMENT, sizeof(vqueue_ub_node_t));
printf("New allocation\n");
}
assert(qnode);
return qnode;
}
void *
run(void *args)
{
data_t *data = NULL;
vsize_t tid = (vsize_t)args;
for (vsize_t i = 0; i < IT; i++) {
data = vqueue_ub_deq(&g_queue, retire_cb, (void *)tid);
if (data == NULL) {
data = malloc(sizeof(data_t));
data->id = i;
vqueue_ub_node_t *qnode = alloc_qnode(tid);
printf("[T%zu] enq %zu\n", tid, data->id);
vqueue_ub_enq(&g_queue, qnode, data);
} else {
printf("[T%zu] deq %zu\n", tid, data->id);
free(data);
}
}
return NULL;
}
int
main(void)
{
pthread_t threads[N];
data_t *data = NULL;
vqueue_ub_init(&g_queue);
for (vsize_t i = 0; i < N; i++) {
pthread_create(&threads[i], NULL, run, (void *)i);
}
for (vsize_t i = 0; i < N; i++) {
pthread_join(threads[i], NULL);
}
/* dequeue all remaining nodes, to be able to destroy data */
while (data = vqueue_ub_deq(&g_queue, destroy_cb, NULL), data) {
free(data);
}
/* destroy the queue to destroy the remaining sentinel */
vqueue_ub_destroy(&g_queue, destroy_cb, NULL);
/* destroy local pools */
destroy_pools();
return 0;
}
Operating conditions
vqueue_ub_node_tobjects must be 128byte aligned- Nodes cannot be freed, they can only be recycled
Recycling tips
- maintain local pool per thread
- when a thread retires a node, add the node to the local pool
- when a thread wants to allocate a new node fetch a node from the local unless it is not empty. If it is empty, allocate it with your normal allocator e.g. memalign> Note: the above suggestions works well for threads that perform roughly the same number of enqueues and dequeues. If that is not the case, a thread needs a strategy of stealing nodes from other pools.
References:
Maurice Herlihy, Nir Shavit - The Art of Multiprocessor Programming 10.6
Functions
| Function | Description |
|---|---|
| vqueue_ub_init | Initialize the queue. |
| vqueue_ub_destroy | Destroys all remaining nodes in the queue. |
| vqueue_ub_enq | Enqueue the given node qnode in the given queue q. |
| vqueue_ub_empty | Check if the queue is empty. |
| vqueue_ub_deq | Dequeue a node from the given queue q. |
| vqueue_ub_get_length | Iterate through the queue nodes and count them. |
Function vqueue_ub_init
static void vqueue_ub_init(vqueue_ub_t *q)
Initialize the queue.
Note: Function must be called before threads start operating on the queue.
Parameters:
q: address of vqueue_ub_t object
Function vqueue_ub_destroy
static void vqueue_ub_destroy(vqueue_ub_t *q, vqueue_ub_node_handler_t retire, void *arg)
Destroys all remaining nodes in the queue.
Note: call only after all threads finished operating on the queue.
Note: NOT THREAD-SAFE.
Parameters:
q: address of vqueue_ub_t objectretire: function pointer of the callback that handles recycling/freeing the remaining nodes.arg: extra parameter passed toretire.
Function vqueue_ub_enq
static void vqueue_ub_enq(vqueue_ub_t *q, vqueue_ub_node_t *qnode, void *data)
Enqueue the given node qnode in the given queue q.
Parameters:
q: address of vqueue_ub_t objectqnode: address of vqueue_ub_node_tdata: address of data object to enqueue.
Function vqueue_ub_empty
static vbool_t vqueue_ub_empty(vqueue_ub_t *q)
Check if the queue is empty.
Note: Use function with caution. It is subject to ABA problems and might return empty when the queue is not empty.
Parameters:
q: address of vqueue_ub_t object
Returns: true the queue is empty
Returns: false the queue is not empty
Function vqueue_ub_deq
static void* vqueue_ub_deq(vqueue_ub_t *q, vqueue_ub_node_handler_t retire, void *arg)
Dequeue a node from the given queue q.
Parameters:
q: address of vqueue_ub_t objectretire: function pointer of the callback that recycles a detachedvqueue_ub_node_t.arg: extra parameter passed toretire.
Returns: NULL when the queue is empty.
Returns: a non NULL address of the dequeued data object.
Function vqueue_ub_get_length
static vsize_t vqueue_ub_get_length(vqueue_ub_t *q)
Iterate through the queue nodes and count them.
Note: returns an atomic snapshot of the length, which may be stale
Parameters:
q: address of vqueue_ub_t object
Returns: vsize_t the length of the queue