Skip to content

Commit

Permalink
net: lwm2m: Add observe callback for observe and notification events
Browse files Browse the repository at this point in the history
Added an observe callback so that the application can register to
receive events like observer added/deleted, and notification acked/
timed out. The notifications can be traced back to the exact data
contained within them by use of the user_data pointer.

Fixes #38531.

Signed-off-by: Maik Vermeulen <maik.vermeulen@innotractor.com>
  • Loading branch information
Maik Vermeulen committed Oct 23, 2021
1 parent 2253c6e commit 6b38ba9
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 29 deletions.
68 changes: 63 additions & 5 deletions include/net/lwm2m.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,37 @@
/* clang-format on */

typedef void (*lwm2m_socket_fault_cb_t)(int error);
typedef void (*lwm2m_notify_timeout_cb_t)(void);

struct lwm2m_obj_path {
uint16_t obj_id;
uint16_t obj_inst_id;
uint16_t res_id;
uint16_t res_inst_id;
uint8_t level; /* 0/1/2/3/4 (4 = resource instance) */
};

/**
* @brief Observe callback events
*/
enum lwm2m_observe_event {
LWM2M_OBSERVE_EVENT_OBSERVER_ADDED,
LWM2M_OBSERVE_EVENT_OBSERVER_REMOVED,
LWM2M_OBSERVE_EVENT_NOTIFY_ACK,
LWM2M_OBSERVE_EVENT_NOTIFY_TIMEOUT,
};

/**
* @brief Observe callback indicating observer adds and deletes, and
* notification ACKs and timeouts
*
* @param[in] event Observer add/delete or notification ack/timeout
* @param[in] path LwM2M path
* @param[in] user_data Pointer to user_data buffer, as provied in
* send_traceable_notification(). Used to determine for which
* data the ACKed/timed out notification was.
*/
typedef void (*lwm2m_observe_cb_t)(enum lwm2m_observe_event event, struct lwm2m_obj_path *path,
void *user_data);

/**
* @brief LwM2M context structure to maintain information for a single
Expand Down Expand Up @@ -126,10 +156,10 @@ struct lwm2m_ctx {
*/
lwm2m_socket_fault_cb_t fault_cb;

/** Notify Timeout Callback. LwM2M processing thread will call this
* callback in case of notify timeout.
/** Callback for new or cancelled observations, and acknowledged or timed
* out notifications.
*/
lwm2m_notify_timeout_cb_t notify_timeout_cb;
lwm2m_observe_cb_t observe_cb;

/** Validation buffer. Used as a temporary buffer to decode the resource
* value before validation. On successful validation, its content is
Expand Down Expand Up @@ -960,9 +990,13 @@ typedef void (*lwm2m_ctx_event_cb_t)(struct lwm2m_ctx *ctx,
* @param[in] ep_name Registered endpoint name
* @param[in] flags Flags used to configure current LwM2M session.
* @param[in] event_cb Client event callback function
* @param[in] observe_cb Observe callback function called when an observer was
* added or deleted, and when a notification was acked or
* has timed out
*/
void lwm2m_rd_client_start(struct lwm2m_ctx *client_ctx, const char *ep_name,
uint32_t flags, lwm2m_ctx_event_cb_t event_cb);
uint32_t flags, lwm2m_ctx_event_cb_t event_cb,
lwm2m_observe_cb_t observe_cb);

/**
* @brief Stop the LwM2M RD (De-register) Client
Expand All @@ -985,5 +1019,29 @@ void lwm2m_rd_client_stop(struct lwm2m_ctx *client_ctx,
*/
void lwm2m_rd_client_update(void);

/**
* @brief Helper function to print path objects' contents to log
*
* @param[in] buf The buffer to use for formatting the string
* @param[in] path The path to stringify
*/
char *lwm2m_path_log_strdup(char *buf, struct lwm2m_obj_path *path);

/**
* @brief Send a notification that traces back to some user data
*
* In the case of e.g. buffering, the user may want to send notifications
* for specific data, like buffered measurements. This function lets the
* user trigger a notification, which can be traced back to such a
* buffered measurement using a provided pointer. This pointer is provided
* back to the user in the lwm2m_observe_cb_t as well, so that ACKs and timeouts
* can be traced back to the specific data that was contained in that
* notification.
*
* @param[in] path Path for which to send the notification
* @param[in] user_data Pointer to user data, e.g. stored measurement
*/
int send_traceable_notification(struct lwm2m_obj_path *path, void *user_data);

#endif /* ZEPHYR_INCLUDE_NET_LWM2M_H_ */
/**@} */
104 changes: 89 additions & 15 deletions subsys/net/lib/lwm2m/lwm2m_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -521,10 +521,33 @@ static int engine_add_observer(struct lwm2m_message *msg,
log_strdup(sprint_token(token, tkl)),
log_strdup(lwm2m_sprint_ip_addr(&msg->ctx->remote_addr)));

