Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix events API timeout handling for nil blocks #7184

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions chain/events/events_called.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
// Apply any queued events and timeouts that were targeted at the
// current chain height
e.applyWithConfidence(ts, at)
e.applyTimeouts(ts)
e.applyTimeouts(at)
}

// Update the latest known tipset
Expand Down Expand Up @@ -273,8 +273,8 @@ func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch)
}

// Apply any timeouts that expire at this height
func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
triggers, ok := e.timeouts[ts.Height()]
func (e *hcEvents) applyTimeouts(at abi.ChainEpoch) {
triggers, ok := e.timeouts[at]
if !ok {
return // nothing to do
}
Expand All @@ -288,14 +288,14 @@ func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
continue
}

timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence))
timeoutTs, err := e.tsc.get(at - abi.ChainEpoch(trigger.confidence))
if err != nil {
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height())
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", at-abi.ChainEpoch(trigger.confidence), at)
}

more, err := trigger.handle(nil, nil, timeoutTs, ts.Height())
more, err := trigger.handle(nil, nil, timeoutTs, at)
if err != nil {
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), at, err)
continue // don't revert failed calls
}

Expand Down
153 changes: 80 additions & 73 deletions chain/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,81 +1293,88 @@ func TestStateChangedRevert(t *testing.T) {
}

func TestStateChangedTimeout(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,

msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))

events := NewEvents(context.Background(), fcs)

called := false

err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
called = true
require.Nil(t, data)
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
require.Equal(t, abi.ChainEpoch(23), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
return false, nil, nil
})

require.NoError(t, err)

fcs.advance(0, 21, nil)
require.False(t, called)

fcs.advance(0, 5, nil)
require.True(t, called)
called = false

// with check func reporting done

fcs = &fakeCS{
t: t,
h: 1,
timeoutHeight := abi.ChainEpoch(20)
confidence := 3

msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
testCases := []struct {
name string
checkFn CheckFunc
nilBlocks []int
expectTimeout bool
}{{
// Verify that the state changed timeout is called at the expected height
name: "state changed timeout",
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
},
expectTimeout: true,
}, {
// Verify that the state changed timeout is called even if the timeout
// falls on nil block
name: "state changed timeout falls on nil block",
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
},
nilBlocks: []int{20, 21, 22, 23},
expectTimeout: true,
}, {
// Verify that the state changed timeout is not called if the check
// function reports that it's complete
name: "no timeout callback if check func reports done",
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
},
expectTimeout: false,
}}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,

msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))

events := NewEvents(context.Background(), fcs)

// Track whether the callback was called
called := false

// Set up state change tracking that will timeout at the given height
err := events.StateChanged(
tc.checkFn,
func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
// Expect the callback to be called at the timeout height with nil data
called = true
require.Nil(t, data)
require.Equal(t, timeoutHeight, newTs.Height())
require.Equal(t, timeoutHeight+abi.ChainEpoch(confidence), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, confidence, timeoutHeight, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
return false, nil, nil
})

require.NoError(t, err)

// Advance to timeout height
fcs.advance(0, int(timeoutHeight)+1, nil)
require.False(t, called)

// Advance past timeout height
fcs.advance(0, 5, nil, tc.nilBlocks...)
require.Equal(t, tc.expectTimeout, called)
called = false
})
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))

events = NewEvents(context.Background(), fcs)

err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
called = true
require.Nil(t, data)
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
require.Equal(t, abi.ChainEpoch(23), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
return false, nil, nil
})
require.NoError(t, err)

fcs.advance(0, 21, nil)
require.False(t, called)

fcs.advance(0, 5, nil)
require.False(t, called)
}

