Skip to content

Commit

Permalink
log-backup: do not rewrite shortvalue iif the value is rollback recor…
Browse files Browse the repository at this point in the history
…d. (#39596) (#39599)

close #39575
  • Loading branch information
ti-chi-bot committed Dec 5, 2022
1 parent aedbd4f commit 77776e1
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 8 deletions.
60 changes: 58 additions & 2 deletions br/pkg/stream/meta_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,34 @@ const (
flagShortValuePrefix = byte('v')
flagOverlappedRollback = byte('R')
flagGCFencePrefix = byte('F')
flagLastChangePrefix = byte('l')
flagTxnSourcePrefix = byte('S')
)

// RawWriteCFValue represents the value in write columnFamily.
// Detail see line: https://github.com/tikv/tikv/blob/release-6.5/components/txn_types/src/write.rs#L70
type RawWriteCFValue struct {
t WriteType
startTs uint64
shortValue []byte
hasOverlappedRollback bool
hasGCFence bool
gcFence uint64

// Records the next version after this version when overlapping rollback
// happens on an already existed commit record.
//
// See [`Write::gc_fence`] for more detail.
hasGCFence bool
gcFence uint64

// The number of versions that need skipping from this record
// to find the latest PUT/DELETE record.
// If versions_to_last_change > 0 but last_change_ts == 0, the key does not
// have a PUT/DELETE record before this write record.
lastChangeTs uint64
versionsToLastChange uint64

// The source of this txn.
txnSource uint64
}

// ParseFrom decodes the value to get the struct `RawWriteCFValue`.
Expand All @@ -146,6 +165,10 @@ l_for:
switch data[0] {
case flagShortValuePrefix:
vlen := data[1]
if len(data[2:]) < int(vlen) {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the length of short value is invalid, vlen: %v", int(vlen))
}
v.shortValue = data[2 : vlen+2]
data = data[vlen+2:]
case flagOverlappedRollback:
Expand All @@ -157,13 +180,37 @@ l_for:
if err != nil {
return errors.Annotate(berrors.ErrInvalidArgument, "decode gc fence failed")
}
case flagLastChangePrefix:
data, v.lastChangeTs, err = codec.DecodeUint(data[1:])
if err != nil {
return errors.Annotate(berrors.ErrInvalidArgument, "decode last change ts failed")
}
data, v.versionsToLastChange, err = codec.DecodeUvarint(data)
if err != nil {
return errors.Annotate(berrors.ErrInvalidArgument, "decode versions to last change failed")
}
case flagTxnSourcePrefix:
data, v.txnSource, err = codec.DecodeUvarint(data[1:])
if err != nil {
return errors.Annotate(berrors.ErrInvalidArgument, "decode txn source failed")
}
default:
break l_for
}
}
return nil
}

// IsRollback checks whether the value in cf is a `rollback` record.
func (v *RawWriteCFValue) IsRollback() bool {
return v.GetWriteType() == WriteTypeRollback
}

// IsRollback checks whether the value in cf is a `delete` record.
func (v *RawWriteCFValue) IsDelete() bool {
return v.GetWriteType() == WriteTypeDelete
}

// HasShortValue checks whether short value is stored in write cf.
func (v *RawWriteCFValue) HasShortValue() bool {
return len(v.shortValue) > 0
Expand Down Expand Up @@ -204,5 +251,14 @@ func (v *RawWriteCFValue) EncodeTo() []byte {
data = append(data, flagGCFencePrefix)
data = codec.EncodeUint(data, v.gcFence)
}
if v.lastChangeTs > 0 || v.versionsToLastChange > 0 {
data = append(data, flagLastChangePrefix)
data = codec.EncodeUint(data, v.lastChangeTs)
data = codec.EncodeUvarint(data, v.versionsToLastChange)
}
if v.txnSource > 0 {
data = append(data, flagTxnSourcePrefix)
data = codec.EncodeUvarint(data, v.txnSource)
}
return data
}
101 changes: 96 additions & 5 deletions br/pkg/stream/meta_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,49 @@ func TestWriteType(t *testing.T) {
}

func TestWriteCFValueNoShortValue(t *testing.T) {
var (
ts uint64 = 400036290571534337
txnSource uint64 = 9527
)

buff := make([]byte, 0, 9)
buff = append(buff, byte('P'))
buff = codec.EncodeUvarint(buff, 400036290571534337)
buff = append(buff, WriteTypePut)
buff = codec.EncodeUvarint(buff, ts)
buff = append(buff, flagTxnSourcePrefix)
buff = codec.EncodeUvarint(buff, txnSource)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
require.NoError(t, err)
require.False(t, v.IsDelete())
require.False(t, v.IsRollback())
require.False(t, v.HasShortValue())
require.False(t, v.hasGCFence)
require.Equal(t, v.lastChangeTs, uint64(0))
require.Equal(t, v.versionsToLastChange, uint64(0))
require.Equal(t, v.txnSource, txnSource)

encodedBuff := v.EncodeTo()
require.True(t, bytes.Equal(buff, encodedBuff))
}

