From 5f7b805d4ffe223e0d9ea9dba517dfd9976ba43d Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Fri, 3 Jan 2025 00:20:53 +0000 Subject: [PATCH 01/10] fw: make inface non-optional --- fw/defn/pkt.go | 5 +++-- fw/dispatch/face.go | 2 +- fw/face/ndnlp-link-service.go | 6 +++--- fw/fw/thread.go | 35 ++++++++++++----------------------- 4 files changed, 19 insertions(+), 29 deletions(-) diff --git a/fw/defn/pkt.go b/fw/defn/pkt.go index b970ffa8..30617ed2 100644 --- a/fw/defn/pkt.go +++ b/fw/defn/pkt.go @@ -21,7 +21,8 @@ type Pkt struct { PitToken []byte CongestionMark *uint64 - IncomingFaceID *uint64 - NextHopFaceID *uint64 CachePolicy *uint64 + + IncomingFaceID uint64 + NextHopFaceID *uint64 } diff --git a/fw/dispatch/face.go b/fw/dispatch/face.go index 75bf66f9..9edfcc77 100644 --- a/fw/dispatch/face.go +++ b/fw/dispatch/face.go @@ -33,7 +33,7 @@ type Face interface { type OutPkt struct { Pkt *defn.Pkt PitToken []byte - InFace *uint64 + InFace uint64 } // FaceDispatch is used to allow forwarding to interact with faces without a circular dependency issue. diff --git a/fw/face/ndnlp-link-service.go b/fw/face/ndnlp-link-service.go index b64c7014..94f74b98 100644 --- a/fw/face/ndnlp-link-service.go +++ b/fw/face/ndnlp-link-service.go @@ -256,8 +256,8 @@ func sendPacket(l *NDNLPLinkService, out dispatch.OutPkt) { } // Incoming face indication - if l.options.IsIncomingFaceIndicationEnabled && out.InFace != nil { - fragment.IncomingFaceId = out.InFace + if l.options.IsIncomingFaceIndicationEnabled { + fragment.IncomingFaceId = utils.IdPtr(out.InFace) } // Congestion marking @@ -293,7 +293,7 @@ func (l *NDNLPLinkService) handleIncomingFrame(frame []byte) { // All incoming frames come through a link service // Attempt to decode buffer into LpPacket pkt := &defn.Pkt{ - IncomingFaceID: utils.IdPtr(l.faceID), + IncomingFaceID: l.faceID, } L2, err := ReadPacketUnverified(enc.NewBufferReader(wire)) diff --git a/fw/fw/thread.go b/fw/fw/thread.go index 081190cb..d59ac37d 100644 --- a/fw/fw/thread.go +++ b/fw/fw/thread.go @@ -170,16 +170,11 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) { panic("processIncomingInterest called with non-Interest packet") } - // Ensure incoming face is indicated - if packet.IncomingFaceID == nil { - core.LogError(t, "Interest missing IncomingFaceId - DROP") - return - } // Already asserted that this is an Interest in link service // Get incoming face - incomingFace := dispatch.GetFace(*packet.IncomingFaceID) + incomingFace := dispatch.GetFace(packet.IncomingFaceID) if incomingFace == nil { - core.LogError(t, "Non-existent incoming FaceID=", *packet.IncomingFaceID, + core.LogError(t, "Non-existent incoming FaceID=", packet.IncomingFaceID, " for Interest=", packet.Name, " - DROP") return } @@ -376,7 +371,7 @@ func (t *Thread) processOutgoingInterest( outgoingFace.SendPacket(dispatch.OutPkt{ Pkt: packet, PitToken: pitToken, - InFace: utils.IdPtr(inFace), + InFace: inFace, }) return true @@ -400,12 +395,6 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) { panic("processIncomingData called with non-Data packet") } - // Ensure incoming face is indicated - if packet.IncomingFaceID == nil { - core.LogError(t, "Data missing IncomingFaceId - DROP") - return - } - // Get PIT if present var pitToken *uint32 //lint:ignore S1009 removing the nil check causes a segfault ¯\_(ツ)_/¯ @@ -414,9 +403,9 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) { } // Get incoming face - incomingFace := dispatch.GetFace(*packet.IncomingFaceID) + incomingFace := dispatch.GetFace(packet.IncomingFaceID) if incomingFace == nil { - core.LogError(t, "Non-existent nexthop FaceID=", *packet.IncomingFaceID, " for Data=", packet.Name, " DROP") + core.LogError(t, "Non-existent nexthop FaceID=", packet.IncomingFaceID, " for Data=", packet.Name, " DROP") return } @@ -425,7 +414,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) { // Check if violates /localhost if incomingFace.Scope() == defn.NonLocal && len(packet.Name) > 0 && bytes.Equal(data.NameV[0].Val, LOCALHOST) { - core.LogWarn(t, "Data ", packet.Name, " from non-local FaceID=", *packet.IncomingFaceID, " violates /localhost scope - DROP") + core.LogWarn(t, "Data ", packet.Name, " from non-local FaceID=", packet.IncomingFaceID, " violates /localhost scope - DROP") return } @@ -438,7 +427,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) { pitEntries := t.pitCS.FindInterestPrefixMatchByDataEnc(data, pitToken) if len(pitEntries) == 0 { // Unsolicited Data - nothing more to do - core.LogDebug(t, "Unsolicited data ", packet.Name, " FaceID=", *packet.IncomingFaceID, " - DROP") + core.LogDebug(t, "Unsolicited data ", packet.Name, " FaceID=", packet.IncomingFaceID, " - DROP") return } @@ -456,7 +445,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) { // Invoke strategy's AfterReceiveData core.LogTrace(t, "Sending Data=", packet.Name, " to strategy=", strategyName) - strategy.AfterReceiveData(packet, pitEntry, *packet.IncomingFaceID) + strategy.AfterReceiveData(packet, pitEntry, packet.IncomingFaceID) // Mark PIT entry as satisfied pitEntry.SetSatisfied(true) @@ -478,7 +467,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) { // Store all pending downstreams (except face Data packet arrived on) and PIT tokens downstreams := make(map[uint64][]byte) for face, record := range pitEntry.InRecords() { - if face != *packet.IncomingFaceID { + if face != packet.IncomingFaceID { // TODO: Ad-hoc faces downstreams[face] = make([]byte, len(record.PitToken)) copy(downstreams[face], record.PitToken) @@ -489,7 +478,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) { table.SetExpirationTimerToNow(pitEntry) // Invoke strategy's BeforeSatisfyInterest - strategy.BeforeSatisfyInterest(pitEntry, *packet.IncomingFaceID) + strategy.BeforeSatisfyInterest(pitEntry, packet.IncomingFaceID) // Mark PIT entry as satisfied pitEntry.SetSatisfied(true) @@ -506,7 +495,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) { // Call outgoing Data pipeline for each pending downstream for face, token := range downstreams { core.LogTrace(t, "Multiple PIT entries Data=", packet.Name) - t.processOutgoingData(packet, face, token, *packet.IncomingFaceID) + t.processOutgoingData(packet, face, token, packet.IncomingFaceID) } } } @@ -545,6 +534,6 @@ func (t *Thread) processOutgoingData( outgoingFace.SendPacket(dispatch.OutPkt{ Pkt: packet, PitToken: pitToken, - InFace: utils.IdPtr(inFace), + InFace: inFace, }) } From 9bd01599fcd6f596d0a13ce4be8350f2ffa98c7a Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Fri, 3 Jan 2025 00:35:03 +0000 Subject: [PATCH 02/10] std: refactor localhost and localhop constants --- dv/config/config.go | 10 ++++------ dv/dv/advert_data.go | 3 +-- dv/dv/router.go | 3 +-- dv/table/neighbor_table.go | 2 +- fw/fw/thread.go | 15 +++++---------- std/encoding/init.go | 3 +++ std/ndn/mgmt_2022/mgmt.go | 4 ++-- tools/nfdc/nfdc.go | 2 +- 8 files changed, 18 insertions(+), 24 deletions(-) diff --git a/dv/config/config.go b/dv/config/config.go index 74159864..144adccb 100644 --- a/dv/config/config.go +++ b/dv/config/config.go @@ -8,11 +8,9 @@ import ( ) const CostInfinity = uint64(16) -const MulticastStrategy = "/localhost/nfd/strategy/multicast" const NlsrOrigin = uint64(128) -var Localhop = enc.Name{enc.NewStringComponent(enc.TypeGenericNameComponent, "localhop")} -var Localhost = enc.Name{enc.NewStringComponent(enc.TypeGenericNameComponent, "localhost")} +var MulticastStrategy, _ = enc.NameFromStr("/localhost/nfd/strategy/multicast") type Config struct { // Network should be the same for all routers in the network. @@ -81,7 +79,7 @@ func (c *Config) Parse() (err error) { } // Create name table - c.advSyncPfxN = append(Localhop, append(c.networkNameN, + c.advSyncPfxN = append(enc.Name{enc.LOCALHOP}, append(c.networkNameN, enc.NewStringComponent(enc.TypeKeywordNameComponent, "DV"), enc.NewStringComponent(enc.TypeKeywordNameComponent, "ADS"), )...) @@ -91,7 +89,7 @@ func (c *Config) Parse() (err error) { c.advSyncPassivePfxN = append(c.advSyncPfxN, enc.NewStringComponent(enc.TypeKeywordNameComponent, "PSV"), ) - c.advDataPfxN = append(Localhop, append(c.routerNameN, + c.advDataPfxN = append(enc.Name{enc.LOCALHOP}, append(c.routerNameN, enc.NewStringComponent(enc.TypeKeywordNameComponent, "DV"), enc.NewStringComponent(enc.TypeKeywordNameComponent, "ADV"), )...) @@ -103,7 +101,7 @@ func (c *Config) Parse() (err error) { enc.NewStringComponent(enc.TypeKeywordNameComponent, "DV"), enc.NewStringComponent(enc.TypeKeywordNameComponent, "PFX"), ) - c.localPfxN = append(Localhost, + c.localPfxN = append(enc.Name{enc.LOCALHOST}, enc.NewStringComponent(enc.TypeGenericNameComponent, "nlsr"), ) diff --git a/dv/dv/advert_data.go b/dv/dv/advert_data.go index 02f8fab4..efd8a5e6 100644 --- a/dv/dv/advert_data.go +++ b/dv/dv/advert_data.go @@ -3,7 +3,6 @@ package dv import ( "time" - "github.com/named-data/ndnd/dv/config" "github.com/named-data/ndnd/dv/tlv" enc "github.com/named-data/ndnd/std/encoding" "github.com/named-data/ndnd/std/log" @@ -20,7 +19,7 @@ func (dv *Router) advertDataFetch(nodeId enc.Name, seqNo uint64) { return } - advName := append(config.Localhop, append(nodeId, + advName := append(enc.Name{enc.LOCALHOP}, append(nodeId, enc.NewStringComponent(enc.TypeKeywordNameComponent, "DV"), enc.NewStringComponent(enc.TypeKeywordNameComponent, "ADV"), enc.NewSequenceNumComponent(seqNo), // unused for now diff --git a/dv/dv/router.go b/dv/dv/router.go index fb357769..a1f43d26 100644 --- a/dv/dv/router.go +++ b/dv/dv/router.go @@ -228,7 +228,6 @@ func (dv *Router) register() (err error) { } // Set strategy to multicast for sync prefixes - mcast, _ := enc.NameFromStr(config.MulticastStrategy) pfxs = []enc.Name{ dv.config.AdvertisementSyncPrefix(), dv.config.PrefixTableSyncPrefix(), @@ -240,7 +239,7 @@ func (dv *Router) register() (err error) { Args: &mgmt.ControlArgs{ Name: prefix, Strategy: &mgmt.Strategy{ - Name: mcast, + Name: config.MulticastStrategy, }, }, Retries: -1, diff --git a/dv/table/neighbor_table.go b/dv/table/neighbor_table.go index 96cecb76..38f675d0 100644 --- a/dv/table/neighbor_table.go +++ b/dv/table/neighbor_table.go @@ -127,7 +127,7 @@ func (ns *NeighborState) delete() { } func (ns *NeighborState) localRoute() enc.Name { - return append(config.Localhop, append(ns.Name, + return append(enc.Name{enc.LOCALHOP}, append(ns.Name, enc.NewStringComponent(enc.TypeKeywordNameComponent, "DV"), )...) } diff --git a/fw/fw/thread.go b/fw/fw/thread.go index d59ac37d..2ca9f931 100644 --- a/fw/fw/thread.go +++ b/fw/fw/thread.go @@ -8,7 +8,6 @@ package fw import ( - "bytes" "encoding/binary" "runtime" "strconv" @@ -26,14 +25,13 @@ const MaxFwThreads = 32 // Threads contains all forwarding threads var Threads []*Thread -var LOCALHOST = []byte{0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74} // HashNameToFwThread hashes an NDN name to a forwarding thread. func HashNameToFwThread(name enc.Name) int { // Dispatch all management requests to thread 0 // this is fine, all it does is make sure the pitcs table in thread 0 has the management stuff. // This is not actually touching management. - if len(name) > 0 && bytes.Equal((name)[0].Val, LOCALHOST) { + if len(name) > 0 && name[0].Equal(enc.LOCALHOST) { return 0 } // to prevent negative modulos because we converted from uint to int @@ -46,7 +44,7 @@ func HashNameToAllPrefixFwThreads(name enc.Name) []bool { threads := make([]bool, len(Threads)) // Dispatch all management requests to thread 0 - if len(name) > 0 && bytes.Equal((name)[0].Val, LOCALHOST) { + if len(name) > 0 && name[0].Equal(enc.LOCALHOST) { threads[0] = true return threads } @@ -191,9 +189,7 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) { core.LogTrace(t, "OnIncomingInterest: ", packet.Name, ", FaceID=", incomingFace.FaceID(), ", PitTokenL=", len(packet.PitToken)) // Check if violates /localhost - if incomingFace.Scope() == defn.NonLocal && - len(interest.NameV) > 0 && - bytes.Equal(interest.NameV[0].Val, LOCALHOST) { + if incomingFace.Scope() == defn.NonLocal && len(packet.Name) > 0 && packet.Name[0].Equal(enc.LOCALHOST) { core.LogWarn(t, "Interest ", packet.Name, " from non-local face=", incomingFace.FaceID(), " violates /localhost scope - DROP") return } @@ -412,8 +408,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) { t.NInData++ // Check if violates /localhost - if incomingFace.Scope() == defn.NonLocal && len(packet.Name) > 0 && - bytes.Equal(data.NameV[0].Val, LOCALHOST) { + if incomingFace.Scope() == defn.NonLocal && len(packet.Name) > 0 && packet.Name[0].Equal(enc.LOCALHOST) { core.LogWarn(t, "Data ", packet.Name, " from non-local FaceID=", packet.IncomingFaceID, " violates /localhost scope - DROP") return } @@ -522,7 +517,7 @@ func (t *Thread) processOutgoingData( } // Check if violates /localhost - if outgoingFace.Scope() == defn.NonLocal && len(data.NameV) > 0 && bytes.Equal(data.NameV[0].Val, LOCALHOST) { + if outgoingFace.Scope() == defn.NonLocal && len(packet.Name) > 0 && packet.Name[0].Equal(enc.LOCALHOST) { core.LogWarn(t, "Data ", packet.Name, " cannot be sent to non-local FaceID=", nexthop, " since violates /localhost scope - DROP") return } diff --git a/std/encoding/init.go b/std/encoding/init.go index 6e73f48d..3135e339 100644 --- a/std/encoding/init.go +++ b/std/encoding/init.go @@ -1,5 +1,8 @@ package encoding +var LOCALHOST = NewStringComponent(TypeGenericNameComponent, "localhost") +var LOCALHOP = NewStringComponent(TypeGenericNameComponent, "localhop") + func init() { initComponentConventions() } diff --git a/std/ndn/mgmt_2022/mgmt.go b/std/ndn/mgmt_2022/mgmt.go index 2d2c3870..9fcd3a38 100644 --- a/std/ndn/mgmt_2022/mgmt.go +++ b/std/ndn/mgmt_2022/mgmt.go @@ -23,9 +23,9 @@ func (mgmt *MgmtConfig) MakeCmd(module string, cmd string, var name enc.Name if mgmt.local { - name = append(name, enc.NewStringComponent(enc.TypeGenericNameComponent, "localhost")) + name = append(name, enc.LOCALHOST) } else { - name = append(name, enc.NewStringComponent(enc.TypeGenericNameComponent, "localhop")) + name = append(name, enc.LOCALHOP) } name = append(name, diff --git a/tools/nfdc/nfdc.go b/tools/nfdc/nfdc.go index 467a3eb5..bad73265 100644 --- a/tools/nfdc/nfdc.go +++ b/tools/nfdc/nfdc.go @@ -107,7 +107,7 @@ func (n *Nfdc) Stop() { func (n *Nfdc) GetPrefix() enc.Name { return enc.Name{ - enc.NewStringComponent(enc.TypeGenericNameComponent, "localhost"), + enc.LOCALHOST, enc.NewStringComponent(enc.TypeGenericNameComponent, "nfd"), } } From 62bf7c519f214e5139cdbba430a0277e0f654a59 Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Fri, 3 Jan 2025 00:37:36 +0000 Subject: [PATCH 03/10] fw: fix internal transport close panic --- fw/face/internal-transport.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fw/face/internal-transport.go b/fw/face/internal-transport.go index 3ce46838..860ed9d6 100644 --- a/fw/face/internal-transport.go +++ b/fw/face/internal-transport.go @@ -150,7 +150,6 @@ func (t *InternalTransport) runReceive() { func (t *InternalTransport) Close() { if t.running.Swap(false) { - close(t.recvQueue) - close(t.sendQueue) + // do not close the queues, let them be garbage collected } } From b7fffebc810c42be5997df5d7235910c632df345 Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Fri, 3 Jan 2025 00:38:31 +0000 Subject: [PATCH 04/10] fw: remove duplicate face lookup --- fw/fw/thread.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fw/fw/thread.go b/fw/fw/thread.go index 2ca9f931..6a76c531 100644 --- a/fw/fw/thread.go +++ b/fw/fw/thread.go @@ -286,9 +286,9 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) { // If NextHopFaceId set, forward to that face (if it exists) or drop if packet.NextHopFaceID != nil { - if dispatch.GetFace(*packet.NextHopFaceID) != nil { + if face := dispatch.GetFace(*packet.NextHopFaceID); face != nil { core.LogTrace(t, "NextHopFaceId is set for Interest ", packet.Name, " - dispatching directly to face") - dispatch.GetFace(*packet.NextHopFaceID).SendPacket(dispatch.OutPkt{ + face.SendPacket(dispatch.OutPkt{ Pkt: packet, PitToken: packet.PitToken, // TODO: ?? InFace: packet.IncomingFaceID, From 091aac17668eb8c1861940650a11c35cc51131a9 Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Fri, 3 Jan 2025 00:50:24 +0000 Subject: [PATCH 05/10] fw: implement localhop (close #73) --- fw/fw/thread.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/fw/fw/thread.go b/fw/fw/thread.go index 6a76c531..707b82cc 100644 --- a/fw/fw/thread.go +++ b/fw/fw/thread.go @@ -305,13 +305,25 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) { lookupName = fhName } - // Query the FIB for possible nexthops + // Query the FIB for all possible nexthops nexthops := table.FibStrategyTable.FindNextHopsEnc(lookupName) - // Exclude faces that have an in-record for this interest - // TODO: unclear where NFD dev guide specifies such behavior (if any) + // If the first component is /localhop, we do not forward interests received + // on non-local faces to non-local faces + localFacesOnly := incomingFace.Scope() != defn.Local && len(packet.Name) > 0 && packet.Name[0].Equal(enc.LOCALHOP) + + // Filter the nexthops that are allowed for this Interest allowedNexthops := make([]*table.FibNextHopEntry, 0, len(nexthops)) for _, nexthop := range nexthops { + // Exclude non-local faces for localhop enforcement + if localFacesOnly { + if face := dispatch.GetFace(nexthop.Nexthop); face != nil && face.Scope() != defn.Local { + continue + } + } + + // Exclude faces that have an in-record for this interest + // TODO: unclear where NFD dev guide specifies such behavior (if any) record := pitEntry.InRecords()[nexthop.Nexthop] if record == nil || nexthop.Nexthop == incomingFace.FaceID() { allowedNexthops = append(allowedNexthops, nexthop) From 7e2938569f0f41463ca569d4d05b61dc7447263e Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Fri, 3 Jan 2025 03:12:51 +0000 Subject: [PATCH 06/10] nfdc: pretty-print origin --- fw/mgmt/nlsr_readvertiser.go | 18 +++++------ fw/mgmt/rib.go | 6 ++-- fw/table/rib.go | 22 ++----------- std/ndn/mgmt_2022/definitions.go | 6 ---- std/ndn/mgmt_2022/persistency.go | 31 +++++++++++------- std/ndn/mgmt_2022/route.go | 54 ++++++++++++++++++++++++++++++++ tools/nfdc/nfdc_cmd.go | 11 +++---- tools/nfdc/nfdc_route.go | 15 +++++++-- 8 files changed, 106 insertions(+), 57 deletions(-) create mode 100644 std/ndn/mgmt_2022/route.go diff --git a/fw/mgmt/nlsr_readvertiser.go b/fw/mgmt/nlsr_readvertiser.go index c9e1c7e3..8e00641a 100644 --- a/fw/mgmt/nlsr_readvertiser.go +++ b/fw/mgmt/nlsr_readvertiser.go @@ -6,7 +6,7 @@ import ( "github.com/named-data/ndnd/fw/core" "github.com/named-data/ndnd/fw/table" enc "github.com/named-data/ndnd/std/encoding" - ndn_mgmt "github.com/named-data/ndnd/std/ndn/mgmt_2022" + spec_mgmt "github.com/named-data/ndnd/std/ndn/mgmt_2022" "github.com/named-data/ndnd/std/utils" ) @@ -32,7 +32,7 @@ func (r *NlsrReadvertiser) String() string { } func (r *NlsrReadvertiser) Announce(name enc.Name, route *table.Route) { - if route.Origin != table.RouteOriginClient { + if route.Origin != uint64(spec_mgmt.RouteOriginClient) { core.LogDebug(r, "skip advertise=", name, " origin=", route.Origin) return } @@ -42,14 +42,14 @@ func (r *NlsrReadvertiser) Announce(name enc.Name, route *table.Route) { defer r.mutex.Unlock() r.advertised[name.Hash()] += 1 - params := &ndn_mgmt.ControlArgs{ + params := &spec_mgmt.ControlArgs{ Name: name, Origin: utils.IdPtr(route.Origin), Cost: utils.IdPtr(route.Cost), } - iparams := &ndn_mgmt.ControlParameters{ - Val: &ndn_mgmt.ControlArgs{Name: name}, + iparams := &spec_mgmt.ControlParameters{ + Val: &spec_mgmt.ControlArgs{Name: name}, } cmd, _ := enc.NameFromStr("/localhost/nlsr/rib/register") cmd = append(cmd, enc.NewBytesComponent(enc.TypeGenericNameComponent, iparams.Encode().Join())) @@ -58,7 +58,7 @@ func (r *NlsrReadvertiser) Announce(name enc.Name, route *table.Route) { } func (r *NlsrReadvertiser) Withdraw(name enc.Name, route *table.Route) { - if route.Origin != table.RouteOriginClient { + if route.Origin != uint64(spec_mgmt.RouteOriginClient) { core.LogDebug(r, "skip withdraw=", name, " origin=", route.Origin) return } @@ -74,13 +74,13 @@ func (r *NlsrReadvertiser) Withdraw(name enc.Name, route *table.Route) { } core.LogInfo(r, "withdraw=", name) - params := &ndn_mgmt.ControlArgs{ + params := &spec_mgmt.ControlArgs{ Name: name, Origin: utils.IdPtr(route.Origin), } - iparams := &ndn_mgmt.ControlParameters{ - Val: &ndn_mgmt.ControlArgs{Name: name}, + iparams := &spec_mgmt.ControlParameters{ + Val: &spec_mgmt.ControlArgs{Name: name}, } cmd, _ := enc.NameFromStr("/localhost/nlsr/rib/unregister") cmd = append(cmd, enc.NewBytesComponent(enc.TypeGenericNameComponent, iparams.Encode().Join())) diff --git a/fw/mgmt/rib.go b/fw/mgmt/rib.go index b07bccdf..a5abaf71 100644 --- a/fw/mgmt/rib.go +++ b/fw/mgmt/rib.go @@ -93,7 +93,7 @@ func (r *RIBModule) register(interest *spec.Interest, pitToken []byte, inFace ui } } - origin := table.RouteOriginApp + origin := uint64(mgmt.RouteOriginApp) if params.Origin != nil { origin = *params.Origin } @@ -103,7 +103,7 @@ func (r *RIBModule) register(interest *spec.Interest, pitToken []byte, inFace ui cost = *params.Cost } - flags := table.RouteFlagChildInherit + flags := uint64(mgmt.RouteFlagChildInherit) if params.Flags != nil { flags = *params.Flags } @@ -172,7 +172,7 @@ func (r *RIBModule) unregister(interest *spec.Interest, pitToken []byte, inFace faceID = *params.FaceId } - origin := table.RouteOriginApp + origin := uint64(mgmt.RouteOriginApp) if params.Origin != nil { origin = *params.Origin } diff --git a/fw/table/rib.go b/fw/table/rib.go index 7bce86a4..15224492 100644 --- a/fw/table/rib.go +++ b/fw/table/rib.go @@ -13,6 +13,7 @@ import ( "time" enc "github.com/named-data/ndnd/std/encoding" + spec_mgmt "github.com/named-data/ndnd/std/ndn/mgmt_2022" ) // RibTable represents the Routing Information Base (RIB). @@ -42,23 +43,6 @@ type Route struct { ExpirationPeriod *time.Duration } -// Route flags. -const ( - RouteFlagChildInherit uint64 = 0x01 - RouteFlagCapture uint64 = 0x02 -) - -// Route origins. -const ( - RouteOriginApp uint64 = 0 - RouteOriginStatic uint64 = 255 - RouteOriginNLSR uint64 = 128 - RouteOriginPrefixAnn uint64 = 129 - RouteOriginClient uint64 = 65 - RouteOriginAutoreg uint64 = 64 - RouteOriginAutoconf uint64 = 66 -) - // Rib is the Routing Information Base. var Rib = RibTable{ root: RibEntry{ @@ -269,9 +253,9 @@ func (r *RibEntry) HasCaptureRoute() bool { } func (r *Route) HasCaptureFlag() bool { - return r.Flags&RouteFlagCapture != 0 + return r.Flags&uint64(spec_mgmt.RouteFlagCapture) != 0 } func (r *Route) HasChildInheritFlag() bool { - return r.Flags&RouteFlagChildInherit != 0 + return r.Flags&uint64(spec_mgmt.RouteFlagChildInherit) != 0 } diff --git a/std/ndn/mgmt_2022/definitions.go b/std/ndn/mgmt_2022/definitions.go index a390c2a2..1b440ff2 100644 --- a/std/ndn/mgmt_2022/definitions.go +++ b/std/ndn/mgmt_2022/definitions.go @@ -31,12 +31,6 @@ const ( FaceFlagCongestionMarkingEnabled = uint64(4) ) -const ( - RouteFlagNoFlag = uint64(0) - RouteFlagChildInherit = uint64(1) - RouteFlagCapture = uint64(2) -) - const ( FaceEventCreated = uint64(1) FaceEventDestroyed = uint64(2) diff --git a/std/ndn/mgmt_2022/persistency.go b/std/ndn/mgmt_2022/persistency.go index 0460a1ed..06f9c7ba 100644 --- a/std/ndn/mgmt_2022/persistency.go +++ b/std/ndn/mgmt_2022/persistency.go @@ -1,24 +1,33 @@ package mgmt_2022 -// Persistency represents the persistency of a face. +import "errors" + type Persistency uint64 -// Face persistencies (shared with management). const ( PersistencyPersistent Persistency = 0 PersistencyOnDemand Persistency = 1 PersistencyPermanent Persistency = 2 ) +var PersistencyList = map[Persistency]string{ + PersistencyPersistent: "persistent", + PersistencyOnDemand: "on-demand", + PersistencyPermanent: "permanent", +} + func (p Persistency) String() string { - switch p { - case PersistencyPersistent: - return "Persistent" - case PersistencyOnDemand: - return "OnDemand" - case PersistencyPermanent: - return "Permanent" - default: - return "Unknown" + if s, ok := PersistencyList[p]; ok { + return s + } + return "unknown" +} + +func ParsePersistency(s string) (Persistency, error) { + for k, v := range PersistencyList { + if v == s { + return k, nil + } } + return 0, errors.New("unknown persistency") } diff --git a/std/ndn/mgmt_2022/route.go b/std/ndn/mgmt_2022/route.go new file mode 100644 index 00000000..fd29e957 --- /dev/null +++ b/std/ndn/mgmt_2022/route.go @@ -0,0 +1,54 @@ +package mgmt_2022 + +type RouteFlag uint64 + +const ( + RouteFlagNoFlag RouteFlag = 0 + RouteFlagChildInherit RouteFlag = 1 + RouteFlagCapture RouteFlag = 2 +) + +var RouteFlagList = map[RouteFlag]string{ + RouteFlagChildInherit: "child-inherit", + RouteFlagCapture: "capture", +} + +func (v RouteFlag) String() string { + if s, ok := RouteFlagList[v]; ok { + return s + } + return "unknown" +} + +func (v RouteFlag) IsSet(flags uint64) bool { + return uint64(v)&flags != 0 +} + +type RouteOrigin uint64 + +const ( + RouteOriginApp RouteOrigin = 0 + RouteOriginStatic RouteOrigin = 255 + RouteOriginNLSR RouteOrigin = 128 + RouteOriginPrefixAnn RouteOrigin = 129 + RouteOriginClient RouteOrigin = 65 + RouteOriginAutoreg RouteOrigin = 64 + RouteOriginAutoconf RouteOrigin = 66 +) + +var RouteOriginList = map[RouteOrigin]string{ + RouteOriginApp: "app", + RouteOriginStatic: "static", + RouteOriginNLSR: "nlsr", + RouteOriginPrefixAnn: "prefixann", + RouteOriginClient: "client", + RouteOriginAutoreg: "autoreg", + RouteOriginAutoconf: "autoconf", +} + +func (v RouteOrigin) String() string { + if s, ok := RouteOriginList[v]; ok { + return s + } + return "unknown" +} diff --git a/tools/nfdc/nfdc_cmd.go b/tools/nfdc/nfdc_cmd.go index e0fa7c09..a9f02eee 100644 --- a/tools/nfdc/nfdc_cmd.go +++ b/tools/nfdc/nfdc_cmd.go @@ -168,15 +168,12 @@ func (n *Nfdc) convCmdArg(ctrlArgs *mgmt.ControlArgs, key string, val string) { case "mtu": ctrlArgs.Mtu = utils.IdPtr(parseUint(val)) case "persistency": - switch val { - case "permanent": - ctrlArgs.FacePersistency = utils.IdPtr(uint64(mgmt.PersistencyPermanent)) - case "persistent": - ctrlArgs.FacePersistency = utils.IdPtr(uint64(mgmt.PersistencyPersistent)) - default: + persistency, err := mgmt.ParsePersistency(val) + if err != nil { fmt.Fprintf(os.Stderr, "Invalid persistency: %s\n", val) os.Exit(9) } + ctrlArgs.FacePersistency = utils.IdPtr(uint64(persistency)) // route arguments case "prefix": @@ -219,6 +216,8 @@ func (n *Nfdc) printCtrlResponse(res *mgmt.ControlResponse) { switch key { case "FacePersistency": val = mgmt.Persistency(val.(uint64)).String() + case "Origin": + val = mgmt.RouteOrigin(val.(uint64)).String() } fmt.Printf(" %s=%v\n", key, val) diff --git a/tools/nfdc/nfdc_route.go b/tools/nfdc/nfdc_route.go index 0e8e2848..12c2bc63 100644 --- a/tools/nfdc/nfdc_route.go +++ b/tools/nfdc/nfdc_route.go @@ -35,9 +35,18 @@ func (n *Nfdc) ExecRouteList(args []string) { expiry = (time.Duration(*route.ExpirationPeriod) * time.Millisecond).String() } - // TODO: convert origin, flags to string - fmt.Printf("prefix=%s nexthop=%d origin=%d cost=%d flags=%d expires=%s\n", - entry.Name, route.FaceId, route.Origin, route.Cost, route.Flags, expiry) + flagList := make([]string, 0) + for flag, name := range mgmt.RouteFlagList { + if flag.IsSet(route.Flags) { + flagList = append(flagList, name) + } + } + flags := strings.Join(flagList, ", ") + + origin := mgmt.RouteOrigin(route.Origin) + + fmt.Printf("prefix=%s nexthop=%d origin=%s cost=%d flags={%s} expires=%s\n", + entry.Name, route.FaceId, origin, route.Cost, flags, expiry) } } } From f3ec652db6345a11d31e56fce1290d3649daa61c Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Fri, 3 Jan 2025 03:27:29 +0000 Subject: [PATCH 07/10] dv: remove old TODO --- dv/dv/advert_sync.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dv/dv/advert_sync.go b/dv/dv/advert_sync.go index 39363250..8a0b2b44 100644 --- a/dv/dv/advert_sync.go +++ b/dv/dv/advert_sync.go @@ -40,7 +40,6 @@ func (dv *Router) advertSyncSendInterestImpl(prefix enc.Name) (err error) { } // State Vector for our group - // TODO: switch to new TLV types sv := &svs_2024.StateVectorAppParam{ StateVector: &svs_2024.StateVector{ Entries: []*svs_2024.StateVectorEntry{{ From de4ebcf2d548f6f81b7c5c1c82ee11b35a03394e Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Fri, 3 Jan 2025 03:29:54 +0000 Subject: [PATCH 08/10] putchunks: add signal handler --- tools/putchunks.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tools/putchunks.go b/tools/putchunks.go index 858b8b5f..08162253 100644 --- a/tools/putchunks.go +++ b/tools/putchunks.go @@ -4,6 +4,8 @@ import ( "fmt" "io" "os" + "os/signal" + "syscall" enc "github.com/named-data/ndnd/std/encoding" "github.com/named-data/ndnd/std/engine" @@ -92,6 +94,7 @@ func (pc *PutChunks) run() { } // wait forever - // TODO: quit on SIGTERM, SIGINT or face failure - select {} + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, os.Interrupt, syscall.SIGTERM) + <-sigchan } From 43dc7e8c7dd940bb8b3b2dd796c4ffcc35e0a7c3 Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Fri, 3 Jan 2025 16:01:44 +0000 Subject: [PATCH 09/10] fw: fix broken fragmentation --- fw/face/ndnlp-link-service.go | 98 ++++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 43 deletions(-) diff --git a/fw/face/ndnlp-link-service.go b/fw/face/ndnlp-link-service.go index 94f74b98..9b98a0ec 100644 --- a/fw/face/ndnlp-link-service.go +++ b/fw/face/ndnlp-link-service.go @@ -21,7 +21,7 @@ import ( "github.com/named-data/ndnd/std/utils" ) -const lpPacketOverhead = 1 + 3 +const lpPacketOverhead = 1 + 3 + 1 + 3 // LpPacket+Fragment const pitTokenOverhead = 1 + 1 + 6 const congestionMarkOverhead = 3 + 1 + 8 @@ -116,10 +116,8 @@ func (l *NDNLPLinkService) computeHeaderOverhead() { if l.options.IsFragmentationEnabled { l.headerOverhead += 1 + 1 + 8 // Sequence - } - - if l.options.IsFragmentationEnabled { - l.headerOverhead += 1 + 1 + 2 + 1 + 1 + 2 // FragIndex/FragCount (Type + Length + up to 2^16 fragments) + l.headerOverhead += 1 + 1 + 2 // FragIndex (max 2^16 fragments) + l.headerOverhead += 1 + 1 + 2 // FragCount } if l.options.IsIncomingFaceIndicationEnabled { @@ -183,17 +181,23 @@ func sendPacket(l *NDNLPLinkService, out dispatch.OutPkt) { l.nOutData++ } - now := time.Now() + // Congestion marking + congestionMark := pkt.CongestionMark // from upstream + if l.checkCongestion(wire) && congestionMark == nil { + core.LogWarn(l, "Marking congestion") + congestionMark = utils.IdPtr(uint64(1)) // ours + } + // Calculate effective MTU after accounting for packet-specific overhead effectiveMtu := l.transport.MTU() - l.headerOverhead if pkt.PitToken != nil { effectiveMtu -= pitTokenOverhead } - if pkt.CongestionMark != nil { + if congestionMark != nil { effectiveMtu -= congestionMarkOverhead } - // Fragmentation + // Fragment packet if necessary var fragments []*spec.LpPacket if len(wire) > effectiveMtu { if !l.options.IsFragmentationEnabled { @@ -202,50 +206,35 @@ func sendPacket(l *NDNLPLinkService, out dispatch.OutPkt) { } // Split up fragment - nFragments := int((len(wire) + effectiveMtu - 1) / effectiveMtu) - fragments = make([]*spec.LpPacket, nFragments) + fragCount := (len(wire) + effectiveMtu - 1) / effectiveMtu + fragCountPtr := utils.IdPtr(uint64(fragCount)) + fragments = make([]*spec.LpPacket, fragCount) + reader := enc.NewBufferReader(wire) - for i := 0; i < nFragments; i++ { + for i := range fragments { + // Read till effective mtu or end of wire readSize := effectiveMtu - if i == nFragments-1 { - readSize = len(wire) - effectiveMtu*(nFragments-1) + if i == fragCount-1 { + readSize = len(wire) - effectiveMtu*(fragCount-1) } frag, err := reader.ReadWire(readSize) if err != nil { core.LogFatal(l, "Unexpected Wire reading error") } - fragments[i] = &spec.LpPacket{Fragment: frag} - } - } else { - fragments = []*spec.LpPacket{{Fragment: enc.Wire{wire}}} - } - // Sequence - if len(fragments) > 1 { - for _, fragment := range fragments { - fragment.Sequence = utils.IdPtr(l.nextSequence) + // Create fragment with sequence and index l.nextSequence++ - } - } - - // Congestion marking - congestionMark := pkt.CongestionMark // from upstream - if congestionMarking { - // GetSendQueueSize is expensive, so only check every 1/2 of the threshold - // and only if we can mark congestion for this particular packet - if l.congestionCheck > l.options.DefaultCongestionThresholdBytes { - if now.After(l.lastTimeCongestionMarked.Add(l.options.BaseCongestionMarkingInterval)) && - l.transport.GetSendQueueSize() > l.options.DefaultCongestionThresholdBytes { - core.LogWarn(l, "Marking congestion") - congestionMark = utils.IdPtr[uint64](1) // ours - l.lastTimeCongestionMarked = now + fragments[i] = &spec.LpPacket{ + Fragment: frag, + Sequence: utils.IdPtr(l.nextSequence), + FragIndex: utils.IdPtr(uint64(i)), + FragCount: fragCountPtr, } - - l.congestionCheck = 0 } - - l.congestionCheck += uint64(len(wire)) // approx + } else { + // No fragmentation necessary + fragments = []*spec.LpPacket{{Fragment: enc.Wire{wire}}} } // Send fragment(s) @@ -265,6 +254,7 @@ func sendPacket(l *NDNLPLinkService, out dispatch.OutPkt) { fragment.CongestionMark = congestionMark } + // Encode final LP frame pkt := &spec.Packet{ LpPacket: fragment, } @@ -296,7 +286,7 @@ func (l *NDNLPLinkService) handleIncomingFrame(frame []byte) { IncomingFaceID: l.faceID, } - L2, err := ReadPacketUnverified(enc.NewBufferReader(wire)) + L2, err := readPacketUnverified(enc.NewBufferReader(wire)) if err != nil { core.LogError(l, err) return @@ -370,7 +360,7 @@ func (l *NDNLPLinkService) handleIncomingFrame(frame []byte) { } // Parse inner packet in place - L3, err := ReadPacketUnverified(enc.NewBufferReader(wire)) + L3, err := readPacketUnverified(enc.NewBufferReader(wire)) if err != nil { return } @@ -433,6 +423,28 @@ func (l *NDNLPLinkService) reassemblePacket( return nil } +func (l *NDNLPLinkService) checkCongestion(wire []byte) bool { + if !congestionMarking { + return false + } + + // GetSendQueueSize is expensive, so only check every 1/2 of the threshold + // and only if we can mark congestion for this particular packet + if l.congestionCheck > l.options.DefaultCongestionThresholdBytes { + now := time.Now() + if now.After(l.lastTimeCongestionMarked.Add(l.options.BaseCongestionMarkingInterval)) && + l.transport.GetSendQueueSize() > l.options.DefaultCongestionThresholdBytes { + l.lastTimeCongestionMarked = now + return true + } + + l.congestionCheck = 0 // reset + } + + l.congestionCheck += uint64(len(wire)) // approx + return false +} + func (op *NDNLPLinkServiceOptions) Flags() (ret uint64) { if op.IsConsumerControlledForwardingEnabled { ret |= FaceFlagLocalFields @@ -444,7 +456,7 @@ func (op *NDNLPLinkServiceOptions) Flags() (ret uint64) { } // Reads a packet without validating the internal fields -func ReadPacketUnverified(reader enc.ParseReader) (*spec.Packet, error) { +func readPacketUnverified(reader enc.ParseReader) (*spec.Packet, error) { context := spec.PacketParsingContext{} context.Init() return context.Parse(reader, false) From 81384ba3f11ddfb7f6f1b7fff417d7769601dd50 Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Fri, 3 Jan 2025 16:48:52 +0000 Subject: [PATCH 10/10] fw: use ring buffer for reassembly (fix #65) --- fw/face/ndnlp-link-service.go | 85 +++++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 34 deletions(-) diff --git a/fw/face/ndnlp-link-service.go b/fw/face/ndnlp-link-service.go index 9b98a0ec..07f156f6 100644 --- a/fw/face/ndnlp-link-service.go +++ b/fw/face/ndnlp-link-service.go @@ -63,14 +63,17 @@ type NDNLPLinkService struct { options NDNLPLinkServiceOptions headerOverhead int - // Receive - partialMessageStore map[uint64][][]byte + // Fragment reassembly ring buffer + reassemblyIndex int + reassemblyBuffers [16]struct { + sequence uint64 + buffer enc.Wire + } - // Send + // Outgoing packet state nextSequence uint64 nextTxSequence uint64 lastTimeCongestionMarked time.Time - BufferReader enc.BufferReader congestionCheck uint64 outFrame []byte } @@ -84,11 +87,12 @@ func MakeNDNLPLinkService(transport transport, options NDNLPLinkServiceOptions) l.options = options l.computeHeaderOverhead() - l.partialMessageStore = make(map[uint64][][]byte) + // Initialize outgoing packet state l.nextSequence = 0 l.nextTxSequence = 0 l.congestionCheck = 0 l.outFrame = make([]byte, defn.MaxNDNPacketSize) + return l } @@ -323,7 +327,7 @@ func (l *NDNLPLinkService) handleIncomingFrame(frame []byte) { if fragIndex == 0 && fragCount == 1 { // Bypass reassembly since only one fragment } else { - fragment = l.reassemblePacket(LP, baseSequence, fragIndex, fragCount) + fragment = l.reassemble(LP, baseSequence, fragIndex, fragCount) if fragment == nil { // Nothing more to be done, so return return @@ -380,47 +384,60 @@ func (l *NDNLPLinkService) handleIncomingFrame(frame []byte) { } } -func (l *NDNLPLinkService) reassemblePacket( +func (l *NDNLPLinkService) reassemble( frame *spec.LpPacket, baseSequence uint64, fragIndex uint64, fragCount uint64, ) enc.Wire { - _, hasSequence := l.partialMessageStore[baseSequence] - if !hasSequence { - // Create map entry - l.partialMessageStore[baseSequence] = make([][]byte, fragCount) + var buffer enc.Wire = nil + var bufIndex int = 0 + + // Check if reassembly buffer already exists + for i := range l.reassemblyBuffers { + if l.reassemblyBuffers[i].sequence == baseSequence { + bufIndex = i + buffer = l.reassemblyBuffers[bufIndex].buffer + break + } } - // Insert into PartialMessageStore - // Safe to call Join since there is only one fragment - if len(frame.Fragment) > 1 { - core.LogError("LpPacket should only have one fragment.") + // Use the next available buffer if this is a new sequence + if buffer == nil { + bufIndex = (l.reassemblyIndex + 1) % len(l.reassemblyBuffers) + l.reassemblyIndex = bufIndex + l.reassemblyBuffers[bufIndex].sequence = baseSequence + l.reassemblyBuffers[bufIndex].buffer = make(enc.Wire, fragCount) + buffer = l.reassemblyBuffers[bufIndex].buffer } - l.partialMessageStore[baseSequence][fragIndex] = frame.Fragment.Join() - - // Determine whether it is time to reassemble - receivedCount := 0 - receivedTotalLen := 0 - for _, fragment := range l.partialMessageStore[baseSequence] { - if len(fragment) != 0 { - receivedCount++ - receivedTotalLen += len(fragment) - } + + // Validate fragCount has not changed + if fragCount != uint64(len(buffer)) { + core.LogWarn(l, "Received fragment count ", fragCount, " does not match expected count ", len(buffer), " for base sequence ", baseSequence, " - DROP") + return nil } - if receivedCount == len(l.partialMessageStore[baseSequence]) { - // Time to reassemble! - reassembled := make(enc.Wire, len(l.partialMessageStore[baseSequence])) - for i, fragment := range l.partialMessageStore[baseSequence] { - reassembled[i] = fragment - } + // Validate fragIndex is valid + if fragIndex >= uint64(len(buffer)) { + core.LogWarn(l, "Received fragment index ", fragIndex, " out of range for base sequence ", baseSequence, " - DROP") + return nil + } + + // Store fragment in buffer + buffer[fragIndex] = frame.Fragment.Join() // should be only one fragment - delete(l.partialMessageStore, baseSequence) - return reassembled + // Check if all fragments have been received + for _, frag := range buffer { + if len(frag) == 0 { + return nil // not all fragments received + } } - return nil + // All fragments received, free up buffer + l.reassemblyBuffers[bufIndex].sequence = 0 + l.reassemblyBuffers[bufIndex].buffer = nil + + return buffer } func (l *NDNLPLinkService) checkCongestion(wire []byte) bool {