8
\$\begingroup\$

Recently I am working on implementing a shared-memory based IPC message queue in C programming language on Linux system. A few design choices I've made include

  • The queue will have only 1 producer, possibly 1 or more consumers. (Currently my implementation is only 1 producer 1 consumer yet)
  • The queue will be bounded in size and wrap around in a circular fashion
  • When the consumers are lagging behind, the producer will wait until the queue is drained

I have the basic prototype working and wrap it in an reusable API. I am seeking code review improvement advice both on

  1. How to make the code be written better in term of readability and reusability
  2. How to make the code more performant

Here is the full code, including a benchmark harness:

  • header file: spmc_queue.h
#include <stdatomic.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdalign.h>
#define SPMC_QUEUE_VERSION 0
#define L1_DCACHE_LINESIZE 64               // found via /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size

typedef _Atomic int64_t atomic_int64_t;

enum spmc_mode { spmc_mode_reader = 0, spmc_mode_writer };

typedef struct spmc_header {
  int fd;
  char *path;
  enum spmc_mode mode;
  size_t shared_size;
} spmc_header_t;

typedef struct spmc_shared {
  uint8_t version;
  size_t element_capacity;
  size_t element_size;
  bool initialized;
  bool client_connected;
  alignas(L1_DCACHE_LINESIZE) atomic_int64_t writer_idx;
  alignas(L1_DCACHE_LINESIZE) atomic_int64_t reader_idx;
  alignas(L1_DCACHE_LINESIZE) uint8_t data[];
} spmc_shared_t;

typedef struct spmc_queue {
  spmc_header_t header;
  spmc_shared_t *shared;
} spmc_queue_t;

spmc_queue_t *spmc_queue_create(const char *path, size_t element_size, size_t element_capacity, enum spmc_mode mode);
void spmc_queue_destroy(spmc_queue_t *queue);
bool spmc_queue_enqueue(spmc_queue_t *queue, uint8_t *src_data);
bool spmc_queue_dequeue(spmc_queue_t *queue, uint8_t *dst_data);
  • source file: spmc_queue.c
#define _POSIX_C_SOURCE 200809L
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include "spmc_queue.h"

