Skip to content

Commit

Permalink
Attempt to improve names
Browse files Browse the repository at this point in the history
  • Loading branch information
judfs committed Nov 22, 2023
1 parent 6a0628a commit f574176
Show file tree
Hide file tree
Showing 3 changed files with 380 additions and 345 deletions.
40 changes: 20 additions & 20 deletions lcm-logger/lcm_logger.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@ static void *write_thread(void *user_data)
return NULL;
}
// nope. write the event to disk
lcm_eventlog_event_t *le = (lcm_eventlog_event_t *) msg;
int64_t sz = sizeof(lcm_eventlog_event_t) + le->channellen + 1 + le->datalen;
lcm_eventlog_event_t *log_event = (lcm_eventlog_event_t *) msg;
int64_t sz = sizeof(lcm_eventlog_event_t) + log_event->channellen + 1 + log_event->datalen;
logger->write_queue_size -= sz;
g_mutex_unlock(&logger->mutex);

if (0 != lcm_eventlog_write_event(logger->log, le)) {
if (0 != lcm_eventlog_write_event(logger->log, log_event)) {
static int64_t last_spew_utime = 0;
char *reason = strdup(strerror(errno));
int64_t now = g_get_real_time();
Expand All @@ -234,30 +234,30 @@ static void *write_thread(void *user_data)
last_spew_utime = now;
}
free(reason);
free(le);
free(log_event);
if (errno == ENOSPC) {
exit(1);
} else {
continue;
}
}
if (logger->fflush_interval_ms >= 0 &&
(le->timestamp - logger->last_fflush_time) > logger->fflush_interval_ms * 1000) {
(log_event->timestamp - logger->last_fflush_time) > logger->fflush_interval_ms * 1000) {
fflush(logger->log->f);
#ifndef WIN32
// Perform a full fsync operation after flush
fdatasync(fileno(logger->log->f));
#endif
logger->last_fflush_time = le->timestamp;
logger->last_fflush_time = log_event->timestamp;
}

// bookkeeping, cleanup
int64_t offset_utime = le->timestamp - logger->time0;
int64_t offset_utime = log_event->timestamp - logger->time0;
logger->nevents++;
logger->events_since_last_report++;
logger->logsize += 4 + 8 + 8 + 4 + le->channellen + 4 + le->datalen;
logger->logsize += 4 + 8 + 8 + 4 + log_event->channellen + 4 + log_event->datalen;

free(le);
free(log_event);

if (!logger->quiet && (offset_utime - logger->last_report_time > 1000000)) {
double dt = (offset_utime - logger->last_report_time) / 1000000.0;
Expand Down Expand Up @@ -319,21 +319,21 @@ static void message_handler(const lcm_recv_buf_t *rbuf, const char *channel, voi
}

// queue up the message for writing to disk by the write thread
lcm_eventlog_event_t *le = (lcm_eventlog_event_t *) malloc(mem_sz);
memset(le, 0, mem_sz);
lcm_eventlog_event_t *log_event = (lcm_eventlog_event_t *) malloc(mem_sz);
memset(log_event, 0, mem_sz);

le->timestamp = rbuf->recv_utime;
le->channellen = channellen;
le->datalen = rbuf->data_size;
log_event->timestamp = rbuf->recv_utime;
log_event->channellen = channellen;
log_event->datalen = rbuf->data_size;
// log_write_event will handle le.eventnum.

le->channel = ((char *) le) + sizeof(lcm_eventlog_event_t);
strcpy(le->channel, channel);
le->data = le->channel + channellen + 1;
assert((char *) le->data + rbuf->data_size == (char *) le + mem_sz);
memcpy(le->data, rbuf->data, rbuf->data_size);
log_event->channel = ((char *) log_event) + sizeof(lcm_eventlog_event_t);
strcpy(log_event->channel, channel);
log_event->data = log_event->channel + channellen + 1;
assert((char *) log_event->data + rbuf->data_size == (char *) log_event + mem_sz);
memcpy(log_event->data, rbuf->data, rbuf->data_size);

g_async_queue_push(logger->write_queue, le);
g_async_queue_push(logger->write_queue, log_event);
}

