Recently I am working on implementing a shared-memory based IPC message queue in C programming language on Linux system. A few design choices I've made include
- The queue will have only 1 producer, possibly 1 or more consumers. (Currently my implementation is only 1 producer 1 consumer yet)
- The queue will be bounded in size and wrap around in a circular fashion
- When the consumers are lagging behind, the producer will wait until the queue is drained
I have the basic prototype working and wrap it in an reusable API. I am seeking code review improvement advice both on
- How to make the code be written better in term of readability and reusability
- How to make the code more performant
Here is the full code, including a benchmark harness:
- header file: spmc_queue.h
#include <stdatomic.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdalign.h>
#define SPMC_QUEUE_VERSION 0
#define L1_DCACHE_LINESIZE 64 // found via /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size
typedef _Atomic int64_t atomic_int64_t;
enum spmc_mode { spmc_mode_reader = 0, spmc_mode_writer };
typedef struct spmc_header {
int fd;
char *path;
enum spmc_mode mode;
size_t shared_size;
} spmc_header_t;
typedef struct spmc_shared {
uint8_t version;
size_t element_capacity;
size_t element_size;
bool initialized;
bool client_connected;
alignas(L1_DCACHE_LINESIZE) atomic_int64_t writer_idx;
alignas(L1_DCACHE_LINESIZE) atomic_int64_t reader_idx;
alignas(L1_DCACHE_LINESIZE) uint8_t data[];
} spmc_shared_t;
typedef struct spmc_queue {
spmc_header_t header;
spmc_shared_t *shared;
} spmc_queue_t;
spmc_queue_t *spmc_queue_create(const char *path, size_t element_size, size_t element_capacity, enum spmc_mode mode);
void spmc_queue_destroy(spmc_queue_t *queue);
bool spmc_queue_enqueue(spmc_queue_t *queue, uint8_t *src_data);
bool spmc_queue_dequeue(spmc_queue_t *queue, uint8_t *dst_data);
- source file: spmc_queue.c
#define _POSIX_C_SOURCE 200809L
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "spmc_queue.h"
spmc_queue_t *spmc_queue_create(const char *path, size_t element_size, size_t element_capacity, enum spmc_mode mode) {
if (!path || element_size == 0 || element_capacity == 0 || (mode != spmc_mode_reader && mode != spmc_mode_writer)) {
fprintf(stderr, "spmc_queue_create: invalid arguments (path:%s element_size:%zu element_capacity:%zu mode:%d\n",
path, element_size, element_capacity, mode);
return NULL;
}
printf("creating spmc_queue %s of size %zu and capacity %zu with mode %s\n", path, element_size, element_capacity,
(mode == spmc_mode_reader) ? "reader" : "writer");
spmc_queue_t *queue = NULL;
spmc_shared_t *shared = NULL;
int fd = -1;
size_t shared_size = sizeof(spmc_shared_t) + element_size * element_capacity;
int oflag = (mode == spmc_mode_reader) ? O_RDWR : O_RDWR | O_CREAT | O_EXCL;
fd = shm_open(path, oflag, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (fd == -1) {
perror("shm_open");
goto cleanup;
}
if (mode == spmc_mode_writer) {
int rt = ftruncate(fd, shared_size);
if (rt == -1) {
perror("ftruncate");
goto cleanup;
}
}
queue = calloc(1, sizeof(spmc_queue_t));
if (!queue) {
perror("calloc(1, sizeof(spmc_queue_t))");
goto cleanup;
}
shared = mmap(NULL, shared_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (shared == MAP_FAILED) {
perror("mmap");
goto cleanup;
}
queue->header.fd = fd;
queue->header.mode = mode;
queue->header.path = strdup(path);
if (!queue->header.path) {
perror("strdup");
goto cleanup;
}
queue->header.shared_size = shared_size;
queue->shared = shared;
if (mode == spmc_mode_writer) {
memset(shared, 0, shared_size);
queue->shared->version = SPMC_QUEUE_VERSION;
queue->shared->initialized = true;
queue->shared->client_connected = false;
queue->shared->element_size = element_size;
queue->shared->element_capacity = element_capacity;
queue->shared->writer_idx = 0;
}
if (mode == spmc_mode_reader) {
if (!queue->shared->initialized) {
fprintf(stderr, "spmc_queue: reader mode encounters shared memory not initialized yet by writer\n");
goto cleanup;
}
if (queue->shared->element_capacity != element_capacity) {
fprintf(stderr, "spmc_queue: element_capacity %zu != queue's element_capacity %zu\n",
queue->shared->element_capacity, element_capacity);
goto cleanup;
}
if (queue->shared->element_size < element_size) {
fprintf(stderr, "spmc_queue: element size %zu > queue's element size %zu\n", element_size,
queue->shared->element_size);
goto cleanup;
}
queue->shared->client_connected = true;
queue->shared->reader_idx = 0;
}
printf("spmc_queue created\n");
return queue;
cleanup:
if (fd != -1)
close(fd);
if (shared && shared != MAP_FAILED)
munmap(shared, shared_size);
if (mode == spmc_mode_writer && path)
shm_unlink(path);
if (queue && queue->header.path)
free(queue->header.path);
if (queue)
free(queue);
return NULL;
}
void spmc_queue_destroy(spmc_queue_t *queue) {
printf("destroying spmc_queue %s of mode %s\n", queue->header.path,
(queue->header.mode == spmc_mode_reader) ? "reader" : "writer");
int fd = queue->header.fd;
char *path = queue->header.path;
enum spmc_mode mode = queue->header.mode;
munmap(queue->shared, queue->header.shared_size);
close(fd);
if (mode == spmc_mode_writer) {
// writer owns the lifecycle of the queue
shm_unlink(path);
}
free(path);
free(queue);
printf("spmc_queue destroyed\n");
}
bool spmc_queue_enqueue(spmc_queue_t *queue, uint8_t *src_data) {
if (!queue->shared->client_connected) {
return false;
}
int64_t reader_idx = atomic_load_explicit(&queue->shared->reader_idx, memory_order_acquire);
int64_t writer_idx = atomic_load_explicit(&queue->shared->writer_idx, memory_order_relaxed);
if (writer_idx >= reader_idx + queue->shared->element_capacity) {
// queue full
return false;
}
int64_t idx = writer_idx % queue->shared->element_capacity;
memcpy(&queue->shared->data[idx * queue->shared->element_size], src_data, queue->shared->element_size);
atomic_store_explicit(&queue->shared->writer_idx, writer_idx+1, memory_order_release);
return true;
}
bool spmc_queue_dequeue(spmc_queue_t *queue, uint8_t *dst_data) {
int64_t reader_idx = atomic_load_explicit(&queue->shared->reader_idx, memory_order_relaxed);
int64_t writer_idx = atomic_load_explicit(&queue->shared->writer_idx, memory_order_acquire);
if (reader_idx >= writer_idx) {
// queue empty
return false;
}
int64_t idx = reader_idx % queue->shared->element_capacity;
memcpy(dst_data, &queue->shared->data[idx * queue->shared->element_size], queue->shared->element_size);
atomic_store_explicit(&queue->shared->reader_idx, reader_idx+1, memory_order_release);
return true;
}
- benchmark harness: benchmark.c
#define _POSIX_C_SOURCE 200809L
#define _GNU_SOURCE
#include "spmc_queue.h"
#include <assert.h>
#include <pthread.h>
#include <sched.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#define UNUSED(arg) ((void)arg)
#define TEST_MESSAGE_COUNT 1024 * 1024 * 64 // 64 MB count * 64 bytes = 4 GB data
#define QUEUE_CAPACITY 1024
struct message {
int64_t num;
char padding[L1_DCACHE_LINESIZE - sizeof(int64_t)];
};
static spmc_queue_t *producer_queue = NULL;
static spmc_queue_t *consumer_queue = NULL;
static pthread_t producer_thread;
static pthread_t consumer_thread;
static struct message *test_messages;
static volatile bool producer_thread_ready = false;
static volatile bool consumer_thread_ready = false;
static volatile bool test_may_start = false;
static int64_t test_producer_sum = 0;
static int64_t test_consumer_sum = 0;
static void initialize_benchmark(void) {
printf("Initializing the performance benchmark...\n");
producer_thread_ready = false;
consumer_thread_ready = false;
test_may_start = false;
test_producer_sum = 0;
test_consumer_sum = 0;
producer_queue = spmc_queue_create("/spmc_benchmark_queue", sizeof(struct message), QUEUE_CAPACITY, spmc_mode_writer);
consumer_queue = spmc_queue_create("/spmc_benchmark_queue", sizeof(struct message), QUEUE_CAPACITY, spmc_mode_reader);
assert(producer_queue != NULL);
assert(consumer_queue != NULL);
test_messages = calloc(TEST_MESSAGE_COUNT, sizeof(struct message));
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
int random_number = rand() % 5;
test_messages[i].num = random_number;
test_producer_sum += random_number;
}
printf("Initialized performance benchmark\n");
}
static void pin_to_core(int core_num) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core_num, &cpuset);
pthread_t current_thread = pthread_self();
int rc = pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset);
assert(rc == 0);
}
static void destroy_benchmark(void) {
printf("Destroying the performance benchmark...\n");
spmc_queue_destroy(producer_queue);
spmc_queue_destroy(consumer_queue);
producer_queue = NULL;
consumer_queue = NULL;
free(test_messages);
printf("Destroyed performance benchmark\n");
}
static void *consumer_main(void *arg) {
UNUSED(arg);
pin_to_core(5);
printf("consumer thread spawns\n");
consumer_thread_ready = true;
static struct message message_buf;
while (!test_may_start) {
}
int idx = 0;
while (idx < TEST_MESSAGE_COUNT) {
bool dequeued = spmc_queue_dequeue(consumer_queue, (unsigned char *)&message_buf);
if (dequeued) {
idx++;
test_consumer_sum += message_buf.num;
}
}
return NULL;
}
static void *producer_main(void *arg) {
UNUSED(arg);
pin_to_core(7);
printf("producer thread spawns\n");
producer_thread_ready = true;
while (!test_may_start) {
}
int idx = 0;
while (idx < TEST_MESSAGE_COUNT) {
idx += (int)spmc_queue_enqueue(consumer_queue, (unsigned char *)&test_messages[idx]);
}
return NULL;
}
int main(void) {
struct timespec start;
struct timespec end;
double elapsed_sec;
double total_bytes = (double)TEST_MESSAGE_COUNT * sizeof(struct message);
double throughput_mb;
initialize_benchmark();
pthread_create(&producer_thread, NULL, producer_main, NULL);
pthread_create(&consumer_thread, NULL, consumer_main, NULL);
while (!producer_thread_ready || !consumer_thread_ready) {
static bool prompt = false;
if (!prompt) {
printf("waiting for the producer & consumer thread to be ready...\n");
prompt = true;
}
}
clock_gettime(CLOCK_MONOTONIC, &start);
test_may_start = true;
printf("performance benchmark starts\n");
pthread_join(producer_thread, NULL);
pthread_join(consumer_thread, NULL);
clock_gettime(CLOCK_MONOTONIC, &end);
printf("performance benchmark ends\n");
elapsed_sec = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
throughput_mb = total_bytes / elapsed_sec / (1024 * 1024);
printf("Elapsed time: %.3f seconds\n", elapsed_sec);
printf("Throughput: %.3f MB/s\n", throughput_mb);
printf("test_producer_sum = %ld and test_consumer_sum = %ld\n", test_producer_sum, test_consumer_sum);
assert(test_producer_sum == test_consumer_sum);
destroy_benchmark();
return 0;
}
Currently on my host (Ubuntu 24.04.2) it has the throughput of 1241.186 MB/s based on my benchmark harness.