Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle idempotent retry after error during initializing phase #1406

Merged
merged 5 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ $(eval $(call makemock, internal/assets, Manager, assetm
$(eval $(call makemock, internal/contracts, Manager, contractmocks))
$(eval $(call makemock, internal/spievents, Manager, spieventsmocks))
$(eval $(call makemock, internal/orchestrator, Orchestrator, orchestratormocks))
$(eval $(call makemock, internal/apiserver, FFISwaggerGen, apiservermocks))
$(eval $(call makemock, internal/apiserver, Server, apiservermocks))
$(eval $(call makemock, internal/cache, Manager, cachemocks))
$(eval $(call makemock, internal/metrics, Manager, metricsmocks))
$(eval $(call makemock, internal/operations, Manager, operationmocks))
$(eval $(call makemock, internal/multiparty, Manager, multipartymocks))
$(eval $(call makemock, internal/apiserver, FFISwaggerGen, apiservermocks))
$(eval $(call makemock, internal/apiserver, Server, apiservermocks))

firefly-nocgo: ${GOFILES}
CGO_ENABLED=0 $(VGO) build -o ${BINARY_NAME}-nocgo -ldflags "-X main.buildDate=$(DATE) -X main.buildVersion=$(BUILD_VERSION) -X 'github.com/hyperledger/firefly/cmd.BuildVersionOverride=$(BUILD_VERSION)' -X 'github.com/hyperledger/firefly/cmd.BuildDate=$(DATE)' -X 'github.com/hyperledger/firefly/cmd.BuildCommit=$(GIT_REF)'" -tags=prod -tags=prod -v
Expand Down
2 changes: 1 addition & 1 deletion internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Manager interface {

// From operations.OperationHandler
PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error)
RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, complete bool, err error)
RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, phase core.OpPhase, err error)
}

type assetManager struct {
Expand Down
28 changes: 14 additions & 14 deletions internal/assets/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,49 +99,49 @@ func (am *assetManager) PrepareOperation(ctx context.Context, op *core.Operation
}
}

func (am *assetManager) RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, complete bool, err error) {
func (am *assetManager) RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, phase core.OpPhase, err error) {
switch data := op.Data.(type) {
case createPoolData:
plugin, err := am.selectTokenPlugin(ctx, data.Pool.Connector)
if err != nil {
return nil, false, err
return nil, core.OpPhaseInitializing, err
}
complete, err = plugin.CreateTokenPool(ctx, op.NamespacedIDString(), data.Pool)
return nil, complete, err
phase, err = plugin.CreateTokenPool(ctx, op.NamespacedIDString(), data.Pool)
return nil, phase, err

case activatePoolData:
plugin, err := am.selectTokenPlugin(ctx, data.Pool.Connector)
if err != nil {
return nil, false, err
return nil, core.OpPhaseInitializing, err
}
complete, err = plugin.ActivateTokenPool(ctx, data.Pool)
return nil, complete, err
phase, err = plugin.ActivateTokenPool(ctx, data.Pool)
return nil, phase, err

case transferData:
plugin, err := am.selectTokenPlugin(ctx, data.Pool.Connector)
if err != nil {
return nil, false, err
return nil, core.OpPhaseInitializing, err
}
switch data.Transfer.Type {
case core.TokenTransferTypeMint:
return nil, false, plugin.MintTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
return nil, core.OpPhaseInitializing, plugin.MintTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these transfer operations all considered "initializing"? If we get a successful return from the plugin, doesn't that mean the transaction has made it all the way through the token connector to the blockchain connector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the phase we're in as a result of the error in this layer of the code, rather than the operation state.

e.g. this error occurred before we submitted the operation to the connector, so the calling code can decide if that means we don't store the operation at all, or we store it in state Initializing

Copy link
Contributor Author

@peterbroadhurst peterbroadhurst Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not analyzed very scenario to know if the error path is such, that it could never succeed.
If I had done that (quite large) piece of work everywhere, I could have introduced a phase of OpPhaseRejected to mean - "this is never going to work, tell them it's broken... without even storing the operation at all".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK - so the phase is more meaningful in an error return case in order to determine how to respond to the error (not so much in a success case)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed in some managers you used operations.ErrTernary, but not here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - yes, this should be operations.ErrTernary almost certainly - I'd thought this was an error return.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok - added a commit for that

case core.TokenTransferTypeTransfer:
return nil, false, plugin.TransferTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
return nil, core.OpPhaseInitializing, plugin.TransferTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
case core.TokenTransferTypeBurn:
return nil, false, plugin.BurnTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
return nil, core.OpPhaseInitializing, plugin.BurnTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
default:
panic(fmt.Sprintf("unknown transfer type: %v", data.Transfer.Type))
}

case approvalData:
plugin, err := am.selectTokenPlugin(ctx, data.Pool.Connector)
if err != nil {
return nil, false, err
return nil, core.OpPhaseInitializing, err
}
return nil, false, plugin.TokensApproval(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Approval, data.Pool.Methods)
return nil, core.OpPhaseInitializing, plugin.TokensApproval(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Approval, data.Pool.Methods)

default:
return nil, false, i18n.NewError(ctx, coremsgs.MsgOperationDataIncorrect, op.Data)
return nil, core.OpPhaseInitializing, i18n.NewError(ctx, coremsgs.MsgOperationDataIncorrect, op.Data)
}
}

