Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

lightning: cherry-pick some PRs #1458

Merged
merged 5 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ func (ss *Schemas) BackupSchemas(
metaWriter.StartWriteMetasAsync(ctx, op)
for _, s := range ss.schemas {
schema := s
// Because schema.dbInfo is a pointer that many tables point to.
// Remove "add Temporary-prefix into dbName" from closure to prevent concurrent operations.
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
workerPool.ApplyOnErrorGroup(errg, func() error {
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
logger := log.With(
zap.String("db", schema.dbInfo.Name.O),
zap.String("table", schema.tableInfo.Name.O),
Expand Down
37 changes: 37 additions & 0 deletions pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package backup_test

import (
"context"
"fmt"
"math"
"strings"
"sync/atomic"

"github.com/golang/protobuf/proto"
Expand All @@ -20,6 +22,7 @@ import (
"github.com/pingcap/br/pkg/metautil"
"github.com/pingcap/br/pkg/mock"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/utils"
)

var _ = Suite(&testBackupSchemaSuite{})
Expand Down Expand Up @@ -240,3 +243,37 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchemaWithBrokenStats(c *
c.Assert(schemas2[0].Info, DeepEquals, schemas[0].Info)
c.Assert(schemas2[0].DB, DeepEquals, schemas[0].DB)
}

func (s *testBackupSchemaSuite) TestBackupSchemasForSystemTable(c *C) {
tk := testkit.NewTestKit(c, s.mock.Storage)
es2 := s.GetRandomStorage(c)

systemTablesCount := 32
tablePrefix := "systable"
tk.MustExec("use mysql")
for i := 1; i <= systemTablesCount; i++ {
query := fmt.Sprintf("create table %s%d (a char(1));", tablePrefix, i)
tk.MustExec(query)
}

f, err := filter.Parse([]string{"mysql.systable*"})
c.Assert(err, IsNil)
_, backupSchemas, err := backup.BuildBackupRangeAndSchema(s.mock.Storage, f, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(backupSchemas.Len(), Equals, systemTablesCount)

ctx := context.Background()
updateCh := new(simpleProgress)

metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false)
err = backupSchemas.BackupSchemas(ctx, metaWriter2, s.mock.Storage, nil,
math.MaxUint64, 1, variable.DefChecksumTableConcurrency, true, updateCh)
c.Assert(err, IsNil)

schemas2 := s.GetSchemasFromMeta(c, es2)
c.Assert(schemas2, HasLen, systemTablesCount)
for _, schema := range schemas2 {
c.Assert(schema.DB.Name, Equals, utils.TemporaryDBName("mysql"))
c.Assert(strings.HasPrefix(schema.Info.Name.O, tablePrefix), Equals, true)
}
}
14 changes: 12 additions & 2 deletions pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ import (
"github.com/pingcap/br/pkg/utils"
)

const tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
const (
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
// the increment ratio of large CSV file size threshold by `region-split-size`
largeCSVLowerThresholdRation = 10
)

type TableRegion struct {
EngineID int32
Expand Down Expand Up @@ -268,7 +272,10 @@ func makeSourceFileRegion(
}
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
if isCsvFile && dataFileSize > int64(cfg.Mydumper.MaxRegionSize) && cfg.Mydumper.StrictFormat {
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
// like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can
// avoid split a lot of small chunks.
if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) {
_, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store)
return regions, subFileSizes, err
}
Expand Down Expand Up @@ -359,6 +366,9 @@ func SplitLargeFile(
columns = parser.Columns()
startOffset, _ = parser.Pos()
endOffset = startOffset + maxRegionSize
if endOffset > dataFile.FileMeta.FileSize {
endOffset = dataFile.FileMeta.FileSize
}
}
for {
curRowsCnt := (endOffset - startOffset) / divisor
Expand Down
56 changes: 56 additions & 0 deletions pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,59 @@ func (s *testMydumpRegionSuite) TestSplitLargeFileWithCustomTerminator(c *C) {
c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1])
}
}

func (s *testMydumpRegionSuite) TestSplitLargeFileOnlyOneChunk(c *C) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_file",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 15,
},
}

dir := c.MkDir()

fileName := "test.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("field1,field2\r\n123,456\r\n")
err := os.WriteFile(filePath, content, 0o644)
c.Assert(err, IsNil)

dataFileInfo, err := os.Stat(filePath)
c.Assert(err, IsNil)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
colCnt := int64(2)
columns := []string{"field1", "field2"}
prevRowIdxMax := int64(0)
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)

offsets := [][]int64{{14, 24}}

_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
c.Assert(err, IsNil)
c.Assert(regions, HasLen, len(offsets))
for i := range offsets {
c.Assert(regions[i].Chunk.Offset, Equals, offsets[i][0])
c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1])
c.Assert(regions[i].Chunk.Columns, DeepEquals, columns)
}
}
7 changes: 1 addition & 6 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,6 @@ func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
opt = &WalkOption{}
}