if (msg->ctx->observe_cb) {
msg->ctx->observe_cb(LWM2M_OBSERVE_EVENT_OBSERVER_ADDED, &msg->path, NULL);
}

return 0;
}

static int engine_remove_observer(struct lwm2m_ctx *ctx, const uint8_t *token, uint8_t tkl)
static void remove_observer_from_list(sys_slist_t *list, sys_snode_t *prev_node,
struct observe_node *obs)
{
if (!obs) {
return;
}

char buf[LWM2M_MAX_PATH_STR_LEN];

LOG_DBG("Removing observer %p for path %s", obs, lwm2m_path_log_strdup(buf, &obs->path));

if (obs->ctx->observe_cb) {
obs->ctx->observe_cb(LWM2M_OBSERVE_EVENT_OBSERVER_REMOVED, &obs->path, NULL);
}

sys_slist_remove(list, prev_node, &obs->node);
(void)memset(obs, 0, sizeof(*obs));
}

static int engine_remove_observer_by_token(struct lwm2m_ctx *ctx, const uint8_t *token, uint8_t tkl)
{
struct observe_node *obs, *found_obj = NULL;
sys_snode_t *prev_node = NULL;
Expand All @@ -549,8 +572,7 @@ static int engine_remove_observer(struct lwm2m_ctx *ctx, const uint8_t *token, u
return -ENOENT;
}

sys_slist_remove(&ctx->observer, prev_node, &found_obj->node);
(void)memset(found_obj, 0, sizeof(*found_obj));
remove_observer_from_list(&ctx->observer, prev_node, found_obj);

LOG_DBG("observer '%s' removed", log_strdup(sprint_token(token, tkl)));

Expand Down Expand Up @@ -599,8 +621,8 @@ static int engine_remove_observer_by_path(struct lwm2m_obj_path *path)

LOG_INF("Removing observer for path %s",
lwm2m_path_log_strdup(buf, path));
sys_slist_remove(&engine_observer_list, prev_node, &found_obj->node);
(void)memset(found_obj, 0, sizeof(*found_obj));

remove_observer_from_list(prev_node, found_obj);

return 0;
}
Expand All @@ -622,8 +644,7 @@ static void engine_remove_observer_by_id(uint16_t obj_id, int32_t obj_inst_id)
continue;
}

