LCOV - code coverage report
Current view: top level - lib - lwan-pubsub.c (source / functions) Hit Total Coverage
Test: coverage.info.cleaned Lines: 0 115 0.0 %
Date: 2023-04-18 16:19:03 Functions: 0 23 0.0 %

          Line data    Source code
       1             : /*
       2             :  * lwan - web server
       3             :  * Copyright (c) 2020 L. A. F. Pereira <l@tia.mat.br>
       4             :  *
       5             :  * This program is free software; you can redistribute it and/or
       6             :  * modify it under the terms of the GNU General Public License
       7             :  * as published by the Free Software Foundation; either version 2
       8             :  * of the License, or any later version.
       9             :  *
      10             :  * This program is distributed in the hope that it will be useful,
      11             :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      12             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      13             :  * GNU General Public License for more details.
      14             :  *
      15             :  * You should have received a copy of the GNU General Public License
      16             :  * along with this program; if not, write to the Free Software
      17             :  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301,
      18             :  * USA.
      19             :  */
      20             : 
      21             : #define _GNU_SOURCE
      22             : #include <stdarg.h>
      23             : #include <stdio.h>
      24             : #include <pthread.h>
      25             : 
      26             : #include "list.h"
      27             : #include "ringbuffer.h"
      28             : #include "lwan-private.h"
      29             : 
      30             : struct lwan_pubsub_topic {
      31             :     struct list_head subscribers;
      32             :     pthread_mutex_t lock;
      33             : };
      34             : 
      35             : struct lwan_pubsub_msg {
      36             :     struct lwan_value value;
      37             :     unsigned int refcount;
      38             : };
      39             : 
      40           0 : DEFINE_RING_BUFFER_TYPE(lwan_pubsub_msg_ref_ring, struct lwan_pubsub_msg *, 16)
      41             : 
      42             : struct lwan_pubsub_msg_ref {
      43             :     struct list_node ref;
      44             :     struct lwan_pubsub_msg_ref_ring ring;
      45             : };
      46             : 
      47             : struct lwan_pubsub_subscriber {
      48             :     struct list_node subscriber;
      49             : 
      50             :     pthread_mutex_t lock;
      51             :     struct list_head msg_refs;
      52             : };
      53             : 
      54           0 : static void lwan_pubsub_queue_init(struct lwan_pubsub_subscriber *sub)
      55             : {
      56           0 :     list_head_init(&sub->msg_refs);
      57           0 : }
      58             : 
      59           0 : static bool lwan_pubsub_queue_put(struct lwan_pubsub_subscriber *sub,
      60             :                                   const struct lwan_pubsub_msg *msg)
      61             : {
      62             :     struct lwan_pubsub_msg_ref *ref;
      63             : 
      64           0 :     ref = list_tail(&sub->msg_refs, struct lwan_pubsub_msg_ref, ref);
      65           0 :     if (ref) {
      66             :         /* Try putting the message in the last ringbuffer in this queue: if it's
      67             :          * full, will need to allocate a new ring buffer, even if others might
      68             :          * have space in them:  the FIFO order must be preserved, and short of
      69             :          * compacting the queue at this point -- which will eventually happen
      70             :          * as it is consumed -- this is the only option. */
      71           0 :         if (lwan_pubsub_msg_ref_ring_try_put(&ref->ring, &msg))
      72           0 :             return true;
      73             :     }
      74             : 
      75           0 :     ref = malloc(sizeof(*ref));
      76           0 :     if (!ref)
      77           0 :         return false;
      78             : 
      79           0 :     lwan_pubsub_msg_ref_ring_init(&ref->ring);
      80           0 :     lwan_pubsub_msg_ref_ring_put(&ref->ring, &msg);
      81           0 :     list_add_tail(&sub->msg_refs, &ref->ref);
      82             : 
      83           0 :     return true;
      84             : }
      85             : 
      86             : static struct lwan_pubsub_msg *
      87           0 : lwan_pubsub_queue_get(struct lwan_pubsub_subscriber *sub)
      88             : {
      89             :     struct lwan_pubsub_msg_ref *ref, *next;
      90             : 
      91           0 :     list_for_each_safe (&sub->msg_refs, ref, next, ref) {
      92             :         struct lwan_pubsub_msg *msg;
      93             : 
      94           0 :         if (lwan_pubsub_msg_ref_ring_empty(&ref->ring)) {
      95           0 :             list_del(&ref->ref);
      96           0 :             free(ref);
      97           0 :             continue;
      98             :         }
      99             : 
     100           0 :         msg = lwan_pubsub_msg_ref_ring_get(&ref->ring);
     101             : 
     102           0 :         if (ref->ref.next != ref->ref.prev) {
     103             :             /* If this segment isn't the last one, try pulling in just one
     104             :              * element from the next segment, as there's space in the
     105             :              * current segment now.
     106             :              *
     107             :              * This might lead to an empty ring buffer segment in the middle
     108             :              * of the linked list.  This is by design, to introduce some
     109             :              * hysteresis and avoid the pathological case where malloc churn
     110             :              * will happen when subscribers consume at the same rate as
     111             :              * publishers are able to publish.
     112             :              *
     113             :              * The condition above will take care of these empty segments
     114             :              * once they're dealt with, eventually compacting the queue
     115             :              * completely (and ultimately reducing it to an empty list
     116             :              * without any ring buffers).
     117             :              */
     118             :             struct lwan_pubsub_msg_ref *next_ring;
     119             : 
     120           0 :             next_ring = container_of(ref->ref.next, struct lwan_pubsub_msg_ref, ref);
     121           0 :             if (!lwan_pubsub_msg_ref_ring_empty(&next_ring->ring)) {
     122             :                 const struct lwan_pubsub_msg *next_msg;
     123             : 
     124           0 :                 next_msg = lwan_pubsub_msg_ref_ring_get(&next_ring->ring);
     125           0 :                 lwan_pubsub_msg_ref_ring_put(&ref->ring, &next_msg);
     126             :             }
     127             :         }
     128             : 
     129           0 :         return msg;
     130             :     }
     131             : 
     132           0 :     return NULL;
     133             : }
     134             : 
     135             : static void lwan_pubsub_unsubscribe_internal(struct lwan_pubsub_topic *topic,
     136             :                                              struct lwan_pubsub_subscriber *sub,
     137             :                                              bool take_topic_lock);
     138             : 
     139           0 : struct lwan_pubsub_topic *lwan_pubsub_new_topic(void)
     140             : {
     141           0 :     struct lwan_pubsub_topic *topic = calloc(1, sizeof(*topic));
     142             : 
     143           0 :     if (!topic)
     144           0 :         return NULL;
     145             : 
     146           0 :     list_head_init(&topic->subscribers);
     147           0 :     pthread_mutex_init(&topic->lock, NULL);
     148             : 
     149           0 :     return topic;
     150             : }
     151             : 
     152           0 : void lwan_pubsub_free_topic(struct lwan_pubsub_topic *topic)
     153             : {
     154             :     struct lwan_pubsub_subscriber *iter, *next;
     155             : 
     156           0 :     pthread_mutex_lock(&topic->lock);
     157           0 :     list_for_each_safe (&topic->subscribers, iter, next, subscriber)
     158           0 :         lwan_pubsub_unsubscribe_internal(topic, iter, false);
     159           0 :     pthread_mutex_unlock(&topic->lock);
     160             : 
     161           0 :     pthread_mutex_destroy(&topic->lock);
     162             : 
     163           0 :     free(topic);
     164           0 : }
     165             : 
     166           0 : void lwan_pubsub_msg_done(struct lwan_pubsub_msg *msg)
     167             : {
     168           0 :     if (!ATOMIC_DEC(msg->refcount)) {
     169           0 :         free(msg->value.value);
     170           0 :         free(msg);
     171             :     }
     172           0 : }
     173             : 
     174           0 : static bool lwan_pubsub_publish_value(struct lwan_pubsub_topic *topic,
     175             :                                       const struct lwan_value value)
     176             : {
     177           0 :     struct lwan_pubsub_msg *msg = malloc(sizeof(*msg));
     178             :     struct lwan_pubsub_subscriber *sub;
     179             : 
     180           0 :     if (!msg)
     181           0 :         return false;
     182             : 
     183             :     /* Initialize refcount to 1, so we can drop one ref after publishing to
     184             :      * all subscribers.  If it drops to 0, it means we didn't publish the
     185             :      * message and we can free it. */
     186           0 :     msg->refcount = 1;
     187           0 :     msg->value = value;
     188             : 
     189           0 :     pthread_mutex_lock(&topic->lock);
     190           0 :     list_for_each (&topic->subscribers, sub, subscriber) {
     191           0 :         ATOMIC_INC(msg->refcount);
     192             : 
     193           0 :         pthread_mutex_lock(&sub->lock);
     194           0 :         if (!lwan_pubsub_queue_put(sub, msg)) {
     195           0 :             lwan_status_warning("Couldn't enqueue message, dropping");
     196           0 :             ATOMIC_DEC(msg->refcount);
     197             :         }
     198           0 :         pthread_mutex_unlock(&sub->lock);
     199             :     }
     200           0 :     pthread_mutex_unlock(&topic->lock);
     201             : 
     202           0 :     lwan_pubsub_msg_done(msg);
     203             : 
     204           0 :     return true;
     205             : }
     206             : 
     207           0 : static void *my_memdup(const void *src, size_t len)
     208             : {
     209           0 :     void *dup = malloc(len);
     210             : 
     211           0 :     return dup ? memcpy(dup, src, len) : NULL;
     212             : }
     213             : 
     214           0 : bool lwan_pubsub_publish(struct lwan_pubsub_topic *topic,
     215             :                          const void *contents,
     216             :                          size_t len)
     217             : {
     218           0 :     const struct lwan_value value = { .value = my_memdup(contents, len), .len = len };
     219             : 
     220           0 :     if (!value.value)
     221           0 :         return false;
     222             : 
     223           0 :     return lwan_pubsub_publish_value(topic, value);
     224             : }
     225             : 
     226           0 : bool lwan_pubsub_publishf(struct lwan_pubsub_topic *topic,
     227             :                           const char *format,
     228             :                           ...)
     229             : {
     230             :     char *msg;
     231             :     int len;
     232             :     va_list ap;
     233             : 
     234           0 :     va_start(ap, format);
     235           0 :     len = vasprintf(&msg, format, ap);
     236           0 :     va_end(ap);
     237             : 
     238           0 :     if (len < 0)
     239           0 :         return false;
     240             : 
     241           0 :     const struct lwan_value value = { .value = msg, .len = (size_t)len };
     242           0 :     return lwan_pubsub_publish_value(topic, value);
     243             : }
     244             : 
     245             : struct lwan_pubsub_subscriber *
     246           0 : lwan_pubsub_subscribe(struct lwan_pubsub_topic *topic)
     247             : {
     248           0 :     struct lwan_pubsub_subscriber *sub = calloc(1, sizeof(*sub));
     249             : 
     250           0 :     if (!sub)
     251           0 :         return NULL;
     252             : 
     253           0 :     pthread_mutex_init(&sub->lock, NULL);
     254           0 :     lwan_pubsub_queue_init(sub);
     255             : 
     256           0 :     pthread_mutex_lock(&topic->lock);
     257           0 :     list_add(&topic->subscribers, &sub->subscriber);
     258           0 :     pthread_mutex_unlock(&topic->lock);
     259             : 
     260           0 :     return sub;
     261             : }
     262             : 
     263           0 : struct lwan_pubsub_msg *lwan_pubsub_consume(struct lwan_pubsub_subscriber *sub)
     264             : {
     265             :     struct lwan_pubsub_msg *msg;
     266             : 
     267           0 :     pthread_mutex_lock(&sub->lock);
     268           0 :     msg = lwan_pubsub_queue_get(sub);
     269           0 :     pthread_mutex_unlock(&sub->lock);
     270             : 
     271           0 :     return msg;
     272             : }
     273             : 
     274           0 : static void lwan_pubsub_unsubscribe_internal(struct lwan_pubsub_topic *topic,
     275             :                                              struct lwan_pubsub_subscriber *sub,
     276             :                                              bool take_topic_lock)
     277             : {
     278             :     struct lwan_pubsub_msg *iter;
     279             : 
     280           0 :     if (take_topic_lock)
     281           0 :         pthread_mutex_lock(&topic->lock);
     282           0 :     list_del(&sub->subscriber);
     283           0 :     if (take_topic_lock)
     284           0 :         pthread_mutex_unlock(&topic->lock);
     285             : 
     286           0 :     pthread_mutex_lock(&sub->lock);
     287           0 :     while ((iter = lwan_pubsub_queue_get(sub)))
     288           0 :         lwan_pubsub_msg_done(iter);
     289           0 :     pthread_mutex_unlock(&sub->lock);
     290             : 
     291           0 :     pthread_mutex_destroy(&sub->lock);
     292           0 :     free(sub);
     293           0 : }
     294             : 
     295           0 : void lwan_pubsub_unsubscribe(struct lwan_pubsub_topic *topic,
     296             :                              struct lwan_pubsub_subscriber *sub)
     297             : {
     298           0 :     return (void)lwan_pubsub_unsubscribe_internal(topic, sub, true);
     299             : }
     300             : 
     301           0 : const struct lwan_value *lwan_pubsub_msg_value(const struct lwan_pubsub_msg *msg)
     302             : {
     303           0 :     return &msg->value;
     304             : }

Generated by: LCOV version 1.15-2-gb9d6727