Skip to content

Commit

Permalink
feat(legacy): add carpark fallback (#82)
Browse files Browse the repository at this point in the history
# Goals

Return location claims that were never published properly on the blob
protocol from ~May - November 2024
fix #63

# Implementation

Essentially this ports storacha/blob-fetcher#14

- When no legacy claims are found in the legacy claims bucket, check if
the cid in question has an uploaded blob in carpark (via head request)
- if present, generate a location claim on demand, in much the same way
we do for data pre-blob protocol data (reading from the blob-index
table)
- however, here we fold into the normal indexing query process, and the
way we accomplish this is via identity CIDs. Which is... a bit funky,
but seems the simplest mechanism for making everything work together :)

# For Discussion

TODO:
- test for bucketFallbackMapper
- maybe integration test the whole thing
  • Loading branch information
hannahhoward authored Jan 10, 2025
1 parent 90d6b43 commit 07b80ce
Show file tree
Hide file tree
Showing 7 changed files with 424 additions and 5 deletions.
85 changes: 85 additions & 0 deletions pkg/aws/bucketfallbackmapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package aws

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"

"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
multihash "github.com/multiformats/go-multihash"
"github.com/storacha/go-capabilities/pkg/assert"
"github.com/storacha/go-ucanto/core/delegation"
"github.com/storacha/go-ucanto/principal"
"github.com/storacha/indexing-service/pkg/internal/digestutil"
"github.com/storacha/indexing-service/pkg/types"
)

type ContentToClaimsMapper interface {
GetClaims(ctx context.Context, contentHash multihash.Multihash) ([]cid.Cid, error)
}

type BucketFallbackMapper struct {
id principal.Signer
bucketURL *url.URL
baseMapper ContentToClaimsMapper
getOpts func() []delegation.Option
}

func NewBucketFallbackMapper(id principal.Signer, bucketURL *url.URL, baseMapper ContentToClaimsMapper, getOpts func() []delegation.Option) BucketFallbackMapper {
return BucketFallbackMapper{
id: id,
bucketURL: bucketURL,
baseMapper: baseMapper,
getOpts: getOpts,
}
}

func (cfm BucketFallbackMapper) GetClaims(ctx context.Context, contentHash multihash.Multihash) ([]cid.Cid, error) {
claims, err := cfm.baseMapper.GetClaims(ctx, contentHash)
if err == nil || !errors.Is(err, types.ErrKeyNotFound) {
return claims, err
}

resp, err := http.DefaultClient.Head(cfm.bucketURL.JoinPath(toBlobKey(contentHash)).String())
if err != nil || resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, types.ErrKeyNotFound
}
size := uint64(resp.ContentLength)
delegation, err := assert.Location.Delegate(
cfm.id,
cfm.id,
cfm.id.DID().String(),
assert.LocationCaveats{
Content: assert.FromHash(contentHash),
Location: []url.URL{*cfm.bucketURL.JoinPath(toBlobKey(contentHash))},
Range: &assert.Range{Offset: 0, Length: &size},
},
cfm.getOpts()...,
)
if err != nil {
return nil, fmt.Errorf("generating delegation: %w", err)
}
delegationData, err := io.ReadAll(delegation.Archive())
if err != nil {
return nil, fmt.Errorf("serializing delegation: %w", err)
}
c, err := cid.Prefix{
Version: 1,
Codec: uint64(multicodec.Car),
MhType: multihash.IDENTITY,
MhLength: len(delegationData),
}.Sum(delegationData)
if err != nil {
return nil, fmt.Errorf("generating identity cid: %w", err)
}
return []cid.Cid{c}, err
}

func toBlobKey(contentHash multihash.Multihash) string {
mhStr := digestutil.Format(contentHash)
return fmt.Sprintf("%s/%s.blob", mhStr, mhStr)
}
195 changes: 195 additions & 0 deletions pkg/aws/bucketfallbackmapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package aws_test

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"testing"

"github.com/ipfs/go-cid"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/storacha/go-capabilities/pkg/assert"
"github.com/storacha/go-ucanto/core/delegation"
"github.com/storacha/indexing-service/pkg/aws"
"github.com/storacha/indexing-service/pkg/bytemap"
"github.com/storacha/indexing-service/pkg/internal/digestutil"
"github.com/storacha/indexing-service/pkg/internal/testutil"
"github.com/storacha/indexing-service/pkg/types"
"github.com/stretchr/testify/require"
)

