Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning/parser: support STARTING BY #40821

Merged
merged 3 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,9 @@ type CSVConfig struct {
TrimLastSep bool `toml:"trim-last-separator" json:"trim-last-separator"`
NotNull bool `toml:"not-null" json:"not-null"`
BackslashEscape bool `toml:"backslash-escape" json:"backslash-escape"`
// hide these options for lightning configuration file, they can only be used by LOAD DATA
// https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-field-line-handling
StartingBy string `toml:"-" json:"-"`
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}

type MydumperRuntime struct {
Expand Down
63 changes: 54 additions & 9 deletions br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ type CSVParser struct {
blockParser
cfg *config.CSVConfig

comma []byte
quote []byte
newLine []byte
comma []byte
quote []byte
newLine []byte
startingBy []byte

charsetConvertor *CharsetConvertor
// These variables are used with IndexAnyByte to search a byte slice for the
Expand Down Expand Up @@ -120,6 +121,12 @@ func NewCSVParser(
}
unquoteStopSet = append(unquoteStopSet, newLineStopSet...)

if len(cfg.StartingBy) > 0 {
if strings.Contains(cfg.StartingBy, terminator) {
return nil, errors.New("starting-by cannot contain (line) terminator")
}
}

escFlavor := backslashEscapeFlavorNone
if cfg.BackslashEscape {
escFlavor = backslashEscapeFlavorMySQL
Expand All @@ -138,6 +145,7 @@ func NewCSVParser(
comma: []byte(separator),
quote: []byte(delimiter),
newLine: []byte(terminator),
startingBy: []byte(cfg.StartingBy),
escFlavor: escFlavor,
quoteByteSet: makeByteSet(quoteStopSet),
unquoteByteSet: makeByteSet(unquoteStopSet),
Expand Down Expand Up @@ -370,11 +378,43 @@ func (parser *CSVParser) readRecord(dst []string) ([]string, error) {

isEmptyLine := true
whitespaceLine := true
foundStartingByThisLine := false
prevToken := csvTokenNewLine
var firstToken csvToken

outside:
for {
// we should drop
// 1. the whole line if it does not contain startingBy
// 2. any character before startingBy
// since we have checked startingBy does not contain terminator, we can
// split at terminator to check the substring contains startingBy. Even
// if the terminator is inside a quoted field which means it's not the
// end of a line, the substring can still be dropped by rule 2.
if len(parser.startingBy) > 0 && !foundStartingByThisLine {
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
oldPos := parser.pos
content, _, err := parser.ReadUntilTerminator()
if err != nil {
if !(errors.Cause(err) == io.EOF) {
return nil, err
}
if len(content) == 0 {
return nil, err
}
// if we reached EOF, we should still check the content contains
// startingBy and try to put back and parse it.
}
idx := bytes.Index(content, parser.startingBy)
if idx == -1 {
continue
}
foundStartingByThisLine = true
content = content[idx+len(parser.startingBy):]
content = append(content, parser.newLine...)
parser.buf = append(content, parser.buf...)
parser.pos = oldPos + int64(idx+len(parser.startingBy))
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
}

content, firstByte, err := parser.readUntil(&parser.unquoteByteSet)

if len(content) > 0 {
Expand Down Expand Up @@ -415,6 +455,7 @@ outside:
}
whitespaceLine = false
case csvTokenNewLine:
foundStartingByThisLine = false
// new line = end of record (ignore empty lines)
prevToken = firstToken
if isEmptyLine {
Expand Down Expand Up @@ -578,17 +619,21 @@ func (parser *CSVParser) ReadColumns() error {
}

// ReadUntilTerminator seeks the file until the terminator token is found, and
// returns the file offset beyond the terminator.
// This function is used in strict-format dividing a CSV file.
func (parser *CSVParser) ReadUntilTerminator() (int64, error) {
// returns
// - the content before terminator
// - the file offset beyond the terminator
// - error
// Note that the terminator string pattern may be the content of a field, which
// means it's inside quotes. Caller should make sure to handle this case.
func (parser *CSVParser) ReadUntilTerminator() ([]byte, int64, error) {
for {
_, firstByte, err := parser.readUntil(&parser.newLineByteSet)
content, firstByte, err := parser.readUntil(&parser.newLineByteSet)
if err != nil {
return 0, err
return content, 0, err
}
parser.skipBytes(1)
if ok, err := parser.tryReadNewLine(firstByte); ok || err != nil {
return parser.pos, err
return content, parser.pos, err
}
}
}
234 changes: 234 additions & 0 deletions br/pkg/lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,35 @@ func runTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int
}
}

func runTestCasesCSVIgnoreNLines(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []testCase, ignoreNLines int) {
for _, tc := range cases {
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
assert.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(tc.input), blockBufSize, ioWorkers, false, charsetConvertor)
assert.NoError(t, err)

for ignoreNLines > 0 {
// IGNORE N LINES will directly find (line) terminator without checking it's inside quotes
_, _, err = parser.ReadUntilTerminator()
if errors.Cause(err) == io.EOF {
assert.Len(t, tc.expected, 0, "input = %q", tc.input)
return
}
assert.NoError(t, err)
ignoreNLines--
}

for i, row := range tc.expected {
comment := fmt.Sprintf("input = %q, row = %d", tc.input, i+1)
e := parser.ReadRow()
assert.NoErrorf(t, e, "input = %q, row = %d, error = %s", tc.input, i+1, errors.ErrorStack(e))
assert.Equal(t, int64(i)+1, parser.LastRow().RowID, comment)
assert.Equal(t, row, parser.LastRow().Row, comment)
}
assert.ErrorIsf(t, errors.Cause(parser.ReadRow()), io.EOF, "input = %q", tc.input)
}
}

func runFailingTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []string) {
for _, tc := range cases {
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
Expand Down Expand Up @@ -935,6 +964,211 @@ func TestTerminator(t *testing.T) {
runTestCasesCSV(t, &cfg, 1, testCases)
}

func TestStartingBy(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
StartingBy: "xxx",
},
}
testCases := []testCase{
{
input: `xxx"abc",1
something xxx"def",2
"ghi",3`,
expected: [][]types.Datum{
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
{types.NewStringDatum("def"), types.NewStringDatum("2")},
},
},
}
runTestCasesCSV(t, &cfg, 1, testCases)

testCases = []testCase{
{
input: `xxxabc,1
something xxxdef,2
ghi,3
"bad syntax"aaa`,
expected: [][]types.Datum{
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
{types.NewStringDatum("def"), types.NewStringDatum("2")},
},
},
}
runTestCasesCSV(t, &cfg, 1, testCases)

// test that special characters appears before StartingBy, and StartingBy only takes effect after once

testCases = []testCase{
{
input: `xxx"abc",1
something xxxdef,2
"ghi",3
"yyy"xxx"yyy",4
"yyy",5,xxxyyy,5
qwe,zzzxxxyyy,6
"yyyxxx"yyyxxx",7
yyy",5,xxxxxx,8
`,
expected: [][]types.Datum{
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
{types.NewStringDatum("def"), types.NewStringDatum("2")},
{types.NewStringDatum("yyy"), types.NewStringDatum("4")},
{types.NewStringDatum("yyy"), types.NewStringDatum("5")},
{types.NewStringDatum("yyy"), types.NewStringDatum("6")},
{types.NewStringDatum("yyyxxx"), types.NewStringDatum("7")},
{types.NewStringDatum("xxx"), types.NewStringDatum("8")},
},
},
}
runTestCasesCSV(t, &cfg, 1, testCases)

// test StartingBy contains special characters

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
StartingBy: "x,xx",
},
}
testCases = []testCase{
{
input: `x,xx"abc",1
something x,xxdef,2
"ghi",3
"yyy"xxx"yyy",4
"yyy",5,xxxyyy,5
qwe,zzzxxxyyy,6
"yyyxxx"yyyxxx",7
yyy",5,xx,xxxx,8`,
expected: [][]types.Datum{
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
{types.NewStringDatum("def"), types.NewStringDatum("2")},
{types.NewStringDatum("xx"), types.NewStringDatum("8")},
},
},
}
runTestCasesCSV(t, &cfg, 1, testCases)

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
StartingBy: `x"xx`,
},
}
testCases = []testCase{
{
input: `x"xx"abc",1
something x"xxdef,2
"ghi",3
"yyy"xxx"yyy",4
"yyy",5,xxxyyy,5
qwe,zzzxxxyyy,6
"yyyxxx"yyyxxx",7
yyy",5,xx"xxxx,8
`,
expected: [][]types.Datum{
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
{types.NewStringDatum("def"), types.NewStringDatum("2")},
{types.NewStringDatum("xx"), types.NewStringDatum("8")},
},
},
}
runTestCasesCSV(t, &cfg, 1, testCases)

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
StartingBy: "x\nxx",
},
}
_, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, nil, 1, ioWorkers, false, nil)
require.ErrorContains(t, err, "starting-by cannot contain (line) terminator")
}