spmc_queue_t *spmc_queue_create(const char *path, size_t element_size, size_t element_capacity, enum spmc_mode mode) {
  if (!path || element_size == 0 || element_capacity == 0 || (mode != spmc_mode_reader && mode != spmc_mode_writer)) {
    fprintf(stderr, "spmc_queue_create: invalid arguments (path:%s element_size:%zu element_capacity:%zu mode:%d\n",
            path, element_size, element_capacity, mode);
    return NULL;
  }
  printf("creating spmc_queue %s of size %zu and capacity %zu with mode %s\n", path, element_size, element_capacity,
         (mode == spmc_mode_reader) ? "reader" : "writer");
  spmc_queue_t *queue = NULL;
  spmc_shared_t *shared = NULL;
  int fd = -1;
  size_t shared_size = sizeof(spmc_shared_t) + element_size * element_capacity;
  int oflag = (mode == spmc_mode_reader) ? O_RDWR : O_RDWR | O_CREAT | O_EXCL;

  fd = shm_open(path, oflag, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
  if (fd == -1) {
    perror("shm_open");
    goto cleanup;
  }
  if (mode == spmc_mode_writer) {
    int rt = ftruncate(fd, shared_size);
    if (rt == -1) {
      perror("ftruncate");
      goto cleanup;
    }
  }
  queue = calloc(1, sizeof(spmc_queue_t));
  if (!queue) {
    perror("calloc(1, sizeof(spmc_queue_t))");
    goto cleanup;
  }
  shared = mmap(NULL, shared_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
  if (shared == MAP_FAILED) {
    perror("mmap");
    goto cleanup;
  }

  queue->header.fd = fd;
  queue->header.mode = mode;
  queue->header.path = strdup(path);
  if (!queue->header.path) {
    perror("strdup");
    goto cleanup;
  }
  queue->header.shared_size = shared_size;
  queue->shared = shared;

  if (mode == spmc_mode_writer) {
    memset(shared, 0, shared_size);
    queue->shared->version = SPMC_QUEUE_VERSION;
    queue->shared->initialized = true;
    queue->shared->client_connected = false;
    queue->shared->element_size = element_size;
    queue->shared->element_capacity = element_capacity;
    queue->shared->writer_idx = 0;
  }
  if (mode == spmc_mode_reader) {
    if (!queue->shared->initialized) {
      fprintf(stderr, "spmc_queue: reader mode encounters shared memory not initialized yet by writer\n");
      goto cleanup;
    }
    if (queue->shared->element_capacity != element_capacity) {
      fprintf(stderr, "spmc_queue: element_capacity %zu != queue's element_capacity %zu\n",
              queue->shared->element_capacity, element_capacity);
      goto cleanup;
    }
    if (queue->shared->element_size < element_size) {
      fprintf(stderr, "spmc_queue: element size %zu > queue's element size %zu\n", element_size,
              queue->shared->element_size);
      goto cleanup;
    }
    queue->shared->client_connected = true;
    queue->shared->reader_idx = 0;
  }
  printf("spmc_queue created\n");
  return queue;

cleanup:
  if (fd != -1)
    close(fd);
  if (shared && shared != MAP_FAILED)
    munmap(shared, shared_size);
  if (mode == spmc_mode_writer && path)
    shm_unlink(path);
  if (queue && queue->header.path)
    free(queue->header.path);
  if (queue)
    free(queue);
  return NULL;
}

void spmc_queue_destroy(spmc_queue_t *queue) {
  printf("destroying spmc_queue %s of mode %s\n", queue->header.path,
         (queue->header.mode == spmc_mode_reader) ? "reader" : "writer");
  int fd = queue->header.fd;
  char *path = queue->header.path;
  enum spmc_mode mode = queue->header.mode;
  munmap(queue->shared, queue->header.shared_size);
  close(fd);
  if (mode == spmc_mode_writer) {
    // writer owns the lifecycle of the queue
    shm_unlink(path);
  }
  free(path);
  free(queue);
  printf("spmc_queue destroyed\n");
}

bool spmc_queue_enqueue(spmc_queue_t *queue, uint8_t *src_data) {
  if (!queue->shared->client_connected) {
    return false;
  }
  int64_t reader_idx = atomic_load_explicit(&queue->shared->reader_idx, memory_order_acquire);
  int64_t writer_idx = atomic_load_explicit(&queue->shared->writer_idx, memory_order_relaxed);
  if (writer_idx >= reader_idx + queue->shared->element_capacity) {
    // queue full
    return false;
  }
  int64_t idx = writer_idx % queue->shared->element_capacity;
  memcpy(&queue->shared->data[idx * queue->shared->element_size], src_data, queue->shared->element_size);
  atomic_store_explicit(&queue->shared->writer_idx, writer_idx+1, memory_order_release);
  return true;
}

bool spmc_queue_dequeue(spmc_queue_t *queue, uint8_t *dst_data) {
  int64_t reader_idx = atomic_load_explicit(&queue->shared->reader_idx, memory_order_relaxed);
  int64_t writer_idx = atomic_load_explicit(&queue->shared->writer_idx, memory_order_acquire);
  if (reader_idx >= writer_idx) {
    // queue empty
    return false;
  }
  int64_t idx = reader_idx % queue->shared->element_capacity;
  memcpy(dst_data, &queue->shared->data[idx * queue->shared->element_size], queue->shared->element_size);
  atomic_store_explicit(&queue->shared->reader_idx, reader_idx+1, memory_order_release);
  return true;
}
  • benchmark harness: benchmark.c
#define _POSIX_C_SOURCE 200809L
#define _GNU_SOURCE
#include "spmc_queue.h"
#include <assert.h>
#include <pthread.h>
#include <sched.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#define UNUSED(arg) ((void)arg)
#define TEST_MESSAGE_COUNT 1024 * 1024 * 64 // 64 MB count * 64 bytes = 4 GB data

#define QUEUE_CAPACITY 1024
struct message {
  int64_t num;
  char padding[L1_DCACHE_LINESIZE - sizeof(int64_t)];
};

static spmc_queue_t *producer_queue = NULL;
static spmc_queue_t *consumer_queue = NULL;
static pthread_t producer_thread;
static pthread_t consumer_thread;
static struct message *test_messages;

static volatile bool producer_thread_ready = false;
static volatile bool consumer_thread_ready = false;
static volatile bool test_may_start = false;
static int64_t test_producer_sum = 0;
static int64_t test_consumer_sum = 0;

static void initialize_benchmark(void) {
  printf("Initializing the performance benchmark...\n");
  producer_thread_ready = false;
  consumer_thread_ready = false;
  test_may_start = false;
  test_producer_sum = 0;
  test_consumer_sum = 0;
  producer_queue = spmc_queue_create("/spmc_benchmark_queue", sizeof(struct message), QUEUE_CAPACITY, spmc_mode_writer);
  consumer_queue = spmc_queue_create("/spmc_benchmark_queue", sizeof(struct message), QUEUE_CAPACITY, spmc_mode_reader);
  assert(producer_queue != NULL);
  assert(consumer_queue != NULL);
  test_messages = calloc(TEST_MESSAGE_COUNT, sizeof(struct message));
  for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
    int random_number = rand() % 5;
    test_messages[i].num = random_number;
    test_producer_sum += random_number;
  }
  printf("Initialized performance benchmark\n");
}

static void pin_to_core(int core_num) {
  cpu_set_t cpuset;
  CPU_ZERO(&cpuset);
  CPU_SET(core_num, &cpuset);
  pthread_t current_thread = pthread_self();
  int rc = pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset);
  assert(rc == 0);
}

