Browse Source

core: mbox: introduce thread decoupled message queues

pr/spi.typo
Kaspar Schleiser 7 years ago
parent
commit
3a6f95008d
  1. 1
      Makefile.pseudomodules
  2. 163
      core/include/mbox.h
  3. 11
      core/include/thread.h
  4. 126
      core/mbox.c

1
Makefile.pseudomodules

@ -4,6 +4,7 @@ PSEUDOMODULES += conn_ip
PSEUDOMODULES += conn_tcp
PSEUDOMODULES += conn_udp
PSEUDOMODULES += core_msg
PSEUDOMODULES += core_mbox
PSEUDOMODULES += core_thread_flags
PSEUDOMODULES += emb6_router
PSEUDOMODULES += gnrc_ipv6_default

163
core/include/mbox.h

@ -0,0 +1,163 @@
/*
* Copyright (C) 2016 Kaspar Schleiser <kaspar@schleiser.de>
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @defgroup core_mbox Mailboxes
* @ingroup core
* @brief Mailbox implementation
*
* @{
*
* @file
* @brief Mailbox API
*
* @author Kaspar Schleiser <kaspar@schleiser.de>
*/
#ifndef MBOX_H
#define MBOX_H
#include "list.h"
#include "cib.h"
#include "msg.h"
#ifdef __cplusplus
extern "C" {
#endif
/** Static initializer for mbox objects */
#define MBOX_INIT(queue, queue_size) {{0}, {0}, CIB_INIT(queue_size), queue}
/**
* @brief Mailbox struct definition
*/
typedef struct {
list_node_t readers; /**< list of threads waiting for message */
list_node_t writers; /**< list of threads waiting to send */
cib_t cib; /**< cib for msg array */
msg_t *msg_array; /**< ptr to array of msg queue */
} mbox_t;
enum {
NON_BLOCKING = 0, /**< non-blocking mode */
BLOCKING, /**< blocking mode */
};
/**
* @brief Initialize mbox object
*
* @note The message queue size must be a power of two!
*
* @param[in] mbox ptr to mailbox to initialize
* @param[in] queue array of msg_t used as queue
* @param[in] queue_size number of msg_t objects in queue
*/
static inline void mbox_init(mbox_t *mbox, msg_t *queue, unsigned int queue_size)
{
mbox_t m = MBOX_INIT(queue, queue_size);
*mbox = m;
}
/**
* @brief Add message to mailbox
*
* If the mailbox is full, this fuction will return right away.
*
* @internal
*
* @param[in] mbox ptr to mailbox to operate on
* @param[in] msg ptr to message that will be copied into mailbox
* @param[in] blocking block if 1, don't block if 0
*
* @return 1 if msg could be delivered
* @return 0 otherwise
*/
int _mbox_put(mbox_t *mbox, msg_t *msg, int blocking);
/**
* @brief Get message from mailbox
*
* If the mailbox is empty, this fuction will return right away.
*
* @internal
*
* @param[in] mbox ptr to mailbox to operate on
* @param[in] msg ptr to storage for retrieved message
* @param[in] blocking block if 1, don't block if 0
*
* @return 1 if msg could be retrieved
* @return 0 otherwise
*/
int _mbox_get(mbox_t *mbox, msg_t *msg, int blocking);
/**
* @brief Add message to mailbox
*
* If the mailbox is full, this fuction will block until space becomes
* available.
*
* @param[in] mbox ptr to mailbox to operate on
* @param[in] msg ptr to message that will be copied into mailbox
*/
static inline void mbox_put(mbox_t *mbox, msg_t *msg)
{
_mbox_put(mbox, msg, BLOCKING);
}
/**
* @brief Add message to mailbox
*
* If the mailbox is full, this fuction will return right away.
*
* @param[in] mbox ptr to mailbox to operate on
* @param[in] msg ptr to message that will be copied into mailbox
*
* @return 1 if msg could be delivered
* @return 0 otherwise
*/
static inline int mbox_try_put(mbox_t *mbox, msg_t *msg)
{
return _mbox_put(mbox, msg, NON_BLOCKING);
}
/**
* @brief Get message from mailbox
*
* If the mailbox is empty, this fuction will block until a message becomes
* available.
*
* @param[in] mbox ptr to mailbox to operate on
* @param[in] msg ptr to storage for retrieved message
*/
static inline void mbox_get(mbox_t *mbox, msg_t *msg)
{
_mbox_get(mbox, msg, BLOCKING);
}
/**
* @brief Get message from mailbox
*
* If the mailbox is empty, this fuction will return right away.
*
* @param[in] mbox ptr to mailbox to operate on
* @param[in] msg ptr to storage for retrieved message
*
* @return 1 if msg could be retrieved
* @return 0 otherwise
*/
static inline int mbox_try_get(mbox_t *mbox, msg_t *msg)
{
return _mbox_get(mbox, msg, NON_BLOCKING);
}
#ifdef __cplusplus
}
#endif
/** @} */
#endif /* MBOX_H */

11
core/include/thread.h

