Skip to content

Commit

Permalink
Fixes #736 - Added the notification of negotiated mesh-ID from edge t…
Browse files Browse the repository at this point in the history
…o interior.
  • Loading branch information
ted-ross committed Oct 3, 2022
1 parent 271d650 commit 842802a
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ set(qpid_dispatch_SOURCES
router_core/modules/test_hooks/core_test_hooks.c
router_core/modules/edge_addr_tracking/edge_addr_tracking.c
router_core/modules/mesh_discovery/mesh_discovery_edge.c
router_core/modules/mesh_discovery/mesh_discovery_interior.c
router_core/modules/address_lookup_client/address_lookup_client.c
router_core/modules/heartbeat_edge/heartbeat_edge.c
router_core/modules/heartbeat_server/heartbeat_server.c
Expand Down
4 changes: 2 additions & 2 deletions src/router_core/modules/mesh_discovery/mesh_discovery_edge.c
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ static void on_transfer(void *link_context, qdr_delivery_t *delivery, qd_message
qd_iterator_t *iter = qd_message_field_iterator(message, QD_FIELD_APPLICATION_PROPERTIES);
qd_parsed_field_t *ap = qd_parse(iter);
if (!!ap) {
if (qd_parse_is_map(ap) && qd_parse_sub_count(ap) == 2) {
for (int i = 0; i < 2; i++) {
if (qd_parse_is_map(ap)) {
for (int i = 0; i < qd_parse_sub_count(ap); i++) {
qd_iterator_t *key_iter = qd_parse_raw(qd_parse_sub_key(ap, i));
qd_parsed_field_t *value = qd_parse_sub_value(ap, i);

Expand Down
129 changes: 129 additions & 0 deletions src/router_core/modules/mesh_discovery/mesh_discovery_interior.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "core_link_endpoint.h"
#include "delivery.h"
#include "module.h"

#include "qpid/dispatch/amqp.h"
#include "qpid/dispatch/ctools.h"

#include <stdio.h>


static struct {
qdr_core_t *core;
qdrc_endpoint_desc_t endpoint_desc;
} state;


static void on_first_attach(void *bind_context,
qdrc_endpoint_t *endpoint,
void **link_context,
qdr_terminus_t *remote_source,
qdr_terminus_t *remote_target)
{
qd_iterator_t *iter = qdr_terminus_get_address(remote_target);
qd_iterator_reset_view(iter, ITER_VIEW_ALL);

if (qd_iterator_equal(iter, (unsigned const char*) QD_TERMINUS_MESH_DISCOVERY)
&& qdrc_endpoint_get_direction_CT(endpoint) == QD_INCOMING) {
qdrc_endpoint_second_attach_CT(state.core, endpoint, remote_target, remote_source);
qdrc_endpoint_flow_CT(state.core, endpoint, 3, false);
*link_context = (void*) endpoint;
} else {
qdrc_endpoint_detach_CT(state.core, endpoint, 0);
qdr_terminus_free(remote_source);
qdr_terminus_free(remote_target);
}
}


static void on_transfer(void *link_context, qdr_delivery_t *delivery, qd_message_t *message)
{
qdrc_endpoint_t *endpoint = (qdrc_endpoint_t*) link_context;
char id[QD_DISCRIMINATOR_BYTES];
int values_found = 0;
qdr_link_t *link = qdr_delivery_link(delivery);
qdr_connection_t *conn = !!link ? link->conn : 0;

if (!!conn) {
if (qd_message_check_depth(message, QD_DEPTH_APPLICATION_PROPERTIES) == QD_MESSAGE_DEPTH_OK) {
qd_iterator_t *iter = qd_message_field_iterator(message, QD_FIELD_APPLICATION_PROPERTIES);
qd_parsed_field_t *ap = qd_parse(iter);
if (!!ap) {
if (qd_parse_is_map(ap)) {
for (int i = 0; i < qd_parse_sub_count(ap); i++) {
qd_iterator_t *key_iter = qd_parse_raw(qd_parse_sub_key(ap, i));
qd_parsed_field_t *value = qd_parse_sub_value(ap, i);

if (qd_iterator_equal(key_iter, (unsigned char*) QD_KEY_MESH_ID_ANNOUNCE_IDENTIFIER)) {
qd_iterator_ncopy(qd_parse_raw(value), (unsigned char*) id, QD_DISCRIMINATOR_BYTES);
values_found++;
}
}
}
qd_parse_free(ap);
qd_iterator_free(iter);
}
}
}

if (values_found == 1) {
memcpy(conn->edge_mesh_id, id, QD_DISCRIMINATOR_BYTES);
}

qdrc_endpoint_settle_CT(state.core, delivery, PN_ACCEPTED);

//
// Replenish the credit for this delivery
//
qdrc_endpoint_flow_CT(state.core, endpoint, 1, false);
}


static bool mesh_discovery_interior_enable_CT(qdr_core_t *core)
{
return core->router_mode == QD_ROUTER_MODE_INTERIOR;
}


static void mesh_discovery_interior_init_CT(qdr_core_t *core, void **module_context)
{
*module_context = &state;
ZERO(&state);
state.core = core;

//
// Bind to the static address QD_TERMINUS_EDGE_MESH_PING
//
state.endpoint_desc.label = "mesh_discovery_interior";
state.endpoint_desc.on_first_attach = on_first_attach;
state.endpoint_desc.on_transfer = on_transfer;
qdrc_endpoint_bind_mobile_address_CT(core, QD_TERMINUS_MESH_DISCOVERY, &state.endpoint_desc, &state);
}


static void mesh_discovery_interior_final_CT(void *module_context)
{
// Function intentionally left blank
}


QDR_CORE_MODULE_DECLARE("mesh_discovery_interior", mesh_discovery_interior_enable_CT, mesh_discovery_interior_init_CT, mesh_discovery_interior_final_CT)
1 change: 1 addition & 0 deletions src/router_core/router_core_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ struct qdr_connection_t {
qdr_connection_list_t connection_group; ///< List of associated connection group members
qdr_connection_t *group_cursor; ///< Pointer to the next group member to use for traffic allocation
qdr_edge_peer_t *edge_peer; ///< Edge routers only - Mesh-peer that this connection links to
char edge_mesh_id[QD_DISCRIMINATOR_BYTES]; ///< Interior, edge-role only - Identity of the connected mesh
};

void qdr_core_delete_auto_link (qdr_core_t *core, qdr_auto_link_t *al);
Expand Down

0 comments on commit 842802a

Please # to comment.