static void destroy_benchmark(void) {
  printf("Destroying the performance benchmark...\n");
  spmc_queue_destroy(producer_queue);
  spmc_queue_destroy(consumer_queue);
  producer_queue = NULL;
  consumer_queue = NULL;
  free(test_messages);
  printf("Destroyed performance benchmark\n");
}

static void *consumer_main(void *arg) {
  UNUSED(arg);
  pin_to_core(5);
  printf("consumer thread spawns\n");
  consumer_thread_ready = true;
  static struct message message_buf;
  while (!test_may_start) {
  }
  int idx = 0;
  while (idx < TEST_MESSAGE_COUNT) {
    bool dequeued = spmc_queue_dequeue(consumer_queue, (unsigned char *)&message_buf);
    if (dequeued) {
      idx++;
      test_consumer_sum += message_buf.num;
    }
  }
  return NULL;
}

static void *producer_main(void *arg) {
  UNUSED(arg);
  pin_to_core(7);
  printf("producer thread spawns\n");
  producer_thread_ready = true;
  while (!test_may_start) {
  }
  int idx = 0;
  while (idx < TEST_MESSAGE_COUNT) {
    idx += (int)spmc_queue_enqueue(consumer_queue, (unsigned char *)&test_messages[idx]);
  }
  return NULL;
}

int main(void) {
  struct timespec start;
  struct timespec end;
  double elapsed_sec;
  double total_bytes = (double)TEST_MESSAGE_COUNT * sizeof(struct message);
  double throughput_mb;

  initialize_benchmark();
  pthread_create(&producer_thread, NULL, producer_main, NULL);
  pthread_create(&consumer_thread, NULL, consumer_main, NULL);
  while (!producer_thread_ready || !consumer_thread_ready) {
    static bool prompt = false;
    if (!prompt) {
      printf("waiting for the producer & consumer thread to be ready...\n");
      prompt = true;
    }
  }
  clock_gettime(CLOCK_MONOTONIC, &start);
  test_may_start = true;
  printf("performance benchmark starts\n");
  pthread_join(producer_thread, NULL);
  pthread_join(consumer_thread, NULL);
  clock_gettime(CLOCK_MONOTONIC, &end);
  printf("performance benchmark ends\n");

  elapsed_sec = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
  throughput_mb = total_bytes / elapsed_sec / (1024 * 1024);
  printf("Elapsed time: %.3f seconds\n", elapsed_sec);
  printf("Throughput: %.3f MB/s\n", throughput_mb);
  printf("test_producer_sum = %ld and test_consumer_sum = %ld\n", test_producer_sum, test_consumer_sum);
  assert(test_producer_sum == test_consumer_sum);
  destroy_benchmark();
  return 0;
}

