Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle idempotent retry after error during initializing phase #1406

Merged
merged 5 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ $(eval $(call makemock, internal/assets, Manager, assetm
$(eval $(call makemock, internal/contracts, Manager, contractmocks))
$(eval $(call makemock, internal/spievents, Manager, spieventsmocks))
$(eval $(call makemock, internal/orchestrator, Orchestrator, orchestratormocks))
$(eval $(call makemock, internal/apiserver, FFISwaggerGen, apiservermocks))
$(eval $(call makemock, internal/apiserver, Server, apiservermocks))
$(eval $(call makemock, internal/cache, Manager, cachemocks))
$(eval $(call makemock, internal/metrics, Manager, metricsmocks))
$(eval $(call makemock, internal/operations, Manager, operationmocks))
$(eval $(call makemock, internal/multiparty, Manager, multipartymocks))
$(eval $(call makemock, internal/apiserver, FFISwaggerGen, apiservermocks))
$(eval $(call makemock, internal/apiserver, Server, apiservermocks))

firefly-nocgo: ${GOFILES}
CGO_ENABLED=0 $(VGO) build -o ${BINARY_NAME}-nocgo -ldflags "-X main.buildDate=$(DATE) -X main.buildVersion=$(BUILD_VERSION) -X 'github.com/hyperledger/firefly/cmd.BuildVersionOverride=$(BUILD_VERSION)' -X 'github.com/hyperledger/firefly/cmd.BuildDate=$(DATE)' -X 'github.com/hyperledger/firefly/cmd.BuildCommit=$(GIT_REF)'" -tags=prod -tags=prod -v
Expand Down
2 changes: 1 addition & 1 deletion internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Manager interface {

// From operations.OperationHandler
PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error)
RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, complete bool, err error)
RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, phase core.OpPhase, err error)
}

type assetManager struct {
Expand Down
30 changes: 16 additions & 14 deletions internal/assets/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/internal/operations"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/pkg/core"
)
Expand Down Expand Up @@ -99,49 +100,50 @@ func (am *assetManager) PrepareOperation(ctx context.Context, op *core.Operation
}
}

func (am *assetManager) RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, complete bool, err error) {
func (am *assetManager) RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, phase core.OpPhase, err error) {
switch data := op.Data.(type) {
case createPoolData:
plugin, err := am.selectTokenPlugin(ctx, data.Pool.Connector)
if err != nil {
return nil, false, err
return nil, core.OpPhaseInitializing, err
}
complete, err = plugin.CreateTokenPool(ctx, op.NamespacedIDString(), data.Pool)
return nil, complete, err
phase, err = plugin.CreateTokenPool(ctx, op.NamespacedIDString(), data.Pool)
return nil, phase, err

case activatePoolData:
plugin, err := am.selectTokenPlugin(ctx, data.Pool.Connector)
if err != nil {
return nil, false, err
return nil, core.OpPhaseInitializing, err
}
complete, err = plugin.ActivateTokenPool(ctx, data.Pool)
return nil, complete, err
phase, err = plugin.ActivateTokenPool(ctx, data.Pool)
return nil, phase, err

case transferData:
plugin, err := am.selectTokenPlugin(ctx, data.Pool.Connector)
if err != nil {
return nil, false, err
return nil, core.OpPhaseInitializing, err
}
switch data.Transfer.Type {
case core.TokenTransferTypeMint:
return nil, false, plugin.MintTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
err = plugin.MintTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
case core.TokenTransferTypeTransfer:
return nil, false, plugin.TransferTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
err = plugin.TransferTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
case core.TokenTransferTypeBurn:
return nil, false, plugin.BurnTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
err = plugin.BurnTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
default:
panic(fmt.Sprintf("unknown transfer type: %v", data.Transfer.Type))
}
return nil, operations.ErrTernary(err, core.OpPhaseInitializing, core.OpPhasePending), err

case approvalData:
plugin, err := am.selectTokenPlugin(ctx, data.Pool.Connector)
if err != nil {
return nil, false, err
return nil, core.OpPhaseInitializing, err
}
return nil, false, plugin.TokensApproval(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Approval, data.Pool.Methods)
return nil, core.OpPhaseInitializing, plugin.TokensApproval(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Approval, data.Pool.Methods)

default:
return nil, false, i18n.NewError(ctx, coremsgs.MsgOperationDataIncorrect, op.Data)
return nil, core.OpPhaseInitializing, i18n.NewError(ctx, coremsgs.MsgOperationDataIncorrect, op.Data)
}
}