func TestWriteCFValueWithShortValue(t *testing.T) {
var ts uint64 = 400036290571534337
shortValue := []byte("pingCAP")
var (
ts uint64 = 400036290571534337
shortValue = []byte("pingCAP")
lastChangeTs uint64 = 9527
versionsToLastChange uint64 = 95271
)

buff := make([]byte, 0, 9)
buff = append(buff, byte('P'))
buff = append(buff, WriteTypePut)
buff = codec.EncodeUvarint(buff, ts)
buff = append(buff, flagShortValuePrefix)
buff = append(buff, byte(len(shortValue)))
buff = append(buff, shortValue...)
buff = append(buff, flagLastChangePrefix)
buff = codec.EncodeUint(buff, lastChangeTs)
buff = codec.EncodeUvarint(buff, versionsToLastChange)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
Expand All @@ -99,7 +119,78 @@ func TestWriteCFValueWithShortValue(t *testing.T) {
require.True(t, bytes.Equal(v.GetShortValue(), shortValue))
require.False(t, v.hasGCFence)
require.False(t, v.hasOverlappedRollback)
require.Equal(t, v.lastChangeTs, lastChangeTs)
require.Equal(t, v.versionsToLastChange, versionsToLastChange)
require.Equal(t, v.txnSource, uint64(0))

data := v.EncodeTo()
require.True(t, bytes.Equal(data, buff))
}

func TestWriteCFValueWithRollback(t *testing.T) {
var (
ts uint64 = 400036290571534337
protectedRollbackShortValue = []byte{'P'}
)

buff := make([]byte, 0, 9)
buff = append(buff, WriteTypeRollback)
buff = codec.EncodeUvarint(buff, ts)
buff = append(buff, flagShortValuePrefix, byte(len(protectedRollbackShortValue)))
buff = append(buff, protectedRollbackShortValue...)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
require.NoError(t, err)
require.True(t, v.IsRollback())
require.True(t, v.HasShortValue())
require.Equal(t, v.GetShortValue(), protectedRollbackShortValue)
require.Equal(t, v.startTs, ts)
require.Equal(t, v.lastChangeTs, uint64(0))
require.Equal(t, v.versionsToLastChange, uint64(0))
require.Equal(t, v.txnSource, uint64(0))

data := v.EncodeTo()
require.Equal(t, data, buff)
}

func TestWriteCFValueWithDelete(t *testing.T) {
var ts uint64 = 400036290571534337
buff := make([]byte, 0, 9)
buff = append(buff, byte('D'))
buff = codec.EncodeUvarint(buff, ts)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
require.NoError(t, err)
require.True(t, v.IsDelete())
require.False(t, v.HasShortValue())

data := v.EncodeTo()
require.Equal(t, data, buff)
}

func TestWriteCFValueWithGcFence(t *testing.T) {
var (
ts uint64 = 400036290571534337
gcFence uint64 = 9527
)

buff := make([]byte, 0, 9)
buff = append(buff, WriteTypePut)
buff = codec.EncodeUvarint(buff, ts)
buff = append(buff, flagOverlappedRollback)
buff = append(buff, flagGCFencePrefix)
buff = codec.EncodeUint(buff, gcFence)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
require.NoError(t, err)
require.Equal(t, v.startTs, ts)
require.True(t, v.hasGCFence)
require.Equal(t, v.gcFence, gcFence)
require.True(t, v.hasOverlappedRollback)

data := v.EncodeTo()
require.Equal(t, data, buff)
}
12 changes: 11 additions & 1 deletion br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,20 @@ func (sr *SchemasReplace) rewriteValueV2(value []byte, cf string, rewrite func([
return rewriteResult{}, errors.Trace(err)
}

if rawWriteCFValue.t == WriteTypeDelete {
if rawWriteCFValue.IsDelete() {
return rewriteResult{
NewValue: value,
NeedRewrite: true,
Deleted: true,
}, nil
}
if rawWriteCFValue.IsRollback() {
return rewriteResult{
NewValue: value,
NeedRewrite: true,
Deleted: false,
}, nil
}
if !rawWriteCFValue.HasShortValue() {
return rewriteResult{
NewValue: value,
Expand All @@ -467,6 +474,9 @@ func (sr *SchemasReplace) rewriteValueV2(value []byte, cf string, rewrite func([

shortValue, needWrite, err := rewrite(rawWriteCFValue.GetShortValue())
if err != nil {
log.Info("failed to rewrite short value",
zap.ByteString("write-type", []byte{rawWriteCFValue.GetWriteType()}),
zap.Int("short-value-len", len(rawWriteCFValue.GetShortValue())))
return rewriteResult{}, errors.Trace(err)
}
if !needWrite {
Expand Down

0 comments on commit 77776e1

Please sign in to comment.