Currently on my host (Ubuntu 24.04.2) it has the throughput of 1241.186 MB/s based on my benchmark harness.

New contributor
Yukun Jiang is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct.
\$\endgroup\$
1
  • \$\begingroup\$ Welcome to Code Review! Please don't modify the code in your question once it has been answered. You could post improved code as a new question, as an answer, or as a link to an external site - as described in I improved my code based on the reviews. What next?. I have rolled back the edit, so that it's clear exactly what version has been reviewed. \$\endgroup\$ Commented yesterday

1 Answer 1

7
\$\begingroup\$

Please add an include guard to the header so that it can safely be included more than once in a translation unit.


Take care with names:

typedef _Atomic int64_t atomic_int64_t;

POSIX reserves all names ending in _t for future expansion, so this one is risky. It's not clear why we use a signed type here rather than, say, size_t. The mixed-signedness arithmetic results in many warnings when I compile this code.


client_connected can be changed by the other process - it should be atomic.


In the implementation file, I recommend including the corresponding header as the first include. This gives us confidence that the header is complete and has no hidden dependencies.


This looks like leftover debugging, rather than something we'd want to be intermingled with program output:

  printf("creating spmc_queue %s of size %zu and capacity %zu with mode %s\n", path, element_size, element_capacity,
         (mode == spmc_mode_reader) ? "reader" : "writer");

calloc() initialises the content to char zero. This doesn't necessarily result in null pointers (it's possible that Linux defines all-zero as a null pointer, but better to be portable). I think it's better to use malloc() and initialise the members immediately (move the mmap() call outside of the queue initialisation).


Beware that although POSIX specifies that errno is set on allocation failure, the C standard doesn't require that, so we have a non-portable assumption here - perhaps worth a comment?

It's curious that the other perror() calls supply just the function name, but we are inconsistent with the calloc one.


We're inconsistent here:

  queue->shared = shared;

  if (mode == spmc_mode_writer) {
    memset(shared, 0, shared_size);
    queue->shared->version = SPMC_QUEUE_VERSION;

It would be clearer to refer to shared or queue->shared consistently here.


This shouldn't be an error:

    if (!queue->shared->initialized) {
      fprintf(stderr, "spmc_queue: reader mode encounters shared memory not initialized yet by writer\n");
      goto cleanup;
    }

It's a quite predictable race condition to find that the file exists before the writer has completed the initialisation. Consider polling this variable (remember to make it atomic) or having the writer create the file with a temporary name and renaming once initialisation is complete.


The cleanup block over-tests. It's perfectly reasonable to pass a null pointer to free(), and there's no benefit to avoiding that. I believe it's also safe to pass MAP_FAILED to munmap() (which should then result in EINVAL). Also, we know that path is not null because we tested at the start of the function (declaring as const char *const path would help readers see that it's not subsequently assigned).

That would allow:

  spmc_shared_t *shared = MAP_FAILED;  /* better choice than NULL */
  ⋮

cleanup:
  if (fd != -1)
    close(fd);
  munmap(shared, shared_size);
  if (mode == spmc_mode_writer)
    shm_unlink(path);
  if (queue)
    free(queue->header.path);
  free(queue);

When we queue or enqueue, we add one to the index, and never reset. I think it needs a comment to justify why we'll never have (signed) arithmetic overflow here (it's just a matter of time!).


I provide no review of the benchmark program.

\$\endgroup\$
2
  • \$\begingroup\$ Hi @Toby Speight Really appreciate the code review comments. I've made corresponding adjustment to the code and posted the newer version. There is 1 part I don't quite follow on your comments. You said ``` calloc() initialises the content to char zero. This doesn't necessarily result in null pointers ``` I checked en.cppreference.com/w/c/memory/calloc.html it need initialize all bytes to zero. So the concerns here is that NULL the alias for null pointer might not be of byte 0? Is it possible? \$\endgroup\$ Commented yesterday
  • \$\begingroup\$ Other way around - a pointer constructed from zero bytes is not necessarily a null pointer. That's possible in general; as I say, I don't know whether Linux specifies additional constraints that such a pointer must be a null pointer. (Remember that NULL expands to 0, which must convert to a null pointer; all null pointers compare equal). \$\endgroup\$ Commented yesterday

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.