Line data Source code
1 : /*
2 : * lwan - web server
3 : * Copyright (c) 2020 L. A. F. Pereira <l@tia.mat.br>
4 : *
5 : * This program is free software; you can redistribute it and/or
6 : * modify it under the terms of the GNU General Public License
7 : * as published by the Free Software Foundation; either version 2
8 : * of the License, or any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU General Public License
16 : * along with this program; if not, write to the Free Software
17 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
18 : * USA.
19 : */
20 :
21 : #define _GNU_SOURCE
22 : #include <stdarg.h>
23 : #include <stdio.h>
24 : #include <pthread.h>
25 :
26 : #include "list.h"
27 : #include "ringbuffer.h"
28 : #include "lwan-private.h"
29 :
30 : struct lwan_pubsub_topic {
31 : struct list_head subscribers;
32 : pthread_mutex_t lock;
33 : };
34 :
35 : struct lwan_pubsub_msg {
36 : struct lwan_value value;
37 : unsigned int refcount;
38 : };
39 :
40 0 : DEFINE_RING_BUFFER_TYPE(lwan_pubsub_msg_ref_ring, struct lwan_pubsub_msg *, 16)
41 :
42 : struct lwan_pubsub_msg_ref {
43 : struct list_node ref;
44 : struct lwan_pubsub_msg_ref_ring ring;
45 : };
46 :
47 : struct lwan_pubsub_subscriber {
48 : struct list_node subscriber;
49 :
50 : pthread_mutex_t lock;
51 : struct list_head msg_refs;
52 : };
53 :
54 0 : static void lwan_pubsub_queue_init(struct lwan_pubsub_subscriber *sub)
55 : {
56 0 : list_head_init(&sub->msg_refs);
57 0 : }
58 :
59 0 : static bool lwan_pubsub_queue_put(struct lwan_pubsub_subscriber *sub,
60 : const struct lwan_pubsub_msg *msg)
61 : {
62 : struct lwan_pubsub_msg_ref *ref;
63 :
64 0 : ref = list_tail(&sub->msg_refs, struct lwan_pubsub_msg_ref, ref);
65 0 : if (ref) {
66 : /* Try putting the message in the last ringbuffer in this queue: if it's
67 : * full, will need to allocate a new ring buffer, even if others might
68 : * have space in them: the FIFO order must be preserved, and short of
69 : * compacting the queue at this point -- which will eventually happen
70 : * as it is consumed -- this is the only option. */
71 0 : if (lwan_pubsub_msg_ref_ring_try_put(&ref->ring, &msg))
72 0 : return true;
73 : }
74 :
75 0 : ref = malloc(sizeof(*ref));
76 0 : if (!ref)
77 0 : return false;
78 :
79 0 : lwan_pubsub_msg_ref_ring_init(&ref->ring);
80 0 : lwan_pubsub_msg_ref_ring_put(&ref->ring, &msg);
81 0 : list_add_tail(&sub->msg_refs, &ref->ref);
82 :
83 0 : return true;
84 : }
85 :
86 : static struct lwan_pubsub_msg *
87 0 : lwan_pubsub_queue_get(struct lwan_pubsub_subscriber *sub)
88 : {
89 : struct lwan_pubsub_msg_ref *ref, *next;
90 :
91 0 : list_for_each_safe (&sub->msg_refs, ref, next, ref) {
92 : struct lwan_pubsub_msg *msg;
93 :
94 0 : if (lwan_pubsub_msg_ref_ring_empty(&ref->ring)) {
95 0 : list_del(&ref->ref);
96 0 : free(ref);
97 0 : continue;
98 : }
99 :
100 0 : msg = lwan_pubsub_msg_ref_ring_get(&ref->ring);
101 :
102 0 : if (ref->ref.next != ref->ref.prev) {
103 : /* If this segment isn't the last one, try pulling in just one
104 : * element from the next segment, as there's space in the
105 : * current segment now.
106 : *
107 : * This might lead to an empty ring buffer segment in the middle
108 : * of the linked list. This is by design, to introduce some
109 : * hysteresis and avoid the pathological case where malloc churn
110 : * will happen when subscribers consume at the same rate as
111 : * publishers are able to publish.
112 : *
113 : * The condition above will take care of these empty segments
114 : * once they're dealt with, eventually compacting the queue
115 : * completely (and ultimately reducing it to an empty list
116 : * without any ring buffers).
117 : */
118 : struct lwan_pubsub_msg_ref *next_ring;
119 :
120 0 : next_ring = container_of(ref->ref.next, struct lwan_pubsub_msg_ref, ref);
121 0 : if (!lwan_pubsub_msg_ref_ring_empty(&next_ring->ring)) {
122 : const struct lwan_pubsub_msg *next_msg;
123 :
124 0 : next_msg = lwan_pubsub_msg_ref_ring_get(&next_ring->ring);
125 0 : lwan_pubsub_msg_ref_ring_put(&ref->ring, &next_msg);
126 : }
127 : }
128 :
129 0 : return msg;
130 : }
131 :
132 0 : return NULL;
133 : }
134 :
135 : static void lwan_pubsub_unsubscribe_internal(struct lwan_pubsub_topic *topic,
136 : struct lwan_pubsub_subscriber *sub,
137 : bool take_topic_lock);
138 :
139 0 : struct lwan_pubsub_topic *lwan_pubsub_new_topic(void)
140 : {
141 0 : struct lwan_pubsub_topic *topic = calloc(1, sizeof(*topic));
142 :
143 0 : if (!topic)
144 0 : return NULL;
145 :
146 0 : list_head_init(&topic->subscribers);
147 0 : pthread_mutex_init(&topic->lock, NULL);
148 :
149 0 : return topic;
150 : }
151 :
152 0 : void lwan_pubsub_free_topic(struct lwan_pubsub_topic *topic)
153 : {
154 : struct lwan_pubsub_subscriber *iter, *next;
155 :
156 0 : pthread_mutex_lock(&topic->lock);
157 0 : list_for_each_safe (&topic->subscribers, iter, next, subscriber)
158 0 : lwan_pubsub_unsubscribe_internal(topic, iter, false);
159 0 : pthread_mutex_unlock(&topic->lock);
160 :
161 0 : pthread_mutex_destroy(&topic->lock);
162 :
163 0 : free(topic);
164 0 : }
165 :
166 0 : void lwan_pubsub_msg_done(struct lwan_pubsub_msg *msg)
167 : {
168 0 : if (!ATOMIC_DEC(msg->refcount)) {
169 0 : free(msg->value.value);
170 0 : free(msg);
171 : }
172 0 : }
173 :
174 0 : static bool lwan_pubsub_publish_value(struct lwan_pubsub_topic *topic,
175 : const struct lwan_value value)
176 : {
177 0 : struct lwan_pubsub_msg *msg = malloc(sizeof(*msg));
178 : struct lwan_pubsub_subscriber *sub;
179 :
180 0 : if (!msg)
181 0 : return false;
182 :
183 : /* Initialize refcount to 1, so we can drop one ref after publishing to
184 : * all subscribers. If it drops to 0, it means we didn't publish the
185 : * message and we can free it. */
186 0 : msg->refcount = 1;
187 0 : msg->value = value;
188 :
189 0 : pthread_mutex_lock(&topic->lock);
190 0 : list_for_each (&topic->subscribers, sub, subscriber) {
191 0 : ATOMIC_INC(msg->refcount);
192 :
193 0 : pthread_mutex_lock(&sub->lock);
194 0 : if (!lwan_pubsub_queue_put(sub, msg)) {
195 0 : lwan_status_warning("Couldn't enqueue message, dropping");
196 0 : ATOMIC_DEC(msg->refcount);
197 : }
198 0 : pthread_mutex_unlock(&sub->lock);
199 : }
200 0 : pthread_mutex_unlock(&topic->lock);
201 :
202 0 : lwan_pubsub_msg_done(msg);
203 :
204 0 : return true;
205 : }
206 :
207 0 : static void *my_memdup(const void *src, size_t len)
208 : {
209 0 : void *dup = malloc(len);
210 :
211 0 : return dup ? memcpy(dup, src, len) : NULL;
212 : }
213 :
214 0 : bool lwan_pubsub_publish(struct lwan_pubsub_topic *topic,
215 : const void *contents,
216 : size_t len)
217 : {
218 0 : const struct lwan_value value = { .value = my_memdup(contents, len), .len = len };
219 :
220 0 : if (!value.value)
221 0 : return false;
222 :
223 0 : return lwan_pubsub_publish_value(topic, value);
224 : }
225 :
226 0 : bool lwan_pubsub_publishf(struct lwan_pubsub_topic *topic,
227 : const char *format,
228 : ...)
229 : {
230 : char *msg;
231 : int len;
232 : va_list ap;
233 :
234 0 : va_start(ap, format);
235 0 : len = vasprintf(&msg, format, ap);
236 0 : va_end(ap);
237 :
238 0 : if (len < 0)
239 0 : return false;
240 :
241 0 : const struct lwan_value value = { .value = msg, .len = (size_t)len };
242 0 : return lwan_pubsub_publish_value(topic, value);
243 : }
244 :
245 : struct lwan_pubsub_subscriber *
246 0 : lwan_pubsub_subscribe(struct lwan_pubsub_topic *topic)
247 : {
248 0 : struct lwan_pubsub_subscriber *sub = calloc(1, sizeof(*sub));
249 :
250 0 : if (!sub)
251 0 : return NULL;
252 :
253 0 : pthread_mutex_init(&sub->lock, NULL);
254 0 : lwan_pubsub_queue_init(sub);
255 :
256 0 : pthread_mutex_lock(&topic->lock);
257 0 : list_add(&topic->subscribers, &sub->subscriber);
258 0 : pthread_mutex_unlock(&topic->lock);
259 :
260 0 : return sub;
261 : }
262 :
263 0 : struct lwan_pubsub_msg *lwan_pubsub_consume(struct lwan_pubsub_subscriber *sub)
264 : {
265 : struct lwan_pubsub_msg *msg;
266 :
267 0 : pthread_mutex_lock(&sub->lock);
268 0 : msg = lwan_pubsub_queue_get(sub);
269 0 : pthread_mutex_unlock(&sub->lock);
270 :
271 0 : return msg;
272 : }
273 :
274 0 : static void lwan_pubsub_unsubscribe_internal(struct lwan_pubsub_topic *topic,
275 : struct lwan_pubsub_subscriber *sub,
276 : bool take_topic_lock)
277 : {
278 : struct lwan_pubsub_msg *iter;
279 :
280 0 : if (take_topic_lock)
281 0 : pthread_mutex_lock(&topic->lock);
282 0 : list_del(&sub->subscriber);
283 0 : if (take_topic_lock)
284 0 : pthread_mutex_unlock(&topic->lock);
285 :
286 0 : pthread_mutex_lock(&sub->lock);
287 0 : while ((iter = lwan_pubsub_queue_get(sub)))
288 0 : lwan_pubsub_msg_done(iter);
289 0 : pthread_mutex_unlock(&sub->lock);
290 :
291 0 : pthread_mutex_destroy(&sub->lock);
292 0 : free(sub);
293 0 : }
294 :
295 0 : void lwan_pubsub_unsubscribe(struct lwan_pubsub_topic *topic,
296 : struct lwan_pubsub_subscriber *sub)
297 : {
298 0 : return (void)lwan_pubsub_unsubscribe_internal(topic, sub, true);
299 : }
300 :
301 0 : const struct lwan_value *lwan_pubsub_msg_value(const struct lwan_pubsub_msg *msg)
302 : {
303 0 : return &msg->value;
304 : }
|