Skip to content

Commit

Permalink
Added changes requested by kgiusti in the PR review.
Browse files Browse the repository at this point in the history
  • Loading branch information
ted-ross committed Dec 12, 2022
1 parent 8ad41b8 commit 185aafa
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 24 deletions.
4 changes: 4 additions & 0 deletions include/qpid/dispatch/ctools.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ do { \
#define MIN(a,b) (((a)<(b))?(a):(b))
#define MAX(a,b) (((a)>(b))?(a):(b))

#define BIT_SET(M,B) (M) |= (B)
#define BIT_CLEAR(M,B) (M) &= ~(B)
#define BIT_IS_SET(M,B) ((M) & (B))

//
// Heap allocation with abort() on failure
//
Expand Down
5 changes: 3 additions & 2 deletions include/qpid/dispatch/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ typedef struct qd_iterator_t qd_iterator_t;
* L^^^^^^^
* amqp:/_edge/<router>/<local>
* H^^^^^^^^ [ interior mode ]
* L_edge [ edge mode ]
* H^^^^^^^^ [ edge mode where <router> is an edge-peer ]
* L_edge [ edge mode where <router> is not an edge-peer ]
*
* ITER_VIEW_NODE_HASH - Isolate the hashable part of a router-id, used for headers
*
Expand Down Expand Up @@ -140,7 +141,7 @@ void qd_iterator_set_address(bool edge_mode, const char *area, const char *route
* Add and delete peer-edge router identities. When in edge mode, peer edge routers
* result in different hash results than remote edge routers. These functions are used
* to maintain the current set of peer edge routers.
*
*
* @param router The identifier of a router that is a peer-edge
*/
void qd_iterator_add_peer_edge(const char *router);
Expand Down
20 changes: 15 additions & 5 deletions src/iterator.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ ALLOC_DEFINE(qd_iterator_t);

typedef struct qd_iterator_peer_edge_t {
DEQ_LINKS(struct qd_iterator_peer_edge_t);
const char *identity;
char *router_id;
} qd_iterator_peer_edge_t;

DEQ_DECLARE(qd_iterator_peer_edge_t, qd_iterator_peer_edge_list_t);
Expand All @@ -83,13 +83,22 @@ typedef enum {
STATE_AT_NODE_ID
} state_t;


//
// Static state that influences how the iterator operates.
//
static bool edge_mode = false;
static char *my_area = 0;
static char *my_router = 0;

//
// Used for edge routers only. This is a list of routers that are connected directly
// to this router in a mesh-of-edges.
//
static qd_iterator_peer_edge_list_t peer_edges = DEQ_EMPTY;

//
// Separator characters that can be used to divide addresses into segments.
//
static const char *SEPARATORS = "./";


Expand Down Expand Up @@ -196,7 +205,7 @@ static void parse_address_view(qd_iterator_t *iter)
qd_iterator_peer_edge_t *peer_edge = DEQ_HEAD(peer_edges);
qd_buffer_field_t save_pointer = iter->view_pointer;
while (!!peer_edge) {
if (qd_iterator_prefix(iter, peer_edge->identity)) {
if (qd_iterator_prefix(iter, peer_edge->router_id)) {
is_peer = true;
iter->view_pointer = save_pointer;
break;
Expand Down Expand Up @@ -498,7 +507,7 @@ void qd_iterator_add_peer_edge(const char *router)
{
qd_iterator_peer_edge_t *peer_edge = NEW(qd_iterator_peer_edge_t);
ZERO(peer_edge);
peer_edge->identity = router;
peer_edge->router_id = strdup(router);
DEQ_INSERT_TAIL(peer_edges, peer_edge);
}

Expand All @@ -508,8 +517,9 @@ void qd_iterator_del_peer_edge(const char *router)
qd_iterator_peer_edge_t *peer_edge = DEQ_HEAD(peer_edges);

while (!!peer_edge) {
if (peer_edge->identity == router) {
if (strcmp(peer_edge->router_id, router) == 0) {
DEQ_REMOVE(peer_edges, peer_edge);
free(peer_edge->router_id);
free(peer_edge);
return;
}
Expand Down
19 changes: 9 additions & 10 deletions src/router_core/connections.c
Original file line number Diff line number Diff line change
Expand Up @@ -1402,16 +1402,15 @@ void qdr_process_addr_attributes_CT(qdr_core_t *core, qdr_address_t *addr)
bool new_value = qd_bitmask_cardinality(addr->rnodes) > 0;

int router_bit, c;
for (QD_BITMASK_EACH(addr->rnodes, router_bit, c)) {
if (addr->remote_sole_destination_meshes == 0) {
new_value = false;
break;
}

char *ptr = addr->remote_sole_destination_meshes + (router_bit * QD_DISCRIMINATOR_BYTES);
if (memcmp(ptr, addr->destination_mesh_id, QD_DISCRIMINATOR_BYTES) != 0) {
new_value = false;
break;
if (addr->remote_sole_destination_meshes == 0) {
new_value = false;
} else {
for (QD_BITMASK_EACH(addr->rnodes, router_bit, c)) {
char *ptr = addr->remote_sole_destination_meshes + (router_bit * QD_DISCRIMINATOR_BYTES);
if (memcmp(ptr, addr->destination_mesh_id, QD_DISCRIMINATOR_BYTES) != 0) {
new_value = false;
break;
}
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/router_core/forwarder.c
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,22 @@ static inline bool qdr_forward_edge_echo_CT(qdr_delivery_t *in_dlv, qdr_link_t *
return false;
}

//
// Compute 'mesh_loop' - True iff the message is annotated with an ingress-mesh and the mesh ID is the same as that
// of the outgoing connection. This means we are contemplating sending a delivery back to the edge-mesh from which
// it originated.
//
if (out_link->conn->role == QDR_ROLE_EDGE_CONNECTION && out_link->conn->edge_mesh_id[0] != '\0') {
qd_parsed_field_t *mesh_id = qd_message_get_ingress_mesh(in_dlv->msg);
mesh_loop = !!mesh_id && qd_iterator_equal_n(qd_parse_raw(mesh_id), (unsigned char*) out_link->conn->edge_mesh_id, QD_DISCRIMINATOR_BYTES);
}

//
// Sending the delivery on the out_link will result in edge-echo if any of the following are true:
// 1) The delivery came from an edge (we are an interior) and it's going to go back out the same connection
// 2) There will be a mesh-loop (as described above)
// 3) The out_link is a proxy (i.e. leads to a consumer on another router) and the in_connection is edge or inter-edge
//
return (((in_dlv->via_edge && in_link->conn == out_link->conn)
|| mesh_loop
|| ((in_link->conn->role == QDR_ROLE_INTER_EDGE || in_link->conn->role == QDR_ROLE_EDGE_CONNECTION) && out_link->proxy)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ static void mesh_discovery_edge_init_CT(qdr_core_t *core, void **module_context)
state.core->edge_mesh_identifier[QD_DISCRIMINATOR_BYTES] = '\0';

//
// Bind to the static address QD_TERMINUS_EDGE_MESH_PING.
// Bind to the static address QD_TERMINUS_MESH_ID_NEGOTIATION.
//
state.endpoint_desc.label = "mesh_discovery_edge";
state.endpoint_desc.on_first_attach = on_first_attach;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ static void mesh_discovery_interior_init_CT(qdr_core_t *core, void **module_cont
state.core = core;

//
// Bind to the static address QD_TERMINUS_EDGE_MESH_PING
// Bind to the static address QD_TERMINUS_MESH_DISCOVERY
//
state.endpoint_desc.label = "mesh_discovery_interior";
state.endpoint_desc.on_first_attach = on_first_attach;
Expand Down
4 changes: 0 additions & 4 deletions src/router_core/modules/mobile_sync/mobile.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ static const char *HAVE_SEQ = "have_seq";
#define ADDR_SYNC_ROUTER_MA_REQUESTED 0x00000001
#define ADDR_SYNC_ROUTER_VERSION_LOGGED 0x00000002

#define BIT_SET(M,B) (M) |= (B)
#define BIT_CLEAR(M,B) (M) &= ~(B)
#define BIT_IS_SET(M,B) ((M) & (B))

typedef struct {
qdr_core_t *core;
qdrc_event_subscription_t *event_sub;
Expand Down
1 change: 0 additions & 1 deletion tests/interrouter_msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def decode(data: bytes) -> 'RouterAnnotationsSection':
obj = Data()
obj.decode(data)
lcount = obj.get_list()
print("lcount == %d" % lcount)
assert lcount in (4, 5)
obj.enter()

Expand Down

0 comments on commit 185aafa

Please # to comment.