Expand Down
62 changes: 31 additions & 31 deletions internal/assets/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ func TestPrepareAndRunCreatePool(t *testing.T) {
assert.NoError(t, err)

mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("CreateTokenPool", context.Background(), "ns1:"+op.ID.String(), pool).Return(false, nil)
mti.On("CreateTokenPool", context.Background(), "ns1:"+op.ID.String(), pool).Return(core.OpPhaseComplete, nil)

po, err := am.PrepareOperation(context.Background(), op)
assert.NoError(t, err)
assert.Equal(t, pool, po.Data.(createPoolData).Pool)

_, complete, err := am.RunOperation(context.Background(), po)
_, phase, err := am.RunOperation(context.Background(), po)

assert.False(t, complete)
assert.Equal(t, core.OpPhaseComplete, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -78,16 +78,16 @@ func TestPrepareAndRunActivatePool(t *testing.T) {

mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mdi := am.database.(*databasemocks.Plugin)
mti.On("ActivateTokenPool", context.Background(), pool).Return(true, nil)
mti.On("ActivateTokenPool", context.Background(), pool).Return(core.OpPhaseComplete, nil)
mdi.On("GetTokenPoolByID", context.Background(), "ns1", pool.ID).Return(pool, nil)

po, err := am.PrepareOperation(context.Background(), op)
assert.NoError(t, err)
assert.Equal(t, pool, po.Data.(activatePoolData).Pool)

_, complete, err := am.RunOperation(context.Background(), po)
_, phase, err := am.RunOperation(context.Background(), po)

assert.True(t, complete)
assert.Equal(t, core.OpPhaseComplete, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand Down Expand Up @@ -124,9 +124,9 @@ func TestPrepareAndRunTransfer(t *testing.T) {
assert.Equal(t, pool, po.Data.(transferData).Pool)
assert.Equal(t, transfer, po.Data.(transferData).Transfer)

_, complete, err := am.RunOperation(context.Background(), po)
_, phase, err := am.RunOperation(context.Background(), po)

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand Down Expand Up @@ -163,9 +163,9 @@ func TestPrepareAndRunApproval(t *testing.T) {
assert.Equal(t, pool, po.Data.(approvalData).Pool)
assert.Equal(t, approval, po.Data.(approvalData).Approval)

_, complete, err := am.RunOperation(context.Background(), po)
_, phase, err := am.RunOperation(context.Background(), po)

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand Down Expand Up @@ -352,9 +352,9 @@ func TestRunOperationNotSupported(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()

_, complete, err := am.RunOperation(context.Background(), &core.PreparedOperation{})
_, phase, err := am.RunOperation(context.Background(), &core.PreparedOperation{})

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10378", err)
}

Expand All @@ -365,9 +365,9 @@ func TestRunOperationCreatePoolBadPlugin(t *testing.T) {
op := &core.Operation{}
pool := &core.TokenPool{}

_, complete, err := am.RunOperation(context.Background(), opCreatePool(op, pool))
_, phase, err := am.RunOperation(context.Background(), opCreatePool(op, pool))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10272", err)
}

Expand All @@ -384,11 +384,11 @@ func TestRunOperationCreatePool(t *testing.T) {
}

mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("CreateTokenPool", context.Background(), "ns1:"+op.ID.String(), pool).Return(false, nil)
mti.On("CreateTokenPool", context.Background(), "ns1:"+op.ID.String(), pool).Return(core.OpPhaseInitializing, nil)

_, complete, err := am.RunOperation(context.Background(), opCreatePool(op, pool))
_, phase, err := am.RunOperation(context.Background(), opCreatePool(op, pool))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -401,9 +401,9 @@ func TestRunOperationActivatePoolBadPlugin(t *testing.T) {
op := &core.Operation{}
pool := &core.TokenPool{}

_, complete, err := am.RunOperation(context.Background(), opActivatePool(op, pool))
_, phase, err := am.RunOperation(context.Background(), opActivatePool(op, pool))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10272", err)
}

Expand All @@ -415,9 +415,9 @@ func TestRunOperationTransferBadPlugin(t *testing.T) {
pool := &core.TokenPool{}
transfer := &core.TokenTransfer{}

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10272", err)
}

Expand All @@ -429,9 +429,9 @@ func TestRunOperationApprovalBadPlugin(t *testing.T) {
pool := &core.TokenPool{}
approval := &core.TokenApproval{}

_, complete, err := am.RunOperation(context.Background(), opApproval(op, pool, approval))
_, phase, err := am.RunOperation(context.Background(), opApproval(op, pool, approval))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10272", err)
}

Expand Down Expand Up @@ -473,9 +473,9 @@ func TestRunOperationTransferMint(t *testing.T) {
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("MintTokens", context.Background(), "ns1:"+op.ID.String(), "F1", transfer, (*fftypes.JSONAny)(nil)).Return(nil)

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -501,9 +501,9 @@ func TestRunOperationTransferMintWithInterface(t *testing.T) {
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("MintTokens", context.Background(), "ns1:"+op.ID.String(), "F1", transfer, pool.Methods).Return(nil)

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -528,9 +528,9 @@ func TestRunOperationTransferBurn(t *testing.T) {
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("BurnTokens", context.Background(), "ns1:"+op.ID.String(), "F1", transfer, (*fftypes.JSONAny)(nil)).Return(nil)

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -555,9 +555,9 @@ func TestRunOperationTransfer(t *testing.T) {
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("TransferTokens", context.Background(), "ns1:"+op.ID.String(), "F1", transfer, (*fftypes.JSONAny)(nil)).Return(nil)

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand Down
30 changes: 20 additions & 10 deletions internal/assets/token_approval.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ func (am *assetManager) GetTokenApprovals(ctx context.Context, filter ffapi.AndF
}

type approveSender struct {
mgr *assetManager
approval *core.TokenApprovalInput
resolved bool
msgSender syncasync.Sender
mgr *assetManager
approval *core.TokenApprovalInput
resolved bool
msgSender syncasync.Sender
idempotentSubmit bool
}

func (s *approveSender) Prepare(ctx context.Context) error {
Expand All @@ -58,8 +59,9 @@ func (s *approveSender) setDefaults() {

func (am *assetManager) NewApproval(approval *core.TokenApprovalInput) syncasync.Sender {
sender := &approveSender{
mgr: am,
approval: approval,
mgr: am,
approval: approval,
idempotentSubmit: approval.IdempotencyKey != "",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we really need to store this separate bool here, as we can always compute it from the approval above at the point we need it (but I'm fine either way)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have to do more investigation to convince myself of that.. and if I did so, I'd probably want to rename approval to approvalInput (rather than the implying it's the actual approval object).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... if it's ok, think I'd like to leave it as is for this PR

Copy link
Contributor

@awrichar awrichar Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yes, the thing we store is the input DTO, so possible it's named incorrectly.

}
sender.setDefaults()
return sender
Expand Down Expand Up @@ -107,20 +109,28 @@ func (s *approveSender) resolve(ctx context.Context) (opResubmitted bool, err er
if err != nil {
// Check if we've clashed on idempotency key. There might be operations still in "Initialized" state that need
// submitting to their handlers
resubmitWholeTX := false
if idemErr, ok := err.(*sqlcommon.IdempotencyError); ok {
resubmitted, resubmitErr := s.mgr.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
total, resubmitted, resubmitErr := s.mgr.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
if resubmitErr != nil {
// Error doing resubmit, return the new error
return false, resubmitErr
}
if len(resubmitted) > 0 {
if total == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic feels slightly confusing to me. I guess what we're trying to differentiate is these 3 cases:

  • no operations have ever been started for this transaction
  • some operations were previously initialized for this transaction, but we think they weren't fully submitted, so we've now retried them
  • some operations were previously initialized for this transaction, but we think they're all currently in flight, so we can't do anything but continue waiting

We're expressing these 3 states with a combination of two integer returns... wonder if there's a cleaner way to express it?

Copy link
Contributor Author

@peterbroadhurst peterbroadhurst Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - agreed.

I think pushing all this into a combined operation+transaction manager as a function that has plug points for the differentiation, rather than the whole of this section being boilerplate copied in lots of places would be good.

Choice is whether this little bit of extension to the boilerplate is the point to take on the bigger refactor.

I could merge the SubmitNewTransaction and ResubmitOperations functions to a new function, contains this logic in an opinionated way and take in the idempotencyKey out of the input-request, and return:

  1. It's all done now - please return without error to the user
  2. Nothing was done, here's the existing transaction ID
  3. We've never heard of this idempotency key, here's a shiny new TX
  4. This is already submitted completely, 409 to your user
  5. Something else went wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, point taken - it may not be worth continuing to iterate on this interface until we have the cycles to do a larger refactor. We should capture that next step as a task though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raised #1410

// We didn't do anything last time - just start again
txid = idemErr.ExistingTXID
resubmitWholeTX = true
err = nil
} else if len(resubmitted) > 0 {
// We resubmitted something - translate the status code to 200 (true return)
s.approval.TX.ID = idemErr.ExistingTXID
s.approval.TX.Type = core.TransactionTypeTokenApproval
return true, nil
}
}
return false, err
if !resubmitWholeTX {
return false, err
}
}
s.approval.TX.ID = txid
s.approval.TX.Type = core.TransactionTypeTokenApproval
Expand Down Expand Up @@ -194,7 +204,7 @@ func (s *approveSender) sendInternal(ctx context.Context, method sendMethod) (er
}
}

_, err = s.mgr.operations.RunOperation(ctx, opApproval(op, pool, &s.approval.TokenApproval))
_, err = s.mgr.operations.RunOperation(ctx, opApproval(op, pool, &s.approval.TokenApproval), s.idempotentSubmit)
return err
}

Expand Down
Loading
Loading