Skip to content

Commit

Permalink
fix: move block index table legacy service to regular legacy claims f…
Browse files Browse the repository at this point in the history
…low (#138)

Resolves #135 

This PR takes the logic from the block index table legacy service and
turns it into a ContentToClaimsMapper. Then adds that mapper to the list
of mappers used by the regular legacy claims flow so that its results
are handled and cached the same way as with the results from other
mappers.

---------

Co-authored-by: Hannah Howard <hannah@hannahhoward.net>
  • Loading branch information
volmedo and hannahhoward authored Feb 27, 2025
1 parent 8e507ed commit fefd23c
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 493 deletions.
162 changes: 162 additions & 0 deletions pkg/aws/blockindextablemapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package aws

import (
"context"
"errors"
"fmt"
"io"
"net/url"
"strings"
"time"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/multiformats/go-multibase"
"github.com/multiformats/go-multicodec"
multihash "github.com/multiformats/go-multihash"
cassert "github.com/storacha/go-libstoracha/capabilities/assert"
ctypes "github.com/storacha/go-libstoracha/capabilities/types"
"github.com/storacha/go-ucanto/core/delegation"
"github.com/storacha/go-ucanto/principal"
"github.com/storacha/indexing-service/pkg/service/providerindex"
)

type BlockIndexStore interface {
Query(ctx context.Context, digest multihash.Multihash) ([]BlockIndexRecord, error)
}

type BlockIndexRecord struct {
CarPath string
Offset uint64
Length uint64
}

type blockIndexTableMapper struct {
id principal.Signer
blockIndexStore BlockIndexStore
bucketURL url.URL
claimExp time.Duration
}

var _ providerindex.ContentToClaimsMapper = blockIndexTableMapper{}

// NewBlockIndexTableMapper creates a new ContentToClaimsMapper that synthethizes location claims from data in the
// blockIndexStore - a giant index of historical data, mapping multihashes to bucket keys/URLs and their byte offsets.
//
// The data referenced by bucket keys in the blockIndexStore has been consolidated into a single bucket. So this
// instance does the work of mapping old bucket keys to URLs, where the base URL is the passed bucketURL param.
//
// Using the data in the blockIndexStore, the service will materialize content claims using the id param as the
// signing key. Claims will be set to expire in the amount of time given by the claimExpiration parameter.
func NewBlockIndexTableMapper(id principal.Signer, blockIndexStore BlockIndexStore, bucketURL string, claimExpiration time.Duration) (blockIndexTableMapper, error) {
burl, err := url.Parse(bucketURL)
if err != nil {
return blockIndexTableMapper{}, fmt.Errorf("parsing bucket URL: %w", err)
}

return blockIndexTableMapper{
id: id,
blockIndexStore: blockIndexStore,
bucketURL: *burl,
claimExp: claimExpiration,
}, nil
}

// GetClaims implements providerindex.ContentToClaimsMapper.
// Although it returns a list of CIDs, they are identity CIDs, so they contain the actual claims the refer to.
func (bit blockIndexTableMapper) GetClaims(ctx context.Context, contentHash multihash.Multihash) ([]cid.Cid, error) {
var locs []cassert.LocationCaveats

// lets see if we can materialize some location claims
content := ctypes.FromHash(contentHash)
records, err := bit.blockIndexStore.Query(ctx, content.Hash())
if err != nil {
return nil, err
}

for _, r := range records {
u, err := url.Parse(r.CarPath)
if err != nil || !u.IsAbs() {
// non-URL is legacy region/bucket/key format
// e.g. us-west-2/dotstorage-prod-1/raw/bafy...
parts := strings.Split(r.CarPath, "/")
key := strings.Join(parts[2:], "/")
shard, err := bucketKeyToShardLink(key)
if err != nil {
continue
}

u = bit.bucketURL.JoinPath(fmt.Sprintf("/%s/%s.car", shard.String(), shard.String()))
locs = append(locs, cassert.LocationCaveats{
Content: content,
Location: []url.URL{*u},
Range: &cassert.Range{Offset: r.Offset, Length: &r.Length},
})
} else {
locs = append(locs, cassert.LocationCaveats{
Content: content,
Location: []url.URL{*u},
Range: &cassert.Range{Offset: r.Offset, Length: &r.Length},
})
}
}

claimCids := make([]cid.Cid, 0, len(locs))
for _, loc := range locs {
claim, err := cassert.Location.Delegate(
bit.id,
bit.id,
bit.id.DID().String(),
loc,
delegation.WithExpiration(int(time.Now().Add(bit.claimExp).Unix())),
)
if err != nil {
continue
}

claimData, err := io.ReadAll(claim.Archive())
if err != nil {
continue
}

c, err := cid.Prefix{
Version: 1,
Codec: uint64(multicodec.Car),
MhType: multihash.IDENTITY,
MhLength: len(claimData),
}.Sum(claimData)
if err != nil {
continue
}

claimCids = append(claimCids, c)
}

return claimCids, nil
}

func bucketKeyToShardLink(key string) (ipld.Link, error) {
parts := strings.Split(key, "/")
filename := parts[len(parts)-1]
hash := strings.Split(filename, ".")[0]

// recent buckets encode CAR CID in filename
shard, err := cid.Parse(hash)
if err != nil {
// older buckets base32 encode a CAR multihash <base32(car-multihash)>.car
_, digestBytes, err := multibase.Decode(string(multibase.Base32) + hash)
if err != nil {
return nil, err
}
digest, err := multihash.Cast(digestBytes)
if err != nil {
return nil, err
}
return cidlink.Link{Cid: cid.NewCidV1(uint64(multicodec.Car), digest)}, nil
}
if shard.Prefix().Codec != uint64(multicodec.Car) {
return nil, errors.New("not a CAR CID")
}
return cidlink.Link{Cid: shard}, nil
}
138 changes: 138 additions & 0 deletions pkg/aws/blockindextablemapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package aws

import (
"context"
"net/url"
"testing"
"time"

"github.com/multiformats/go-multihash"
cassert "github.com/storacha/go-libstoracha/capabilities/assert"
"github.com/storacha/go-ucanto/core/delegation"
"github.com/storacha/indexing-service/pkg/bytemap"
"github.com/storacha/indexing-service/pkg/internal/digestutil"
"github.com/storacha/indexing-service/pkg/internal/testutil"
"github.com/storacha/indexing-service/pkg/types"
"github.com/stretchr/testify/require"
)

func TestBlockIndexTableMapper(t *testing.T) {
id := testutil.Service
bucketURL := testutil.Must(url.Parse("https://test.bucket.example.com"))(t)

fixtures := []struct {
name string
digest multihash.Multihash
record BlockIndexRecord
// the expected location URL in the materlized claim
expectedURL *url.URL
// signals that no claim can be materialized from fixture
noClaim bool
}{
{
name: "b32 multihash key",
digest: testutil.Must(digestutil.Parse("zQmNUfyG3ynAkCzPFLsijsJwEFpPXqJZF1CJpT9GLYmgBBd"))(t),
record: BlockIndexRecord{
CarPath: "us-west-2/dotstorage-prod-1/raw/bafyreifvbqc4e5qphijgpj43qxk5ndw2vbfbhkzuuuvpo4cturr2dfk45e/315318734258474846/ciqd7nsjnsrsi6pqulv5j46qel7gw6oeo644o5ef3zopne37xad5oui.car",
Offset: 128844,
Length: 200,
},
expectedURL: bucketURL.JoinPath("/bagbaierah63es3fder47bixl2tz5aix6nn44i55zy52ilxs462jx7oah25iq/bagbaierah63es3fder47bixl2tz5aix6nn44i55zy52ilxs462jx7oah25iq.car"),
},
{
name: "b32 CAR CID key",
digest: testutil.Must(digestutil.Parse("zQmNVL7AESETquhed2Sv7VRq8ujqiL8NiPVmTBMCoKioZXh"))(t),
record: BlockIndexRecord{
CarPath: "us-west-2/carpark-prod-0/bagbaieras4pzdxrc6rxlqfu73a4g4zmbtn54e77gwxyq4lvwqouwkknndquq/bagbaieras4pzdxrc6rxlqfu73a4g4zmbtn54e77gwxyq4lvwqouwkknndquq.car",
Offset: 9196818,
Length: 262144,
},
expectedURL: bucketURL.JoinPath("/bagbaieras4pzdxrc6rxlqfu73a4g4zmbtn54e77gwxyq4lvwqouwkknndquq/bagbaieras4pzdxrc6rxlqfu73a4g4zmbtn54e77gwxyq4lvwqouwkknndquq.car"),
},
{
name: "b32 root CID key",
digest: testutil.Must(digestutil.Parse("zQmPc8FCfDtjgC5xB2EXArnuYs2d53vT5kbH7HJejkYwCz4"))(t),
record: BlockIndexRecord{
CarPath: "us-west-2/dotstorage-prod-1/complete/bafybeihya44jmfali7ret42wvhasnkacg6s5pfuxt4ydszdyp5ib4knzjm.car",
Offset: 8029928,
Length: 58,
},
expectedURL: nil,
noClaim: true,
},
{
name: "b58 multihash URL",
digest: testutil.Must(digestutil.Parse("zQmaRyqqRHaGmqdRBAWTsbC1cezEgtbCmVftcNVyXFcJ4n6"))(t),
record: BlockIndexRecord{
CarPath: "https://carpark-prod-0.r2.w3s.link/zQmRYBmBVN28FpKprXj8FiRxE8KLSkQ96gNsBu8LtnK7sEe/zQmRYBmBVN28FpKprXj8FiRxE8KLSkQ96gNsBu8LtnK7sEe.blob",
Offset: 5401120,
Length: 36876,
},
expectedURL: testutil.Must(url.Parse("https://carpark-prod-0.r2.w3s.link/zQmRYBmBVN28FpKprXj8FiRxE8KLSkQ96gNsBu8LtnK7sEe/zQmRYBmBVN28FpKprXj8FiRxE8KLSkQ96gNsBu8LtnK7sEe.blob"))(t),
},
}

for _, f := range fixtures {
t.Run(f.name, func(t *testing.T) {
mockStore := newMockBlockIndexStore()
mockStore.data.Set(f.digest, []BlockIndexRecord{f.record})
bitMapper, err := NewBlockIndexTableMapper(id, mockStore, bucketURL.String(), time.Hour)
require.NoError(t, err)

claimCids, err := bitMapper.GetClaims(context.Background(), f.digest)
require.NoError(t, err)

if f.noClaim {
require.Empty(t, claimCids)
return
}

require.Equal(t, 1, len(claimCids))

dh, err := multihash.Decode(claimCids[0].Hash())
require.NoError(t, err)

claim, err := delegation.Extract(dh.Digest)
require.NoError(t, err)

require.Equal(t, id.DID().String(), claim.Issuer().DID().String())
require.Equal(t, cassert.LocationAbility, claim.Capabilities()[0].Can())
require.NotNil(t, claim.Expiration())

nb, err := cassert.LocationCaveatsReader.Read(claim.Capabilities()[0].Nb())
require.NoError(t, err)
require.Equal(t, f.digest, nb.Content.Hash())
require.Equal(t, 1, len(nb.Location))
require.Equal(t, f.expectedURL.String(), nb.Location[0].String())
require.Equal(t, f.record.Offset, nb.Range.Offset)
require.Equal(t, f.record.Length, *nb.Range.Length)
})
}

t.Run("returns ErrKeyNotFound when block index errors with not found", func(t *testing.T) {
mockStore := newMockBlockIndexStore()
bitMapper, err := NewBlockIndexTableMapper(id, mockStore, bucketURL.String(), time.Hour)
require.NoError(t, err)

_, err = bitMapper.GetClaims(context.Background(), testutil.RandomMultihash())
require.ErrorIs(t, err, types.ErrKeyNotFound)
})
}

type mockBlockIndexStore struct {
data bytemap.ByteMap[multihash.Multihash, []BlockIndexRecord]
}

func (bs *mockBlockIndexStore) Query(ctx context.Context, digest multihash.Multihash) ([]BlockIndexRecord, error) {
records := bs.data.Get(digest)
if len(records) == 0 {
return nil, types.ErrKeyNotFound
}
return records, nil
}

func newMockBlockIndexStore() *mockBlockIndexStore {
return &mockBlockIndexStore{
data: bytemap.NewByteMap[multihash.Multihash, []BlockIndexRecord](1),
}
}
9 changes: 4 additions & 5 deletions pkg/aws/dynamoblockindextable.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
multihash "github.com/multiformats/go-multihash"
"github.com/storacha/indexing-service/pkg/internal/digestutil"
"github.com/storacha/indexing-service/pkg/service/legacy"
"github.com/storacha/indexing-service/pkg/types"
)

