Line data Source code
1 : /*
2 : * SPSC Bounded Queue
3 : * Based on public domain C++ version by mstump[1]. Released under
4 : * the same license terms.
5 : *
6 : * [1]
7 : * https://github.com/mstump/queues/blob/master/include/spsc-bounded-queue.hpp
8 : */
9 :
10 : #include <errno.h>
11 : #include <stdbool.h>
12 : #include <stdint.h>
13 : #include <stdlib.h>
14 : #include <string.h>
15 :
16 : #include "queue.h"
17 : #include "lwan-private.h"
18 :
19 : #if !defined(ATOMIC_RELAXED)
20 :
21 : #define ATOMIC_RELAXED __ATOMIC_RELAXED
22 : #define ATOMIC_ACQUIRE __ATOMIC_ACQUIRE
23 : #define ATOMIC_RELEASE __ATOMIC_RELEASE
24 :
25 : #endif
26 :
27 : #if defined(__GNUC__)
28 :
29 : #if (__GNUC__ * 100 + __GNUC_MINOR__ >= 470)
30 : #define HAS_GCC_ATOMIC 1
31 : #else
32 : #define HAS_SYNC_ATOMIC 1
33 : #endif
34 :
35 : #endif
36 :
37 : #if HAS_GCC_ATOMIC
38 :
39 : #define ATOMIC_INIT(P, V) \
40 : do { \
41 : (P) = (V); \
42 : } while (0)
43 :
44 : #define ATOMIC_LOAD(P, O) __atomic_load_n((P), (O))
45 : #define ATOMIC_STORE(P, V, O) __atomic_store_n((P), (V), (O))
46 :
47 : #elif HAS_SYNC_ATOMIC
48 :
49 : #define ATOMIC_INIT(P, V) \
50 : do { \
51 : (P) = (V); \
52 : } while (0)
53 :
54 : #define ATOMIC_LOAD(P, O) __sync_fetch_and_add((P), 0)
55 : #define ATOMIC_STORE(P, V, O) \
56 : ({ \
57 : __sync_synchronize(); \
58 : __sync_lock_test_and_set((P), (V)); \
59 : })
60 :
61 : #else
62 :
63 : #error Unsupported compiler.
64 :
65 : #endif
66 :
67 186 : int spsc_queue_init(struct spsc_queue *q, size_t size)
68 : {
69 186 : if (size == 0)
70 0 : return -EINVAL;
71 :
72 186 : size = lwan_nextpow2(size);
73 186 : q->buffer = calloc(1 + size, sizeof(int));
74 186 : if (!q->buffer)
75 0 : return -errno;
76 :
77 186 : ATOMIC_INIT(q->head, 0);
78 186 : ATOMIC_INIT(q->tail, 0);
79 :
80 186 : q->size = size;
81 186 : q->mask = size - 1;
82 :
83 186 : return 0;
84 : }
85 :
86 0 : void spsc_queue_free(struct spsc_queue *q) { free(q->buffer); }
87 :
88 304 : bool spsc_queue_push(struct spsc_queue *q, int input)
89 : {
90 304 : const size_t head = ATOMIC_LOAD(&q->head, ATOMIC_RELAXED);
91 :
92 304 : if (((ATOMIC_LOAD(&q->tail, ATOMIC_ACQUIRE) - (head + 1)) & q->mask) >= 1) {
93 304 : q->buffer[head & q->mask] = input;
94 304 : ATOMIC_STORE(&q->head, head + 1, ATOMIC_RELEASE);
95 :
96 304 : return true;
97 : }
98 :
99 0 : return false;
100 : }
101 :
102 608 : bool spsc_queue_pop(struct spsc_queue *q, int *output)
103 : {
104 608 : const size_t tail = ATOMIC_LOAD(&q->tail, ATOMIC_RELAXED);
105 :
106 608 : if (((ATOMIC_LOAD(&q->head, ATOMIC_ACQUIRE) - tail) & q->mask) >= 1) {
107 304 : *output = q->buffer[tail & q->mask];
108 :
109 304 : ATOMIC_STORE(&q->tail, tail + 1, ATOMIC_RELEASE);
110 :
111 304 : return true;
112 : }
113 :
114 304 : return false;
115 : }
|