Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

sql: fix pick up possible field (tidb#29399) (#399) #400

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ func checkTiDBTableRegionPkFields(pkFields, pkColTypes []string) (err error) {
err = errors.Errorf("unsupported primary key for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", "))
return
}
if _, ok := dataTypeNum[pkColTypes[0]]; !ok {
if _, ok := dataTypeInt[pkColTypes[0]]; !ok {
err = errors.Errorf("unsupported primary key type for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", "))
}
return
Expand Down
64 changes: 55 additions & 9 deletions v4/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"database/sql"
"fmt"
"io"
"math"
"net/url"
"strconv"
"strings"
Expand Down Expand Up @@ -441,6 +442,9 @@ func GetPrimaryKeyColumns(db *sql.Conn, database, table string) ([]string, error
return cols, nil
}

// getNumericIndex picks up indices according to the following priority:
// primary key > unique key with the smallest count > key with the max cardinality
// primary key with multi cols is before unique key with single col because we will sort result by primary keys
func getNumericIndex(db *sql.Conn, meta TableMeta) (string, error) {
database, table := meta.DatabaseName(), meta.TableName()
colName2Type := string2Map(meta.ColumnNames(), meta.ColumnTypes())
Expand All @@ -449,22 +453,64 @@ func getNumericIndex(db *sql.Conn, meta TableMeta) (string, error) {
if err != nil {
return "", errors.Annotatef(err, "sql: %s", keyQuery)
}
results, err := GetSpecifiedColumnValuesAndClose(rows, "NON_UNIQUE", "KEY_NAME", "COLUMN_NAME")
results, err := GetSpecifiedColumnValuesAndClose(rows, "NON_UNIQUE", "SEQ_IN_INDEX", "KEY_NAME", "COLUMN_NAME", "CARDINALITY")
if err != nil {
return "", errors.Annotatef(err, "sql: %s", keyQuery)
}
uniqueColumnName := ""
type keyColumnPair struct {
colName string
count uint64
}
var (
uniqueKeyMap = map[string]keyColumnPair{} // unique key name -> key column name, unique key columns count
keyColumn string
maxCardinality int64 = -1
)

// check primary key first, then unique key
for _, oneRow := range results {
var ok bool
if _, ok = dataTypeNum[colName2Type[oneRow[2]]]; ok && oneRow[1] == "PRIMARY" {
return oneRow[2], nil
nonUnique, seqInIndex, keyName, colName, cardinality := oneRow[0], oneRow[1], oneRow[2], oneRow[3], oneRow[4]
// only try pick the first column, because the second column of pk/uk in where condition will trigger a full table scan
if seqInIndex != "1" {
if pair, ok := uniqueKeyMap[keyName]; ok {
seqInIndexInt, err := strconv.ParseUint(seqInIndex, 10, 64)
if err == nil && seqInIndexInt > pair.count {
uniqueKeyMap[keyName] = keyColumnPair{pair.colName, seqInIndexInt}
}
}
continue
}
if uniqueColumnName != "" && oneRow[0] == "0" && ok {
uniqueColumnName = oneRow[2]
_, numberColumn := dataTypeInt[colName2Type[colName]]
if numberColumn {
switch {
case keyName == "PRIMARY":
return colName, nil
case nonUnique == "0":
uniqueKeyMap[keyName] = keyColumnPair{colName, 1}
// pick index column with max cardinality when there is no unique index
case len(uniqueKeyMap) == 0:
cardinalityInt, err := strconv.ParseInt(cardinality, 10, 64)
if err == nil && cardinalityInt > maxCardinality {
keyColumn = colName
maxCardinality = cardinalityInt
}
}
}
}
if len(uniqueKeyMap) > 0 {
var (
minCols uint64 = math.MaxUint64
uniqueKeyColumn string
)
for _, pair := range uniqueKeyMap {
if pair.count < minCols {
uniqueKeyColumn = pair.colName
minCols = pair.count
}
}
return uniqueKeyColumn, nil
}
return uniqueColumnName, nil
return keyColumn, nil
}

// FlushTableWithReadLock flush tables with read lock
Expand Down Expand Up @@ -952,7 +998,7 @@ func pickupPossibleField(meta TableMeta, db *sql.Conn) (string, error) {
if meta.HasImplicitRowID() {
return "_tidb_rowid", nil
}
// try to use pk
// try to use pk or uk
fieldName, err := getNumericIndex(db, meta)
if err != nil {
return "", err
Expand Down
147 changes: 146 additions & 1 deletion v4/export/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
"strings"
"testing"

tcontext "github.com/pingcap/dumpling/v4/context"
"github.com/stretchr/testify/require"

tcontext "github.com/pingcap/dumpling/v4/context"

"github.com/DATA-DOG/go-sqlmock"
"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -1548,6 +1549,150 @@ func TestBuildVersion3RegionQueries(t *testing.T) {
}
}

func TestPickupPossibleField(t *testing.T) {
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()

conn, err := db.Conn(context.Background())
require.NoError(t, err)

meta := &mockTableIR{
dbName: database,
tblName: table,
colNames: []string{"string1", "int1", "int2", "float1", "bin1", "int3", "bool1", "int4"},
colTypes: []string{"VARCHAR", "INT", "BIGINT", "FLOAT", "BINARY", "MEDIUMINT", "BOOL", "TINYINT"},
specCmt: []string{
"/*!40101 SET NAMES binary*/;",
},
}

testCases := []struct {
expectedErr error
expectedField string
hasImplicitRowID bool
showIndexResults [][]driver.Value
}{
{
errors.New("show index error"),
"",
false,
nil,
}, {
nil,
"_tidb_rowid",
true,
nil,
}, // both primary and unique key columns are integers, use primary key first
{
nil,
"int1",
false,
[][]driver.Value{
{table, 0, "PRIMARY", 1, "int1", "A", 2, nil, nil, "", "BTREE", "", ""},
{table, 0, "PRIMARY", 2, "float1", "A", 2, nil, nil, "", "BTREE", "", ""},
{table, 0, "int2", 1, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "string1", 1, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "int3", 1, "int3", "A", 20, nil, nil, "YES", "BTREE", "", ""},
},
}, // primary key doesn't have integer at seq 1, use unique key with integer
{
nil,
"int2",
false,
[][]driver.Value{
{table, 0, "PRIMARY", 1, "float1", "A", 2, nil, nil, "", "BTREE", "", ""},
{table, 0, "PRIMARY", 2, "int1", "A", 2, nil, nil, "", "BTREE", "", ""},
{table, 0, "int2", 1, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "string1", 1, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "int3", 1, "int3", "A", 20, nil, nil, "YES", "BTREE", "", ""},
},
}, // several unique keys, use unique key who has a integer in seq 1
{
nil,
"int1",
false,
[][]driver.Value{
{table, 0, "u1", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u1", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u1", 3, "bin1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 1, "float1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 2, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
},
}, // several unique keys and ordinary keys, use unique key who has a integer in seq 1
{
nil,
"int1",
false,
[][]driver.Value{
{table, 0, "u1", 1, "float1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u1", 2, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 3, "bin1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "int3", 1, "int3", "A", 2, nil, nil, "YES", "BTREE", "", ""},
},
}, // several unique keys and ordinary keys, use unique key who has less columns
{
nil,
"int2",
false,
[][]driver.Value{
{table, 0, "u1", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u1", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u1", 3, "bin1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 1, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "int3", 1, "int3", "A", 20, nil, nil, "YES", "BTREE", "", ""},
},
}, // several unique keys and ordinary keys, use key who has max cardinality
{
nil,
"int2",
false,
[][]driver.Value{
{table, 0, "PRIMARY", 1, "string1", "A", 2, nil, nil, "", "BTREE", "", ""},
{table, 0, "u1", 1, "float1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u1", 2, "int3", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "i1", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "i2", 1, "int2", "A", 5, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "i2", 2, "bool1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "i3", 1, "bin1", "A", 10, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "i3", 2, "int4", "A", 10, nil, nil, "YES", "BTREE", "", ""},
},
},
}

query := fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)
for i, testCase := range testCases {
t.Logf("case #%d", i)

meta.hasImplicitRowID = testCase.hasImplicitRowID
expectedErr := testCase.expectedErr
if expectedErr != nil {
mock.ExpectQuery(query).WillReturnError(expectedErr)
} else if !testCase.hasImplicitRowID {
rows := sqlmock.NewRows(showIndexHeaders)
for _, showIndexResult := range testCase.showIndexResults {
rows.AddRow(showIndexResult...)
}
mock.ExpectQuery(query).WillReturnRows(rows)
}

var field string
field, err = pickupPossibleField(meta, conn)
if expectedErr != nil {
require.ErrorIs(t, errors.Cause(err), expectedErr)
} else {
require.NoError(t, err)
require.Equal(t, testCase.expectedField, field)
}
require.NoError(t, mock.ExpectationsWereMet())
}
}

func makeVersion(major, minor, patch int64, preRelease string) *semver.Version {
return &semver.Version{
Major: major,
Expand Down
13 changes: 9 additions & 4 deletions v4/export/sql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ func initColTypeRowReceiverMap() {
"ENUM", "SET", "JSON", "NULL", "VAR_STRING",
}

dataTypeNumArr := []string{
dataTypeIntArr := []string{
"INTEGER", "BIGINT", "TINYINT", "SMALLINT", "MEDIUMINT",
"INT", "INT1", "INT2", "INT3", "INT8",
}

dataTypeNumArr := append(dataTypeIntArr, []string{
"FLOAT", "REAL", "DOUBLE", "DOUBLE PRECISION",
"DECIMAL", "NUMERIC", "FIXED",
"BOOL", "BOOLEAN",
}
}...)

dataTypeBinArr := []string{
"BLOB", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB", "LONG",
Expand All @@ -49,8 +52,10 @@ func initColTypeRowReceiverMap() {
dataTypeString[s] = struct{}{}
colTypeRowReceiverMap[s] = SQLTypeStringMaker
}
for _, s := range dataTypeIntArr {
dataTypeInt[s] = struct{}{}
}
for _, s := range dataTypeNumArr {
dataTypeNum[s] = struct{}{}
colTypeRowReceiverMap[s] = SQLTypeNumberMaker
}
for _, s := range dataTypeBinArr {
Expand All @@ -59,7 +64,7 @@ func initColTypeRowReceiverMap() {
}
}

var dataTypeString, dataTypeNum, dataTypeBin = make(map[string]struct{}), make(map[string]struct{}), make(map[string]struct{})
var dataTypeString, dataTypeInt, dataTypeBin = make(map[string]struct{}), make(map[string]struct{}), make(map[string]struct{})

func escapeBackslashSQL(s []byte, bf *bytes.Buffer) {
var (
Expand Down