gcoap: Add server-side Observe registration and notifications.

master
Ken Bannister 6 years ago
parent 05fe8527a2
commit 9d37be2729

@ -25,6 +25,10 @@
* port, which supports RFC 6282 compression. Internally, gcoap depends on the
* nanocoap package for base level structs and functionality.
*
* gcoap also supports the Observe extension (RFC 7641) for a server. gcoap
* provides functions to generate and send an observe notification that are
* similar to the functions to send a client request.
*
* ## Server Operation ##
*
* gcoap listens for requests on GCOAP_PORT, 5683 by default. You can redefine
@ -75,7 +79,7 @@
*
* ### Creating a request ###
*
* Here is the expected sequence for preparing and sending a request:
* Here is the expected sequence to prepare and send a request:
*
* Allocate a buffer and a coap_pkt_t for the request.
*
@ -109,6 +113,43 @@
* _content_type_ attributes.
* -# Read the payload, if any.
*
* ## Observe Server Operation
*
* A CoAP client may register for Observe notifications for any resource that
* an application has registered with gcoap. An application does not need to
* take any action to support Observe client registration. However, gcoap
* limits registration for a given resource to a _single_ observer.
*
* An Observe notification is considered a response to the original client
* registration request. So, the Observe server only needs to create and send
* the notification -- no further communication or callbacks are required.
*
* ### Creating a notification ###
*
* Here is the expected sequence to prepare and send a notification:
*
* Allocate a buffer and a coap_pkt_t for the notification, then follow the
* steps below.
*
* -# Call gcoap_obs_init() to initialize the notification for a resource.
* Test the return value, which may indicate there is not an observer for
* the resource. If so, you are done.
* -# Write the notification payload, starting at the updated _payload_ pointer
* in the coap_pkt_t.
* -# Call gcoap_finish(), which updates the packet for the payload.
*
* Finally, call gcoap_obs_send() for the resource.
*
* ### Other considerations ###
*
* By default, the value for the Observe option in a notification is three
* bytes long. For resources that change slowly, this length can be reduced via
* GCOAP_OBS_VALUE_WIDTH.
*
* To cancel a notification, the server expects to receive a GET request with
* the Observe option value set to 1. The server does not support cancellation
* via a reset (RST) response to a non-confirmable notification.
*
* ## Implementation Notes ##
*
* ### Building a packet ###
@ -175,6 +216,13 @@ extern "C" {
*/
#define GCOAP_RESP_OPTIONS_BUF (8)
/**
* @brief Size of the buffer used to write options in an Observe notification.
*
* Accommodates Content-Format and Observe.
*/
#define GCOAP_OBS_OPTIONS_BUF (8)
/** @brief Maximum number of requests awaiting a response */
#define GCOAP_REQ_WAITING_MAX (2)
@ -225,6 +273,69 @@ extern "C" {
*/
#define GCOAP_MSG_TYPE_INTR (0x1502)
/** @brief Maximum number of Observe clients; use 2 if not defined */
#ifndef GCOAP_OBS_CLIENTS_MAX
#define GCOAP_OBS_CLIENTS_MAX (2)
#endif
/**
* @brief Maximum number of registrations for Observable resources; use 2 if
* not defined
*/
#ifndef GCOAP_OBS_REGISTRATIONS_MAX
#define GCOAP_OBS_REGISTRATIONS_MAX (2)
#endif
/**
* @name States for the memo used to track Observe registrations
* @{
*/
#define GCOAP_OBS_MEMO_UNUSED (0) /**< This memo is unused */
#define GCOAP_OBS_MEMO_IDLE (1) /**< Registration OK; no current activity */
#define GCOAP_OBS_MEMO_PENDING (2) /**< Resource changed; notification pending */
/** @} */
/**
* @brief Width in bytes of the Observe option value for a notification.
*
* This width is used to determine the length of the 'tick' used to measure
* the time between observable changes to a resource. A tick is expressed
* internally as GCOAP_OBS_TICK_EXPONENT, which is the base-2 log value of the
* tick length in microseconds.
*
* The canonical setting for the value width is 3 (exponent 5), which results
* in a tick length of 32 usec, per sec. 3.4, 4.4 of the RFC. Width 2
* (exponent 16) results in a tick length of ~65 msec, and width 1 (exponent
* 24) results in a tick length of ~17 sec.
*
* The tick length must be short enough so that the Observe value strictly
* increases for each new notification. The purpose of the value is to allow a
* client to detect message reordering within the network latency period (128
* sec). For resources that change only slowly, the reduced message length is
* useful when packet size is limited.
*/
#ifndef GCOAP_OBS_VALUE_WIDTH
#define GCOAP_OBS_VALUE_WIDTH (3)
#endif
/** @brief See GCOAP_OBS_VALUE_WIDTH. */
#if (GCOAP_OBS_VALUE_WIDTH == 3)
#define GCOAP_OBS_TICK_EXPONENT (5)
#elif (GCOAP_OBS_VALUE_WIDTH == 2)
#define GCOAP_OBS_TICK_EXPONENT (16)
#elif (GCOAP_OBS_VALUE_WIDTH == 1)
#define GCOAP_OBS_TICK_EXPONENT (24)
#endif
/**
* @name Return values for gcoap_obs_init()
* @{
*/
#define GCOAP_OBS_INIT_OK (0)
#define GCOAP_OBS_INIT_ERR (-1)
#define GCOAP_OBS_INIT_UNUSED (-2)
/** @} */
/**
* @brief A modular collection of resources for a server
*/
@ -255,16 +366,29 @@ typedef struct {
msg_t timeout_msg; /**< For response timer */
} gcoap_request_memo_t;
/** @brief Memo for Observe registration and notifications */
typedef struct {
sock_udp_ep_t *observer; /**< Client endpoint; unused if null */
coap_resource_t *resource; /**< Entity being observed */
uint8_t token[GCOAP_TOKENLEN_MAX]; /**< Client token for notifications */
unsigned token_len; /**< Actual length of token attribute */
} gcoap_observe_memo_t;
/**
* @brief Container for the state of gcoap itself
*/
typedef struct {
gcoap_listener_t *listeners; /**< List of registered listeners */
gcoap_listener_t *listeners; /**< List of registered listeners */
gcoap_request_memo_t open_reqs[GCOAP_REQ_WAITING_MAX];
/**< Storage for open requests; if first
byte of an entry is zero, the entry
is available */
uint16_t last_message_id; /**< Last message ID used */
/**< Storage for open requests; if first
byte of an entry is zero, the entry
is available */
uint16_t last_message_id; /**< Last message ID used */
sock_udp_ep_t observers[GCOAP_OBS_CLIENTS_MAX];
/**< Observe clients; allows reuse for
observe memos */
gcoap_observe_memo_t observe_memos[GCOAP_OBS_REGISTRATIONS_MAX];
/**< Observed resource registrations */
} gcoap_state_t;
/**
@ -402,6 +526,39 @@ static inline ssize_t gcoap_response(coap_pkt_t *pdu, uint8_t *buf, size_t len,
: -1;
}
/**
* @brief Initializes a CoAP Observe notification packet on a buffer, for the
* observer registered for a resource.
*
* First verifies that an observer has been registered for the resource.
*
* @param[in] pdu Notification metadata
* @param[in] buf Buffer containing the PDU
* @param[in] len Length of the buffer
* @param[in] resource Resource for the notification
*
* @return GCOAP_OBS_INIT_OK on success
* @return GCOAP_OBS_INIT_ERR on error
* @return GCOAP_OBS_INIT_UNUSED if no observer for resource
*/
int gcoap_obs_init(coap_pkt_t *pdu, uint8_t *buf, size_t len,
const coap_resource_t *resource);
/**
* @brief Sends a buffer containing a CoAP Observe notification to the
* observer registered for a resource.
*
* Assumes a single observer for a resource.
*
* @param[in] buf Buffer containing the PDU
* @param[in] len Length of the buffer
* @param[in] resource Resource to send
*
* @return length of the packet
* @return 0 if cannot send
*/
size_t gcoap_obs_send(uint8_t *buf, size_t len, const coap_resource_t *resource);
/**
* @brief Provides important operational statistics.
*

@ -34,11 +34,19 @@ static void *_event_loop(void *arg);
static void _listen(sock_udp_t *sock);
static ssize_t _well_known_core_handler(coap_pkt_t* pdu, uint8_t *buf, size_t len);
static ssize_t _write_options(coap_pkt_t *pdu, uint8_t *buf, size_t len);
static size_t _handle_req(coap_pkt_t *pdu, uint8_t *buf, size_t len);
static size_t _handle_req(coap_pkt_t *pdu, uint8_t *buf, size_t len,
sock_udp_ep_t *remote);
static ssize_t _finish_pdu(coap_pkt_t *pdu, uint8_t *buf, size_t len);
static void _expire_request(gcoap_request_memo_t *memo);
static void _find_req_memo(gcoap_request_memo_t **memo_ptr, coap_pkt_t *pdu,
uint8_t *buf, size_t len);
static void _find_resource(coap_pkt_t *pdu, coap_resource_t **resource_ptr,
gcoap_listener_t **listener_ptr);
static int _find_observer(sock_udp_ep_t **observer, sock_udp_ep_t *remote);
static int _find_obs_memo(gcoap_observe_memo_t **memo, sock_udp_ep_t *remote,
coap_pkt_t *pdu);
static void _find_obs_memo_resource(gcoap_observe_memo_t **memo,
const coap_resource_t *resource);
/* Internal variables */
const coap_resource_t _default_resources[] = {
@ -132,7 +140,7 @@ static void _listen(sock_udp_t *sock)
/* incoming request */
if (coap_get_code_class(&pdu) == COAP_CLASS_REQ) {
size_t pdu_len = _handle_req(&pdu, buf, sizeof(buf));
size_t pdu_len = _handle_req(&pdu, buf, sizeof(buf), &remote);
if (pdu_len > 0) {
sock_udp_send(sock, buf, pdu_len, &remote);
}
@ -152,8 +160,104 @@ static void _listen(sock_udp_t *sock)
* Main request handler: generates response PDU in the provided buffer.
*
* Caller must finish the PDU and send it.
*
* return length of response pdu, or < 0 if can't handle
*/
static size_t _handle_req(coap_pkt_t *pdu, uint8_t *buf, size_t len,
sock_udp_ep_t *remote)
{
coap_resource_t *resource;
gcoap_listener_t *listener;
sock_udp_ep_t *observer = NULL;
gcoap_observe_memo_t *memo = NULL;
gcoap_observe_memo_t *resource_memo = NULL;
_find_resource(pdu, &resource, &listener);
if (resource == NULL) {
return gcoap_response(pdu, buf, len, COAP_CODE_PATH_NOT_FOUND);
}
else {
/* used below to ensure a memo not already recorded for the resource */
_find_obs_memo_resource(&resource_memo, resource);
}
if (coap_get_observe(pdu) == COAP_OBS_REGISTER) {
int empty_slot = _find_obs_memo(&memo, remote, pdu);
/* record observe memo */
if (memo == NULL) {
if (empty_slot >= 0 && resource_memo == NULL) {
int obs_slot = _find_observer(&observer, remote);
/* cache new observer */
if (observer == NULL) {
if (obs_slot >= 0) {
observer = &_coap_state.observers[obs_slot];
memcpy(observer, remote, sizeof(sock_udp_ep_t));
} else {
DEBUG("gcoap: can't register observer\n");
}
}
if (observer != NULL) {
memo = &_coap_state.observe_memos[empty_slot];
}
}
if (memo == NULL) {
coap_clear_observe(pdu);
DEBUG("gcoap: can't register observe memo\n");
}
}
if (memo != NULL) {
memo->observer = observer;
memo->resource = resource;
memo->token_len = coap_get_token_len(pdu);
if (memo->token_len) {
memcpy(&memo->token[0], pdu->token, memo->token_len);
}
DEBUG("gcoap: Registered observer for: %s\n", memo->resource->path);
/* generate initial notification value */
uint32_t now = xtimer_now_usec();
pdu->observe_value = (now >> GCOAP_OBS_TICK_EXPONENT) & 0xFFFFFF;
}
} else if (coap_get_observe(pdu) == COAP_OBS_DEREGISTER) {
_find_obs_memo(&memo, remote, pdu);
/* clear memo, and clear observer if no other memos */
if (memo != NULL) {
DEBUG("gcoap: Deregistering observer for: %s\n", memo->resource->path);
memo->observer = NULL;
memo = NULL;
_find_obs_memo(&memo, remote, NULL);
if (memo == NULL) {
_find_observer(&observer, remote);
if (observer != NULL) {
observer->family = AF_UNSPEC;
}
}
}
coap_clear_observe(pdu);
} else if (coap_has_observe(pdu)) {
/* bogus request; don't respond */
DEBUG("gcoap: Observe value unexpected: %" PRIu32 "\n", coap_get_observe(pdu));
return -1;
}
ssize_t pdu_len = resource->handler(pdu, buf, len);
if (pdu_len < 0) {
pdu_len = gcoap_response(pdu, buf, len,
COAP_CODE_INTERNAL_SERVER_ERROR);
}
return pdu_len;
}
/*
* Searches listener registrations for the resource matching the path in a PDU.
*
* param[out] resource_ptr -- found resource
* param[out] listener_ptr -- listener for found resource
*/
static size_t _handle_req(coap_pkt_t *pdu, uint8_t *buf, size_t len)
static void _find_resource(coap_pkt_t *pdu, coap_resource_t **resource_ptr,
gcoap_listener_t **listener_ptr)
{
unsigned method_flag = coap_method2flag(coap_get_code_detail(pdu));
@ -178,18 +282,16 @@ static size_t _handle_req(coap_pkt_t *pdu, uint8_t *buf, size_t len)
break;
}
else {
ssize_t pdu_len = resource->handler(pdu, buf, len);
if (pdu_len < 0) {
pdu_len = gcoap_response(pdu, buf, len,
COAP_CODE_INTERNAL_SERVER_ERROR);
}
return pdu_len;
*resource_ptr = resource;
*listener_ptr = listener;
return;
}
}
listener = listener->next;
}
/* resource not found */
return gcoap_response(pdu, buf, len, COAP_CODE_PATH_NOT_FOUND);
*resource_ptr = NULL;
*listener_ptr = NULL;
}
/*
@ -329,6 +431,22 @@ static ssize_t _write_options(coap_pkt_t *pdu, uint8_t *buf, size_t len)
uint8_t *bufpos = buf + coap_get_total_hdr_len(pdu); /* position for write */
/* Observe for notification or registration response */
if (coap_get_code_class(pdu) == COAP_CLASS_SUCCESS && coap_has_observe(pdu)) {
uint32_t nval = htonl(pdu->observe_value);
uint8_t *nbyte = (uint8_t *)&nval;
unsigned i;
/* find address of non-zero MSB; max 3 bytes */
for (i = 1; i < 4; i++) {
if (*(nbyte+i) > 0) {
break;
}
}
bufpos += coap_put_option(bufpos, last_optnum, COAP_OPT_OBSERVE,
nbyte+i, 4-i);
last_optnum = COAP_OPT_OBSERVE;
}
/* Uri-Path for request */
if (coap_get_code_class(pdu) == COAP_CLASS_REQ) {
size_t url_len = strlen((char *)pdu->url);
@ -355,6 +473,108 @@ static ssize_t _write_options(coap_pkt_t *pdu, uint8_t *buf, size_t len)
return bufpos - buf;
}
/*
* Find registered observer for a remote address and port.
*
* observer[out] -- Registered observer, or NULL if not found
* remote[in] -- Endpoint to match
*
* return Index of empty slot, suitable for registering new observer; or -1
* if no empty slots. Undefined if observer found.
*/
static int _find_observer(sock_udp_ep_t **observer, sock_udp_ep_t *remote)
{
int empty_slot = -1;
*observer = NULL;
for (unsigned i = 0; i < GCOAP_OBS_CLIENTS_MAX; i++) {
unsigned cmplen = 0;
if (_coap_state.observers[i].family == AF_UNSPEC) {
cmplen = 0;
empty_slot = i;
}
else if (_coap_state.observers[i].family == AF_INET6) {
cmplen = 16;
}
else {
cmplen = 4;
}
if (cmplen &&
memcmp(&_coap_state.observers[i].addr.ipv6[0], &remote->addr.ipv6[0],
cmplen) == 0
&& _coap_state.observers[i].port == remote->port) {
*observer = &_coap_state.observers[i];
break;
}
}
return empty_slot;
}
/*
* Find registered observe memo for a remote address and token.
*
* memo[out] -- Registered observe memo, or NULL if not found
* remote[in] -- Endpoint for address to match
* pdu[in] -- PDU for token to match, or NULL to match only on remote address
*
* return Index of empty slot, suitable for registering new memo; or -1 if no
* empty slots. Undefined if memo found.
*/
static int _find_obs_memo(gcoap_observe_memo_t **memo, sock_udp_ep_t *remote,
coap_pkt_t *pdu)
{
int empty_slot = -1;
*memo = NULL;
sock_udp_ep_t *remote_observer = NULL;
_find_observer(&remote_observer, remote);
for (unsigned i = 0; i < GCOAP_OBS_REGISTRATIONS_MAX; i++) {
if (_coap_state.observe_memos[i].observer == NULL) {
empty_slot = i;
continue;
}
if (_coap_state.observe_memos[i].observer == remote_observer) {
if (pdu == NULL) {
*memo = &_coap_state.observe_memos[i];
break;
}
if (_coap_state.observe_memos[i].token_len == coap_get_token_len(pdu)) {
unsigned cmplen = _coap_state.observe_memos[i].token_len;
if (cmplen &&
memcmp(&_coap_state.observe_memos[i].token[0], &pdu->token[0],
cmplen) == 0) {
*memo = &_coap_state.observe_memos[i];
break;
}
}
}
}
return empty_slot;
}
/*
* Find registered observe memo for a resource.
*
* memo[out] -- Registered observe memo, or NULL if not found
* resource[in] -- Resource to match
*/
static void _find_obs_memo_resource(gcoap_observe_memo_t **memo,
const coap_resource_t *resource)
{
*memo = NULL;
for (int i = 0; i < GCOAP_OBS_REGISTRATIONS_MAX; i++) {
if (_coap_state.observe_memos[i].observer != NULL
&& _coap_state.observe_memos[i].resource == resource) {
*memo = &_coap_state.observe_memos[i];
break;
}
}
}
/*
* gcoap interface functions
*/
@ -367,8 +587,10 @@ kernel_pid_t gcoap_init(void)
_pid = thread_create(_msg_stack, sizeof(_msg_stack), THREAD_PRIORITY_MAIN - 1,
THREAD_CREATE_STACKTEST, _event_loop, NULL, "coap");
/* Blank list of open requests so we know if an entry is available. */
/* Blank lists so we know if an entry is available. */
memset(&_coap_state.open_reqs[0], 0, sizeof(_coap_state.open_reqs));
memset(&_coap_state.observers[0], 0, sizeof(_coap_state.observers));
memset(&_coap_state.observe_memos[0], 0, sizeof(_coap_state.observe_memos));
/* randomize initial value */
_coap_state.last_message_id = random_uint32() & 0xFFFF;
@ -518,6 +740,55 @@ int gcoap_resp_init(coap_pkt_t *pdu, uint8_t *buf, size_t len, unsigned code)
return 0;
}
int gcoap_obs_init(coap_pkt_t *pdu, uint8_t *buf, size_t len,
const coap_resource_t *resource)
{
ssize_t hdrlen;
gcoap_observe_memo_t *memo = NULL;
_find_obs_memo_resource(&memo, resource);
if (memo == NULL) {
/* Unique return value to specify there is not an observer */
return GCOAP_OBS_INIT_UNUSED;
}
pdu->hdr = (coap_hdr_t *)buf;
hdrlen = coap_build_hdr(pdu->hdr, COAP_TYPE_NON, &memo->token[0],
memo->token_len, COAP_CODE_CONTENT,
++_coap_state.last_message_id);
if (hdrlen > 0) {
uint32_t now = xtimer_now_usec();
pdu->observe_value = (now >> GCOAP_OBS_TICK_EXPONENT) & 0xFFFFFF;
/* Reserve some space between the header and payload to write options later */
pdu->payload = buf + coap_get_total_hdr_len(pdu) + GCOAP_OBS_OPTIONS_BUF;
/* Payload length really zero at this point, but we set this to the available
* length in the buffer. Allows us to reconstruct buffer length later. */
pdu->payload_len = len - (pdu->payload - buf);
pdu->content_type = COAP_FORMAT_NONE;
return GCOAP_OBS_INIT_OK;
}
else {
/* reason for negative hdrlen is not defined, so we also are vague */
return GCOAP_OBS_INIT_ERR;
}
}
size_t gcoap_obs_send(uint8_t *buf, size_t len, const coap_resource_t *resource)
{
gcoap_observe_memo_t *memo = NULL;
_find_obs_memo_resource(&memo, resource);
if (memo) {
return sock_udp_send(&_sock, buf, len, memo->observer);
}
else {
return 0;
}
}
uint8_t gcoap_op_state(void)
{
uint8_t count = 0;

Loading…
Cancel
Save