#ifdef USE_SIGHUP
Expand Down
126 changes: 66 additions & 60 deletions lcm/lcm.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,32 +157,34 @@ static void map_free_handlers_callback(gpointer _key, gpointer _value, gpointer
free(_key);
}

static void lcm_handler_free(lcm_subscription_t *h)
static void lcm_handler_free(lcm_subscription_t *subscription)
{
assert(!h->callback_scheduled);
g_regex_unref(h->regex);
free(h->channel);
memset(h, 0, sizeof(lcm_subscription_t));
free(h);
assert(!subscription->callback_scheduled);
g_regex_unref(subscription->regex);
free(subscription->channel);
memset(subscription, 0, sizeof(lcm_subscription_t));
free(subscription);
}

void lcm_destroy(lcm_t *lcm)
{
if (lcm->provider) {
for (unsigned int i = 0; i < lcm->handlers_all->len; i++) {
// unsubscribe from all handlers
lcm_subscription_t *h = (lcm_subscription_t *) g_ptr_array_index(lcm->handlers_all, i);
lcm_unsubscribe(lcm, h);
lcm_subscription_t *subscription =
(lcm_subscription_t *) g_ptr_array_index(lcm->handlers_all, i);
lcm_unsubscribe(lcm, subscription);
}
lcm->vtable->destroy(lcm->provider);
}
g_hash_table_foreach(lcm->handlers_map, map_free_handlers_callback, NULL);
g_hash_table_destroy(lcm->handlers_map);

for (unsigned int i = 0; i < lcm->handlers_all->len; i++) {
lcm_subscription_t *h = (lcm_subscription_t *) g_ptr_array_index(lcm->handlers_all, i);
h->callback_scheduled = 0; // XXX hack...
lcm_handler_free(h);
lcm_subscription_t *subscription =
(lcm_subscription_t *) g_ptr_array_index(lcm->handlers_all, i);
subscription->callback_scheduled = 0; // XXX hack...
lcm_handler_free(subscription);
}
g_ptr_array_free(lcm->handlers_all, TRUE);

Expand Down Expand Up @@ -251,31 +253,31 @@ int lcm_publish(lcm_t *lcm, const char *channel, const void *data, unsigned int
return -1;
}

static int is_handler_subscriber(lcm_subscription_t *h, const char *channel_name)
static int is_handler_subscriber(lcm_subscription_t *subscription, const char *channel_name)
{
return g_regex_match(h->regex, channel_name, (GRegexMatchFlags) 0, NULL);
return g_regex_match(subscription->regex, channel_name, (GRegexMatchFlags) 0, NULL);
}

// add the handler to any channel's handler list if its subscription matches
static void map_add_handler_callback(gpointer _key, gpointer _value, gpointer _data)
{
lcm_subscription_t *h = (lcm_subscription_t *) _data;
lcm_subscription_t *subscription = (lcm_subscription_t *) _data;
char *channel_name = (char *) _key;
GPtrArray *handlers = (GPtrArray *) _value;

if (!is_handler_subscriber(h, channel_name))
if (!is_handler_subscriber(subscription, channel_name))
return;

g_ptr_array_add(handlers, h);
g_ptr_array_add(handlers, subscription);
}

// remove from a channel's handler list
static void map_remove_handler_callback(gpointer _key, gpointer _value, gpointer _data)
{
(void) _key;
lcm_subscription_t *h = (lcm_subscription_t *) _data;
lcm_subscription_t *subscription = (lcm_subscription_t *) _data;
GPtrArray *handlers = (GPtrArray *) _value;
g_ptr_array_remove_fast(handlers, h);
g_ptr_array_remove_fast(handlers, subscription);
}

lcm_subscription_t *lcm_subscribe(lcm_t *lcm, const char *channel, lcm_msg_handler_t handler,
Expand All @@ -290,53 +292,54 @@ lcm_subscription_t *lcm_subscribe(lcm_t *lcm, const char *channel, lcm_msg_handl
}

// create and populate a new message handler struct
lcm_subscription_t *h = (lcm_subscription_t *) calloc(1, sizeof(lcm_subscription_t));
h->channel = strdup(channel);
h->handler = handler;
h->userdata = userdata;
h->callback_scheduled = 0;
h->marked_for_deletion = 0;
h->max_num_queued_messages = lcm->default_max_num_queued_messages;
h->num_queued_messages = 0;
h->lcm = lcm;
lcm_subscription_t *subscription = (lcm_subscription_t *) calloc(1, sizeof(lcm_subscription_t));
subscription->channel = strdup(channel);
subscription->handler = handler;
subscription->userdata = userdata;
subscription->callback_scheduled = 0;
subscription->marked_for_deletion = 0;
subscription->max_num_queued_messages = lcm->default_max_num_queued_messages;
subscription->num_queued_messages = 0;
subscription->lcm = lcm;

char *regexbuf = g_strdup_printf("^%s$", channel);
GError *rerr = NULL;
h->regex = g_regex_new(regexbuf, (GRegexCompileFlags) 0, (GRegexMatchFlags) 0, &rerr);
subscription->regex =
g_regex_new(regexbuf, (GRegexCompileFlags) 0, (GRegexMatchFlags) 0, &rerr);
g_free(regexbuf);
if (rerr) {
fprintf(stderr, "%s: %s\n", __FUNCTION__, rerr->message);
dbg(DBG_LCM, "%s: %s\n", __FUNCTION__, rerr->message);
g_error_free(rerr);
free(h);
free(subscription);
return NULL;
}
g_rec_mutex_lock(&lcm->mutex);
g_ptr_array_add(lcm->handlers_all, h);
g_hash_table_foreach(lcm->handlers_map, map_add_handler_callback, h);
g_ptr_array_add(lcm->handlers_all, subscription);
g_hash_table_foreach(lcm->handlers_map, map_add_handler_callback, subscription);
g_rec_mutex_unlock(&lcm->mutex);

return h;
return subscription;
}

int lcm_unsubscribe(lcm_t *lcm, lcm_subscription_t *h)
int lcm_unsubscribe(lcm_t *lcm, lcm_subscription_t *subscription)
{
g_rec_mutex_lock(&lcm->mutex);

// remove the handler from the master list
int foundit = g_ptr_array_remove(lcm->handlers_all, h);
int foundit = g_ptr_array_remove(lcm->handlers_all, subscription);

if (lcm->provider && lcm->vtable->unsubscribe) {
lcm->vtable->unsubscribe(lcm->provider, h->channel);
lcm->vtable->unsubscribe(lcm->provider, subscription->channel);
}

if (foundit) {
// remove the handler from all the lists in the hash table
g_hash_table_foreach(lcm->handlers_map, map_remove_handler_callback, h);
if (!h->callback_scheduled)
lcm_handler_free(h);
g_hash_table_foreach(lcm->handlers_map, map_remove_handler_callback, subscription);
if (!subscription->callback_scheduled)
lcm_handler_free(subscription);
else
h->marked_for_deletion = 1;
subscription->marked_for_deletion = 1;
}

g_rec_mutex_unlock(&lcm->mutex);
Expand All @@ -361,9 +364,10 @@ GPtrArray *lcm_get_handlers(lcm_t *lcm, const char *channel)

// find all the matching handlers
for (unsigned int i = 0; i < lcm->handlers_all->len; i++) {
lcm_subscription_t *h = (lcm_subscription_t *) g_ptr_array_index(lcm->handlers_all, i);
if (is_handler_subscriber(h, channel))
g_ptr_array_add(handlers, h);
lcm_subscription_t *subscription =
(lcm_subscription_t *) g_ptr_array_index(lcm->handlers_all, i);
if (is_handler_subscriber(subscription, channel))
g_ptr_array_add(handlers, subscription);
}

finished:
Expand All @@ -377,10 +381,10 @@ int lcm_try_enqueue_message(lcm_t *lcm, const char *channel)
GPtrArray *handlers = lcm_get_handlers(lcm, channel);
int num_keepers = 0;
for (unsigned int i = 0; i < handlers->len; i++) {
lcm_subscription_t *h = (lcm_subscription_t *) g_ptr_array_index(handlers, i);
if (h->num_queued_messages < h->max_num_queued_messages ||
h->max_num_queued_messages <= 0) {
h->num_queued_messages++;
lcm_subscription_t *subscription = (lcm_subscription_t *) g_ptr_array_index(handlers, i);
if (subscription->num_queued_messages < subscription->max_num_queued_messages ||
subscription->max_num_queued_messages <= 0) {
subscription->num_queued_messages++;
num_keepers++;
}
}
Expand Down Expand Up @@ -412,35 +416,37 @@ int lcm_dispatch_handlers(lcm_t *lcm, lcm_recv_buf_t *buf, const char *channel)
// callbacks.
int nhandlers = handlers->len;
for (int i = 0; i < nhandlers; i++) {
lcm_subscription_t *h = (lcm_subscription_t *) g_ptr_array_index(handlers, i);
h->callback_scheduled = 1;
lcm_subscription_t *subscription = (lcm_subscription_t *) g_ptr_array_index(handlers, i);
subscription->callback_scheduled = 1;
}

// now, call the handlers.
for (int i = 0; i < nhandlers; i++) {
lcm_subscription_t *h = (lcm_subscription_t *) g_ptr_array_index(handlers, i);
if (!h->marked_for_deletion && h->num_queued_messages > 0) {
h->num_queued_messages--;
lcm_subscription_t *subscription = (lcm_subscription_t *) g_ptr_array_index(handlers, i);

if (!subscription->marked_for_deletion && subscription->num_queued_messages > 0) {
subscription->num_queued_messages--;
g_rec_mutex_unlock(&lcm->mutex);
h->handler(buf, channel, h->userdata);
subscription->handler(buf, channel, subscription->userdata);
g_rec_mutex_lock(&lcm->mutex);
}
}

// unref the handlers and check if any should be deleted
GList *to_remove = NULL;
for (int i = 0; i < nhandlers; i++) {
lcm_subscription_t *h = (lcm_subscription_t *) g_ptr_array_index(handlers, i);
h->callback_scheduled = 0;
if (h->marked_for_deletion)
to_remove = g_list_prepend(to_remove, h);
lcm_subscription_t *subscription = (lcm_subscription_t *) g_ptr_array_index(handlers, i);

subscription->callback_scheduled = 0;
if (subscription->marked_for_deletion)
to_remove = g_list_prepend(to_remove, subscription);
}
// actually delete handlers marked for deletion
for (; to_remove; to_remove = g_list_delete_link(to_remove, to_remove)) {
lcm_subscription_t *h = (lcm_subscription_t *) to_remove->data;
g_ptr_array_remove(lcm->handlers_all, h);
g_hash_table_foreach(lcm->handlers_map, map_remove_handler_callback, h);
lcm_handler_free(h);
lcm_subscription_t *subscription = (lcm_subscription_t *) to_remove->data;
g_ptr_array_remove(lcm->handlers_all, subscription);
g_hash_table_foreach(lcm->handlers_map, map_remove_handler_callback, subscription);
lcm_handler_free(subscription);
}
g_rec_mutex_unlock(&lcm->mutex);

Expand Down
Loading

0 comments on commit f574176

Please sign in to comment.