func TestBucketFallbackMapper(t *testing.T) {
ctx := context.Background()
baseMap := bytemap.NewByteMap[multihash.Multihash, cidsAndError](-1)
responses := bytemap.NewByteMap[multihash.Multihash, resp](-1)
signer := testutil.RandomSigner()
serverURL := testutil.Must(url.Parse("http://localhost:8888"))(t)

hasBaseResultsHash := testutil.RandomMultihash()
hasBaseResultCids := []cid.Cid{testutil.RandomCID().(cidlink.Link).Cid}
baseMap.Set(hasBaseResultsHash, cidsAndError{hasBaseResultCids, nil})

hasBaseErrorHash := testutil.RandomMultihash()
hasBaseError := errors.New("something went real wrong")
baseMap.Set(hasBaseErrorHash, cidsAndError{nil, hasBaseError})

hasNonSuccessHash := testutil.RandomMultihash()
responses.Set(hasNonSuccessHash, resp{0, http.StatusInternalServerError})

hasSuccessHash := testutil.RandomMultihash()
hasSuccessContentLength := uint64(500)
responses.Set(hasSuccessHash, resp{int64(hasSuccessContentLength), http.StatusOK})
hasSuccessClaim := testutil.Must(assert.Location.Delegate(
signer,
signer,
signer.DID().String(),
assert.LocationCaveats{
Content: assert.FromHash(hasSuccessHash),
Location: []url.URL{
*serverURL.JoinPath(digestutil.Format(hasSuccessHash), fmt.Sprintf("%s.blob", digestutil.Format(hasSuccessHash))),
},
Range: &assert.Range{
Offset: 0,
Length: &hasSuccessContentLength,
},
},
delegation.WithNoExpiration(),
))(t)

data := testutil.Must(io.ReadAll(hasSuccessClaim.Archive()))(t)

hasSuccessCids := []cid.Cid{testutil.Must(cid.Prefix{
Version: 1,
Codec: uint64(multicodec.Car),
MhType: multihash.IDENTITY,
MhLength: int(hasSuccessContentLength),
}.Sum(data))(t)}

testCases := []struct {
name string
hash multihash.Multihash
expectedCids []cid.Cid
expectedErr error
expectedClaim delegation.Delegation
}{
{
name: "base mapper has results",
hash: hasBaseResultsHash,
expectedCids: hasBaseResultCids,
},
{
name: "base mapper has error other than not found",
hash: hasBaseErrorHash,
expectedErr: hasBaseError,
},
{
name: "non 200 status code from fallback bucket",
hash: hasNonSuccessHash,
expectedErr: types.ErrKeyNotFound,
},
{
name: "200 status code on head generates claim",
hash: hasSuccessHash,
expectedCids: hasSuccessCids,
expectedClaim: hasSuccessClaim,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
doneErr := make(chan error, 1)
mux := http.NewServeMux()
mux.Handle("/{multihash1}/{multihash2}", mockServer{responses})
server := &http.Server{
Addr: serverURL.Host,
Handler: mux,
}
go func() {
doneErr <- server.ListenAndServe()
}()
bucketFallbackMapper := aws.NewBucketFallbackMapper(signer, serverURL, mockMapper{baseMap}, func() []delegation.Option {
return []delegation.Option{delegation.WithNoExpiration()}
})
cids, err := bucketFallbackMapper.GetClaims(ctx, testCase.hash)
if testCase.expectedErr != nil {
require.ErrorIs(t, err, testCase.expectedErr)
require.Len(t, cids, 0)
} else {
require.NoError(t, err)
require.Equal(t, testCase.expectedCids, cids)
if testCase.expectedClaim != nil {
require.Len(t, cids, 1)
require.Equal(t, cids[0].Prefix().MhType, uint64(multihash.IDENTITY))
decoded := testutil.Must(multihash.Decode(cids[0].Hash()))(t)
claim := testutil.Must(delegation.Extract(decoded.Digest))(t)
testutil.RequireEqualDelegation(t, testCase.expectedClaim, claim)
}
}
require.NoError(t, server.Shutdown(ctx))
select {
case <-ctx.Done():
t.Fatal("did not complete shutdown")
case err := <-doneErr:
require.ErrorIs(t, err, http.ErrServerClosed)
}
})
}
}

type resp struct {
contentLength int64
status int
}

type mockServer struct {
hashes bytemap.ByteMap[multihash.Multihash, resp]
}

