From 77776e17159e9c6faf6aeedf3085a7a2524d3205 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 5 Dec 2022 19:08:04 +0800 Subject: [PATCH] log-backup: do not rewrite shortvalue iif the value is rollback record. (#39596) (#39599) close pingcap/tidb#39575 --- br/pkg/stream/meta_kv.go | 60 ++++++++++++++++- br/pkg/stream/meta_kv_test.go | 101 ++++++++++++++++++++++++++-- br/pkg/stream/rewrite_meta_rawkv.go | 12 +++- 3 files changed, 165 insertions(+), 8 deletions(-) diff --git a/br/pkg/stream/meta_kv.go b/br/pkg/stream/meta_kv.go index 9d054f0bef454..fb7c2f79f17d1 100644 --- a/br/pkg/stream/meta_kv.go +++ b/br/pkg/stream/meta_kv.go @@ -111,15 +111,34 @@ const ( flagShortValuePrefix = byte('v') flagOverlappedRollback = byte('R') flagGCFencePrefix = byte('F') + flagLastChangePrefix = byte('l') + flagTxnSourcePrefix = byte('S') ) +// RawWriteCFValue represents the value in write columnFamily. +// Detail see line: https://github.com/tikv/tikv/blob/release-6.5/components/txn_types/src/write.rs#L70 type RawWriteCFValue struct { t WriteType startTs uint64 shortValue []byte hasOverlappedRollback bool - hasGCFence bool - gcFence uint64 + + // Records the next version after this version when overlapping rollback + // happens on an already existed commit record. + // + // See [`Write::gc_fence`] for more detail. + hasGCFence bool + gcFence uint64 + + // The number of versions that need skipping from this record + // to find the latest PUT/DELETE record. + // If versions_to_last_change > 0 but last_change_ts == 0, the key does not + // have a PUT/DELETE record before this write record. + lastChangeTs uint64 + versionsToLastChange uint64 + + // The source of this txn. + txnSource uint64 } // ParseFrom decodes the value to get the struct `RawWriteCFValue`. @@ -146,6 +165,10 @@ l_for: switch data[0] { case flagShortValuePrefix: vlen := data[1] + if len(data[2:]) < int(vlen) { + return errors.Annotatef(berrors.ErrInvalidArgument, + "the length of short value is invalid, vlen: %v", int(vlen)) + } v.shortValue = data[2 : vlen+2] data = data[vlen+2:] case flagOverlappedRollback: @@ -157,6 +180,20 @@ l_for: if err != nil { return errors.Annotate(berrors.ErrInvalidArgument, "decode gc fence failed") } + case flagLastChangePrefix: + data, v.lastChangeTs, err = codec.DecodeUint(data[1:]) + if err != nil { + return errors.Annotate(berrors.ErrInvalidArgument, "decode last change ts failed") + } + data, v.versionsToLastChange, err = codec.DecodeUvarint(data) + if err != nil { + return errors.Annotate(berrors.ErrInvalidArgument, "decode versions to last change failed") + } + case flagTxnSourcePrefix: + data, v.txnSource, err = codec.DecodeUvarint(data[1:]) + if err != nil { + return errors.Annotate(berrors.ErrInvalidArgument, "decode txn source failed") + } default: break l_for } @@ -164,6 +201,16 @@ l_for: return nil } +// IsRollback checks whether the value in cf is a `rollback` record. +func (v *RawWriteCFValue) IsRollback() bool { + return v.GetWriteType() == WriteTypeRollback +} + +// IsRollback checks whether the value in cf is a `delete` record. +func (v *RawWriteCFValue) IsDelete() bool { + return v.GetWriteType() == WriteTypeDelete +} + // HasShortValue checks whether short value is stored in write cf. func (v *RawWriteCFValue) HasShortValue() bool { return len(v.shortValue) > 0 @@ -204,5 +251,14 @@ func (v *RawWriteCFValue) EncodeTo() []byte { data = append(data, flagGCFencePrefix) data = codec.EncodeUint(data, v.gcFence) } + if v.lastChangeTs > 0 || v.versionsToLastChange > 0 { + data = append(data, flagLastChangePrefix) + data = codec.EncodeUint(data, v.lastChangeTs) + data = codec.EncodeUvarint(data, v.versionsToLastChange) + } + if v.txnSource > 0 { + data = append(data, flagTxnSourcePrefix) + data = codec.EncodeUvarint(data, v.txnSource) + } return data } diff --git a/br/pkg/stream/meta_kv_test.go b/br/pkg/stream/meta_kv_test.go index eaebf64526243..7a8c5e4fed8b6 100644 --- a/br/pkg/stream/meta_kv_test.go +++ b/br/pkg/stream/meta_kv_test.go @@ -68,29 +68,49 @@ func TestWriteType(t *testing.T) { } func TestWriteCFValueNoShortValue(t *testing.T) { + var ( + ts uint64 = 400036290571534337 + txnSource uint64 = 9527 + ) + buff := make([]byte, 0, 9) - buff = append(buff, byte('P')) - buff = codec.EncodeUvarint(buff, 400036290571534337) + buff = append(buff, WriteTypePut) + buff = codec.EncodeUvarint(buff, ts) + buff = append(buff, flagTxnSourcePrefix) + buff = codec.EncodeUvarint(buff, txnSource) v := new(RawWriteCFValue) err := v.ParseFrom(buff) require.NoError(t, err) + require.False(t, v.IsDelete()) + require.False(t, v.IsRollback()) require.False(t, v.HasShortValue()) + require.False(t, v.hasGCFence) + require.Equal(t, v.lastChangeTs, uint64(0)) + require.Equal(t, v.versionsToLastChange, uint64(0)) + require.Equal(t, v.txnSource, txnSource) encodedBuff := v.EncodeTo() require.True(t, bytes.Equal(buff, encodedBuff)) } func TestWriteCFValueWithShortValue(t *testing.T) { - var ts uint64 = 400036290571534337 - shortValue := []byte("pingCAP") + var ( + ts uint64 = 400036290571534337 + shortValue = []byte("pingCAP") + lastChangeTs uint64 = 9527 + versionsToLastChange uint64 = 95271 + ) buff := make([]byte, 0, 9) - buff = append(buff, byte('P')) + buff = append(buff, WriteTypePut) buff = codec.EncodeUvarint(buff, ts) buff = append(buff, flagShortValuePrefix) buff = append(buff, byte(len(shortValue))) buff = append(buff, shortValue...) + buff = append(buff, flagLastChangePrefix) + buff = codec.EncodeUint(buff, lastChangeTs) + buff = codec.EncodeUvarint(buff, versionsToLastChange) v := new(RawWriteCFValue) err := v.ParseFrom(buff) @@ -99,7 +119,78 @@ func TestWriteCFValueWithShortValue(t *testing.T) { require.True(t, bytes.Equal(v.GetShortValue(), shortValue)) require.False(t, v.hasGCFence) require.False(t, v.hasOverlappedRollback) + require.Equal(t, v.lastChangeTs, lastChangeTs) + require.Equal(t, v.versionsToLastChange, versionsToLastChange) + require.Equal(t, v.txnSource, uint64(0)) data := v.EncodeTo() require.True(t, bytes.Equal(data, buff)) } + +func TestWriteCFValueWithRollback(t *testing.T) { + var ( + ts uint64 = 400036290571534337 + protectedRollbackShortValue = []byte{'P'} + ) + + buff := make([]byte, 0, 9) + buff = append(buff, WriteTypeRollback) + buff = codec.EncodeUvarint(buff, ts) + buff = append(buff, flagShortValuePrefix, byte(len(protectedRollbackShortValue))) + buff = append(buff, protectedRollbackShortValue...) + + v := new(RawWriteCFValue) + err := v.ParseFrom(buff) + require.NoError(t, err) + require.True(t, v.IsRollback()) + require.True(t, v.HasShortValue()) + require.Equal(t, v.GetShortValue(), protectedRollbackShortValue) + require.Equal(t, v.startTs, ts) + require.Equal(t, v.lastChangeTs, uint64(0)) + require.Equal(t, v.versionsToLastChange, uint64(0)) + require.Equal(t, v.txnSource, uint64(0)) + + data := v.EncodeTo() + require.Equal(t, data, buff) +} + +func TestWriteCFValueWithDelete(t *testing.T) { + var ts uint64 = 400036290571534337 + buff := make([]byte, 0, 9) + buff = append(buff, byte('D')) + buff = codec.EncodeUvarint(buff, ts) + + v := new(RawWriteCFValue) + err := v.ParseFrom(buff) + require.NoError(t, err) + require.True(t, v.IsDelete()) + require.False(t, v.HasShortValue()) + + data := v.EncodeTo() + require.Equal(t, data, buff) +} + +func TestWriteCFValueWithGcFence(t *testing.T) { + var ( + ts uint64 = 400036290571534337 + gcFence uint64 = 9527 + ) + + buff := make([]byte, 0, 9) + buff = append(buff, WriteTypePut) + buff = codec.EncodeUvarint(buff, ts) + buff = append(buff, flagOverlappedRollback) + buff = append(buff, flagGCFencePrefix) + buff = codec.EncodeUint(buff, gcFence) + + v := new(RawWriteCFValue) + err := v.ParseFrom(buff) + require.NoError(t, err) + require.Equal(t, v.startTs, ts) + require.True(t, v.hasGCFence) + require.Equal(t, v.gcFence, gcFence) + require.True(t, v.hasOverlappedRollback) + + data := v.EncodeTo() + require.Equal(t, data, buff) +} diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 40e76a6130358..7398abdbb2cb9 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -451,13 +451,20 @@ func (sr *SchemasReplace) rewriteValueV2(value []byte, cf string, rewrite func([ return rewriteResult{}, errors.Trace(err) } - if rawWriteCFValue.t == WriteTypeDelete { + if rawWriteCFValue.IsDelete() { return rewriteResult{ NewValue: value, NeedRewrite: true, Deleted: true, }, nil } + if rawWriteCFValue.IsRollback() { + return rewriteResult{ + NewValue: value, + NeedRewrite: true, + Deleted: false, + }, nil + } if !rawWriteCFValue.HasShortValue() { return rewriteResult{ NewValue: value, @@ -467,6 +474,9 @@ func (sr *SchemasReplace) rewriteValueV2(value []byte, cf string, rewrite func([ shortValue, needWrite, err := rewrite(rawWriteCFValue.GetShortValue()) if err != nil { + log.Info("failed to rewrite short value", + zap.ByteString("write-type", []byte{rawWriteCFValue.GetWriteType()}), + zap.Int("short-value-len", len(rawWriteCFValue.GetShortValue()))) return rewriteResult{}, errors.Trace(err) } if !needWrite {