@ -56,6 +56,7 @@
#define STATUS_REPLY_BLOCKED 5 /**< waiting for a message response */
#define STATUS_FLAG_BLOCKED_ANY 6 /**< waiting for any flag from flag_mask*/
#define STATUS_FLAG_BLOCKED_ALL 7 /**< waiting for all flags in flag_mask */
#define STATUS_MBOX_BLOCKED 8 /**< waiting for get/put on mbox */
/** @} */
/**
@ -63,8 +64,8 @@
* @{*/
#define STATUS_ON_RUNQUEUE STATUS_RUNNING /**< to check if on run queue:
`st >= STATUS_ON_RUNQUEUE` */
#define STATUS_RUNNING 8 /**< currently running */
#define STATUS_PENDING 9 /**< waiting to be scheduled to run */
#define STATUS_RUNNING 9 /**< currently running */
#define STATUS_PENDING 10 /**< waiting to be scheduled to run */
/** @} */
/** @} */
@ -84,8 +85,10 @@ struct _thread {
clist_node_t rq_entry; /**< run queue entry */
#if defined(MODULE_CORE_MSG) || defined(MODULE_CORE_THREAD_FLAGS)
void *wait_data; /**< used by msg and thread flags */
#if defined(MODULE_CORE_MSG) || defined(MODULE_CORE_THREAD_FLAGS) \
|| defined(MODULE_CORE_MBOX)
void *wait_data; /**< used by msg, mbox and thread
flags */
#endif
#if defined(MODULE_CORE_MSG)
list_node_t msg_waiters; /**< threads waiting on message */

126
core/mbox.c

@ -0,0 +1,126 @@
/*
* Copyright (C) 2016 Kaspar Schleiser <kaspar@schleiser.de>
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @ingroup core_mbox
* @{
*
* @file
* @brief mailbox implementation
*
* @author Kaspar Schleiser <kaspar@schleiser.de>
*
* @}
*/
#include <string.h>
#include "mbox.h"
#include "irq.h"
#include "sched.h"
#include "thread.h"
#define ENABLE_DEBUG (0)
#include "debug.h"
#ifdef MODULE_CORE_MBOX
static void _wake_waiter(thread_t *thread, unsigned irqstate)
{
sched_set_status(thread, STATUS_PENDING);
DEBUG("mbox: Thread %"PRIkernel_pid": _wake_waiter(): waking up "
"%"PRIkernel_pid".\n", sched_active_pid, thread->pid);
uint16_t process_priority = thread->priority;
irq_restore(irqstate);
sched_switch(process_priority);
}
static void _wait(list_node_t *wait_list, unsigned irqstate)
{
DEBUG("mbox: Thread %"PRIkernel_pid" _wait(): going blocked.\n",
sched_active_pid);
thread_t *me = (thread_t*) sched_active_thread;
sched_set_status(me, STATUS_MBOX_BLOCKED);
thread_add_to_list(wait_list, me);
irq_restore(irqstate);
thread_yield();
DEBUG("mbox: Thread %"PRIkernel_pid" _wait(): woke up.\n",
sched_active_pid);
}
int _mbox_put(mbox_t *mbox, msg_t *msg, int blocking)
{
unsigned irqstate = irq_disable();
list_node_t *next = (list_node_t*) list_remove_head(&mbox->readers);
if (next) {
DEBUG("mbox: Thread %"PRIkernel_pid" mbox 0x%08x: _tryput(): "
"there's a waiter.\n", sched_active_pid, (unsigned)mbox);
thread_t *thread = container_of((clist_node_t*)next, thread_t, rq_entry);
*(msg_t *)thread->wait_data = *msg;
_wake_waiter(thread, irqstate);
return 1;
}
else {
if (cib_full(&mbox->cib)) {
if (blocking) {
_wait(&mbox->writers, irqstate);
irqstate = irq_disable();
}
else {
irq_restore(irqstate);
return 0;
}
}
DEBUG("mbox: Thread %"PRIkernel_pid" mbox 0x%08x: _tryput(): "
"queued message.\n", sched_active_pid, (unsigned)mbox);
msg->sender_pid = sched_active_pid;
/* copy msg into queue */
mbox->msg_array[cib_put_unsafe(&mbox->cib)] = *msg;
irq_restore(irqstate);
return 1;
}
}
int _mbox_get(mbox_t *mbox, msg_t *msg, int blocking)
{
unsigned irqstate = irq_disable();
if (cib_avail(&mbox->cib)) {
DEBUG("mbox: Thread %"PRIkernel_pid" mbox 0x%08x: _tryget(): "
"got queued message.\n", sched_active_pid, (unsigned)mbox);
/* copy msg from queue */
*msg = mbox->msg_array[cib_get_unsafe(&mbox->cib)];
list_node_t *next = (list_node_t*) list_remove_head(&mbox->writers);
if (next) {
thread_t *thread = container_of((clist_node_t*)next, thread_t, rq_entry);
_wake_waiter(thread, irqstate);
}
else {
irq_restore(irqstate);
}
return 1;
}
else if (blocking) {
sched_active_thread->wait_data = (void*)msg;
_wait(&mbox->readers, irqstate);
/* sender has copied message */
return 1;
}
else {
irq_restore(irqstate);
return 0;
}
}
#endif /* MODULE_CORE_MBOX */
Loading…
Cancel
Save