Expand Down
62 changes: 31 additions & 31 deletions internal/assets/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ func TestPrepareAndRunCreatePool(t *testing.T) {
assert.NoError(t, err)

mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("CreateTokenPool", context.Background(), "ns1:"+op.ID.String(), pool).Return(false, nil)
mti.On("CreateTokenPool", context.Background(), "ns1:"+op.ID.String(), pool).Return(core.OpPhaseComplete, nil)

po, err := am.PrepareOperation(context.Background(), op)
assert.NoError(t, err)
assert.Equal(t, pool, po.Data.(createPoolData).Pool)

_, complete, err := am.RunOperation(context.Background(), po)
_, phase, err := am.RunOperation(context.Background(), po)

assert.False(t, complete)
assert.Equal(t, core.OpPhaseComplete, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -78,16 +78,16 @@ func TestPrepareAndRunActivatePool(t *testing.T) {

mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mdi := am.database.(*databasemocks.Plugin)
mti.On("ActivateTokenPool", context.Background(), pool).Return(true, nil)
mti.On("ActivateTokenPool", context.Background(), pool).Return(core.OpPhaseComplete, nil)
mdi.On("GetTokenPoolByID", context.Background(), "ns1", pool.ID).Return(pool, nil)

po, err := am.PrepareOperation(context.Background(), op)
assert.NoError(t, err)
assert.Equal(t, pool, po.Data.(activatePoolData).Pool)

_, complete, err := am.RunOperation(context.Background(), po)
_, phase, err := am.RunOperation(context.Background(), po)

assert.True(t, complete)
assert.Equal(t, core.OpPhaseComplete, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand Down Expand Up @@ -124,9 +124,9 @@ func TestPrepareAndRunTransfer(t *testing.T) {
assert.Equal(t, pool, po.Data.(transferData).Pool)
assert.Equal(t, transfer, po.Data.(transferData).Transfer)

_, complete, err := am.RunOperation(context.Background(), po)
_, phase, err := am.RunOperation(context.Background(), po)

assert.False(t, complete)
assert.Equal(t, core.OpPhasePending, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand Down Expand Up @@ -163,9 +163,9 @@ func TestPrepareAndRunApproval(t *testing.T) {
assert.Equal(t, pool, po.Data.(approvalData).Pool)
assert.Equal(t, approval, po.Data.(approvalData).Approval)

_, complete, err := am.RunOperation(context.Background(), po)
_, phase, err := am.RunOperation(context.Background(), po)

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand Down Expand Up @@ -352,9 +352,9 @@ func TestRunOperationNotSupported(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()

_, complete, err := am.RunOperation(context.Background(), &core.PreparedOperation{})
_, phase, err := am.RunOperation(context.Background(), &core.PreparedOperation{})

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10378", err)
}

Expand All @@ -365,9 +365,9 @@ func TestRunOperationCreatePoolBadPlugin(t *testing.T) {
op := &core.Operation{}
pool := &core.TokenPool{}

_, complete, err := am.RunOperation(context.Background(), opCreatePool(op, pool))
_, phase, err := am.RunOperation(context.Background(), opCreatePool(op, pool))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10272", err)
}

Expand All @@ -384,11 +384,11 @@ func TestRunOperationCreatePool(t *testing.T) {
}

mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("CreateTokenPool", context.Background(), "ns1:"+op.ID.String(), pool).Return(false, nil)
mti.On("CreateTokenPool", context.Background(), "ns1:"+op.ID.String(), pool).Return(core.OpPhaseInitializing, nil)

_, complete, err := am.RunOperation(context.Background(), opCreatePool(op, pool))
_, phase, err := am.RunOperation(context.Background(), opCreatePool(op, pool))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -401,9 +401,9 @@ func TestRunOperationActivatePoolBadPlugin(t *testing.T) {
op := &core.Operation{}
pool := &core.TokenPool{}

_, complete, err := am.RunOperation(context.Background(), opActivatePool(op, pool))
_, phase, err := am.RunOperation(context.Background(), opActivatePool(op, pool))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10272", err)
}

Expand All @@ -415,9 +415,9 @@ func TestRunOperationTransferBadPlugin(t *testing.T) {
pool := &core.TokenPool{}
transfer := &core.TokenTransfer{}

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10272", err)
}

Expand All @@ -429,9 +429,9 @@ func TestRunOperationApprovalBadPlugin(t *testing.T) {
pool := &core.TokenPool{}
approval := &core.TokenApproval{}

_, complete, err := am.RunOperation(context.Background(), opApproval(op, pool, approval))
_, phase, err := am.RunOperation(context.Background(), opApproval(op, pool, approval))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10272", err)
}

Expand Down Expand Up @@ -473,9 +473,9 @@ func TestRunOperationTransferMint(t *testing.T) {
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("MintTokens", context.Background(), "ns1:"+op.ID.String(), "F1", transfer, (*fftypes.JSONAny)(nil)).Return(nil)

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhasePending, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -501,9 +501,9 @@ func TestRunOperationTransferMintWithInterface(t *testing.T) {
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("MintTokens", context.Background(), "ns1:"+op.ID.String(), "F1", transfer, pool.Methods).Return(nil)

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhasePending, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -528,9 +528,9 @@ func TestRunOperationTransferBurn(t *testing.T) {
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("BurnTokens", context.Background(), "ns1:"+op.ID.String(), "F1", transfer, (*fftypes.JSONAny)(nil)).Return(nil)

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhasePending, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -555,9 +555,9 @@ func TestRunOperationTransfer(t *testing.T) {
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("TransferTokens", context.Background(), "ns1:"+op.ID.String(), "F1", transfer, (*fftypes.JSONAny)(nil)).Return(nil)

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhasePending, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand Down
30 changes: 20 additions & 10 deletions internal/assets/token_approval.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ func (am *assetManager) GetTokenApprovals(ctx context.Context, filter ffapi.AndF
}

type approveSender struct {
mgr *assetManager
approval *core.TokenApprovalInput
resolved bool
msgSender syncasync.Sender
mgr *assetManager
approval *core.TokenApprovalInput
resolved bool
msgSender syncasync.Sender
idempotentSubmit bool
}

func (s *approveSender) Prepare(ctx context.Context) error {
Expand All @@ -58,8 +59,9 @@ func (s *approveSender) setDefaults() {

func (am *assetManager) NewApproval(approval *core.TokenApprovalInput) syncasync.Sender {
sender := &approveSender{
mgr: am,
approval: approval,
mgr: am,
approval: approval,
idempotentSubmit: approval.IdempotencyKey != "",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we really need to store this separate bool here, as we can always compute it from the approval above at the point we need it (but I'm fine either way)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have to do more investigation to convince myself of that.. and if I did so, I'd probably want to rename approval to approvalInput (rather than the implying it's the actual approval object).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... if it's ok, think I'd like to leave it as is for this PR

Copy link
Contributor

@awrichar awrichar Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yes, the thing we store is the input DTO, so possible it's named incorrectly.

}
sender.setDefaults()
return sender
Expand Down Expand Up @@ -107,20 +109,28 @@ func (s *approveSender) resolve(ctx context.Context) (opResubmitted bool, err er
if err != nil {
// Check if we've clashed on idempotency key. There might be operations still in "Initialized" state that need
// submitting to their handlers
resubmitWholeTX := false
if idemErr, ok := err.(*sqlcommon.IdempotencyError); ok {
resubmitted, resubmitErr := s.mgr.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
total, resubmitted, resubmitErr := s.mgr.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
if resubmitErr != nil {
// Error doing resubmit, return the new error
return false, resubmitErr
}
if len(resubmitted) > 0 {
if total == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic feels slightly confusing to me. I guess what we're trying to differentiate is these 3 cases:

  • no operations have ever been started for this transaction
  • some operations were previously initialized for this transaction, but we think they weren't fully submitted, so we've now retried them
  • some operations were previously initialized for this transaction, but we think they're all currently in flight, so we can't do anything but continue waiting

We're expressing these 3 states with a combination of two integer returns... wonder if there's a cleaner way to express it?

Copy link
Contributor Author

@peterbroadhurst peterbroadhurst Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - agreed.

I think pushing all this into a combined operation+transaction manager as a function that has plug points for the differentiation, rather than the whole of this section being boilerplate copied in lots of places would be good.

Choice is whether this little bit of extension to the boilerplate is the point to take on the bigger refactor.

I could merge the SubmitNewTransaction and ResubmitOperations functions to a new function, contains this logic in an opinionated way and take in the idempotencyKey out of the input-request, and return:

  1. It's all done now - please return without error to the user
  2. Nothing was done, here's the existing transaction ID
  3. We've never heard of this idempotency key, here's a shiny new TX
  4. This is already submitted completely, 409 to your user
  5. Something else went wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, point taken - it may not be worth continuing to iterate on this interface until we have the cycles to do a larger refactor. We should capture that next step as a task though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raised #1410

// We didn't do anything last time - just start again
txid = idemErr.ExistingTXID
resubmitWholeTX = true
err = nil
} else if len(resubmitted) > 0 {
// We resubmitted something - translate the status code to 200 (true return)
s.approval.TX.ID = idemErr.ExistingTXID
s.approval.TX.Type = core.TransactionTypeTokenApproval
return true, nil
}
}
return false, err
if !resubmitWholeTX {
return false, err
}
}
s.approval.TX.ID = txid
s.approval.TX.Type = core.TransactionTypeTokenApproval
Expand Down Expand Up @@ -194,7 +204,7 @@ func (s *approveSender) sendInternal(ctx context.Context, method sendMethod) (er
}
}

_, err = s.mgr.operations.RunOperation(ctx, opApproval(op, pool, &s.approval.TokenApproval))
_, err = s.mgr.operations.RunOperation(ctx, opApproval(op, pool, &s.approval.TokenApproval), s.idempotentSubmit)
return err
}

Expand Down
Loading
Loading