Expand All @@ -25,7 +24,7 @@ type blockIndexItem struct {
Length uint64 `dynamodbav:"length"`
}

func (d *DynamoProviderBlockIndexTable) Query(ctx context.Context, digest multihash.Multihash) ([]legacy.BlockIndexRecord, error) {
func (d *DynamoProviderBlockIndexTable) Query(ctx context.Context, digest multihash.Multihash) ([]BlockIndexRecord, error) {
digestAttr, err := attributevalue.Marshal(digestutil.Format(digest))
if err != nil {
return nil, err
Expand All @@ -37,7 +36,7 @@ func (d *DynamoProviderBlockIndexTable) Query(ctx context.Context, digest multih
return nil, err
}

records := []legacy.BlockIndexRecord{}
records := []BlockIndexRecord{}

queryPaginator := dynamodb.NewQueryPaginator(d.client, &dynamodb.QueryInput{
TableName: aws.String(d.tableName),
Expand All @@ -60,7 +59,7 @@ func (d *DynamoProviderBlockIndexTable) Query(ctx context.Context, digest multih
}

for _, item := range items {
records = append(records, legacy.BlockIndexRecord(item))
records = append(records, BlockIndexRecord(item))
}
}

Expand All @@ -71,7 +70,7 @@ func (d *DynamoProviderBlockIndexTable) Query(ctx context.Context, digest multih
return records, nil
}

var _ legacy.BlockIndexStore = (*DynamoProviderBlockIndexTable)(nil)
var _ BlockIndexStore = (*DynamoProviderBlockIndexTable)(nil)

func NewDynamoProviderBlockIndexTable(client dynamodb.QueryAPIClient, tableName string) *DynamoProviderBlockIndexTable {
return &DynamoProviderBlockIndexTable{client, tableName}
Expand Down
3 changes: 1 addition & 2 deletions pkg/aws/dynamoblockindextable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/storacha/indexing-service/pkg/internal/digestutil"
"github.com/storacha/indexing-service/pkg/internal/link"
"github.com/storacha/indexing-service/pkg/internal/testutil"
"github.com/storacha/indexing-service/pkg/service/legacy"
istypes "github.com/storacha/indexing-service/pkg/types"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
Expand Down Expand Up @@ -107,7 +106,7 @@ func TestDynamoProviderBlockIndexTable(t *testing.T) {
require.Equal(t, len(items), len(results))

for _, i := range items {
require.True(t, slices.ContainsFunc(results, func(r legacy.BlockIndexRecord) bool {
require.True(t, slices.ContainsFunc(results, func(r BlockIndexRecord) bool {
return r.CarPath == i.path && r.Offset == uint64(i.offset) && r.Length == uint64(i.length)
}))
}
Expand Down
Loading

0 comments on commit fefd23c

Please # to comment.