diff --git a/pkg/aws/blockindextablemapper.go b/pkg/aws/blockindextablemapper.go new file mode 100644 index 0000000..e33f2b6 --- /dev/null +++ b/pkg/aws/blockindextablemapper.go @@ -0,0 +1,162 @@ +package aws + +import ( + "context" + "errors" + "fmt" + "io" + "net/url" + "strings" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/multiformats/go-multibase" + "github.com/multiformats/go-multicodec" + multihash "github.com/multiformats/go-multihash" + cassert "github.com/storacha/go-libstoracha/capabilities/assert" + ctypes "github.com/storacha/go-libstoracha/capabilities/types" + "github.com/storacha/go-ucanto/core/delegation" + "github.com/storacha/go-ucanto/principal" + "github.com/storacha/indexing-service/pkg/service/providerindex" +) + +type BlockIndexStore interface { + Query(ctx context.Context, digest multihash.Multihash) ([]BlockIndexRecord, error) +} + +type BlockIndexRecord struct { + CarPath string + Offset uint64 + Length uint64 +} + +type blockIndexTableMapper struct { + id principal.Signer + blockIndexStore BlockIndexStore + bucketURL url.URL + claimExp time.Duration +} + +var _ providerindex.ContentToClaimsMapper = blockIndexTableMapper{} + +// NewBlockIndexTableMapper creates a new ContentToClaimsMapper that synthethizes location claims from data in the +// blockIndexStore - a giant index of historical data, mapping multihashes to bucket keys/URLs and their byte offsets. +// +// The data referenced by bucket keys in the blockIndexStore has been consolidated into a single bucket. So this +// instance does the work of mapping old bucket keys to URLs, where the base URL is the passed bucketURL param. +// +// Using the data in the blockIndexStore, the service will materialize content claims using the id param as the +// signing key. Claims will be set to expire in the amount of time given by the claimExpiration parameter. +func NewBlockIndexTableMapper(id principal.Signer, blockIndexStore BlockIndexStore, bucketURL string, claimExpiration time.Duration) (blockIndexTableMapper, error) { + burl, err := url.Parse(bucketURL) + if err != nil { + return blockIndexTableMapper{}, fmt.Errorf("parsing bucket URL: %w", err) + } + + return blockIndexTableMapper{ + id: id, + blockIndexStore: blockIndexStore, + bucketURL: *burl, + claimExp: claimExpiration, + }, nil +} + +// GetClaims implements providerindex.ContentToClaimsMapper. +// Although it returns a list of CIDs, they are identity CIDs, so they contain the actual claims the refer to. +func (bit blockIndexTableMapper) GetClaims(ctx context.Context, contentHash multihash.Multihash) ([]cid.Cid, error) { + var locs []cassert.LocationCaveats + + // lets see if we can materialize some location claims + content := ctypes.FromHash(contentHash) + records, err := bit.blockIndexStore.Query(ctx, content.Hash()) + if err != nil { + return nil, err + } + + for _, r := range records { + u, err := url.Parse(r.CarPath) + if err != nil || !u.IsAbs() { + // non-URL is legacy region/bucket/key format + // e.g. us-west-2/dotstorage-prod-1/raw/bafy... + parts := strings.Split(r.CarPath, "/") + key := strings.Join(parts[2:], "/") + shard, err := bucketKeyToShardLink(key) + if err != nil { + continue + } + + u = bit.bucketURL.JoinPath(fmt.Sprintf("/%s/%s.car", shard.String(), shard.String())) + locs = append(locs, cassert.LocationCaveats{ + Content: content, + Location: []url.URL{*u}, + Range: &cassert.Range{Offset: r.Offset, Length: &r.Length}, + }) + } else { + locs = append(locs, cassert.LocationCaveats{ + Content: content, + Location: []url.URL{*u}, + Range: &cassert.Range{Offset: r.Offset, Length: &r.Length}, + }) + } + } + + claimCids := make([]cid.Cid, 0, len(locs)) + for _, loc := range locs { + claim, err := cassert.Location.Delegate( + bit.id, + bit.id, + bit.id.DID().String(), + loc, + delegation.WithExpiration(int(time.Now().Add(bit.claimExp).Unix())), + ) + if err != nil { + continue + } + + claimData, err := io.ReadAll(claim.Archive()) + if err != nil { + continue + } + + c, err := cid.Prefix{ + Version: 1, + Codec: uint64(multicodec.Car), + MhType: multihash.IDENTITY, + MhLength: len(claimData), + }.Sum(claimData) + if err != nil { + continue + } + + claimCids = append(claimCids, c) + } + + return claimCids, nil +} + +func bucketKeyToShardLink(key string) (ipld.Link, error) { + parts := strings.Split(key, "/") + filename := parts[len(parts)-1] + hash := strings.Split(filename, ".")[0] + + // recent buckets encode CAR CID in filename + shard, err := cid.Parse(hash) + if err != nil { + // older buckets base32 encode a CAR multihash .car + _, digestBytes, err := multibase.Decode(string(multibase.Base32) + hash) + if err != nil { + return nil, err + } + digest, err := multihash.Cast(digestBytes) + if err != nil { + return nil, err + } + return cidlink.Link{Cid: cid.NewCidV1(uint64(multicodec.Car), digest)}, nil + } + if shard.Prefix().Codec != uint64(multicodec.Car) { + return nil, errors.New("not a CAR CID") + } + return cidlink.Link{Cid: shard}, nil +} diff --git a/pkg/aws/blockindextablemapper_test.go b/pkg/aws/blockindextablemapper_test.go new file mode 100644 index 0000000..bc90085 --- /dev/null +++ b/pkg/aws/blockindextablemapper_test.go @@ -0,0 +1,138 @@ +package aws + +import ( + "context" + "net/url" + "testing" + "time" + + "github.com/multiformats/go-multihash" + cassert "github.com/storacha/go-libstoracha/capabilities/assert" + "github.com/storacha/go-ucanto/core/delegation" + "github.com/storacha/indexing-service/pkg/bytemap" + "github.com/storacha/indexing-service/pkg/internal/digestutil" + "github.com/storacha/indexing-service/pkg/internal/testutil" + "github.com/storacha/indexing-service/pkg/types" + "github.com/stretchr/testify/require" +) + +func TestBlockIndexTableMapper(t *testing.T) { + id := testutil.Service + bucketURL := testutil.Must(url.Parse("https://test.bucket.example.com"))(t) + + fixtures := []struct { + name string + digest multihash.Multihash + record BlockIndexRecord + // the expected location URL in the materlized claim + expectedURL *url.URL + // signals that no claim can be materialized from fixture + noClaim bool + }{ + { + name: "b32 multihash key", + digest: testutil.Must(digestutil.Parse("zQmNUfyG3ynAkCzPFLsijsJwEFpPXqJZF1CJpT9GLYmgBBd"))(t), + record: BlockIndexRecord{ + CarPath: "us-west-2/dotstorage-prod-1/raw/bafyreifvbqc4e5qphijgpj43qxk5ndw2vbfbhkzuuuvpo4cturr2dfk45e/315318734258474846/ciqd7nsjnsrsi6pqulv5j46qel7gw6oeo644o5ef3zopne37xad5oui.car", + Offset: 128844, + Length: 200, + }, + expectedURL: bucketURL.JoinPath("/bagbaierah63es3fder47bixl2tz5aix6nn44i55zy52ilxs462jx7oah25iq/bagbaierah63es3fder47bixl2tz5aix6nn44i55zy52ilxs462jx7oah25iq.car"), + }, + { + name: "b32 CAR CID key", + digest: testutil.Must(digestutil.Parse("zQmNVL7AESETquhed2Sv7VRq8ujqiL8NiPVmTBMCoKioZXh"))(t), + record: BlockIndexRecord{ + CarPath: "us-west-2/carpark-prod-0/bagbaieras4pzdxrc6rxlqfu73a4g4zmbtn54e77gwxyq4lvwqouwkknndquq/bagbaieras4pzdxrc6rxlqfu73a4g4zmbtn54e77gwxyq4lvwqouwkknndquq.car", + Offset: 9196818, + Length: 262144, + }, + expectedURL: bucketURL.JoinPath("/bagbaieras4pzdxrc6rxlqfu73a4g4zmbtn54e77gwxyq4lvwqouwkknndquq/bagbaieras4pzdxrc6rxlqfu73a4g4zmbtn54e77gwxyq4lvwqouwkknndquq.car"), + }, + { + name: "b32 root CID key", + digest: testutil.Must(digestutil.Parse("zQmPc8FCfDtjgC5xB2EXArnuYs2d53vT5kbH7HJejkYwCz4"))(t), + record: BlockIndexRecord{ + CarPath: "us-west-2/dotstorage-prod-1/complete/bafybeihya44jmfali7ret42wvhasnkacg6s5pfuxt4ydszdyp5ib4knzjm.car", + Offset: 8029928, + Length: 58, + }, + expectedURL: nil, + noClaim: true, + }, + { + name: "b58 multihash URL", + digest: testutil.Must(digestutil.Parse("zQmaRyqqRHaGmqdRBAWTsbC1cezEgtbCmVftcNVyXFcJ4n6"))(t), + record: BlockIndexRecord{ + CarPath: "https://carpark-prod-0.r2.w3s.link/zQmRYBmBVN28FpKprXj8FiRxE8KLSkQ96gNsBu8LtnK7sEe/zQmRYBmBVN28FpKprXj8FiRxE8KLSkQ96gNsBu8LtnK7sEe.blob", + Offset: 5401120, + Length: 36876, + }, + expectedURL: testutil.Must(url.Parse("https://carpark-prod-0.r2.w3s.link/zQmRYBmBVN28FpKprXj8FiRxE8KLSkQ96gNsBu8LtnK7sEe/zQmRYBmBVN28FpKprXj8FiRxE8KLSkQ96gNsBu8LtnK7sEe.blob"))(t), + }, + } + + for _, f := range fixtures { + t.Run(f.name, func(t *testing.T) { + mockStore := newMockBlockIndexStore() + mockStore.data.Set(f.digest, []BlockIndexRecord{f.record}) + bitMapper, err := NewBlockIndexTableMapper(id, mockStore, bucketURL.String(), time.Hour) + require.NoError(t, err) + + claimCids, err := bitMapper.GetClaims(context.Background(), f.digest) + require.NoError(t, err) + + if f.noClaim { + require.Empty(t, claimCids) + return + } + + require.Equal(t, 1, len(claimCids)) + + dh, err := multihash.Decode(claimCids[0].Hash()) + require.NoError(t, err) + + claim, err := delegation.Extract(dh.Digest) + require.NoError(t, err) + + require.Equal(t, id.DID().String(), claim.Issuer().DID().String()) + require.Equal(t, cassert.LocationAbility, claim.Capabilities()[0].Can()) + require.NotNil(t, claim.Expiration()) + + nb, err := cassert.LocationCaveatsReader.Read(claim.Capabilities()[0].Nb()) + require.NoError(t, err) + require.Equal(t, f.digest, nb.Content.Hash()) + require.Equal(t, 1, len(nb.Location)) + require.Equal(t, f.expectedURL.String(), nb.Location[0].String()) + require.Equal(t, f.record.Offset, nb.Range.Offset) + require.Equal(t, f.record.Length, *nb.Range.Length) + }) + } + + t.Run("returns ErrKeyNotFound when block index errors with not found", func(t *testing.T) { + mockStore := newMockBlockIndexStore() + bitMapper, err := NewBlockIndexTableMapper(id, mockStore, bucketURL.String(), time.Hour) + require.NoError(t, err) + + _, err = bitMapper.GetClaims(context.Background(), testutil.RandomMultihash()) + require.ErrorIs(t, err, types.ErrKeyNotFound) + }) +} + +type mockBlockIndexStore struct { + data bytemap.ByteMap[multihash.Multihash, []BlockIndexRecord] +} + +func (bs *mockBlockIndexStore) Query(ctx context.Context, digest multihash.Multihash) ([]BlockIndexRecord, error) { + records := bs.data.Get(digest) + if len(records) == 0 { + return nil, types.ErrKeyNotFound + } + return records, nil +} + +func newMockBlockIndexStore() *mockBlockIndexStore { + return &mockBlockIndexStore{ + data: bytemap.NewByteMap[multihash.Multihash, []BlockIndexRecord](1), + } +} diff --git a/pkg/aws/dynamoblockindextable.go b/pkg/aws/dynamoblockindextable.go index 99fc407..4740b3c 100644 --- a/pkg/aws/dynamoblockindextable.go +++ b/pkg/aws/dynamoblockindextable.go @@ -10,7 +10,6 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb" multihash "github.com/multiformats/go-multihash" "github.com/storacha/indexing-service/pkg/internal/digestutil" - "github.com/storacha/indexing-service/pkg/service/legacy" "github.com/storacha/indexing-service/pkg/types" ) @@ -25,7 +24,7 @@ type blockIndexItem struct { Length uint64 `dynamodbav:"length"` } -func (d *DynamoProviderBlockIndexTable) Query(ctx context.Context, digest multihash.Multihash) ([]legacy.BlockIndexRecord, error) { +func (d *DynamoProviderBlockIndexTable) Query(ctx context.Context, digest multihash.Multihash) ([]BlockIndexRecord, error) { digestAttr, err := attributevalue.Marshal(digestutil.Format(digest)) if err != nil { return nil, err @@ -37,7 +36,7 @@ func (d *DynamoProviderBlockIndexTable) Query(ctx context.Context, digest multih return nil, err } - records := []legacy.BlockIndexRecord{} + records := []BlockIndexRecord{} queryPaginator := dynamodb.NewQueryPaginator(d.client, &dynamodb.QueryInput{ TableName: aws.String(d.tableName), @@ -60,7 +59,7 @@ func (d *DynamoProviderBlockIndexTable) Query(ctx context.Context, digest multih } for _, item := range items { - records = append(records, legacy.BlockIndexRecord(item)) + records = append(records, BlockIndexRecord(item)) } } @@ -71,7 +70,7 @@ func (d *DynamoProviderBlockIndexTable) Query(ctx context.Context, digest multih return records, nil } -var _ legacy.BlockIndexStore = (*DynamoProviderBlockIndexTable)(nil) +var _ BlockIndexStore = (*DynamoProviderBlockIndexTable)(nil) func NewDynamoProviderBlockIndexTable(client dynamodb.QueryAPIClient, tableName string) *DynamoProviderBlockIndexTable { return &DynamoProviderBlockIndexTable{client, tableName} diff --git a/pkg/aws/dynamoblockindextable_test.go b/pkg/aws/dynamoblockindextable_test.go index 357d7c2..fc4216f 100644 --- a/pkg/aws/dynamoblockindextable_test.go +++ b/pkg/aws/dynamoblockindextable_test.go @@ -20,7 +20,6 @@ import ( "github.com/storacha/indexing-service/pkg/internal/digestutil" "github.com/storacha/indexing-service/pkg/internal/link" "github.com/storacha/indexing-service/pkg/internal/testutil" - "github.com/storacha/indexing-service/pkg/service/legacy" istypes "github.com/storacha/indexing-service/pkg/types" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" @@ -107,7 +106,7 @@ func TestDynamoProviderBlockIndexTable(t *testing.T) { require.Equal(t, len(items), len(results)) for _, i := range items { - require.True(t, slices.ContainsFunc(results, func(r legacy.BlockIndexRecord) bool { + require.True(t, slices.ContainsFunc(results, func(r BlockIndexRecord) bool { return r.CarPath == i.path && r.Offset == uint64(i.offset) && r.Length == uint64(i.length) })) } diff --git a/pkg/aws/service.go b/pkg/aws/service.go index 4453087..29047cb 100644 --- a/pkg/aws/service.go +++ b/pkg/aws/service.go @@ -30,7 +30,6 @@ import ( "github.com/storacha/indexing-service/pkg/presets" "github.com/storacha/indexing-service/pkg/redis" "github.com/storacha/indexing-service/pkg/service/contentclaims" - "github.com/storacha/indexing-service/pkg/service/legacy" "github.com/storacha/indexing-service/pkg/service/providerindex" "github.com/storacha/indexing-service/pkg/telemetry" "github.com/storacha/indexing-service/pkg/types" @@ -233,6 +232,17 @@ func Construct(cfg Config) (types.Service, error) { return []delegation.Option{delegation.WithExpiration(int(time.Now().Add(time.Hour).Unix()))} }, ) + blockIndexCfg := cfg.Config.Copy() + blockIndexCfg.Region = cfg.LegacyBlockIndexTableRegion + legacyBlockIndexStore := NewDynamoProviderBlockIndexTable(dynamodb.NewFromConfig(blockIndexCfg), cfg.LegacyBlockIndexTableName) + + // allow claims synthethized from the block index table to live longer after they are expired in the cache + // so that the service doesn't return cached but expired delegations + synthetizedClaimExp := time.Duration(cfg.ClaimsCacheExpirationSeconds)*time.Second + 1*time.Hour + blockIndexTableMapper, err := NewBlockIndexTableMapper(cfg.Signer, legacyBlockIndexStore, cfg.LegacyDataBucketURL, synthetizedClaimExp) + if err != nil { + return nil, fmt.Errorf("creating block index table mapper: %w", err) + } legacyClaimsBucket := contentclaims.NewStoreFromBucket(NewS3Store(legacyClaimsCfg, cfg.LegacyClaimsBucket, "")) legacyClaimsURL := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/{claim}/{claim}.car", cfg.LegacyClaimsBucket, cfg.Config.Region) @@ -243,7 +253,7 @@ func Construct(cfg Config) (types.Service, error) { construct.WithPublisherStore(publisherStore), construct.WithStartIPNIServer(false), construct.WithClaimsStore(claimBucketStore), - construct.WithLegacyClaims([]providerindex.ContentToClaimsMapper{legacyClaimsMapper, bucketFallbackMapper}, legacyClaimsBucket, legacyClaimsURL), + construct.WithLegacyClaims([]providerindex.ContentToClaimsMapper{legacyClaimsMapper, bucketFallbackMapper, blockIndexTableMapper}, legacyClaimsBucket, legacyClaimsURL), construct.WithHTTPClient(httpClient), construct.WithProvidersClient(providersClient), construct.WithClaimsClient(claimsClient), @@ -255,8 +265,6 @@ func Construct(cfg Config) (types.Service, error) { if err != nil { return nil, err } - blockIndexCfg := cfg.Config.Copy() - blockIndexCfg.Region = cfg.LegacyBlockIndexTableRegion - legacyBlockIndexStore := NewDynamoProviderBlockIndexTable(dynamodb.NewFromConfig(blockIndexCfg), cfg.LegacyBlockIndexTableName) - return legacy.NewService(cfg.Signer, service, legacyBlockIndexStore, cfg.LegacyDataBucketURL) + + return service, nil } diff --git a/pkg/service/legacy/service.go b/pkg/service/legacy/service.go deleted file mode 100644 index f42d91f..0000000 --- a/pkg/service/legacy/service.go +++ /dev/null @@ -1,207 +0,0 @@ -package legacy - -import ( - "context" - "errors" - "fmt" - "net/url" - "strings" - "time" - - "github.com/ipfs/go-cid" - "github.com/ipld/go-ipld-prime/datamodel" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multibase" - "github.com/multiformats/go-multicodec" - "github.com/multiformats/go-multihash" - cassert "github.com/storacha/go-libstoracha/capabilities/assert" - ctypes "github.com/storacha/go-libstoracha/capabilities/types" - "github.com/storacha/go-ucanto/core/delegation" - "github.com/storacha/go-ucanto/core/ipld" - "github.com/storacha/go-ucanto/principal" - "github.com/storacha/indexing-service/pkg/blobindex" - "github.com/storacha/indexing-service/pkg/bytemap" - "github.com/storacha/indexing-service/pkg/internal/link" - "github.com/storacha/indexing-service/pkg/service/queryresult" - "github.com/storacha/indexing-service/pkg/types" -) - -type IndexingService struct { - id principal.Signer - indexingService types.Service - blockIndexStore BlockIndexStore - bucketURL url.URL -} - -func (l *IndexingService) Cache(ctx context.Context, provider peer.AddrInfo, claim delegation.Delegation) error { - return l.indexingService.Cache(ctx, provider, claim) -} - -func (l *IndexingService) Get(ctx context.Context, claim datamodel.Link) (delegation.Delegation, error) { - return l.indexingService.Get(ctx, claim) -} - -func (l *IndexingService) Publish(ctx context.Context, claim delegation.Delegation) error { - return l.indexingService.Publish(ctx, claim) -} - -func (l *IndexingService) Query(ctx context.Context, q types.Query) (types.QueryResult, error) { - // Create a cancellable context for querying the indexStore so we can stop it early if the indexService succeeds. - bisCtx, cancelBis := context.WithCancel(ctx) - defer cancelBis() - - // We'll capture the results of indexStore query in a channel, using the result if the indexService - // doesn't yield anything. - type indexResult struct { - claims map[cid.Cid]delegation.Delegation - indexes bytemap.ByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView] - err error - } - indexResCh := make(chan indexResult, 1) - - // Query the indexStore async. - go func() { - var locs []cassert.LocationCaveats - for _, h := range q.Hashes { - // lets see if we can materialize some location claims - content := ctypes.FromHash(h) - records, err := l.blockIndexStore.Query(bisCtx, content.Hash()) - if err != nil { - if errors.Is(err, types.ErrKeyNotFound) { - continue - } - // bail if non-continuable error. - indexResCh <- indexResult{err: err} - return - } - - for _, r := range records { - u, err := url.Parse(r.CarPath) - if err != nil || !u.IsAbs() { - // non-URL is legacy region/bucket/key format - // e.g. us-west-2/dotstorage-prod-1/raw/bafy... - parts := strings.Split(r.CarPath, "/") - key := strings.Join(parts[2:], "/") - shard, err := bucketKeyToShardLink(key) - if err != nil { - continue - } - - u = l.bucketURL.JoinPath(fmt.Sprintf("/%s/%s.car", shard.String(), shard.String())) - locs = append(locs, cassert.LocationCaveats{ - Content: content, - Location: []url.URL{*u}, - Range: &cassert.Range{Offset: r.Offset, Length: &r.Length}, - }) - continue - } - - locs = append(locs, cassert.LocationCaveats{ - Content: content, - Location: []url.URL{*u}, - Range: &cassert.Range{Offset: r.Offset, Length: &r.Length}, - }) - } - } - - claims := make(map[cid.Cid]delegation.Delegation, len(locs)) - for _, loc := range locs { - claim, err := cassert.Location.Delegate( - l.id, - l.id, - l.id.DID().String(), - loc, - delegation.WithExpiration(int(time.Now().Add(time.Hour).Unix())), - ) - if err != nil { - indexResCh <- indexResult{err: err} - return - } - claims[link.ToCID(claim.Link())] = claim - } - - indexes := bytemap.NewByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView](0) - indexResCh <- indexResult{claims: claims, indexes: indexes, err: nil} - }() - - results, err := l.indexingService.Query(ctx, q) - if err != nil { - // if we fail to query the indexService, kill the indexStore query and bail with error. - // cancellation of the indexStore query is handled in defer statement at top of method. - return nil, err - } - if len(results.Claims()) > 0 || len(results.Indexes()) > 0 || len(q.Hashes) == 0 { - // indexService produced a result, kill the blockIndex query and return results. - // cancellation of the blockStore query is handled in defer statement at top of method. - return results, nil - } - - // indexService query yields empty result, check the indexStore. - - bsRes := <-indexResCh - if bsRes.err != nil { - return nil, bsRes.err - } - - // the indexService yielded an empty result, use the indexStore query result. - return queryresult.Build(bsRes.claims, bsRes.indexes) -} - -var _ types.Service = (*IndexingService)(nil) - -type BlockIndexRecord struct { - CarPath string - Offset uint64 - Length uint64 -} - -type BlockIndexStore interface { - Query(ctx context.Context, digest multihash.Multihash) ([]BlockIndexRecord, error) -} - -// NewService creates a new indexing service that wraps the passed service and -// transparently proxies to it, with the exception of the call to [Query], which -// calls the wrapped service and then inspects the results. If they are empty -// then it will query the blockIndexStore - a giant index of historical data, -// mapping multihashes to bucket keys/URLs and their byte offsets. -// -// The data referenced by bucket keys in the blockIndexStore has been -// consolidated into a single bucket. So this instance does the work of mapping -// old bucket keys to URLs, where the base URL is the passed bucketURL param. -// -// Using the data in the blockIndexStore, the service will materialize content -// claims using the id param as the signing key, and add them to the query -// results before returning them back to the caller. -func NewService(id principal.Signer, indexer types.Service, blockIndexStore BlockIndexStore, bucketURL string) (*IndexingService, error) { - burl, err := url.Parse(bucketURL) - if err != nil { - return nil, fmt.Errorf("parsing bucket URL: %w", err) - } - return &IndexingService{id, indexer, blockIndexStore, *burl}, nil -} - -func bucketKeyToShardLink(key string) (ipld.Link, error) { - parts := strings.Split(key, "/") - filename := parts[len(parts)-1] - hash := strings.Split(filename, ".")[0] - - // recent buckets encode CAR CID in filename - shard, err := cid.Parse(hash) - if err != nil { - // older buckets base32 encode a CAR multihash .car - _, digestBytes, err := multibase.Decode(string(multibase.Base32) + hash) - if err != nil { - return nil, err - } - digest, err := multihash.Cast(digestBytes) - if err != nil { - return nil, err - } - return cidlink.Link{Cid: cid.NewCidV1(uint64(multicodec.Car), digest)}, nil - } - if shard.Prefix().Codec != uint64(multicodec.Car) { - return nil, errors.New("not a CAR CID") - } - return cidlink.Link{Cid: shard}, nil -} diff --git a/pkg/service/legacy/service_test.go b/pkg/service/legacy/service_test.go deleted file mode 100644 index ea0416e..0000000 --- a/pkg/service/legacy/service_test.go +++ /dev/null @@ -1,270 +0,0 @@ -package legacy - -import ( - "context" - "errors" - "io" - "net/url" - "testing" - - "github.com/ipfs/go-cid" - "github.com/ipld/go-ipld-prime/datamodel" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multicodec" - "github.com/multiformats/go-multihash" - cassert "github.com/storacha/go-libstoracha/capabilities/assert" - ctypes "github.com/storacha/go-libstoracha/capabilities/types" - "github.com/storacha/go-ucanto/core/dag/blockstore" - "github.com/storacha/go-ucanto/core/delegation" - "github.com/storacha/indexing-service/pkg/blobindex" - "github.com/storacha/indexing-service/pkg/bytemap" - "github.com/storacha/indexing-service/pkg/internal/digestutil" - "github.com/storacha/indexing-service/pkg/internal/link" - "github.com/storacha/indexing-service/pkg/internal/testutil" - "github.com/storacha/indexing-service/pkg/service/queryresult" - "github.com/storacha/indexing-service/pkg/types" - "github.com/stretchr/testify/require" -) - -func TestLegacyService(t *testing.T) { - id := testutil.Service - bucketURL, err := url.Parse("https://test.bucket.example.com") - require.NoError(t, err) - - fixtures := []struct { - name string - digest multihash.Multihash - record BlockIndexRecord - // the expected location URL in the materlized claim - expectedURL *url.URL - // signals that no claim can be materialized from fixture - noClaim bool - }{ - { - name: "b32 multihash key", - digest: testutil.Must(digestutil.Parse("zQmNUfyG3ynAkCzPFLsijsJwEFpPXqJZF1CJpT9GLYmgBBd"))(t), - record: BlockIndexRecord{ - CarPath: "us-west-2/dotstorage-prod-1/raw/bafyreifvbqc4e5qphijgpj43qxk5ndw2vbfbhkzuuuvpo4cturr2dfk45e/315318734258474846/ciqd7nsjnsrsi6pqulv5j46qel7gw6oeo644o5ef3zopne37xad5oui.car", - Offset: 128844, - Length: 200, - }, - expectedURL: bucketURL.JoinPath("/bagbaierah63es3fder47bixl2tz5aix6nn44i55zy52ilxs462jx7oah25iq/bagbaierah63es3fder47bixl2tz5aix6nn44i55zy52ilxs462jx7oah25iq.car"), - }, - { - name: "b32 CAR CID key", - digest: testutil.Must(digestutil.Parse("zQmNVL7AESETquhed2Sv7VRq8ujqiL8NiPVmTBMCoKioZXh"))(t), - record: BlockIndexRecord{ - CarPath: "us-west-2/carpark-prod-0/bagbaieras4pzdxrc6rxlqfu73a4g4zmbtn54e77gwxyq4lvwqouwkknndquq/bagbaieras4pzdxrc6rxlqfu73a4g4zmbtn54e77gwxyq4lvwqouwkknndquq.car", - Offset: 9196818, - Length: 262144, - }, - expectedURL: bucketURL.JoinPath("/bagbaieras4pzdxrc6rxlqfu73a4g4zmbtn54e77gwxyq4lvwqouwkknndquq/bagbaieras4pzdxrc6rxlqfu73a4g4zmbtn54e77gwxyq4lvwqouwkknndquq.car"), - }, - { - name: "b32 root CID key", - digest: testutil.Must(digestutil.Parse("zQmPc8FCfDtjgC5xB2EXArnuYs2d53vT5kbH7HJejkYwCz4"))(t), - record: BlockIndexRecord{ - CarPath: "us-west-2/dotstorage-prod-1/complete/bafybeihya44jmfali7ret42wvhasnkacg6s5pfuxt4ydszdyp5ib4knzjm.car", - Offset: 8029928, - Length: 58, - }, - expectedURL: nil, - noClaim: true, - }, - { - name: "b58 multihash URL", - digest: testutil.Must(digestutil.Parse("zQmaRyqqRHaGmqdRBAWTsbC1cezEgtbCmVftcNVyXFcJ4n6"))(t), - record: BlockIndexRecord{ - CarPath: "https://carpark-prod-0.r2.w3s.link/zQmRYBmBVN28FpKprXj8FiRxE8KLSkQ96gNsBu8LtnK7sEe/zQmRYBmBVN28FpKprXj8FiRxE8KLSkQ96gNsBu8LtnK7sEe.blob", - Offset: 5401120, - Length: 36876, - }, - expectedURL: testutil.Must(url.Parse("https://carpark-prod-0.r2.w3s.link/zQmRYBmBVN28FpKprXj8FiRxE8KLSkQ96gNsBu8LtnK7sEe/zQmRYBmBVN28FpKprXj8FiRxE8KLSkQ96gNsBu8LtnK7sEe.blob"))(t), - }, - } - - for _, f := range fixtures { - t.Run(f.name, func(t *testing.T) { - mockStore := newMockBlockIndexStore() - mockStore.data.Set(f.digest, []BlockIndexRecord{f.record}) - mockService := mockIndexingService{} - service, err := NewService(id, &mockService, mockStore, bucketURL.String()) - require.NoError(t, err) - - query := types.Query{Hashes: []multihash.Multihash{f.digest}} - results, err := service.Query(context.Background(), query) - require.NoError(t, err) - require.Empty(t, results.Indexes()) - - if f.noClaim { - require.Empty(t, results.Claims()) - return - } - - require.Equal(t, 1, len(results.Claims())) - - br, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(results.Blocks())) - require.NoError(t, err) - - claim, err := delegation.NewDelegationView(results.Claims()[0], br) - require.NoError(t, err) - require.Equal(t, id.DID().String(), claim.Issuer().DID().String()) - require.Equal(t, cassert.LocationAbility, claim.Capabilities()[0].Can()) - require.NotNil(t, claim.Expiration()) - - nb, err := cassert.LocationCaveatsReader.Read(claim.Capabilities()[0].Nb()) - require.NoError(t, err) - require.Equal(t, f.digest, nb.Content.Hash()) - require.Equal(t, 1, len(nb.Location)) - require.Equal(t, f.expectedURL.String(), nb.Location[0].String()) - require.Equal(t, f.record.Offset, nb.Range.Offset) - require.Equal(t, f.record.Length, *nb.Range.Length) - }) - } - - t.Run("returns claims from underlying indexing service", func(t *testing.T) { - mockStore := newMockBlockIndexStore() - digest := testutil.RandomMultihash() - nb := cassert.LocationCaveats{Content: ctypes.FromHash(digest)} - claim, err := cassert.Location.Delegate(id, id, id.DID().String(), nb) - require.NoError(t, err) - - claims := map[cid.Cid]delegation.Delegation{link.ToCID(claim.Link()): claim} - indexes := bytemap.NewByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView](0) - result, err := queryresult.Build(claims, indexes) - require.NoError(t, err) - - mockService := mockIndexingService{result, nil} - service, err := NewService(id, &mockService, mockStore, bucketURL.String()) - require.NoError(t, err) - - query := types.Query{Hashes: []multihash.Multihash{digest}} - results, err := service.Query(context.Background(), query) - require.NoError(t, err) - require.Empty(t, results.Indexes()) - require.Equal(t, 1, len(results.Claims())) - require.Equal(t, claim.Link(), results.Claims()[0]) - }) - - t.Run("returns indexes from underlying indexing service", func(t *testing.T) { - mockStore := newMockBlockIndexStore() - root := testutil.RandomCID() - digest := link.ToCID(root).Hash() - - index := blobindex.NewShardedDagIndexView(root, 0) - indexBytes, err := io.ReadAll(testutil.Must(index.Archive())(t)) - require.NoError(t, err) - - indexCID, err := cid.Prefix{ - Version: 1, - Codec: uint64(multicodec.Car), - MhType: multihash.SHA2_256, - MhLength: -1, - }.Sum(indexBytes) - require.NoError(t, err) - - claims := map[cid.Cid]delegation.Delegation{} - indexes := bytemap.NewByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView](1) - indexes.Set(types.EncodedContextID(digest), index) - result, err := queryresult.Build(claims, indexes) - require.NoError(t, err) - - mockService := mockIndexingService{result, nil} - service, err := NewService(id, &mockService, mockStore, bucketURL.String()) - require.NoError(t, err) - - query := types.Query{Hashes: []multihash.Multihash{digest}} - results, err := service.Query(context.Background(), query) - require.NoError(t, err) - require.Empty(t, results.Claims()) - require.Equal(t, 1, len(results.Indexes())) - require.Equal(t, indexCID.String(), results.Indexes()[0].String()) - }) - - t.Run("calls through to underlying indexing service", func(t *testing.T) { - digest := testutil.RandomMultihash() - mockStore := newMockBlockIndexStore() - mockService := mockIndexingService{nil, errNotImplemented} - service, err := NewService(id, &mockService, mockStore, bucketURL.String()) - require.NoError(t, err) - - nb := cassert.LocationCaveats{Content: ctypes.FromHash(digest)} - claim, err := cassert.Location.Delegate(id, id, id.DID().String(), nb) - require.NoError(t, err) - - err = service.Cache(context.Background(), peer.AddrInfo{}, claim) - require.True(t, errors.Is(err, errNotImplemented)) - - _, err = service.Get(context.Background(), testutil.RandomCID()) - require.True(t, errors.Is(err, errNotImplemented)) - - err = service.Publish(context.Background(), claim) - require.True(t, errors.Is(err, errNotImplemented)) - - query := types.Query{Hashes: []multihash.Multihash{digest}} - _, err = service.Query(context.Background(), query) - require.True(t, errors.Is(err, errNotImplemented)) - }) - - t.Run("returns empty results when block index errors with not found", func(t *testing.T) { - mockStore := newMockBlockIndexStore() - mockService := mockIndexingService{nil, nil} - service, err := NewService(id, &mockService, mockStore, bucketURL.String()) - require.NoError(t, err) - - query := types.Query{Hashes: []multihash.Multihash{testutil.RandomMultihash()}} - results, err := service.Query(context.Background(), query) - require.NoError(t, err) - require.Len(t, results.Claims(), 0) - require.Len(t, results.Indexes(), 0) - }) -} - -var errNotImplemented = errors.New("not implemented") - -type mockIndexingService struct { - queryResult types.QueryResult - queryError error -} - -func (is *mockIndexingService) Cache(ctx context.Context, provider peer.AddrInfo, claim delegation.Delegation) error { - return errNotImplemented -} - -func (is *mockIndexingService) Get(ctx context.Context, claim datamodel.Link) (delegation.Delegation, error) { - return nil, errNotImplemented -} - -func (is *mockIndexingService) Publish(ctx context.Context, claim delegation.Delegation) error { - return errNotImplemented -} - -func (is *mockIndexingService) Query(ctx context.Context, q types.Query) (types.QueryResult, error) { - if is.queryError != nil { - return nil, is.queryError - } - if is.queryResult != nil { - return is.queryResult, nil - } - claims := map[cid.Cid]delegation.Delegation{} - indexes := bytemap.NewByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView](0) - return queryresult.Build(claims, indexes) -} - -type mockBlockIndexStore struct { - data bytemap.ByteMap[multihash.Multihash, []BlockIndexRecord] -} - -func (bs *mockBlockIndexStore) Query(ctx context.Context, digest multihash.Multihash) ([]BlockIndexRecord, error) { - records := bs.data.Get(digest) - if len(records) == 0 { - return nil, types.ErrKeyNotFound - } - return records, nil -} - -func newMockBlockIndexStore() *mockBlockIndexStore { - return &mockBlockIndexStore{ - data: bytemap.NewByteMap[multihash.Multihash, []BlockIndexRecord](1), - } -} diff --git a/pkg/service/providerindex/legacy.go b/pkg/service/providerindex/legacy.go index 7b81691..e375859 100644 --- a/pkg/service/providerindex/legacy.go +++ b/pkg/service/providerindex/legacy.go @@ -77,14 +77,16 @@ func NewLegacyClaimsStore(contentToClaimsMappers []ContentToClaimsMapper, claimS // relevant claims, these will be returned and no more mappers will be checked. func (ls LegacyClaimsStore) Find(ctx context.Context, contentHash multihash.Multihash, targetClaims []multicodec.Code) ([]model.ProviderResult, error) { for _, mapper := range ls.mappers { - claims, err := ls.findInMapper(ctx, contentHash, targetClaims, mapper) + results, err := ls.findInMapper(ctx, contentHash, targetClaims, mapper) if err != nil { return nil, err } - if len(claims) > 0 { - return claims, nil + + if len(results) > 0 { + return results, nil } } + return []model.ProviderResult{}, nil }