vsync / queue / cachedq.h
Lockless, multi-producer, multi-consumer queue.
Groups: Linearizable
A variation of the DPDK ring buffer that uses cached variables to improve the performance. It accepts vuint64_t as data type.
Example:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <vsync/common/assert.h>
#include <vsync/queue/cachedq.h>
#define BUFFER_ENTRY_NUM 6UL
#define WRITER_THREAD 2UL
#define READER_THREAD 2UL
#define ENQUEUE_BATCH 4UL
#define DEQUEUE_BATCH 3UL
#define DATA 12345UL
cachedq_t *q;
void *
writer(void *id)
{
vuint64_t buf[ENQUEUE_BATCH] = {0};
for (vsize_t i = 0; i < ENQUEUE_BATCH; i++) {
buf[i] = DATA;
}
vuint64_t count = cachedq_enqueue(q, buf, ENQUEUE_BATCH);
printf(
"Writer #%zu tries to write a batch with %ld items, %ld items are "
"success.\n",
(vsize_t)id, ENQUEUE_BATCH, count);
return NULL;
}
void *
reader(void *id)
{
vuint64_t buf[DEQUEUE_BATCH] = {0};
vuint64_t count = cachedq_dequeue(q, buf, DEQUEUE_BATCH);
for (vsize_t i = 0; i < count; i++) {
ASSERT(buf[i] == DATA);
}
printf(
"Reader #%zu tries to read a batch with %ld items, %ld items are "
"success.\n",
(vsize_t)id, DEQUEUE_BATCH, count);
return NULL;
}
int
main(void)
{
vsize_t buf_size = cachedq_memsize(BUFFER_ENTRY_NUM);
void *buf = malloc(buf_size);
q = cachedq_init(buf, buf_size);
if (q == NULL) {
perror("fail to create the cachedq");
}
pthread_t thread_w[WRITER_THREAD] = {0};
for (vsize_t i = 0; i < WRITER_THREAD; i++) {
pthread_create(&thread_w[i], NULL, writer, (void *)i);
}
pthread_t thread_r[READER_THREAD] = {0};
for (vsize_t i = 0; i < READER_THREAD; i++) {
pthread_create(&thread_r[i], NULL, reader, (void *)i);
}
for (vsize_t i = 0; i < WRITER_THREAD; i++) {
pthread_join(thread_w[i], NULL);
}
for (vsize_t i = 0; i < READER_THREAD; i++) {
pthread_join(thread_r[i], NULL);
}
free(buf);
return 0;
}
Functions
| Function | Description |
|---|---|
| cachedq_memsize | Returns the minimum allocation size for acachedq with the given capacity. |
| cachedq_init | Initializes the cachedq with a maximum capacity. |
| cachedq_enqueue | Enqueues one or more entries. |
| cachedq_dequeue | Dequeues one or more entries. |
| cachedq_count | Returns the current number of entries in the cachedq. |
Function cachedq_memsize
static vsize_t cachedq_memsize(vsize_t capacity)
Returns the minimum allocation size for acachedq with the given capacity.
Parameters:
capacity: maximum number of entries that can fit in thecachedq.
Returns: minimum number bytes the cachedq buffer should have.
Function cachedq_init
static cachedq_t* cachedq_init(void *buf, vsize_t capacity)
Initializes the cachedq with a maximum capacity.
Parameters:
buf: a buffer that can accommodate at leastcapacityelements.capacity: maximum number of entries that can fit in the cachedq.
Returns: buf pointer casted to cachedq_t*.
Use cachedq_memsize() to determine the size of buf.
Function cachedq_enqueue
static vsize_t cachedq_enqueue(cachedq_t *q, vuint64_t *buf, vsize_t count)
Enqueues one or more entries.
Multiple entries can be enqueued if buf points to an array. Use count to indicate how many entries should be enqueueed, starting from buf.
Parameters:
q: address ofcachedq_tobject.buf: pointer to first entry.count: number of entries to enqueue.
Returns: number of enqueued entries.
Function cachedq_dequeue
static vsize_t cachedq_dequeue(cachedq_t *q, vuint64_t *buf, vsize_t count)
Dequeues one or more entries.
Multiple entries can be dequeued if buf points to an array. Use count to indicate how many entries should be dequeued.
Parameters:
q: address ofcachedq_tobject.buf: pointer to preallocated memory for the first entry.count: number of entries to dequeue.
Returns: number of dequeued entries.
Function cachedq_count
static vsize_t cachedq_count(cachedq_t *q)
Returns the current number of entries in the cachedq.
Parameters:
q: address ofcachedq_tobject.
Returns: number of entries in the cachedq.