func TestCalledMultiplePerEpoch(t *testing.T) {
Expand Down
117 changes: 117 additions & 0 deletions itests/deals_expiry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package itests

import (
"context"
"testing"
"time"

"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/itests/kit"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
market3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/market"
market4 "github.com/filecoin-project/specs-actors/v4/actors/builtin/market"
market5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/market"
"github.com/stretchr/testify/require"
)

// Test that the deal state eventually moves to "Expired" on both client and miner
func TestDealExpiry(t *testing.T) {
kit.QuietMiningLogs()

// reset minimum deal duration to 0, so we can make very short-lived deals.
// NOTE: this will need updating with every new specs-actors version.
market2.DealMinDuration = 0
market3.DealMinDuration = 0
market4.DealMinDuration = 0
market5.DealMinDuration = 0

ctx := context.Background()

var (
client kit.TestFullNode
miner1 kit.TestMiner
)

ens := kit.NewEnsemble(t, kit.MockProofs())
ens.FullNode(&client)
ens.Miner(&miner1, &client, kit.WithAllSubsystems())
bm := ens.Start().InterconnectAll().BeginMining(50 * time.Millisecond)

dh := kit.NewDealHarness(t, &client, &miner1, &miner1)

client.WaitTillChain(ctx, kit.HeightAtLeast(5))

// Make a deal with a short duration
dealProposalCid, _, _ := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{
Rseed: 0,
FastRet: true,
// Needs to be far enough in the future to ensure the deal has been sealed
StartEpoch: 3000,
// Short deal duration
MinBlocksDuration: 50,
})

// Inject null blocks each time the chain advances by a block so as to
// get to deal expiration faster
go func() {
ch, _ := client.ChainNotify(ctx)
for range ch {
bm[0].InjectNulls(10)
}
}()

clientExpired := false
minerExpired := false
for {
ts, err := client.ChainHead(ctx)
require.NoError(t, err)

t.Logf("Chain height: %d", ts.Height())

// Get the miner deal
minerDeals, err := miner1.MarketListIncompleteDeals(ctx)
require.NoError(t, err)
require.Greater(t, len(minerDeals), 0)

var minerDeal *storagemarket.MinerDeal
for _, d := range minerDeals {
if d.ProposalCid == *dealProposalCid {
minerDeal = &d
}
}
require.NotNil(t, minerDeal)

t.Logf("Miner deal:")
t.Logf(" %s -> %s", minerDeal.Proposal.Client, minerDeal.Proposal.Provider)
t.Logf(" StartEpoch: %d", minerDeal.Proposal.StartEpoch)
t.Logf(" EndEpoch: %d", minerDeal.Proposal.EndEpoch)
t.Logf(" State: %s", storagemarket.DealStates[minerDeal.State])
//spew.Dump(d)

// Get the client deal
clientDeals, err := client.ClientListDeals(ctx)
require.NoError(t, err)

t.Logf("Client deal state: %s", storagemarket.DealStates[clientDeals[0].State])

// Expect the deal to eventually expire on the client and the miner
if clientDeals[0].State == storagemarket.StorageDealExpired {
t.Logf("Client deal expired")
clientExpired = true
}
if minerDeal.State == storagemarket.StorageDealExpired {
t.Logf("Miner deal expired")
minerExpired = true
}
if clientExpired && minerExpired {
t.Logf("PASS: Client and miner deal expired")
return
}

if ts.Height() > 5000 {
t.Fatalf("Reached height %d without client and miner deals expiring", ts.Height())
}

time.Sleep(2 * time.Second)
}
}
8 changes: 5 additions & 3 deletions itests/kit/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ type DealHarness struct {
}

type MakeFullDealParams struct {
Rseed int
FastRet bool
StartEpoch abi.ChainEpoch
Rseed int
FastRet bool
StartEpoch abi.ChainEpoch
MinBlocksDuration uint64

// SuspendUntilCryptoeconStable suspends deal-making, until cryptoecon
// parameters are stabilised. This affects projected collateral, and tests
Expand Down Expand Up @@ -92,6 +93,7 @@ func (dh *DealHarness) MakeOnlineDeal(ctx context.Context, params MakeFullDealPa
dp.Data.Root = res.Root
dp.DealStartEpoch = params.StartEpoch
dp.FastRetrieval = params.FastRet
dp.MinBlocksDuration = params.MinBlocksDuration
deal = dh.StartDeal(ctx, dp)

// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
Expand Down
2 changes: 1 addition & 1 deletion node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside
earliest := abi.ChainEpoch(sealEpochs) + ht
if deal.Proposal.StartEpoch < earliest {
log.Warnw("proposed deal would start before sealing can be completed; rejecting storage deal proposal from client", "piece_cid", deal.Proposal.PieceCID, "client", deal.Client.String(), "seal_duration", sealDuration, "earliest", earliest, "curepoch", ht)
return false, fmt.Sprintf("cannot seal a sector before %s", deal.Proposal.StartEpoch), nil
return false, fmt.Sprintf("proposed deal start epoch %s too early, cannot seal a sector before %s", deal.Proposal.StartEpoch, earliest), nil
}

sd, err := startDelay()
Expand Down