func TestCallerCanIgnoreNLines(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems it's better to move this test to LOAD DATA. The parser shouldn't care about how the caller uses it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll do it in next PR

cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases := []testCase{
{
input: `1,1
2,2
3,3`,
expected: [][]types.Datum{
{types.NewStringDatum("3"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)

testCases = []testCase{
{
input: `"bad syntax"1
"b",2
"c",3`,
expected: [][]types.Datum{
{types.NewStringDatum("c"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases = []testCase{
{
input: `1,1
2,2
3,3`,
expected: [][]types.Datum{},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 100)

// test IGNORE N LINES will directly find (line) terminator without checking it's inside quotes

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases = []testCase{
{
input: `"a
",1
"b
",2
"c",3`,
expected: [][]types.Datum{
{types.NewStringDatum("b\n"), types.NewStringDatum("2")},
{types.NewStringDatum("c"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)
}

func TestCharsetConversion(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func SplitLargeFile(
if err = parser.SetPos(endOffset, prevRowIDMax); err != nil {
return 0, nil, nil, err
}
pos, err := parser.ReadUntilTerminator()
_, pos, err := parser.ReadUntilTerminator()
if err != nil {
if !errors.ErrorEqual(err, io.EOF) {
return 0, nil, nil, err
Expand Down