sys_slist_remove(&sock_ctx[i]->observer, prev_node, &obs->node);
(void)memset(obs, 0, sizeof(*obs));
remove_observer_from_list(&sock_ctx[i]->observer, prev_node, obs);
}
}
}
Expand Down Expand Up @@ -3829,7 +3850,7 @@ static int handle_request(struct coap_packet *request,
}
} else if (observe == 1) {
/* remove observer */
r = engine_remove_observer(msg->ctx, token, tkl);
r = engine_remove_observer_by_token(msg->ctx, token, tkl);
if (r < 0) {
#if defined(CONFIG_LWM2M_CANCEL_OBSERVE_BY_PATH)
r = engine_remove_observer_by_path(&msg->path);
Expand Down Expand Up @@ -4036,7 +4057,7 @@ static void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx,
}

/* skip release if reply->user_data has error condition */
if (reply && reply->user_data != COAP_REPLY_STATUS_NONE) {
if (reply && reply->user_data == (void *)COAP_REPLY_STATUS_ERROR) {
/* reset reply->user_data for next time */
reply->user_data = (void *)COAP_REPLY_STATUS_NONE;
LOG_DBG("reply %p NOT removed", reply);
Expand Down Expand Up @@ -4154,8 +4175,9 @@ static void notify_message_timeout_cb(struct lwm2m_message *msg)
if (msg->ctx != NULL) {
struct lwm2m_ctx *client_ctx = msg->ctx;

if (client_ctx->notify_timeout_cb != NULL) {
client_ctx->notify_timeout_cb();
if (client_ctx->observe_cb) {
client_ctx->observe_cb(LWM2M_OBSERVE_EVENT_NOTIFY_TIMEOUT,
&msg->path, NULL);
}
}

Expand All @@ -4179,11 +4201,29 @@ static int notify_message_reply_cb(const struct coap_packet *response,
COAP_RESPONSE_CODE_DETAIL(code),
log_strdup(sprint_token(reply->token, reply->tkl)));

/* Find the correct CTX */
struct observe_node *obs, *found_obj = NULL;

/* find the node index */
SYS_SLIST_FOR_EACH_CONTAINER(&engine_observer_list, obs, node) {
if (memcmp(obs->token, reply->token, reply->tkl) == 0) {
found_obj = obs;
break;
}
}

if (found_obj) {
if (obs->ctx->observe_cb) {
obs->ctx->observe_cb(LWM2M_OBSERVE_EVENT_NOTIFY_ACK,
&obs->path, reply->user_data);
}
}

/* remove observer on COAP_TYPE_RESET */
if (type == COAP_TYPE_RESET) {
if (reply->tkl > 0) {
msg = find_msg(NULL, reply);
ret = engine_remove_observer(msg->ctx, reply->token, reply->tkl);
ret = engine_remove_observer_by_token(msg->ctx, reply->token, reply->tkl);
if (ret) {
LOG_ERR("remove observe error: %d", ret);
}
Expand All @@ -4197,7 +4237,8 @@ static int notify_message_reply_cb(const struct coap_packet *response,

static int generate_notify_message(struct lwm2m_ctx *ctx,
struct observe_node *obs,
bool manual_trigger)
bool manual_trigger,
void *user_data)
{
struct lwm2m_message *msg;
struct lwm2m_engine_obj_inst *obj_inst;
Expand Down Expand Up @@ -4248,6 +4289,9 @@ static int generate_notify_message(struct lwm2m_ctx *ctx,
goto cleanup;
}

/* lwm2m_init_message() cleans the coap reply fields, so we assign our data here */
msg->reply->user_data = user_data;

/* each notification should increment the obs counter */
obs->counter++;
ret = coap_append_option_int(&msg->cpkt, COAP_OPTION_OBSERVE,
Expand Down Expand Up @@ -4276,6 +4320,36 @@ static int generate_notify_message(struct lwm2m_ctx *ctx,
return ret;
}

int send_traceable_notification(struct lwm2m_ctx *client_ctx, struct lwm2m_obj_path *path,
void *user_data)
{
/* TODO This should generate the notifications in place and not use the resource buffers,
* or generate_notify_message(). More discussion is going on in issue #32610.
*/

/* Find the observe node if there is one */
struct observe_node *obs, *found_obj = NULL;

/* find the node index */
SYS_SLIST_FOR_EACH_CONTAINER(&engine_observer_list, obs, node) {
if (memcmp(path, &obs->path, sizeof(*path)) == 0) {
found_obj = obs;
break;
}
}

char buf[LWM2M_MAX_PATH_STR_LEN];

if (!found_obj) {
LOG_ERR("Couldn't find observer for path %s", lwm2m_path_log_strdup(buf, path));
return -ENOENT;
}

LOG_INF("Sending notification for path %s", lwm2m_path_log_strdup(buf, path));

return generate_notify_message(client_ctx, obs, true, user_data);
}

static int32_t engine_next_service_timeout_ms(uint32_t max_timeout,
const int64_t timestamp)
{
Expand Down Expand Up @@ -4372,7 +4446,7 @@ int lwm2m_engine_context_close(struct lwm2m_ctx *client_ctx)
while (!sys_slist_is_empty(&client_ctx->observer)) {
obs_node = sys_slist_get_not_empty(&client_ctx->observer);
obs = SYS_SLIST_CONTAINER(obs_node, obs, node);
(void)memset(obs, 0, sizeof(*obs));
remove_observer_from_list(&client_ctx->observer, prev_node, obs);
}

for (i = 0, msg = messages; i < ARRAY_SIZE(messages); i++, msg++) {
Expand Down Expand Up @@ -4472,7 +4546,7 @@ static void check_notifications(struct lwm2m_ctx *ctx,
if (!manual_notify && !automatic_notify) {
continue;
}
rc = generate_notify_message(ctx, obs, manual_notify);
rc = generate_notify_message(ctx, obs, manual_notify, NULL);
if (rc == -ENOMEM) {
/* no memory/messages available, retry later */
return;
Expand Down
8 changes: 0 additions & 8 deletions subsys/net/lib/lwm2m/lwm2m_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,6 @@ struct lwm2m_message;
#define LWM2M_PATH_LEVEL_RESOURCE_INST 4

/* path representing object instances */
struct lwm2m_obj_path {
uint16_t obj_id;
uint16_t obj_inst_id;
uint16_t res_id;
uint16_t res_inst_id;
uint8_t level; /* 0/1/2/3/4 (4 = resource instance) */
};

#define OBJ_FIELD(_id, _perm, _type) \
{ .res_id = _id, \
.permissions = LWM2M_PERM_ ## _perm, \
Expand Down
4 changes: 3 additions & 1 deletion subsys/net/lib/lwm2m/lwm2m_rd_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,8 @@ static void lwm2m_rd_client_service(struct k_work *work)
}

void lwm2m_rd_client_start(struct lwm2m_ctx *client_ctx, const char *ep_name,
uint32_t flags, lwm2m_ctx_event_cb_t event_cb)
uint32_t flags, lwm2m_ctx_event_cb_t event_cb,
lwm2m_observe_cb_t observe_cb)
{
k_mutex_lock(&client.mutex, K_FOREVER);

Expand All @@ -1068,6 +1069,7 @@ void lwm2m_rd_client_start(struct lwm2m_ctx *client_ctx, const char *ep_name,
client.ctx = client_ctx;
client.ctx->sock_fd = -1;
client.ctx->fault_cb = socket_fault_cb;
client.ctx->observe_cb = observe_cb;
client.event_cb = event_cb;
client.use_bootstrap = flags & LWM2M_RD_CLIENT_FLAG_BOOTSTRAP;

Expand Down

0 comments on commit 6b38ba9

Please sign in to comment.