// ServeHTTP implements http.Handler.
func (m mockServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "HEAD" {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
mhString := r.PathValue("multihash1")
mhString2 := r.PathValue("multihash2")
mhTrimmed, hadSuffix := strings.CutSuffix(mhString2, ".blob")
if mhString != mhTrimmed || !hadSuffix || mhString == "" {
http.Error(w, "invalid multihash", http.StatusBadRequest)
return
}
mh, err := digestutil.Parse(mhString)
if err != nil {
http.Error(w, fmt.Sprintf("parsing multihash: %s", err.Error()), http.StatusBadRequest)
}
if !m.hashes.Has(mh) {
http.Error(w, "not found", http.StatusNotFound)
}
resp := m.hashes.Get(mh)
w.Header().Add("Content-Length", strconv.FormatInt(resp.contentLength, 10))
w.WriteHeader(resp.status)
}

type cidsAndError struct {
cids []cid.Cid
err error
}

type mockMapper struct {
contentMap bytemap.ByteMap[multihash.Multihash, cidsAndError]
}

// GetClaims implements aws.ContentToClaimsMapper.
func (m mockMapper) GetClaims(ctx context.Context, contentHash multihash.Multihash) ([]cid.Cid, error) {
if !m.contentMap.Has(contentHash) {
return nil, types.ErrKeyNotFound
}
resp := m.contentMap.Get(contentHash)
return resp.cids, resp.err
}
20 changes: 17 additions & 3 deletions pkg/aws/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"crypto/tls"
"errors"
"fmt"
"net/url"
"os"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
Expand All @@ -15,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/redis/go-redis/v9"
"github.com/storacha/go-metadata"
"github.com/storacha/go-ucanto/core/delegation"
"github.com/storacha/go-ucanto/did"
"github.com/storacha/go-ucanto/principal"
ed25519 "github.com/storacha/go-ucanto/principal/ed25519/signer"
Expand Down Expand Up @@ -158,9 +161,20 @@ func Construct(cfg Config) (types.Service, error) {
chunkLinksTable := NewDynamoProviderContextTable(cfg.Config, cfg.ChunkLinksTableName)
metadataTable := NewDynamoProviderContextTable(cfg.Config, cfg.MetadataTableName)
publisherStore := store.NewPublisherStore(ipniStore, chunkLinksTable, metadataTable, store.WithMetadataContext(metadata.MetadataContext))
legacyClaimsMapper := NewDynamoContentToClaimsMapper(dynamodb.NewFromConfig(cfg.Config), cfg.LegacyClaimsTableName)
legacyDataBucketURL, err := url.Parse(cfg.LegacyDataBucketURL)
if err != nil {
return nil, fmt.Errorf("parsing carpark url: %s", err)
}
legacyClaimsMapper := NewBucketFallbackMapper(
cfg.Signer,
legacyDataBucketURL,
NewDynamoContentToClaimsMapper(dynamodb.NewFromConfig(cfg.Config), cfg.LegacyClaimsTableName),
func() []delegation.Option {
return []delegation.Option{delegation.WithExpiration(int(time.Now().Add(time.Hour).Unix()))}
},
)
legacyClaimsBucket := contentclaims.NewStoreFromBucket(NewS3Store(cfg.Config, cfg.LegacyClaimsBucket, ""))
legacyClaimsUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/{claim}/{claim}.car", cfg.LegacyClaimsBucket, cfg.Config.Region)
legacyClaimsURL := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/{claim}/{claim}.car", cfg.LegacyClaimsBucket, cfg.Config.Region)

service, err := construct.Construct(
cfg.ServiceConfig,
Expand All @@ -169,7 +183,7 @@ func Construct(cfg Config) (types.Service, error) {
construct.WithPublisherStore(publisherStore),
construct.WithStartIPNIServer(false),
construct.WithClaimsStore(claimBucketStore),
construct.WithLegacyClaims(legacyClaimsMapper, legacyClaimsBucket, legacyClaimsUrl),
construct.WithLegacyClaims(legacyClaimsMapper, legacyClaimsBucket, legacyClaimsURL),
)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/construct/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func Construct(sc ServiceConfig, opts ...Option) (Service, error) {
if !strings.Contains(cfg.legacyClaimsUrl, service.ClaimUrlPlaceholder) {
return nil, fmt.Errorf("legacy claims url %s must contain the claim placeholder %s", cfg.legacyClaimsUrl, service.ClaimUrlPlaceholder)
}
legacyFinder := contentclaims.WithCache(contentclaims.WithStore(contentclaims.NewNotFoundFinder(), cfg.legacyClaimsBucket), claimsCache)
legacyFinder := contentclaims.WithIdentityCids(contentclaims.WithCache(contentclaims.WithStore(contentclaims.NewNotFoundFinder(), cfg.legacyClaimsBucket), claimsCache))
legacyClaims, err = providerindex.NewLegacyClaimsStore(cfg.legacyClaimsMapper, legacyFinder, cfg.legacyClaimsUrl)
if err != nil {
return nil, fmt.Errorf("creating legacy claims store: %w", err)
Expand Down
39 changes: 39 additions & 0 deletions pkg/service/contentclaims/identitycidfinder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package contentclaims

import (
"context"
"net/url"

"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/multiformats/go-multihash"
"github.com/storacha/go-ucanto/core/delegation"
)

type identityCidFinder struct {
finder Finder
}

var _ Finder = (*cachingFinder)(nil)

// WithIdentityCids augments a ClaimFinder with claims retrieved automatically whenever an identity CID is used
func WithIdentityCids(finder Finder) Finder {
return &identityCidFinder{finder}
}

// Find attempts to fetch a claim from either the permenant storage or via the provided URL
func (idf *identityCidFinder) Find(ctx context.Context, id ipld.Link, fetchURL url.URL) (delegation.Delegation, error) {

if cidLink, ok := id.(cidlink.Link); ok {
if cidLink.Cid.Prefix().MhType == multihash.IDENTITY {
dh, err := multihash.Decode(cidLink.Cid.Hash())
if err != nil {
return nil, err
}
return delegation.Extract(dh.Digest)
}
}

// attempt to fetch the claim from the underlying claim finder
return idf.finder.Find(ctx, id, fetchURL)
}
Loading

0 comments on commit 07b80ce

Please # to comment.