maxKeys := int64(1000)
if opt.ListCount > 0 {
maxKeys = opt.ListCount
}

prefix := path.Join(s.gcs.Prefix, opt.SubDir)
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
Expand All @@ -188,7 +183,7 @@ func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
// only need each object's name and size
query.SetAttrSelection([]string{"Name", "Size"})
iter := s.bucket.Objects(ctx, query)
for i := int64(0); i != maxKeys; i++ {
for {
attrs, err := iter.Next()
if err == iterator.Done {
break
Expand Down
26 changes: 26 additions & 0 deletions pkg/storage/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package storage

import (
"context"
"fmt"
"io"
"os"

Expand Down Expand Up @@ -76,6 +77,31 @@ func (r *testStorageSuite) TestGCS(c *C) {
c.Assert(list, Equals, "keykey1key2")
c.Assert(totalSize, Equals, int64(42))

// test 1003 files
totalSize = 0
for i := 0; i < 1000; i += 1 {
err = stg.WriteFile(ctx, fmt.Sprintf("f%d", i), []byte("data"))
c.Assert(err, IsNil)
}
filesSet := make(map[string]struct{}, 1003)
err = stg.WalkDir(ctx, nil, func(name string, size int64) error {
filesSet[name] = struct{}{}
totalSize += size
return nil
})
c.Assert(err, IsNil)
c.Assert(totalSize, Equals, int64(42+4000))
_, ok := filesSet["key"]
c.Assert(ok, IsTrue)
_, ok = filesSet["key1"]
c.Assert(ok, IsTrue)
_, ok = filesSet["key2"]
c.Assert(ok, IsTrue)
for i := 0; i < 1000; i += 1 {
_, ok = filesSet[fmt.Sprintf("f%d", i)]
c.Assert(ok, IsTrue)
}

efr, err := stg.Open(ctx, "key2")
c.Assert(err, IsNil)

Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap"

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/logutil"
)

const (
Expand Down Expand Up @@ -654,6 +655,19 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {

if realOffset == r.pos {
return realOffset, nil
} else if realOffset >= r.rangeInfo.Size {
// See: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
// because s3's GetObject interface doesn't allow get a range that matches zero length data,
// so if the position is out of range, we need to always return io.EOF after the seek operation.

// close current read and open a new one which target offset
if err := r.reader.Close(); err != nil {
log.L().Warn("close s3 reader failed, will ignore this error", logutil.ShortError(err))
}

r.reader = io.NopCloser(bytes.NewReader(nil))
r.pos = r.rangeInfo.Size
return r.pos, nil
}

// if seek ahead no more than 64k, we discard these data
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,15 @@ func (s *s3Suite) TestOpenSeek(c *C) {
c.Assert(err, IsNil)
c.Assert(n, Equals, 100)
c.Assert(slice, DeepEquals, someRandomBytes[990100:990200])

// test seek to the file end or bigger positions
for _, p := range []int64{1000000, 1000001, 2000000} {
offset, err = reader.Seek(p, io.SeekStart)
c.Assert(offset, Equals, int64(1000000))
c.Assert(err, IsNil)
_, err := reader.Read(slice)
c.Assert(err, Equals, io.EOF)
}
}

type limitedBytesReader struct {
Expand Down