Skip to content

Commit

Permalink
Merge pull request #17876 from LabNConsulting/chopps/backend-ds-notify
Browse files Browse the repository at this point in the history
Handle datastore notifications correctly in backend clients (daemons)
  • Loading branch information
donaldsharp authored Jan 18, 2025
2 parents 13838a8 + 61949e4 commit 3c3b559
Show file tree
Hide file tree
Showing 11 changed files with 567 additions and 200 deletions.
44 changes: 35 additions & 9 deletions lib/mgmt_be_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1114,40 +1114,66 @@ static void be_client_handle_notify(struct mgmt_be_client *client, void *msgbuf,
size_t msg_len)
{
struct mgmt_msg_notify_data *notif_msg = msgbuf;
struct nb_node *nb_node;
struct lyd_node *dnode;
struct nb_node *nb_node, *nb_parent;
struct lyd_node *dnode = NULL;
const char *data = NULL;
const char *notif;
LY_ERR err;
bool is_yang_notify;
LY_ERR err = LY_SUCCESS;

debug_be_client("Received notification for client %s", client->name);

notif = mgmt_msg_native_xpath_data_decode(notif_msg, msg_len, data);
if (!notif || !data) {
if (!notif) {
log_err_be_client("Corrupt notify msg");
return;
}
if (!data && (notif_msg->op == NOTIFY_OP_DS_REPLACE || notif_msg->op == NOTIFY_OP_DS_PATCH)) {
log_err_be_client("Corrupt replace/patch notify msg: missing data");
return;
}

nb_node = nb_node_find(notif);
if (!nb_node) {
log_err_be_client("No schema found for notification: %s", notif);
return;
}

if (!nb_node->cbs.notify) {
is_yang_notify = !!CHECK_FLAG(nb_node->snode->nodetype, LYS_NOTIF);

if (is_yang_notify && !nb_node->cbs.notify) {
debug_be_client("No notification callback for: %s", notif);
return;
}

err = yang_parse_notification(notif, notif_msg->result_type, data,
if (!nb_node->cbs.notify) {
/*
* See if a parent has a callback, this is so backend's can
* listen for changes on an entire datastore sub-tree.
*/
for (nb_parent = nb_node->parent; nb_parent; nb_parent = nb_node->parent)
if (nb_parent->cbs.notify)
break;
if (!nb_parent) {
debug_be_client("Including parents, no DS notification callback for: %s",
notif);
return;
}
nb_node = nb_parent;
}

if (data && is_yang_notify) {
err = yang_parse_notification(notif, notif_msg->result_type, data, &dnode);
} else if (data) {
err = yang_parse_data(notif, notif_msg->result_type, false, true, false, data,
&dnode);
}
if (err) {
log_err_be_client("Can't parse notification data for: %s",
notif);
log_err_be_client("Can't parse notification data for: %s", notif);
return;
}

nb_callback_notify(nb_node, notif, dnode);
nb_callback_notify(nb_node, notif_msg->op, notif, dnode);

lyd_free_all(dnode);
}
Expand Down
3 changes: 2 additions & 1 deletion lib/northbound.c
Original file line number Diff line number Diff line change
Expand Up @@ -1857,14 +1857,15 @@ int nb_callback_rpc(const struct nb_node *nb_node, const char *xpath,
return nb_node->cbs.rpc(&args);
}

void nb_callback_notify(const struct nb_node *nb_node, const char *xpath,
void nb_callback_notify(const struct nb_node *nb_node, uint8_t op, const char *xpath,
struct lyd_node *dnode)
{
struct nb_cb_notify_args args = {};

DEBUGD(&nb_dbg_cbs_notify, "northbound notify: %s", xpath);

args.xpath = xpath;
args.op = op;
args.dnode = dnode;
nb_node->cbs.notify(&args);
}
Expand Down
3 changes: 2 additions & 1 deletion lib/northbound.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ struct nb_cb_rpc_args {
struct nb_cb_notify_args {
/* XPath of the notification. */
const char *xpath;
uint8_t op;

/*
* libyang data node representing the notification. If the notification
Expand Down Expand Up @@ -861,7 +862,7 @@ extern const void *nb_callback_lookup_next(const struct nb_node *nb_node,
extern int nb_callback_rpc(const struct nb_node *nb_node, const char *xpath,
const struct lyd_node *input, struct lyd_node *output,
char *errmsg, size_t errmsg_len);
extern void nb_callback_notify(const struct nb_node *nb_node, const char *xpath,
extern void nb_callback_notify(const struct nb_node *nb_node, uint8_t op, const char *xpath,
struct lyd_node *dnode);

/*
Expand Down
148 changes: 87 additions & 61 deletions lib/northbound_notif.c
Original file line number Diff line number Diff line change
Expand Up @@ -480,118 +480,124 @@ static struct op_changes_group *op_changes_group_next(void)
/* Query for changes and notify */
/* ---------------------------- */

static void timer_walk_abort(struct nb_notif_walk_args *args);
static void timer_walk_continue(struct event *event);
static void timer_walk_done(struct nb_notif_walk_args *args);

static struct op_change *__next_change(struct op_changes_group *group)
{
struct op_change *next = RB_NEXT(op_changes, group->cur_change);

/* Remove and free current so retry works */
RB_REMOVE(op_changes, group->cur_changes, group->cur_change);
op_change_free(group->cur_change);
return next;
}

static struct op_changes_group *__next_group(struct op_changes_group *group)
{
__dbg("done with oper-path collection for group");
op_changes_group_free(group);
return op_changes_group_next();
}

static enum nb_error oper_walk_done(const struct lyd_node *tree, void *arg, enum nb_error ret)
{
struct nb_notif_walk_args *args = arg;
struct op_changes_group *group = args->group;
const char *path = group->cur_change->path;
const char *op = group->cur_changes == &group->adds ? "add" : "delete";

/* we don't send batches when yielding as we need completed edit in any patch */
assert(ret != NB_YIELD);

nb_notif_walk = NULL;

if (ret == NB_ERR_NOT_FOUND) {
__dbg("Path not found while walking oper tree: %s", path);
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
return ret;
}
/* Something else went wrong with the walk */
if (ret != NB_OK) {
ret = NB_OK;
} else if (ret != NB_OK) {
error:
__log_err("Error notifying for datastore change on path: %s: %s", path,
nb_err_name(ret));
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
/* XXX Need to inform mgmtd/front-ends things are out-of-sync */
return ret;
}

__dbg("done with oper-path collection for %s path: %s", op, path);

/* Do we need this? */
while (tree->parent)
tree = lyd_parent(tree);

/* Send the add (replace) notification */
if (mgmt_be_send_ds_replace_notification(path, tree)) {
ret = NB_ERR;
goto error;
__log_err("Error notifying for datastore path: %s: %s", path, nb_err_name(ret));

timer_walk_abort(args);
goto done;
} else {
__dbg("Done with oper-path collection for path: %s", path);

/* Do we need this? */
while (tree->parent)
tree = lyd_parent(tree);

/* Send the add (replace) notification */
if (mgmt_be_send_ds_replace_notification(path, tree)) {
__log_err("Error sending notification message for path: %s", path);
ret = NB_ERR;
goto error;
}
}

/*
* Advance to next change (either dels or adds or both).
* Advance to next change.
*/

group->cur_change = RB_NEXT(op_changes, group->cur_change);
group->cur_change = __next_change(group);
if (!group->cur_change) {
__dbg("done with oper-path collection for group");
op_changes_group_free(group);

group = op_changes_group_next();
args->group = group;
if (!group) {
__dbg("done with ALL oper-path collection for notification");
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
args->group = __next_group(group);
if (!args->group) {
timer_walk_done(args);
goto done;
}
}

/* Run next walk after giving other events a shot to run */
event_add_timer_msec(nb_notif_master, timer_walk_continue, args, 0, &nb_notif_timer);
done:
/* Done with current walk and scheduled next one if there is more */
nb_notif_walk = NULL;

return NB_OK;
return ret;
}

static LY_ERR nb_notify_delete_changes(struct nb_notif_walk_args *args)
static int nb_notify_delete_changes(struct nb_notif_walk_args *args)
{
struct op_changes_group *group = args->group;
LY_ERR err;

group->cur_change = RB_MIN(op_changes, group->cur_changes);
while (group->cur_change) {
err = mgmt_be_send_ds_delete_notification(group->cur_change->path);
assert(err == LY_SUCCESS); /* XXX */

group->cur_change = RB_NEXT(op_changes, group->cur_change);
if (mgmt_be_send_ds_delete_notification(group->cur_change->path)) {
__log_err("Error sending delete notification message for path: %s",
group->cur_change->path);
return 1;
}
group->cur_change = __next_change(group);
}

return LY_SUCCESS;
return 0;
}

static void timer_walk_continue(struct event *event)
{
struct nb_notif_walk_args *args = EVENT_ARG(event);
struct op_changes_group *group = args->group;
const char *path;
LY_ERR err;
int ret;

/*
* Notify about deletes until we have add changes to collect.
*/
while (group->cur_changes == &group->dels) {
err = nb_notify_delete_changes(args);
assert(err == LY_SUCCESS); /* XXX */
assert(!group->cur_change); /* we send all the deletes in one message */
ret = nb_notify_delete_changes(args);
if (ret) {
timer_walk_abort(args);
return;
}

/* after deletes advance to adds */
group->cur_changes = &group->adds;
group->cur_change = RB_MIN(op_changes, group->cur_changes);
if (group->cur_change)
break;

__dbg("done with oper-path change group");
op_changes_group_free(group);

group = op_changes_group_next();
args->group = group;
if (!group) {
__dbg("done with ALL oper-path changes");
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
args->group = __next_group(group);
if (!args->group) {
timer_walk_done(args);
return;
}
}
Expand Down Expand Up @@ -621,6 +627,22 @@ static void timer_walk_start(struct event *event)
timer_walk_continue(event);
}

static void timer_walk_abort(struct nb_notif_walk_args *args)
{
__dbg("Failed notifying datastore changes, will retry");

__dbg("oper-state notify setting retry timer to fire in: %d msec ", NB_NOTIF_TIMER_MSEC);
event_add_timer_msec(nb_notif_master, timer_walk_continue, args, NB_NOTIF_TIMER_MSEC,
&nb_notif_timer);
}

static void timer_walk_done(struct nb_notif_walk_args *args)
{
__dbg("Finished notifying for all datastore changes");
assert(!args->group);
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
}

static void nb_notif_set_walk_timer(void)
{
if (nb_notif_walk) {
Expand Down Expand Up @@ -659,19 +681,23 @@ void nb_notif_init(struct event_loop *tm)

void nb_notif_terminate(void)
{
struct nb_notif_walk_args *args;
struct nb_notif_walk_args *args = nb_notif_timer ? EVENT_ARG(nb_notif_timer) : NULL;
struct op_changes_group *group;

__dbg("terminating: timer: %p timer arg: %p walk %p", nb_notif_timer, args, nb_notif_walk);

EVENT_OFF(nb_notif_timer);

if (nb_notif_walk) {
nb_oper_cancel_walk(nb_notif_walk);
/* need to free the group that's in the walk */
/* Grab walk args from walk if active. */
args = nb_oper_walk_finish_arg(nb_notif_walk);
if (args)
op_changes_group_free(args->group);
nb_oper_cancel_walk(nb_notif_walk);
nb_notif_walk = NULL;
}
if (args) {
op_changes_group_free(args->group);
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
}

while ((group = op_changes_group_next()))
op_changes_group_free(group);
Expand Down
Loading

0 comments on commit 3c3b559

Please sign in to comment.