diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 95f2a9375156c..83f0f68d29032 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -153,6 +153,7 @@ go_library( "//pkg/util/execdetails", "//pkg/util/filter", "//pkg/util/gcutil", + "//pkg/util/generatedexpr", "//pkg/util/generic", "//pkg/util/hack", "//pkg/util/intest", @@ -319,6 +320,7 @@ go_test( "//pkg/store/gcworker", "//pkg/store/helper", "//pkg/store/mockstore", + "//pkg/store/mockstore/unistore", "//pkg/table", "//pkg/table/tables", "//pkg/tablecodec", @@ -347,6 +349,7 @@ go_test( "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", diff --git a/pkg/ddl/cancel_test.go b/pkg/ddl/cancel_test.go index c414f8cad0477..e4d1557630435 100644 --- a/pkg/ddl/cancel_test.go +++ b/pkg/ddl/cancel_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/testutil" + "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/testkit" @@ -68,6 +69,15 @@ var allTestCase = []testCancelJob{ {"alter table t add index idx_c2(c2)", true, model.StateDeleteOnly, true, true, nil}, {"alter table t add index idx_c2(c2)", true, model.StateWriteOnly, true, true, nil}, {"alter table t add index idx_cx2(c2)", false, model.StatePublic, false, true, nil}, + // Drop vector index + {"alter table t drop index v_idx_1", true, model.StatePublic, true, false, []string{"alter table t add vector index v_idx_1((VEC_L2_DISTANCE(v2))) USING HNSW"}}, + {"alter table t drop index v_idx_2", false, model.StateWriteOnly, true, false, []string{"alter table t add vector index v_idx_2((VEC_COSINE_DISTANCE(v2))) USING HNSW"}}, + {"alter table t drop index v_idx_3", false, model.StateDeleteOnly, false, true, []string{"alter table t add vector index v_idx_3((VEC_COSINE_DISTANCE(v2))) USING HNSW"}}, + {"alter table t drop index v_idx_4", false, model.StateDeleteReorganization, false, true, []string{"alter table t add vector index v_idx_4((VEC_COSINE_DISTANCE(v2))) USING HNSW"}}, + // Add vector key + {"alter table t add vector index v_idx((VEC_COSINE_DISTANCE(v2))) USING HNSW", true, model.StateNone, true, false, nil}, + {"alter table t add vector index v_idx((VEC_COSINE_DISTANCE(v2))) USING HNSW", true, model.StateDeleteOnly, true, true, nil}, + {"alter table t add vector index v_idx((VEC_COSINE_DISTANCE(v2))) USING HNSW", true, model.StateWriteOnly, true, true, nil}, // Add column. {"alter table t add column c4 bigint", true, model.StateNone, true, false, nil}, {"alter table t add column c4 bigint", true, model.StateDeleteOnly, true, true, nil}, @@ -204,7 +214,7 @@ func cancelSuccess(rs *testkit.Result) bool { return strings.Contains(rs.Rows()[0][1].(string), "success") } -func TestCancel(t *testing.T) { +func TestCancelVariousJobs(t *testing.T) { var enterCnt, exitCnt atomic.Int32 testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDeliveryJob", func(job *model.Job) { enterCnt.Add(1) }) testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.Job) { exitCnt.Add(1) }) @@ -213,10 +223,18 @@ func TestCancel(t *testing.T) { return enterCnt.Load() == exitCnt.Load() }, 10*time.Second, 10*time.Millisecond) } - store := testkit.CreateMockStoreWithSchemaLease(t, 100*time.Millisecond) + store := testkit.CreateMockStoreWithSchemaLease(t, 100*time.Millisecond, withMockTiFlash(2)) tk := testkit.NewTestKit(t, store) tkCancel := testkit.NewTestKit(t, store) + tiflash := infosync.NewMockTiFlash() + infosync.SetMockTiFlash(tiflash) + defer func() { + tiflash.Lock() + tiflash.StatusServer.Close() + tiflash.Unlock() + }() + // Prepare schema. tk.MustExec("use test") tk.MustExec("drop table if exists t_partition;") @@ -231,14 +249,16 @@ func TestCancel(t *testing.T) { partition p4 values less than (7096) );`) tk.MustExec(`create table t ( - c1 int, c2 int, c3 int, c11 tinyint, index fk_c1(c1) + c1 int, c2 int, c3 int, c11 tinyint, v2 vector(3), index fk_c1(c1) );`) + tk.MustExec("alter table t set tiflash replica 2 location labels 'a','b';") // Prepare data. for i := 0; i <= 2048; i++ { tk.MustExec(fmt.Sprintf("insert into t_partition values(%d, %d, %d)", i*3, i*2, i)) tk.MustExec(fmt.Sprintf("insert into t(c1, c2, c3) values(%d, %d, %d)", i*3, i*2, i)) } + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess", `return(2048)`) // Change some configurations. ddl.ReorgWaitTimeout = 10 * time.Millisecond diff --git a/pkg/ddl/create_table.go b/pkg/ddl/create_table.go index fd47121c5e331..ee9ec8715ee9a 100644 --- a/pkg/ddl/create_table.go +++ b/pkg/ddl/create_table.go @@ -1313,10 +1313,11 @@ func BuildTableInfo( // build index info. idxInfo, err := BuildIndexInfo( ctx, - tbInfo.Columns, + tbInfo, pmodel.NewCIStr(indexName), primary, unique, + false, constr.Keys, constr.Option, model.StatePublic, @@ -1480,7 +1481,7 @@ func addIndexForForeignKey(ctx sessionctx.Context, tbInfo *model.TableInfo) erro Length: types.UnspecifiedLength, }) } - idxInfo, err := BuildIndexInfo(ctx, tbInfo.Columns, idxName, false, false, keys, nil, model.StatePublic) + idxInfo, err := BuildIndexInfo(ctx, tbInfo, idxName, false, false, false, keys, nil, model.StatePublic) if err != nil { return errors.Trace(err) } diff --git a/pkg/ddl/delete_range.go b/pkg/ddl/delete_range.go index 8f9e103fe0152..e1d999694add0 100644 --- a/pkg/ddl/delete_range.go +++ b/pkg/ddl/delete_range.go @@ -376,14 +376,9 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap } case model.ActionDropIndex, model.ActionDropPrimaryKey: tableID := job.TableID - var indexName any - var partitionIDs []int64 - ifExists := make([]bool, 1) - allIndexIDs := make([]int64, 1) - if err := job.DecodeArgs(&indexName, &ifExists[0], &allIndexIDs[0], &partitionIDs); err != nil { - if err = job.DecodeArgs(&indexName, &ifExists, &allIndexIDs, &partitionIDs); err != nil { - return errors.Trace(err) - } + _, _, allIndexIDs, partitionIDs, _, err := job.DecodeDropIndexFinishedArgs() + if err != nil { + return errors.Trace(err) } // partitionIDs len is 0 if the dropped index is a global index, even if it is a partitioned table. if len(partitionIDs) == 0 { diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 043ca7225d167..8542507b1a77c 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -1824,7 +1824,7 @@ func (e *executor) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt err = e.CreateCheckConstraint(sctx, ident, pmodel.NewCIStr(constr.Name), spec.Constraint) } case ast.ConstraintVector: - err = createVectorIndex() + err = e.createVectorIndex(sctx, ident, pmodel.NewCIStr(constr.Name), spec.Constraint.Keys, constr.Option, constr.IfNotExists) default: // Nothing to do now. } @@ -4492,11 +4492,11 @@ func (e *executor) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexN tblInfo := t.Meta() // Check before the job is put to the queue. // This check is redundant, but useful. If DDL check fail before the job is put - // to job queue, the fail path logic is super fast. + // to job queue, the fail path logic is particularly fast. // After DDL job is put to the queue, and if the check fail, TiDB will run the DDL cancel logic. // The recover step causes DDL wait a few seconds, makes the unit test painfully slow. // For same reason, decide whether index is global here. - indexColumns, _, err := buildIndexColumns(ctx, tblInfo.Columns, indexPartSpecifications) + indexColumns, _, err := buildIndexColumns(ctx, tblInfo.Columns, indexPartSpecifications, false) if err != nil { return errors.Trace(err) } @@ -4556,8 +4556,155 @@ func (e *executor) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexN return errors.Trace(err) } -func createVectorIndex() error { - return dbterror.ErrUnsupportedAddVectorIndex.FastGenByArgs("not currently supported") +func checkIndexNameAndColumns(ctx sessionctx.Context, t table.Table, indexName pmodel.CIStr, + indexPartSpecifications []*ast.IndexPartSpecification, isVector, ifNotExists bool) (pmodel.CIStr, []*model.ColumnInfo, error) { + // Deal with anonymous index. + if len(indexName.L) == 0 { + colName := pmodel.NewCIStr("expression_index") + if isVector { + colName = pmodel.NewCIStr("vector_index") + } + if indexPartSpecifications[0].Column != nil { + colName = indexPartSpecifications[0].Column.Name + } + indexName = GetName4AnonymousIndex(t, colName, pmodel.NewCIStr("")) + } + + var err error + if indexInfo := t.Meta().FindIndexByName(indexName.L); indexInfo != nil { + if indexInfo.State != model.StatePublic { + // NOTE: explicit error message. See issue #18363. + err = dbterror.ErrDupKeyName.GenWithStack("index already exist %s; "+ + "a background job is trying to add the same index, "+ + "please check by `ADMIN SHOW DDL JOBS`", indexName) + } else { + err = dbterror.ErrDupKeyName.GenWithStackByArgs(indexName) + } + if ifNotExists { + ctx.GetSessionVars().StmtCtx.AppendNote(err) + return pmodel.CIStr{}, nil, nil + } + return pmodel.CIStr{}, nil, err + } + + if err = checkTooLongIndex(indexName); err != nil { + return pmodel.CIStr{}, nil, errors.Trace(err) + } + + // Build hidden columns if necessary. + var hiddenCols []*model.ColumnInfo + if !isVector { + hiddenCols, err = buildHiddenColumnInfoWithCheck(ctx, indexPartSpecifications, indexName, t.Meta(), t.Cols()) + if err != nil { + return pmodel.CIStr{}, nil, err + } + } + if err = checkAddColumnTooManyColumns(len(t.Cols()) + len(hiddenCols)); err != nil { + return pmodel.CIStr{}, nil, errors.Trace(err) + } + + return indexName, hiddenCols, nil +} + +func checkTableTypeForVectorIndex(tblInfo *model.TableInfo) error { + if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable { + return errors.Trace(dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Create Vector Index")) + } + if tblInfo.TempTableType != model.TempTableNone { + return dbterror.ErrOptOnTemporaryTable.FastGenByArgs("vector index") + } + if tblInfo.GetPartitionInfo() != nil { + return dbterror.ErrUnsupportedAddVectorIndex.FastGenByArgs("unsupported partition table") + } + if tblInfo.TiFlashReplica == nil || tblInfo.TiFlashReplica.Count == 0 { + return dbterror.ErrUnsupportedAddVectorIndex.FastGenByArgs("unsupported empty TiFlash replica, the replica is nil") + } + + return nil +} + +func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, indexName pmodel.CIStr, + indexPartSpecifications []*ast.IndexPartSpecification, indexOption *ast.IndexOption, ifNotExists bool) error { + schema, t, err := e.getSchemaAndTableByIdent(ti) + if err != nil { + return errors.Trace(err) + } + + tblInfo := t.Meta() + if err := checkTableTypeForVectorIndex(tblInfo); err != nil { + return errors.Trace(err) + } + + indexName, _, err = checkIndexNameAndColumns(ctx, t, indexName, indexPartSpecifications, true, ifNotExists) + if err != nil { + return errors.Trace(err) + } + _, funcExpr, err := buildVectorInfoWithCheck(indexPartSpecifications, tblInfo) + if err != nil { + return errors.Trace(err) + } + + // Check before the job is put to the queue. + // This check is redundant, but useful. If DDL check fail before the job is put + // to job queue, the fail path logic is particularly fast. + // After DDL job is put to the queue, and if the check fail, TiDB will run the DDL cancel logic. + // The recover step causes DDL wait a few seconds, makes the unit test painfully slow. + // For same reason, decide whether index is global here. + _, _, err = buildIndexColumns(ctx, tblInfo.Columns, indexPartSpecifications, true) + if err != nil { + return errors.Trace(err) + } + + // May be truncate comment here, when index comment too long and sql_mode it's strict. + sessionVars := ctx.GetSessionVars() + if _, err = validateCommentLength(sessionVars.StmtCtx.ErrCtx(), sessionVars.SQLMode, indexName.String(), &indexOption.Comment, dbterror.ErrTooLongTableComment); err != nil { + return errors.Trace(err) + } + + job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t) + if err != nil { + return errors.Trace(err) + } + job.Type = model.ActionAddVectorIndex + indexPartSpecifications[0].Expr = nil + job.Args = []any{indexName, indexPartSpecifications[0], indexOption, funcExpr} + // TODO: support CDCWriteSource + + err = e.DoDDLJob(ctx, job) + // key exists, but if_not_exists flags is true, so we ignore this error. + if dbterror.ErrDupKeyName.Equal(err) && ifNotExists { + ctx.GetSessionVars().StmtCtx.AppendNote(err) + return nil + } + return errors.Trace(err) +} + +func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) (*model.Job, error) { + tzName, tzOffset := ddlutil.GetTimeZone(ctx) + charset, collate := ctx.GetSessionVars().GetCharsetInfo() + job := &model.Job{ + SchemaID: schema.ID, + TableID: t.Meta().ID, + SchemaName: schema.Name.L, + TableName: t.Meta().Name.L, + BinlogInfo: &model.HistoryInfo{}, + ReorgMeta: &model.DDLReorgMeta{ + SQLMode: ctx.GetSessionVars().SQLMode, + Warnings: make(map[errors.ErrorID]*terror.Error), + WarningsCount: make(map[errors.ErrorID]int64), + Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, + }, + Priority: ctx.GetSessionVars().DDLReorgPriority, + Charset: charset, + Collate: collate, + SQLMode: ctx.GetSessionVars().SQLMode, + } + reorgMeta, err := newReorgMetaFromVariables(job, ctx) + if err != nil { + return nil, errors.Trace(err) + } + job.ReorgMeta = reorgMeta + return job, nil } func (e *executor) CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error { @@ -4595,7 +4742,7 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast return dbterror.ErrUnsupportedIndexType.GenWithStack("FULLTEXT and SPATIAL index is not supported") } if keyType == ast.IndexKeyTypeVector { - return createVectorIndex() + return e.createVectorIndex(ctx, ti, indexName, indexPartSpecifications, indexOption, ifNotExists) } unique := keyType == ast.IndexKeyTypeUnique schema, t, err := e.getSchemaAndTableByIdent(ti) @@ -4606,56 +4753,22 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast if t.Meta().TableCacheStatusType != model.TableCacheStatusDisable { return errors.Trace(dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Create Index")) } - // Deal with anonymous index. - if len(indexName.L) == 0 { - colName := pmodel.NewCIStr("expression_index") - if indexPartSpecifications[0].Column != nil { - colName = indexPartSpecifications[0].Column.Name - } - indexName = GetName4AnonymousIndex(t, colName, pmodel.NewCIStr("")) - } - - if indexInfo := t.Meta().FindIndexByName(indexName.L); indexInfo != nil { - if indexInfo.State != model.StatePublic { - // NOTE: explicit error message. See issue #18363. - err = dbterror.ErrDupKeyName.GenWithStack("Duplicate key name '%s'; "+ - "a background job is trying to add the same index, "+ - "please check by `ADMIN SHOW DDL JOBS`", indexName) - } else { - err = dbterror.ErrDupKeyName.GenWithStackByArgs(indexName) - } - if ifNotExists { - ctx.GetSessionVars().StmtCtx.AppendNote(err) - return nil - } - return err - } - - if err = checkTooLongIndex(indexName); err != nil { - return errors.Trace(err) - } - - tblInfo := t.Meta() - - // Build hidden columns if necessary. - hiddenCols, err := buildHiddenColumnInfoWithCheck(ctx, indexPartSpecifications, indexName, t.Meta(), t.Cols()) + indexName, hiddenCols, err := checkIndexNameAndColumns(ctx, t, indexName, indexPartSpecifications, false, ifNotExists) if err != nil { - return err - } - if err = checkAddColumnTooManyColumns(len(t.Cols()) + len(hiddenCols)); err != nil { return errors.Trace(err) } + tblInfo := t.Meta() finalColumns := make([]*model.ColumnInfo, len(tblInfo.Columns), len(tblInfo.Columns)+len(hiddenCols)) copy(finalColumns, tblInfo.Columns) finalColumns = append(finalColumns, hiddenCols...) // Check before the job is put to the queue. // This check is redundant, but useful. If DDL check fail before the job is put - // to job queue, the fail path logic is super fast. + // to job queue, the fail path logic is particularly fast. // After DDL job is put to the queue, and if the check fail, TiDB will run the DDL cancel logic. // The recover step causes DDL wait a few seconds, makes the unit test painfully slow. // For same reason, decide whether index is global here. - indexColumns, _, err := buildIndexColumns(ctx, finalColumns, indexPartSpecifications) + indexColumns, _, err := buildIndexColumns(ctx, finalColumns, indexPartSpecifications, false) if err != nil { return errors.Trace(err) } @@ -4700,7 +4813,7 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast } if indexOption != nil && indexOption.Tp == pmodel.IndexTypeHypo { // for hypo-index - indexInfo, err := BuildIndexInfo(ctx, tblInfo.Columns, indexName, false, unique, + indexInfo, err := BuildIndexInfo(ctx, tblInfo, indexName, false, unique, false, indexPartSpecifications, indexOption, model.StatePublic) if err != nil { return err @@ -4708,30 +4821,16 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast return e.addHypoIndexIntoCtx(ctx, ti.Schema, ti.Name, indexInfo) } - chs, coll := ctx.GetSessionVars().GetCharsetInfo() // global is set to 'false' is just there to be backwards compatible, // to avoid unmarshal issues, it is now part of indexOption. global := false - job := &model.Job{ - SchemaID: schema.ID, - TableID: t.Meta().ID, - SchemaName: schema.Name.L, - TableName: t.Meta().Name.L, - Type: model.ActionAddIndex, - BinlogInfo: &model.HistoryInfo{}, - ReorgMeta: nil, - Args: []any{unique, indexName, indexPartSpecifications, indexOption, hiddenCols, global}, - Priority: ctx.GetSessionVars().DDLReorgPriority, - Charset: chs, - Collate: coll, - CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, - SQLMode: ctx.GetSessionVars().SQLMode, - } - reorgMeta, err := newReorgMetaFromVariables(job, ctx) + job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t) if err != nil { - return err + return errors.Trace(err) } - job.ReorgMeta = reorgMeta + job.Type = model.ActionAddIndex + job.Args = []any{unique, indexName, indexPartSpecifications, indexOption, hiddenCols, global} + job.CDCWriteSource = ctx.GetSessionVars().CDCWriteSource err = e.DoDDLJob(ctx, job) // key exists, but if_not_exists flags is true, so we ignore this error. diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 897f788c26542..b9fd09e875c19 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -40,6 +40,7 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/storage" + "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" @@ -65,6 +66,7 @@ import ( "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/dbterror" + "github.com/pingcap/tidb/pkg/util/generatedexpr" tidblogutil "github.com/pingcap/tidb/pkg/util/logutil" decoder "github.com/pingcap/tidb/pkg/util/rowDecoder" "github.com/pingcap/tidb/pkg/util/size" @@ -99,7 +101,7 @@ func suppressErrorTooLongKeyForSchemaTracker(sctx sessionctx.Context) bool { return false } -func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, indexPartSpecifications []*ast.IndexPartSpecification) ([]*model.IndexColumn, bool, error) { +func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, indexPartSpecifications []*ast.IndexPartSpecification, isVector bool) ([]*model.IndexColumn, bool, error) { // Build offsets. idxParts := make([]*model.IndexColumn, 0, len(indexPartSpecifications)) var col *model.ColumnInfo @@ -113,7 +115,11 @@ func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, inde return nil, false, dbterror.ErrKeyColumnDoesNotExits.GenWithStack("column does not exist: %s", ip.Column.Name) } - if err := checkIndexColumn(ctx, col, ip.Length); err != nil { + if isVector && col.FieldType.GetType() != mysql.TypeTiDBVectorFloat32 { + return nil, false, dbterror.ErrUnsupportedAddVectorIndex.FastGenByArgs(fmt.Sprintf("only support vector type, but this is type: %s", col.FieldType.String())) + } + + if err := checkIndexColumn(ctx, col, ip.Length, isVector); err != nil { return nil, false, err } if col.FieldType.IsArray() { @@ -210,7 +216,7 @@ func indexColumnsLen(cols []*model.ColumnInfo, idxCols []*model.IndexColumn) (co return } -func checkIndexColumn(ctx sessionctx.Context, col *model.ColumnInfo, indexColumnLen int) error { +func checkIndexColumn(ctx sessionctx.Context, col *model.ColumnInfo, indexColumnLen int, isVectorIndex bool) error { if col.GetFlen() == 0 && (types.IsTypeChar(col.FieldType.GetType()) || types.IsTypeVarchar(col.FieldType.GetType())) { if col.Hidden { return errors.Trace(dbterror.ErrWrongKeyColumnFunctionalIndex.GenWithStackByArgs(col.GeneratedExprString)) @@ -231,7 +237,9 @@ func checkIndexColumn(ctx sessionctx.Context, col *model.ColumnInfo, indexColumn if col.Hidden { return errors.Errorf("Cannot create an expression index on a function that returns a VECTOR value") } - return errors.Trace(dbterror.ErrWrongKeyColumn.GenWithStackByArgs(col.Name)) + if !isVectorIndex { + return dbterror.ErrUnsupportedAddVectorIndex.FastGenByArgs("unsupported adding a general index on a vector column") + } } // Length must be specified and non-zero for BLOB and TEXT column indexes. @@ -325,10 +333,9 @@ func calcBytesLengthForDecimal(m int) int { // BuildIndexInfo builds a new IndexInfo according to the index information. func BuildIndexInfo( ctx sessionctx.Context, - allTableColumns []*model.ColumnInfo, + tblInfo *model.TableInfo, indexName pmodel.CIStr, - isPrimary bool, - isUnique bool, + isPrimary, isUnique, isVector bool, indexPartSpecifications []*ast.IndexPartSpecification, indexOption *ast.IndexOption, state model.SchemaState, @@ -337,19 +344,27 @@ func BuildIndexInfo( return nil, errors.Trace(err) } - idxColumns, mvIndex, err := buildIndexColumns(ctx, allTableColumns, indexPartSpecifications) - if err != nil { - return nil, errors.Trace(err) - } - // Create index info. idxInfo := &model.IndexInfo{ Name: indexName, - Columns: idxColumns, State: state, Primary: isPrimary, Unique: isUnique, - MVIndex: mvIndex, + } + + if isVector { + vectorInfo, _, err := buildVectorInfoWithCheck(indexPartSpecifications, tblInfo) + if err != nil { + return nil, errors.Trace(err) + } + idxInfo.VectorInfo = vectorInfo + } + + var err error + allTableColumns := tblInfo.Columns + idxInfo.Columns, idxInfo.MVIndex, err = buildIndexColumns(ctx, allTableColumns, indexPartSpecifications, isVector) + if err != nil { + return nil, errors.Trace(err) } if indexOption != nil { @@ -360,6 +375,8 @@ func BuildIndexInfo( if indexOption.Tp == pmodel.IndexTypeInvalid { // Use btree as default index type. idxInfo.Tp = pmodel.IndexTypeBtree + } else if !isVector && indexOption.Tp == pmodel.IndexTypeHNSW { + return nil, dbterror.ErrUnsupportedIndexType.FastGenByArgs("Only support vector index with HNSW type, but it's non-vector index") } else { idxInfo.Tp = indexOption.Tp } @@ -372,6 +389,61 @@ func BuildIndexInfo( return idxInfo, nil } +func buildVectorInfoWithCheck(indexPartSpecifications []*ast.IndexPartSpecification, + tblInfo *model.TableInfo) (*model.VectorIndexInfo, string, error) { + if len(indexPartSpecifications) != 1 { + return nil, "", dbterror.ErrUnsupportedAddVectorIndex.FastGenByArgs("unsupported no function") + } + + idxPart := indexPartSpecifications[0] + f, ok := idxPart.Expr.(*ast.FuncCallExpr) + if !ok { + return nil, "", dbterror.ErrUnsupportedAddVectorIndex.FastGenByArgs(fmt.Sprintf("unsupported function: %v", idxPart.Expr)) + } + distanceMetric, ok := variable.DistanceMetric4VectorIndex[f.FnName.L] + if !ok { + return nil, "", dbterror.ErrUnsupportedAddVectorIndex.FastGenByArgs("unsupported function") + } + colExpr, ok := f.Args[0].(*ast.ColumnNameExpr) + if !ok { + return nil, "", dbterror.ErrUnsupportedAddVectorIndex.FastGenByArgs(fmt.Sprintf("unsupported function args: %v", f.Args[0])) + } + colInfo := findColumnByName(colExpr.Name.Name.L, tblInfo) + if colInfo == nil { + return nil, "", infoschema.ErrColumnNotExists.GenWithStackByArgs(colExpr.Name.Name.String()) + } + + // check duplicated function on the same column + for _, idx := range tblInfo.Indices { + if idx.VectorInfo == nil { + continue + } + if idxCol := idx.FindColumnByName(colInfo.Name.L); idxCol == nil { + continue + } + if idx.VectorInfo.DistanceMetric == distanceMetric { + return nil, "", dbterror.ErrDupKeyName.FastGen(fmt.Sprintf("Duplicate vector index function name 'vector index: %s, column name: %s, duplicate function name: %s'", idx.Name, colInfo.Name, f.FnName)) + } + } + if colInfo.FieldType.GetFlen() <= 0 { + return nil, "", errors.Errorf("add vector index can only be defined on fixed-dimension vector columns") + } + + exprStr, err := restoreFuncCall(f) + if err != nil { + return nil, "", errors.Trace(err) + } + + // It's used for build buildIndexColumns. + idxPart.Column = &ast.ColumnName{Name: colInfo.Name} + idxPart.Length = types.UnspecifiedLength + + return &model.VectorIndexInfo{ + Dimension: uint64(colInfo.FieldType.GetFlen()), + DistanceMetric: distanceMetric, + }, exprStr, nil +} + // AddIndexColumnFlag aligns the column flags of columns in TableInfo to IndexInfo. func AddIndexColumnFlag(tblInfo *model.TableInfo, indexInfo *model.IndexInfo) { if indexInfo.Primary { @@ -599,6 +671,280 @@ func decodeAddIndexArgs(job *model.Job) ( return } +func checkAndBuildIndexInfo(job *model.Job, tblInfo *model.TableInfo, indexName pmodel.CIStr, isPK, unique, isVector bool, + indexPartSpecifications []*ast.IndexPartSpecification, indexOption *ast.IndexOption, hiddenCols []*model.ColumnInfo) (*model.IndexInfo, error) { + var err error + indexInfo := tblInfo.FindIndexByName(indexName.L) + if indexInfo != nil { + if indexInfo.State == model.StatePublic { + err = dbterror.ErrDupKeyName.GenWithStack("index already exist %s", indexName) + if isPK { + err = infoschema.ErrMultiplePriKey + } + return nil, err + } + return indexInfo, nil + } + + for _, hiddenCol := range hiddenCols { + columnInfo := model.FindColumnInfo(tblInfo.Columns, hiddenCol.Name.L) + if columnInfo != nil && columnInfo.State == model.StatePublic { + // We already have a column with the same column name. + // TODO: refine the error message + return nil, infoschema.ErrColumnExists.GenWithStackByArgs(hiddenCol.Name) + } + } + + if len(hiddenCols) > 0 { + for _, hiddenCol := range hiddenCols { + InitAndAddColumnToTable(tblInfo, hiddenCol) + } + } + if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil { + return nil, errors.Trace(err) + } + indexInfo, err = BuildIndexInfo( + nil, + tblInfo, + indexName, + isPK, + unique, + isVector, + indexPartSpecifications, + indexOption, + model.StateNone, + ) + if err != nil { + return nil, errors.Trace(err) + } + if isPK { + if _, err = CheckPKOnGeneratedColumn(tblInfo, indexPartSpecifications); err != nil { + return nil, err + } + } + indexInfo.ID = AllocateIndexID(tblInfo) + tblInfo.Indices = append(tblInfo.Indices, indexInfo) + if err = checkTooManyIndexes(tblInfo.Indices); err != nil { + return nil, errors.Trace(err) + } + // Here we need do this check before set state to `DeleteOnly`, + // because if hidden columns has been set to `DeleteOnly`, + // the `DeleteOnly` columns are missing when we do this check. + if err := checkInvisibleIndexOnPK(tblInfo); err != nil { + return nil, err + } + logutil.DDLLogger().Info("[ddl] run add index job", zap.String("job", job.String()), zap.Reflect("indexInfo", indexInfo)) + return indexInfo, nil +} + +func (w *worker) onCreateVectorIndex(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, err error) { + // Handle the rolling back job. + if job.IsRollingback() { + ver, err = onDropIndex(jobCtx, t, job) + if err != nil { + return ver, errors.Trace(err) + } + return ver, nil + } + + // Handle normal job. + schemaID := job.SchemaID + tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID) + if err != nil { + return ver, errors.Trace(err) + } + if err := checkTableTypeForVectorIndex(tblInfo); err != nil { + return ver, errors.Trace(err) + } + + var ( + indexName pmodel.CIStr + indexOption *ast.IndexOption + indexPartSpecification *ast.IndexPartSpecification + funcExpr string + ) + err = job.DecodeArgs(&indexName, &indexPartSpecification, &indexOption, &funcExpr) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + indexPartSpecification.Expr, err = generatedexpr.ParseExpression(funcExpr) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + defer func() { + indexPartSpecification.Expr = nil + }() + + indexInfo, err := checkAndBuildIndexInfo(job, tblInfo, indexName, false, false, true, []*ast.IndexPartSpecification{indexPartSpecification}, indexOption, nil) + if err != nil { + return ver, errors.Trace(err) + } + originalState := indexInfo.State + switch indexInfo.State { + case model.StateNone: + // none -> delete only + indexInfo.State = model.StateDeleteOnly + ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, originalState != indexInfo.State) + if err != nil { + return ver, err + } + job.SchemaState = model.StateDeleteOnly + case model.StateDeleteOnly: + // delete only -> write only + indexInfo.State = model.StateWriteOnly + ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != indexInfo.State) + if err != nil { + return ver, err + } + job.SchemaState = model.StateWriteOnly + case model.StateWriteOnly: + // write only -> reorganization + indexInfo.State = model.StateWriteReorganization + ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != indexInfo.State) + if err != nil { + return ver, err + } + // Initialize SnapshotVer to 0 for later reorganization check. + job.SnapshotVer = 0 + job.SchemaState = model.StateWriteReorganization + case model.StateWriteReorganization: + // reorganization -> public + tbl, err := getTable(jobCtx.getAutoIDRequirement(), schemaID, tblInfo) + if err != nil { + return ver, errors.Trace(err) + } + + if job.IsCancelling() { + return convertAddIdxJob2RollbackJob(jobCtx, t, job, tbl.Meta(), []*model.IndexInfo{indexInfo}, dbterror.ErrCancelledDDLJob) + } + + // Send sync schema notification to TiFlash. + if job.SnapshotVer == 0 { + currVer, err := getValidCurrentVersion(jobCtx.store) + if err != nil { + return ver, errors.Trace(err) + } + err = infosync.SyncTiFlashTableSchema(jobCtx.ctx, tbl.Meta().ID) + if err != nil { + return ver, errors.Trace(err) + } + job.SnapshotVer = currVer.Ver + return ver, nil + } + + // Check the progress of the TiFlash backfill index. + var done bool + done, ver, err = w.checkVectorIndexProcessOnTiFlash(jobCtx, t, job, tbl, indexInfo) + if err != nil || !done { + return ver, err + } + + indexInfo.State = model.StatePublic + ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != indexInfo.State) + if err != nil { + return ver, errors.Trace(err) + } + job.Args = []any{indexInfo.ID, false /*if exists*/, getPartitionIDs(tblInfo)} + // Finish this job. + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + logutil.DDLLogger().Info("[ddl] run add vector index job done", + zap.Int64("ver", ver), + zap.String("charset", job.Charset), + zap.String("collation", job.Collate)) + default: + err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", indexInfo.State) + } + + return ver, errors.Trace(err) +} + +func (w *worker) checkVectorIndexProcessOnTiFlash(jobCtx *jobContext, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo, +) (done bool, ver int64, err error) { + err = w.checkVectorIndexProcess(jobCtx, tbl, job, indexInfo) + if err != nil { + if dbterror.ErrWaitReorgTimeout.Equal(err) { + return false, ver, nil + } + if !errorIsRetryable(err, job) { + logutil.DDLLogger().Warn("run add vector index job failed, convert job to rollback", zap.Stringer("job", job), zap.Error(err)) + ver, err = convertAddIdxJob2RollbackJob(jobCtx, t, job, tbl.Meta(), []*model.IndexInfo{indexInfo}, err) + } + return false, ver, errors.Trace(err) + } + + return true, ver, nil +} + +func (w *worker) checkVectorIndexProcess(jobCtx *jobContext, tbl table.Table, job *model.Job, index *model.IndexInfo) error { + waitTimeout := ReorgWaitTimeout + for { + select { + case <-w.ddlCtx.ctx.Done(): + return dbterror.ErrInvalidWorker.GenWithStack("worker is closed") + case <-time.After(waitTimeout): + logutil.DDLLogger().Info("[ddl] index backfill state running, check vector index process", + zap.Stringer("job", job), zap.Stringer("index name", index.Name), zap.Int64("index ID", index.ID), + zap.Duration("wait time", waitTimeout), zap.Int64("total added row count", job.RowCount)) + return dbterror.ErrWaitReorgTimeout + default: + } + + if !w.ddlCtx.isOwner() { + // If it's not the owner, we will try later, so here just returns an error. + logutil.DDLLogger().Info("DDL is not the DDL owner", zap.String("ID", w.ddlCtx.uuid)) + return errors.Trace(dbterror.ErrNotOwner) + } + + isDone, addedIndexCnt, err := w.checkVectorIndexProcessOnce(jobCtx, tbl, index.ID) + if err != nil { + return errors.Trace(err) + } + job.RowCount = addedIndexCnt + + if isDone { + break + } + + time.Sleep(500 * time.Millisecond) + } + return nil +} + +// checkVectorIndexProcessOnce checks the backfill process of a vector index from TiFlash once. +func (w *worker) checkVectorIndexProcessOnce(jobCtx *jobContext, tbl table.Table, indexID int64) (bool, int64, error) { + failpoint.Inject("MockCheckVectorIndexProcess", func(val failpoint.Value) { + if valInt, ok := val.(int); ok { + if valInt == -1 { + failpoint.Return(false, 0, nil) + } else { + failpoint.Return(true, int64(valInt), nil) + } + } + }) + + // TODO: We need to add error_msg for to show error information. + sql := fmt.Sprintf("select rows_stable_not_indexed, rows_stable_indexed from information_schema.tiflash_indexes where table_id = %d and index_id = %d;", + tbl.Meta().ID, indexID) + rows, err := w.sess.Execute(jobCtx.ctx, sql, "add_vector_index_check_result") + if err != nil || len(rows) == 0 { + return false, 0, errors.Trace(err) + } + + // handle info from multiple TiFlash nodes + notAddedIndexCnt, addedIndexCnt := int64(0), int64(0) + for _, row := range rows { + notAddedIndexCnt += row.GetInt64(0) + addedIndexCnt += row.GetInt64(1) + } + if notAddedIndexCnt != 0 { + return false, 0, nil + } + + return true, rows[0].GetInt64(1), nil +} + func (w *worker) onCreateIndex(jobCtx *jobContext, t *meta.Meta, job *model.Job, isPK bool) (ver int64, err error) { // Handle the rolling back job. if job.IsRollingback() { @@ -640,70 +986,10 @@ func (w *worker) onCreateIndex(jobCtx *jobContext, t *meta.Meta, job *model.Job, allIndexInfos := make([]*model.IndexInfo, 0, len(indexNames)) for i, indexName := range indexNames { - indexInfo := tblInfo.FindIndexByName(indexName.L) - if indexInfo != nil && indexInfo.State == model.StatePublic { + indexInfo, err := checkAndBuildIndexInfo(job, tblInfo, indexName, isPK, uniques[i], false, indexPartSpecifications[i], indexOption[i], hiddenCols[i]) + if err != nil { job.State = model.JobStateCancelled - err = dbterror.ErrDupKeyName.GenWithStack("index already exist %s", indexName) - if isPK { - err = infoschema.ErrMultiplePriKey - } - return ver, err - } - if indexInfo == nil { - for _, hiddenCol := range hiddenCols[i] { - columnInfo := model.FindColumnInfo(tblInfo.Columns, hiddenCol.Name.L) - if columnInfo != nil && columnInfo.State == model.StatePublic { - // We already have a column with the same column name. - job.State = model.JobStateCancelled - // TODO: refine the error message - return ver, infoschema.ErrColumnExists.GenWithStackByArgs(hiddenCol.Name) - } - } - } - if indexInfo == nil { - if len(hiddenCols) > 0 { - for _, hiddenCol := range hiddenCols[i] { - InitAndAddColumnToTable(tblInfo, hiddenCol) - } - } - if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - indexInfo, err = BuildIndexInfo( - nil, - tblInfo.Columns, - indexName, - isPK, - uniques[i], - indexPartSpecifications[i], - indexOption[i], - model.StateNone, - ) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - if isPK { - if _, err = CheckPKOnGeneratedColumn(tblInfo, indexPartSpecifications[i]); err != nil { - job.State = model.JobStateCancelled - return ver, err - } - } - indexInfo.ID = AllocateIndexID(tblInfo) - tblInfo.Indices = append(tblInfo.Indices, indexInfo) - if err = checkTooManyIndexes(tblInfo.Indices); err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - // Here we need do this check before set state to `DeleteOnly`, - // because if hidden columns has been set to `DeleteOnly`, - // the `DeleteOnly` columns are missing when we do this check. - if err := checkInvisibleIndexOnPK(tblInfo); err != nil { - job.State = model.JobStateCancelled - return ver, err - } - logutil.DDLLogger().Info("run add index job", zap.Stringer("job", job), zap.Reflect("indexInfo", indexInfo)) + return ver, errors.Trace(err) } allIndexInfos = append(allIndexInfos, indexInfo) } @@ -836,7 +1122,7 @@ SwitchIndexState: zap.String("charset", job.Charset), zap.String("collation", job.Collate)) default: - err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", tblInfo.State) + err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", allIndexInfos[0].State) } return ver, errors.Trace(err) @@ -1176,12 +1462,13 @@ func onDropIndex(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ // the partition ids were append by convertAddIdxJob2RollbackJob, it is weird, but for the compatibility, // we should keep appending the partitions in the convertAddIdxJob2RollbackJob. job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) + isVector := allIndexInfos[0].VectorInfo != nil // Global index key has t{tableID}_ prefix. // Assign partitionIDs empty to guarantee correct prefix in insertJobIntoDeleteRangeTable. if allIndexInfos[0].Global { - job.Args = append(job.Args, idxIDs[0], []int64{}) + job.Args = append(job.Args, idxIDs[0], []int64{}, isVector) } else { - job.Args = append(job.Args, idxIDs[0], getPartitionIDs(tblInfo)) + job.Args = append(job.Args, idxIDs[0], getPartitionIDs(tblInfo), isVector) } } default: diff --git a/pkg/ddl/index_modify_test.go b/pkg/ddl/index_modify_test.go index 1d6b84b8473a4..40cd21fb998bf 100644 --- a/pkg/ddl/index_modify_test.go +++ b/pkg/ddl/index_modify_test.go @@ -26,24 +26,36 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/pkg/ddl" testddlutil "github.com/pingcap/tidb/pkg/ddl/testutil" + "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/meta/model" + pmodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/terror" + sessiontypes "github.com/pingcap/tidb/pkg/session/types" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/sessiontxn" "github.com/pingcap/tidb/pkg/store/mockstore" + "github.com/pingcap/tidb/pkg/store/mockstore/unistore" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/external" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/codec" + contextutil "github.com/pingcap/tidb/pkg/util/context" "github.com/pingcap/tidb/pkg/util/dbterror" + "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/testutils" ) const indexModifyLease = 600 * time.Millisecond @@ -1059,16 +1071,282 @@ func TestAddIndexUniqueFailOnDuplicate(t *testing.T) { ddl.ResultCounterForTest = nil } -func TestAddVectorIndex(t *testing.T) { - store := testkit.CreateMockStore(t) +// withMockTiFlash sets the mockStore to have N TiFlash stores (naming as tiflash0, tiflash1, ...). +func withMockTiFlash(nodes int) mockstore.MockTiKVStoreOption { + return mockstore.WithMultipleOptions( + mockstore.WithClusterInspector(func(c testutils.Cluster) { + mockCluster := c.(*unistore.Cluster) + _, _, region1 := mockstore.BootstrapWithSingleStore(c) + tiflashIdx := 0 + for tiflashIdx < nodes { + store2 := c.AllocID() + peer2 := c.AllocID() + addr2 := fmt.Sprintf("tiflash%d", tiflashIdx) + mockCluster.AddStore(store2, addr2, &metapb.StoreLabel{Key: "engine", Value: "tiflash"}) + mockCluster.AddPeer(region1, store2, peer2) + tiflashIdx++ + } + }), + mockstore.WithStoreType(mockstore.EmbedUnistore), + ) +} + +func getJobsBySQL(se sessiontypes.Session, tbl, condition string) ([]*model.Job, error) { + rs, err := se.Execute(context.Background(), fmt.Sprintf("select job_meta from mysql.%s %s", tbl, condition)) + if err != nil { + return nil, errors.Trace(err) + } + if len(rs) != 1 { + return nil, errors.New("row cnt is wrong") + } + var rows []chunk.Row + defer terror.Call(rs[0].Close) + if rows, err = sqlexec.DrainRecordSet(context.Background(), rs[0], 8); err != nil { + return nil, errors.Trace(err) + } + jobs := make([]*model.Job, 0, 16) + for _, row := range rows { + jobBinary := row.GetBytes(0) + job := model.Job{} + err := job.Decode(jobBinary) + if err != nil { + return nil, errors.Trace(err) + } + jobs = append(jobs, &job) + } + return jobs, nil +} + +func TestAddVectorIndexSimple(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, tiflashReplicaLease, withMockTiFlash(2)) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") + tk.MustExec("drop table if exists t, pt;") + + tiflash := infosync.NewMockTiFlash() + infosync.SetMockTiFlash(tiflash) + defer func() { + tiflash.Lock() + tiflash.StatusServer.Close() + tiflash.Unlock() + }() + + // test for errors + // for partition table + tk.MustExec(`create table pt( + a int, + b vector, + c int) + PARTITION BY RANGE ( a ) ( + PARTITION p0 VALUES LESS THAN (6), + PARTITION p1 VALUES LESS THAN (11), + PARTITION p2 VALUES LESS THAN (21) + );`) + tk.MustContainErrMsg("alter table pt add vector index idx((vec_cosine_distance(b))) USING HNSW;", + "Unsupported add vector index: unsupported partition table") + // for TiFlash replica + tk.MustExec("create table t (a int, b vector, c vector(3), d vector(4));") + tk.MustContainErrMsg("alter table t add vector index idx((VEC_COSINE_DISTANCE(b))) USING HNSW COMMENT 'b comment';", + "unsupported empty TiFlash replica, the replica is nil") + tk.MustExec("alter table t set tiflash replica 2 location labels 'a','b';") + tk.MustContainErrMsg("alter table t add key idx(a) USING HNSW;", + "Only support vector index with HNSW type, but it's non-vector index") + // for a wrong column + tk.MustGetErrCode("alter table t add vector index ((vec_cosine_distance(n))) USING HNSW;", errno.ErrBadField) + // for wrong functions + tk.MustGetErrCode("alter table t add vector index ((vec_cosine_distance(a))) USING HNSW;", errno.ErrUnsupportedDDLOperation) + tk.MustContainErrMsg("alter table t add vector index ((vec_cosine_distance(a,'[1,2.1,3.3]'))) USING HNSW;", + "Unsupported add vector index: only support vector type, but this is type: int(11)") + tk.MustGetErrCode("alter table t add vector index ((vec_l1_distance(b))) USING HNSW;", errno.ErrUnsupportedDDLOperation) + tk.MustGetErrCode("alter table t add vector index ((vec_negative_inner_product(b))) USING HNSW;", errno.ErrUnsupportedDDLOperation) + tk.MustGetErrCode("alter table t add vector index ((lower(b))) USING HNSW;", errno.ErrUnsupportedDDLOperation) + + // for duplicated index name + tk.MustExec("alter table t add key idx(a);") + tk.MustGetErrCode("alter table t add vector index idx((vec_cosine_distance(c))) USING HNSW;", errno.ErrDupKeyName) + // for duplicated function + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess", `return(1)`) + tk.MustContainErrMsg("alter table t add vector index vecIdx((vec_cosine_distance(b))) USING HNSW;", + "add vector index can only be defined on fixed-dimension vector columns") + tk.MustExec("alter table t add vector index vecIdx((vec_cosine_distance(c))) USING HNSW;") + tk.MustGetErrCode("alter table t add vector index vecIdx1((vec_cosine_distance(c))) USING HNSW;", errno.ErrDupKeyName) + tk.MustExec("alter table t add vector index vecIdx1((vec_cosine_distance(d))) USING HNSW;") + tk.MustExec("alter table t add vector index vecIdx2((vec_l2_distance(c))) USING HNSW;") + // for "if not exists" + tk.MustExec("alter table t drop index vecIdx2") + tk.MustExec("alter table t add vector index if not exists idx((vec_l2_distance(c))) USING HNSW;") + warnings := tk.Session().GetSessionVars().StmtCtx.GetWarnings() + require.GreaterOrEqual(t, len(warnings), 1) + lastWarn := warnings[len(warnings)-1] + require.Truef(t, terror.ErrorEqual(dbterror.ErrDupKeyName, lastWarn.Err), "err %v", lastWarn.Err) + require.Equal(t, contextutil.WarnLevelNote, lastWarn.Level) + tk.MustContainErrMsg("alter table t add vector index if not exists idx((vec_cosine_distance(c))) USING HNSW;", + "Duplicate vector index function name 'vector index: vecIdx, column name: c, duplicate function name: vec_cosine_distance'") + + // normal test cases tk.MustExec("drop table if exists t;") - tk.MustContainErrMsg("create table t(a int, b vector(3), vector index((VEC_COSINE_DISTANCE(b))) USING HNSW);", - "Unsupported add vector index: not currently supported") tk.MustExec("create table t (a int, b vector(3));") - tk.MustContainErrMsg("alter table t add vector index idx((VEC_COSINE_DISTANCE(b))) USING HNSW COMMENT 'b comment';", - "Unsupported add vector index: not currently supported") - tk.MustContainErrMsg("create vector index idx on t ((VEC_COSINE_DISTANCE(b))) USING HNSW COMMENT 'b comment';", - "Unsupported add vector index: not currently supported") + tk.MustExec("alter table t set tiflash replica 2 location labels 'a','b';") + tk.MustExec("insert into t values (1, '[1,2.1,3.3]');") + tk.MustQuery("SELECT * FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE table_name = 't'").Check(testkit.Rows()) + + tbl, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + indexes := tbl.Meta().Indices + require.Equal(t, 0, len(indexes)) + tk.MustExec("alter table t add vector index idx((VEC_COSINE_DISTANCE(b))) USING HNSW COMMENT 'b comment';") + tbl, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + indexes = tbl.Meta().Indices + require.Equal(t, 1, len(indexes)) + require.Equal(t, pmodel.IndexTypeHNSW, indexes[0].Tp) + require.Equal(t, model.DistanceMetricCosine, indexes[0].VectorInfo.DistanceMetric) + // test row count + jobs, err := getJobsBySQL(tk.Session(), "tidb_ddl_history", "order by job_id desc limit 1") + require.NoError(t, err) + require.Equal(t, 1, len(jobs)) + require.Equal(t, model.ActionAddVectorIndex, jobs[0].Type) + require.Equal(t, int64(1), jobs[0].RowCount) + + tk.MustQuery("select * from t;").Check(testkit.Rows("1 [1,2.1,3.3]")) + tk.MustExec("admin check table t") + tk.MustExec("admin check index t idx") + tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n" + + " `a` int(11) DEFAULT NULL,\n" + + " `b` vector(3) DEFAULT NULL,\n" + + " VECTOR INDEX `idx`((VEC_COSINE_DISTANCE(`b`))) COMMENT 'b comment'\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + + tk.MustExec("alter table t drop index idx;") + tbl, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + indexes = tbl.Meta().Indices + require.Equal(t, 0, len(indexes)) + gcCnt := tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0] + require.Equal(t, "0", gcCnt) + tk.MustQuery("select * from t;").Check(testkit.Rows("1 [1,2.1,3.3]")) + tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n" + + " `a` int(11) DEFAULT NULL,\n" + + " `b` vector(3) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + + // test create a vector index with same name + tk.MustExec("create vector index idx on t ((VEC_COSINE_DISTANCE(b))) USING HNSW COMMENT 'b comment';") + tbl, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + indexes = tbl.Meta().Indices + require.Equal(t, 1, len(indexes)) + require.Equal(t, pmodel.IndexTypeHNSW, indexes[0].Tp) + require.Equal(t, model.DistanceMetricCosine, indexes[0].VectorInfo.DistanceMetric) + tk.MustQuery("select * from t;").Check(testkit.Rows("1 [1,2.1,3.3]")) + tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n" + + " `a` int(11) DEFAULT NULL,\n" + + " `b` vector(3) DEFAULT NULL,\n" + + " VECTOR INDEX `idx`((VEC_COSINE_DISTANCE(`b`))) COMMENT 'b comment'\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + + // test anonymous index + tk.MustExec("alter table t drop index idx;") + tk.MustExec("alter table t add vector index ((vec_l2_distance(b))) USING HNSW;") + tbl, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, 1, len(tbl.Meta().Indices)) + idx := tbl.Meta().Indices[0] + require.Equal(t, "vector_index", idx.Name.O) + require.Equal(t, pmodel.IndexTypeHNSW, idx.Tp) + require.Equal(t, model.DistanceMetricL2, idx.VectorInfo.DistanceMetric) + tk.MustExec("alter table t add key vector_index_2(a);") + tk.MustExec("alter table t add vector index ((VEC_COSINE_DISTANCE(b))) USING HNSW;") + tbl, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, 3, len(tbl.Meta().Indices)) + require.Equal(t, "vector_index_2", tbl.Meta().Indices[1].Name.O) + require.Equal(t, true, tbl.Meta().Indices[1].VectorInfo == nil) + require.Equal(t, "vector_index_3", tbl.Meta().Indices[2].Name.O) + require.Equal(t, false, tbl.Meta().Indices[2].VectorInfo == nil) +} + +func TestAddVectorIndexRollback(t *testing.T) { + store, _ := testkit.CreateMockStoreAndDomainWithSchemaLease(t, tiflashReplicaLease, withMockTiFlash(2)) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + limit := variable.GetDDLErrorCountLimit() + variable.SetDDLErrorCountLimit(5) + defer func() { + variable.SetDDLErrorCountLimit(limit) + }() + + // mock TiFlash replicas + tk.MustExec("create table t1 (c1 int, b vector, c vector(3), unique key(c1));") + tk.MustExec("alter table t1 set tiflash replica 2 location labels 'a','b';") + + tk.MustExec("insert into t1 values (1, '[1,6.6]', '[1,8.88,9.99]'), (2, '[2,6.6]', '[2,8.88,9.99]'), (3, '[3,6.6]', '[3,8.88,9.99]'), (4, '[4,6.6]', '[4,8.88,9.99]')") + ddl.SetWaitTimeWhenErrorOccurred(100 * time.Millisecond) + addIdxSQL := "alter table t1 add vector index v_idx((VEC_COSINE_DISTANCE(c))) USING HNSW COMMENT 'b comment';" + + // Check whether the reorg information is cleaned up, and check the rollback info. + checkRollbackInfo := func(expectState model.JobState) { + jobs, err := getJobsBySQL(tk.Session(), "tidb_ddl_history", "order by job_id desc limit 1") + require.NoError(t, err) + currJob := jobs[0] + require.Equal(t, model.ActionAddVectorIndex, currJob.Type) + require.Equal(t, expectState, currJob.State) + // check reorg meta + element, start, end, physicalID, err := ddl.NewReorgHandlerForTest(testkit.NewTestKit(t, store).Session()).GetDDLReorgHandle(currJob) + require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err)) + require.Nil(t, element) + require.Nil(t, start) + require.Nil(t, end) + require.Equal(t, int64(0), physicalID) + } + + // Case1: call SyncTiFlashTableSchema failed to rollback job. + tk.MustGetErrMsg(addIdxSQL, "[ddl:-1]DDL job rollback, error msg: MockTiFlash is not accessible") + checkRollbackInfo(model.JobStateRollbackDone) + + // Case2: do 'admin cancel ddl job to rollback job. + tiflash := infosync.NewMockTiFlash() + infosync.SetMockTiFlash(tiflash) + defer func() { + tiflash.Lock() + tiflash.StatusServer.Close() + tiflash.Unlock() + }() + + times := 1 + var checkErr error + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess", `return(1)`) + onJobUpdatedExportedFunc := func(job *model.Job) { + if checkErr != nil { + return + } + if job.SchemaState == model.StateWriteReorganization { + if times == 2 { + time.Sleep(10 * time.Millisecond) + rs := tk1.MustQuery(fmt.Sprintf("admin cancel ddl jobs %d", job.ID)) + if !strings.Contains(rs.Rows()[0][1].(string), "success") { + checkErr = errors.New("admin cancel ddl job failed") + } + } + times++ + } + } + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", onJobUpdatedExportedFunc) + + tk.MustGetErrMsg(addIdxSQL, "[ddl:8214]Cancelled DDL job") + require.NoError(t, checkErr) + tk.MustQuery("select count(1) from t1;").Check(testkit.Rows("4")) + checkRollbackInfo(model.JobStateRollbackDone) + + // Case3: add a vector index normally. + testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated") + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess", `return(4)`) + tk.MustExec(addIdxSQL) + // TODO: add mock TiFlash to make sure the vector index count is equal to row count. + // tk.MustQuery("select count(1) from t1 use index(v_idx);").Check(testkit.Rows("4")) + + testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess") } diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index d07afa0b1e6b5..c7525a124136b 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -315,7 +315,7 @@ func JobNeedGC(job *model.Job) bool { } switch job.Type { case model.ActionDropSchema, model.ActionDropTable, - model.ActionTruncateTable, model.ActionDropIndex, + model.ActionTruncateTable, model.ActionDropPrimaryKey, model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionModifyColumn, @@ -323,6 +323,17 @@ func JobNeedGC(job *model.Job) bool { model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning: return true + case model.ActionDropIndex: + _, _, _, _, hasVectors, err := job.DecodeDropIndexFinishedArgs() + if err != nil { + return false + } + // If it's a vector index, it needn't to store key ranges to gc_delete_range. + // We don't support drop vector index in multi-schema, so we only check the first one. + if len(hasVectors) > 0 && hasVectors[0] { + return false + } + return true case model.ActionMultiSchemaChange: for i, sub := range job.MultiSchemaInfo.SubJobs { proxyJob := sub.ToProxyJob(job, i) @@ -858,6 +869,8 @@ func (w *worker) runOneJobStep( ver, err = w.onCreateIndex(jobCtx, t, job, false) case model.ActionAddPrimaryKey: ver, err = w.onCreateIndex(jobCtx, t, job, true) + case model.ActionAddVectorIndex: + ver, err = w.onCreateVectorIndex(jobCtx, t, job) case model.ActionDropIndex, model.ActionDropPrimaryKey: ver, err = onDropIndex(jobCtx, t, job) case model.ActionRenameIndex: diff --git a/pkg/ddl/modify_column.go b/pkg/ddl/modify_column.go index cb3edff8b574d..83a69171115cb 100644 --- a/pkg/ddl/modify_column.go +++ b/pkg/ddl/modify_column.go @@ -1070,7 +1070,7 @@ func checkColumnWithIndexConstraint(tbInfo *model.TableInfo, originalCol, newCol if !modified { return } - err = checkIndexInModifiableColumns(columns, indexInfo.Columns) + err = checkIndexInModifiableColumns(columns, indexInfo.Columns, indexInfo.VectorInfo != nil) if err != nil { return } @@ -1103,7 +1103,7 @@ func checkColumnWithIndexConstraint(tbInfo *model.TableInfo, originalCol, newCol return nil } -func checkIndexInModifiableColumns(columns []*model.ColumnInfo, idxColumns []*model.IndexColumn) error { +func checkIndexInModifiableColumns(columns []*model.ColumnInfo, idxColumns []*model.IndexColumn, isVectorIndex bool) error { for _, ic := range idxColumns { col := model.FindColumnInfo(columns, ic.Name.L) if col == nil { @@ -1116,7 +1116,7 @@ func checkIndexInModifiableColumns(columns []*model.ColumnInfo, idxColumns []*mo // if the type is still prefixable and larger than old prefix length. prefixLength = ic.Length } - if err := checkIndexColumn(nil, col, prefixLength); err != nil { + if err := checkIndexColumn(nil, col, prefixLength, isVectorIndex); err != nil { return err } } diff --git a/pkg/ddl/rollingback.go b/pkg/ddl/rollingback.go index a5d1972043aef..cbebd83d3a813 100644 --- a/pkg/ddl/rollingback.go +++ b/pkg/ddl/rollingback.go @@ -100,7 +100,7 @@ func convertAddIdxJob2RollbackJob( // convertNotReorgAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob, // to rollback add index operations. job.SnapshotVer == 0 indicates the workers are not started. -func convertNotReorgAddIdxJob2RollbackJob(jobCtx *jobContext, t *meta.Meta, job *model.Job, occuredErr error) (ver int64, err error) { +func convertNotReorgAddIdxJob2RollbackJob(jobCtx *jobContext, t *meta.Meta, job *model.Job, occuredErr error, isVector bool) (ver int64, err error) { defer func() { if ingest.LitBackCtxMgr != nil { ingest.LitBackCtxMgr.Unregister(job.ID) @@ -112,14 +112,20 @@ func convertNotReorgAddIdxJob2RollbackJob(jobCtx *jobContext, t *meta.Meta, job return ver, errors.Trace(err) } + var funcExpr string + var indexPartSpecification *ast.IndexPartSpecification unique := make([]bool, 1) indexName := make([]pmodel.CIStr, 1) indexPartSpecifications := make([][]*ast.IndexPartSpecification, 1) indexOption := make([]*ast.IndexOption, 1) - err = job.DecodeArgs(&unique[0], &indexName[0], &indexPartSpecifications[0], &indexOption[0]) - if err != nil { - err = job.DecodeArgs(&unique, &indexName, &indexPartSpecifications, &indexOption) + if !isVector { + err = job.DecodeArgs(&unique[0], &indexName[0], &indexPartSpecifications[0], &indexOption[0]) + if err != nil { + err = job.DecodeArgs(&unique, &indexName, &indexPartSpecifications, &indexOption) + } + } else { + err = job.DecodeArgs(&indexName[0], &indexPartSpecification, &indexOption[0], &funcExpr) } if err != nil { job.State = model.JobStateCancelled @@ -260,6 +266,18 @@ func rollingbackDropIndex(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver } } +func rollingbackAddVectorIndex(w *worker, jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, err error) { + if job.SchemaState == model.StateWriteReorganization { + // Add vector index workers are started. need to ask them to exit. + w.jobLogger(job).Info("run the cancelling DDL job", zap.String("job", job.String())) + ver, err = w.onCreateVectorIndex(jobCtx, t, job) + } else { + // add index's reorg workers are not running, remove the indexInfo in tableInfo. + ver, err = convertNotReorgAddIdxJob2RollbackJob(jobCtx, t, job, dbterror.ErrCancelledDDLJob, true) + } + return +} + func rollingbackAddIndex(w *worker, jobCtx *jobContext, t *meta.Meta, job *model.Job, isPK bool) (ver int64, err error) { if needNotifyAndStopReorgWorker(job) { // add index workers are started. need to ask them to exit. @@ -268,7 +286,7 @@ func rollingbackAddIndex(w *worker, jobCtx *jobContext, t *meta.Meta, job *model ver, err = w.onCreateIndex(jobCtx, t, job, isPK) } else { // add index's reorg workers are not running, remove the indexInfo in tableInfo. - ver, err = convertNotReorgAddIdxJob2RollbackJob(jobCtx, t, job, dbterror.ErrCancelledDDLJob) + ver, err = convertNotReorgAddIdxJob2RollbackJob(jobCtx, t, job, dbterror.ErrCancelledDDLJob, false) } return } @@ -481,6 +499,8 @@ func convertJob2RollbackJob(w *worker, jobCtx *jobContext, t *meta.Meta, job *mo ver, err = rollingbackAddIndex(w, jobCtx, t, job, false) case model.ActionAddPrimaryKey: ver, err = rollingbackAddIndex(w, jobCtx, t, job, true) + case model.ActionAddVectorIndex: + ver, err = rollingbackAddVectorIndex(w, jobCtx, t, job) case model.ActionAddTablePartition: ver, err = rollingbackAddTablePartition(jobCtx, t, job) case model.ActionReorganizePartition, model.ActionRemovePartitioning, diff --git a/pkg/ddl/sanity_check.go b/pkg/ddl/sanity_check.go index cfe4ea197a9dd..42d273bd2aa7c 100644 --- a/pkg/ddl/sanity_check.go +++ b/pkg/ddl/sanity_check.go @@ -141,14 +141,13 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) { } return ret, nil case model.ActionDropIndex, model.ActionDropPrimaryKey: - var indexName any - ifNotExists := make([]bool, 1) - indexID := make([]int64, 1) - var partitionIDs []int64 - if err := job.DecodeArgs(&indexName, &ifNotExists[0], &indexID[0], &partitionIDs); err != nil { - if err := job.DecodeArgs(&indexName, &ifNotExists, &indexID, &partitionIDs); err != nil { - return 0, errors.Trace(err) - } + _, _, _, partitionIDs, hasVectors, err := job.DecodeDropIndexFinishedArgs() + if err != nil { + return 0, errors.Trace(err) + } + // We don't support drop vector index in multi-schema, so we only check the first one. + if len(hasVectors) > 0 && hasVectors[0] { + return 0, nil } return mathutil.Max(len(partitionIDs), 1), nil case model.ActionDropColumn: diff --git a/pkg/ddl/schematracker/dm_tracker.go b/pkg/ddl/schematracker/dm_tracker.go index 5e38d10559a8b..3f03052e8d739 100644 --- a/pkg/ddl/schematracker/dm_tracker.go +++ b/pkg/ddl/schematracker/dm_tracker.go @@ -417,20 +417,17 @@ func (d *SchemaTracker) createIndex( if err != nil { return err } - finalColumns := make([]*model.ColumnInfo, len(tblInfo.Columns), len(tblInfo.Columns)+len(hiddenCols)) - copy(finalColumns, tblInfo.Columns) - finalColumns = append(finalColumns, hiddenCols...) - for _, hiddenCol := range hiddenCols { ddl.InitAndAddColumnToTable(tblInfo, hiddenCol) } indexInfo, err := ddl.BuildIndexInfo( ctx, - finalColumns, + tblInfo, indexName, false, unique, + false, indexPartSpecifications, indexOption, model.StatePublic, @@ -871,10 +868,11 @@ func (d *SchemaTracker) createPrimaryKey( indexInfo, err := ddl.BuildIndexInfo( ctx, - tblInfo.Columns, + tblInfo, indexName, true, true, + false, indexPartSpecifications, indexOption, model.StatePublic, diff --git a/pkg/ddl/tests/adminpause/BUILD.bazel b/pkg/ddl/tests/adminpause/BUILD.bazel index 144bb1872474a..5c05ac89be040 100644 --- a/pkg/ddl/tests/adminpause/BUILD.bazel +++ b/pkg/ddl/tests/adminpause/BUILD.bazel @@ -13,8 +13,10 @@ go_library( "//pkg/ddl", "//pkg/ddl/logutil", "//pkg/domain", + "//pkg/domain/infosync", "//pkg/meta/model", "//pkg/testkit", + "//pkg/types", ], ) diff --git a/pkg/ddl/tests/adminpause/ddl_data_generation.go b/pkg/ddl/tests/adminpause/ddl_data_generation.go index f135847942db6..9f72ee8e3eccc 100644 --- a/pkg/ddl/tests/adminpause/ddl_data_generation.go +++ b/pkg/ddl/tests/adminpause/ddl_data_generation.go @@ -19,7 +19,9 @@ import ( "math/rand" "time" + "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/types" ) // AgeMax limits the max number of tuple generated for t_user.age @@ -36,10 +38,12 @@ type TestTableUser struct { phone string createdTime time.Time updatedTime time.Time + vec types.VectorFloat32 } // Frequently referenced definition of `table` in all test cases among `admin pause test cases` const adminPauseTestTable string = "t_user" +const adminPauseTestTableWithVec string = "t_user_vec" const adminPauseTestTableStmt string = `CREATE TABLE if not exists ` + adminPauseTestTable + ` ( id int(11) NOT NULL AUTO_INCREMENT, tenant varchar(128) NOT NULL, @@ -51,6 +55,18 @@ const adminPauseTestTableStmt string = `CREATE TABLE if not exists ` + adminPaus created_time datetime NOT NULL, updated_time datetime NOT NULL );` +const adminPauseTestTableStmtWithVec string = `CREATE TABLE if not exists ` + adminPauseTestTableWithVec + ` ( + id int(11) NOT NULL AUTO_INCREMENT, + tenant varchar(128) NOT NULL, + name varchar(128) NOT NULL, + age int(11) NOT NULL, + province varchar(32) NOT NULL DEFAULT '', + city varchar(32) NOT NULL DEFAULT '', + phone varchar(16) NOT NULL DEFAULT '', + created_time datetime NOT NULL, + updated_time datetime NOT NULL, + vec vector(3) + );` const adminPauseTestPartitionTable string = "t_user_partition" const adminPauseTestPartitionTableStmt string = `CREATE TABLE if not exists ` + adminPauseTestPartitionTable + ` ( @@ -117,16 +133,25 @@ func (tu *TestTableUser) generateAttributes(id int) (err error) { } tu.createdTime = time.Now() tu.updatedTime = time.Now() + tu.vec = types.InitVectorFloat32(3) return nil } func (tu *TestTableUser) insertStmt(tableName string, count int) string { sql := fmt.Sprintf("INSERT INTO %s(tenant, name, age, province, city, phone, created_time, updated_time) VALUES ", tableName) + if tableName == adminPauseTestTableWithVec { + sql = fmt.Sprintf("INSERT INTO %s(tenant, name, age, province, city, phone, created_time, updated_time, vec) VALUES ", tableName) + } for n := 0; n < count; n++ { _ = tu.generateAttributes(n) - sql += fmt.Sprintf("('%s', '%s', %d, '%s', '%s', '%s', '%s', '%s')", - tu.tenant, tu.name, tu.age, tu.province, tu.city, tu.phone, tu.createdTime, tu.updatedTime) + if tableName == adminPauseTestTable { + sql += fmt.Sprintf("('%s', '%s', %d, '%s', '%s', '%s', '%s', '%s')", + tu.tenant, tu.name, tu.age, tu.province, tu.city, tu.phone, tu.createdTime, tu.updatedTime) + } else { + sql += fmt.Sprintf("('%s', '%s', %d, '%s', '%s', '%s', '%s', '%s', '%s')", + tu.tenant, tu.name, tu.age, tu.province, tu.city, tu.phone, tu.createdTime, tu.updatedTime, tu.vec) + } if n != count-1 { sql += ", " } @@ -139,11 +164,32 @@ func generateTblUser(tk *testkit.TestKit, rowCount int) error { if rowCount == 0 { return nil } + tu := &TestTableUser{} tk.MustExec(tu.insertStmt(adminPauseTestTable, rowCount)) return nil } +func generateTblUserWithVec(tk *testkit.TestKit, rowCount int) error { + tk.MustExec(adminPauseTestTableStmtWithVec) + if rowCount == 0 { + return nil + } + + tk.MustExec(fmt.Sprintf("alter table %s set tiflash replica 3 location labels 'a','b';", adminPauseTestTableWithVec)) + tiflash := infosync.NewMockTiFlash() + infosync.SetMockTiFlash(tiflash) + defer func() { + tiflash.Lock() + tiflash.StatusServer.Close() + tiflash.Unlock() + }() + + tu := &TestTableUser{} + tk.MustExec(tu.insertStmt(adminPauseTestTableWithVec, rowCount)) + return nil +} + func generateTblUserParition(tk *testkit.TestKit) error { tk.MustExec(adminPauseTestPartitionTableStmt) return nil diff --git a/pkg/ddl/tests/adminpause/ddl_stmt_cases.go b/pkg/ddl/tests/adminpause/ddl_stmt_cases.go index cf70377fd7741..5cace09e9791c 100644 --- a/pkg/ddl/tests/adminpause/ddl_stmt_cases.go +++ b/pkg/ddl/tests/adminpause/ddl_stmt_cases.go @@ -114,6 +114,9 @@ const dropUniqueIndexStmt string = alterTableDropPrefix + "index if exists idx_p const addIndexStmt string = alterTableAddPrefix + "index if not exists idx_name (name);" const dropIndexStmt string = alterTableDropPrefix + "index if exists idx_name;" +const addVectorIndexStmt string = "alter table " + adminPauseTestTableWithVec + " add vector index v_idx((VEC_COSINE_DISTANCE(vec))) USING HNSW;" +const dropVectorIndexStmt string = "alter table " + adminPauseTestTableWithVec + " drop index if exists v_idx;" + var indexDDLStmtCase = [...]StmtCase{ // Add primary key {ai.globalID(), addPrimaryIndexStmt, model.StateNone, true, nil, []string{dropPrimaryIndexStmt}}, @@ -124,7 +127,7 @@ var indexDDLStmtCase = [...]StmtCase{ // Drop primary key {ai.globalID(), dropPrimaryIndexStmt, model.StatePublic, true, []string{addPrimaryIndexStmt}, []string{dropPrimaryIndexStmt}}, - {ai.globalID(), dropPrimaryIndexStmt, model.StateWriteOnly, false, []string{addPrimaryIndexStmt}, []string{dropPrimaryIndexStmt}}, + {ai.globalID(), dropPrimaryIndexStmt, model.StateDeleteReorganization, false, []string{addPrimaryIndexStmt}, []string{dropPrimaryIndexStmt}}, {ai.globalID(), dropPrimaryIndexStmt, model.StateWriteOnly, false, []string{addPrimaryIndexStmt}, []string{dropPrimaryIndexStmt}}, {ai.globalID(), dropPrimaryIndexStmt, model.StateDeleteOnly, false, []string{addPrimaryIndexStmt}, []string{dropPrimaryIndexStmt}}, @@ -142,9 +145,21 @@ var indexDDLStmtCase = [...]StmtCase{ {ai.globalID(), addIndexStmt, model.StateWriteReorganization, true, nil, []string{dropIndexStmt}}, {ai.globalID(), addIndexStmt, model.StatePublic, false, nil, []string{dropIndexStmt}}, + // Add vector index + {ai.globalID(), addVectorIndexStmt, model.StateNone, true, nil, []string{dropVectorIndexStmt}}, + {ai.globalID(), addVectorIndexStmt, model.StateDeleteOnly, true, nil, []string{dropVectorIndexStmt}}, + {ai.globalID(), addVectorIndexStmt, model.StateWriteOnly, true, nil, []string{dropVectorIndexStmt}}, + {ai.globalID(), addVectorIndexStmt, model.StatePublic, false, nil, []string{dropVectorIndexStmt}}, + + // Drop vector index + {ai.globalID(), dropVectorIndexStmt, model.StatePublic, true, []string{addVectorIndexStmt}, []string{dropVectorIndexStmt}}, + {ai.globalID(), dropVectorIndexStmt, model.StateWriteOnly, false, []string{addVectorIndexStmt}, []string{dropVectorIndexStmt}}, + {ai.globalID(), dropVectorIndexStmt, model.StateDeleteOnly, false, []string{addVectorIndexStmt}, []string{dropVectorIndexStmt}}, + {ai.globalID(), dropVectorIndexStmt, model.StateDeleteReorganization, false, []string{addVectorIndexStmt}, []string{dropVectorIndexStmt}}, + // Drop normal key {ai.globalID(), dropIndexStmt, model.StatePublic, true, []string{addIndexStmt}, []string{dropIndexStmt}}, - {ai.globalID(), dropIndexStmt, model.StateWriteOnly, false, []string{addIndexStmt}, []string{dropIndexStmt}}, + {ai.globalID(), dropIndexStmt, model.StateDeleteReorganization, false, []string{addIndexStmt}, []string{dropIndexStmt}}, {ai.globalID(), dropIndexStmt, model.StateWriteOnly, false, []string{addIndexStmt}, []string{dropIndexStmt}}, {ai.globalID(), dropIndexStmt, model.StateDeleteOnly, false, []string{addIndexStmt}, []string{dropIndexStmt}}, } diff --git a/pkg/ddl/tests/adminpause/pause_cancel_test.go b/pkg/ddl/tests/adminpause/pause_cancel_test.go index d958b6e4d41e1..d3391455aecb4 100644 --- a/pkg/ddl/tests/adminpause/pause_cancel_test.go +++ b/pkg/ddl/tests/adminpause/pause_cancel_test.go @@ -190,7 +190,11 @@ func TestPauseCancelAndRerunSchemaStmt(t *testing.T) { func TestPauseCancelAndRerunIndexStmt(t *testing.T) { var dom, stmtKit, adminCommandKit = prepareDomain(t) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/infoschema/mockTiFlashStoreCount", `return(true)`) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess", `return(1)`) + require.Nil(t, generateTblUser(stmtKit, 10)) + require.Nil(t, generateTblUserWithVec(stmtKit, 10)) for _, stmtCase := range indexDDLStmtCase { pauseAndCancelStmt(t, stmtKit, adminCommandKit, dom, &stmtCase) diff --git a/pkg/ddl/tests/adminpause/pause_resume_test.go b/pkg/ddl/tests/adminpause/pause_resume_test.go index a73873e765b3b..13d44812630dd 100644 --- a/pkg/ddl/tests/adminpause/pause_resume_test.go +++ b/pkg/ddl/tests/adminpause/pause_resume_test.go @@ -231,7 +231,11 @@ func TestPauseAndResumeSchemaStmt(t *testing.T) { func TestPauseAndResumeIndexStmt(t *testing.T) { var dom, stmtKit, adminCommandKit = prepareDomain(t) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/infoschema/mockTiFlashStoreCount", `return(true)`) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess", `return(1)`) + require.Nil(t, generateTblUser(stmtKit, 10)) + require.Nil(t, generateTblUserWithVec(stmtKit, 10)) for _, stmtCase := range indexDDLStmtCase { pauseResumeAndCancel(t, stmtKit, adminCommandKit, dom, &stmtCase, false) @@ -298,7 +302,11 @@ func TestPauseResumeCancelAndRerunSchemaStmt(t *testing.T) { func TestPauseResumeCancelAndRerunIndexStmt(t *testing.T) { var dom, stmtKit, adminCommandKit = prepareDomain(t) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/infoschema/mockTiFlashStoreCount", `return(true)`) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess", `return(1)`) + require.Nil(t, generateTblUser(stmtKit, 10)) + require.Nil(t, generateTblUserWithVec(stmtKit, 10)) for _, stmtCase := range indexDDLStmtCase { pauseResumeAndCancel(t, stmtKit, adminCommandKit, dom, &stmtCase, true) diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index 3d1334a5211f3..3d402b374e00c 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -1117,6 +1117,25 @@ func GetLabelRules(ctx context.Context, ruleIDs []string) (map[string]*label.Rul return is.labelRuleManager.GetLabelRules(ctx, ruleIDs) } +// SyncTiFlashTableSchema syncs TiFlash table schema. +func SyncTiFlashTableSchema(ctx context.Context, tableID int64) error { + is, err := getGlobalInfoSyncer() + if err != nil { + return errors.Trace(err) + } + tikvStats, err := is.tiflashReplicaManager.GetStoresStat(ctx) + if err != nil { + return errors.Trace(err) + } + tiflashStores := make([]pdhttp.StoreInfo, 0, len(tikvStats.Stores)) + for _, store := range tikvStats.Stores { + if engine.IsTiFlashHTTPResp(&store.Store) { + tiflashStores = append(tiflashStores, store) + } + } + return is.tiflashReplicaManager.SyncTiFlashTableSchema(tableID, tiflashStores) +} + // CalculateTiFlashProgress calculates TiFlash replica progress func CalculateTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores map[int64]pdhttp.StoreInfo) (float64, error) { is, err := getGlobalInfoSyncer() diff --git a/pkg/domain/infosync/tiflash_manager.go b/pkg/domain/infosync/tiflash_manager.go index bd84d5fb4c043..cb0278a8f1d91 100644 --- a/pkg/domain/infosync/tiflash_manager.go +++ b/pkg/domain/infosync/tiflash_manager.go @@ -74,6 +74,8 @@ type TiFlashReplicaManager interface { DeleteTiFlashProgressFromCache(tableID int64) // CleanTiFlashProgressCache clean progress cache CleanTiFlashProgressCache() + // SyncTiFlashTableSchema syncs the table's schema to TiFlash. + SyncTiFlashTableSchema(tableID int64, storesStat []pd.StoreInfo) error // Close is to close TiFlashReplicaManager Close(ctx context.Context) } @@ -169,6 +171,20 @@ func (m *TiFlashReplicaManagerCtx) CalculateTiFlashProgress(tableID int64, repli return calculateTiFlashProgress(m.codec.GetKeyspaceID(), tableID, replicaCount, tiFlashStores) } +// SyncTiFlashTableSchema syncs the table's schema to TiFlash. +func (m *TiFlashReplicaManagerCtx) SyncTiFlashTableSchema(tableID int64, tiFlashStores []pd.StoreInfo) error { + for _, store := range tiFlashStores { + err := helper.SyncTableSchemaToTiFlash(store.Store.StatusAddress, m.codec.GetKeyspaceID(), tableID) + if err != nil { + logutil.BgLogger().Error("Fail to sync peer schema to TiFlash", + zap.Int64("storeID", store.Store.ID), + zap.Int64("tableID", tableID)) + return err + } + } + return nil +} + // UpdateTiFlashProgressCache updates tiflashProgressCache func (m *TiFlashReplicaManagerCtx) UpdateTiFlashProgressCache(tableID int64, progress float64) { m.Lock() @@ -738,6 +754,11 @@ func (tiflash *MockTiFlash) SetNetworkError(e bool) { tiflash.NetworkError = e } +// SyncTiFlashTableSchema syncs the table's schema to TiFlash. +func (*mockTiFlashReplicaManagerCtx) SyncTiFlashTableSchema(_ int64, _ []pd.StoreInfo) error { + return nil +} + // CalculateTiFlashProgress return truncated string to avoid float64 comparison. func (*mockTiFlashReplicaManagerCtx) CalculateTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores map[int64]pd.StoreInfo) (float64, error) { return calculateTiFlashProgress(tikv.NullspaceID, tableID, replicaCount, tiFlashStores) diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index 05cc85b2cad21..532faa60b5f13 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -495,23 +495,23 @@ func buildIndexLookUpChecker(b *executorBuilder, p *plannercore.PhysicalIndexLoo } func (b *executorBuilder) buildCheckTable(v *plannercore.CheckTable) exec.Executor { - noMVIndexOrPrefixIndex := true + noMVIndexOrPrefixIndexOrVectorIndex := true for _, idx := range v.IndexInfos { - if idx.MVIndex { - noMVIndexOrPrefixIndex = false + if idx.MVIndex || idx.VectorInfo != nil { + noMVIndexOrPrefixIndexOrVectorIndex = false break } for _, col := range idx.Columns { if col.Length != types.UnspecifiedLength { - noMVIndexOrPrefixIndex = false + noMVIndexOrPrefixIndexOrVectorIndex = false break } } - if !noMVIndexOrPrefixIndex { + if !noMVIndexOrPrefixIndexOrVectorIndex { break } } - if b.ctx.GetSessionVars().FastCheckTable && noMVIndexOrPrefixIndex { + if b.ctx.GetSessionVars().FastCheckTable && noMVIndexOrPrefixIndexOrVectorIndex { e := &FastCheckTableExec{ BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()), dbName: v.DBName, diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index ede5b300afcdb..ea47648e625a6 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -928,7 +928,7 @@ func (e *CheckTableExec) Next(ctx context.Context, _ *chunk.Chunk) error { idxNames := make([]string, 0, len(e.indexInfos)) for _, idx := range e.indexInfos { - if idx.MVIndex { + if idx.MVIndex || idx.VectorInfo != nil { continue } idxNames = append(idxNames, idx.Name.O) diff --git a/pkg/executor/infoschema_reader_test.go b/pkg/executor/infoschema_reader_test.go index 448ebaa303f7d..5a83d311a2960 100644 --- a/pkg/executor/infoschema_reader_test.go +++ b/pkg/executor/infoschema_reader_test.go @@ -605,7 +605,7 @@ func TestColumnTable(t *testing.T) { testkit.RowsWithSep("|", "test|tbl1|col_2")) tk.MustQuery(`select count(*) from information_schema.columns;`).Check( - testkit.RowsWithSep("|", "4948")) + testkit.RowsWithSep("|", "4949")) } func TestIndexUsageTable(t *testing.T) { diff --git a/pkg/executor/show.go b/pkg/executor/show.go index 1291f0d54e214..fdb914ab13881 100644 --- a/pkg/executor/show.go +++ b/pkg/executor/show.go @@ -1211,6 +1211,8 @@ func constructResultOfShowCreateTable(ctx sessionctx.Context, dbName *pmodel.CIS buf.WriteString(" PRIMARY KEY ") } else if idxInfo.Unique { fmt.Fprintf(buf, " UNIQUE KEY %s ", stringutil.Escape(idxInfo.Name.O, sqlMode)) + } else if idxInfo.VectorInfo != nil { + fmt.Fprintf(buf, " VECTOR INDEX %s", stringutil.Escape(idxInfo.Name.O, sqlMode)) } else { fmt.Fprintf(buf, " KEY %s ", stringutil.Escape(idxInfo.Name.O, sqlMode)) } @@ -1228,7 +1230,12 @@ func constructResultOfShowCreateTable(ctx sessionctx.Context, dbName *pmodel.CIS } cols = append(cols, colInfo) } - fmt.Fprintf(buf, "(%s)", strings.Join(cols, ",")) + if idxInfo.VectorInfo != nil { + funcName := variable.Function4VectorIndex[idxInfo.VectorInfo.DistanceMetric] + fmt.Fprintf(buf, "((%s(%s)))", strings.ToUpper(funcName), strings.Join(cols, ",")) + } else { + fmt.Fprintf(buf, "(%s)", strings.Join(cols, ",")) + } if idxInfo.Invisible { fmt.Fprintf(buf, ` /*!80000 INVISIBLE */`) } diff --git a/pkg/infoschema/tables.go b/pkg/infoschema/tables.go index eee02c23ebadb..98388aeb16ec5 100644 --- a/pkg/infoschema/tables.go +++ b/pkg/infoschema/tables.go @@ -1514,11 +1514,12 @@ var tableTableTiFlashSegmentsCols = []columnInfo{ } var tableTiFlashIndexesCols = []columnInfo{ - {name: "TIDB_DATABASE", tp: mysql.TypeVarchar, size: 21}, - {name: "TIDB_TABLE", tp: mysql.TypeVarchar, size: 21}, + {name: "TIDB_DATABASE", tp: mysql.TypeVarchar, size: 64}, + {name: "TIDB_TABLE", tp: mysql.TypeVarchar, size: 64}, {name: "TABLE_ID", tp: mysql.TypeLonglong, size: 21}, {name: "COLUMN_NAME", tp: mysql.TypeVarchar, size: 64}, {name: "COLUMN_ID", tp: mysql.TypeLonglong, size: 64}, + {name: "INDEX_ID", tp: mysql.TypeLonglong, size: 21}, {name: "INDEX_KIND", tp: mysql.TypeVarchar, size: 64}, {name: "ROWS_STABLE_INDEXED", tp: mysql.TypeLonglong, size: 64}, {name: "ROWS_STABLE_NOT_INDEXED", tp: mysql.TypeLonglong, size: 64}, diff --git a/pkg/meta/model/index.go b/pkg/meta/model/index.go index d33c82483251b..6515e75325ed5 100644 --- a/pkg/meta/model/index.go +++ b/pkg/meta/model/index.go @@ -19,23 +19,45 @@ import ( "github.com/pingcap/tidb/pkg/parser/types" ) +// DistanceMetric is the distance metric used by the vector index. +// `DistanceMetric` is actually vector functions in ast package. Use `DistanceMetric` to avoid cycle dependency +type DistanceMetric string + +// Note: tipb.VectorDistanceMetric's enum names must be aligned with these constant values. +const ( + DistanceMetricL2 DistanceMetric = "L2" + // DistanceMetricCosine is cosine distance. + DistanceMetricCosine DistanceMetric = "COSINE" + // DistanceMetricInnerProduct is inner product. + DistanceMetricInnerProduct DistanceMetric = "INNER_PRODUCT" +) + +// VectorIndexInfo is the information of vector index of a column. +type VectorIndexInfo struct { + // Dimension is the dimension of the vector. + Dimension uint64 `json:"dimension"` // Set to 0 when initially parsed from comment. Will be assigned to flen later. + // DistanceMetric is the distance metric used by the index. + DistanceMetric DistanceMetric `json:"distance_metric"` +} + // IndexInfo provides meta data describing a DB index. // It corresponds to the statement `CREATE INDEX Name ON Table (Column);` // See https://dev.mysql.com/doc/refman/5.7/en/create-index.html type IndexInfo struct { - ID int64 `json:"id"` - Name model.CIStr `json:"idx_name"` // Index name. - Table model.CIStr `json:"tbl_name"` // Table name. - Columns []*IndexColumn `json:"idx_cols"` // Index columns. - State SchemaState `json:"state"` - BackfillState BackfillState `json:"backfill_state"` - Comment string `json:"comment"` // Comment - Tp model.IndexType `json:"index_type"` // Index type: Btree, Hash or Rtree - Unique bool `json:"is_unique"` // Whether the index is unique. - Primary bool `json:"is_primary"` // Whether the index is primary key. - Invisible bool `json:"is_invisible"` // Whether the index is invisible. - Global bool `json:"is_global"` // Whether the index is global. - MVIndex bool `json:"mv_index"` // Whether the index is multivalued index. + ID int64 `json:"id"` + Name model.CIStr `json:"idx_name"` // Index name. + Table model.CIStr `json:"tbl_name"` // Table name. + Columns []*IndexColumn `json:"idx_cols"` // Index columns. + State SchemaState `json:"state"` + BackfillState BackfillState `json:"backfill_state"` + Comment string `json:"comment"` // Comment + Tp model.IndexType `json:"index_type"` // Index type: Btree, Hash or Rtree + Unique bool `json:"is_unique"` // Whether the index is unique. + Primary bool `json:"is_primary"` // Whether the index is primary key. + Invisible bool `json:"is_invisible"` // Whether the index is invisible. + Global bool `json:"is_global"` // Whether the index is global. + MVIndex bool `json:"mv_index"` // Whether the index is multivalued index. + VectorInfo *VectorIndexInfo `json:"vector_index"` // VectorInfo is the vector index information. } // Clone clones IndexInfo. diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index 04cb601fb9a8c..bf096d9df0497 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -110,6 +110,7 @@ const ( ActionDropResourceGroup ActionType = 70 ActionAlterTablePartitioning ActionType = 71 ActionRemovePartitioning ActionType = 72 + ActionAddVectorIndex ActionType = 73 ) // ActionMap is the map of DDL ActionType to string. @@ -566,6 +567,20 @@ func (job *Job) DecodeArgs(args ...any) error { return nil } +// DecodeDropIndexFinishedArgs decodes the drop index job's args when it's finished. +func (job *Job) DecodeDropIndexFinishedArgs() ( + indexName any, ifExists []bool, indexIDs []int64, partitionIDs []int64, hasVectors []bool, err error) { + ifExists = make([]bool, 1) + indexIDs = make([]int64, 1) + hasVectors = make([]bool, 1) + if err := job.DecodeArgs(&indexName, &ifExists[0], &indexIDs[0], &partitionIDs, &hasVectors[0]); err != nil { + if err := job.DecodeArgs(&indexName, &ifExists, &indexIDs, &partitionIDs, &hasVectors); err != nil { + return nil, []bool{false}, []int64{-1}, nil, []bool{false}, errors.Trace(err) + } + } + return +} + // String implements fmt.Stringer interface. func (job *Job) String() string { rowCount := job.GetRowCount() @@ -733,6 +748,10 @@ func (job *Job) IsPausing() bool { // IsPausable checks whether we can pause the job. func (job *Job) IsPausable() bool { + // TODO: We can remove it after TiFlash support pause operation. + if job.Type == ActionAddVectorIndex && job.SchemaState == StateWriteReorganization { + return false + } return job.NotStarted() || (job.IsRunning() && job.IsRollbackable()) } diff --git a/pkg/parser/parser_test.go b/pkg/parser/parser_test.go index ab01473e8f514..276b25b82b546 100644 --- a/pkg/parser/parser_test.go +++ b/pkg/parser/parser_test.go @@ -3379,6 +3379,7 @@ func TestDDL(t *testing.T) { {"CREATE VECTOR INDEX IF NOT EXISTS idx ON t ((VEC_COSINE_DISTANCE(a))) USING HNSW", true, "CREATE VECTOR INDEX IF NOT EXISTS `idx` ON `t` ((VEC_COSINE_DISTANCE(`a`))) USING HNSW"}, {"CREATE VECTOR INDEX IF NOT EXISTS idx ON t ((VEC_COSINE_DISTANCE(a))) TYPE HNSW", true, "CREATE VECTOR INDEX IF NOT EXISTS `idx` ON `t` ((VEC_COSINE_DISTANCE(`a`))) USING HNSW"}, {"CREATE VECTOR INDEX ident TYPE HNSW ON d_n.t_n ((VEC_COSINE_DISTANCE(a)))", true, "CREATE VECTOR INDEX `ident` ON `d_n`.`t_n` ((VEC_COSINE_DISTANCE(`a`))) USING HNSW"}, + {"CREATE VECTOR INDEX idx USING HNSW ON t ((VEC_COSINE_DISTANCE(a)))", true, "CREATE VECTOR INDEX `idx` ON `t` ((VEC_COSINE_DISTANCE(`a`))) USING HNSW"}, {"CREATE VECTOR INDEX ident ON d_n.t_n ( ident , ident ASC ) TYPE HNSW", false, ""}, {"CREATE UNIQUE INDEX ident USING HNSW ON d_n.t_n ( ident , ident ASC )", false, ""}, diff --git a/pkg/planner/core/BUILD.bazel b/pkg/planner/core/BUILD.bazel index b511fdfbde465..d3d8f571f7c98 100644 --- a/pkg/planner/core/BUILD.bazel +++ b/pkg/planner/core/BUILD.bazel @@ -273,6 +273,7 @@ go_test( deps = [ "//pkg/config", "//pkg/domain", + "//pkg/domain/infosync", "//pkg/expression", "//pkg/expression/aggregation", "//pkg/expression/context", @@ -311,6 +312,7 @@ go_test( "//pkg/testkit/ddlhelper", "//pkg/testkit/external", "//pkg/testkit/testdata", + "//pkg/testkit/testfailpoint", "//pkg/testkit/testmain", "//pkg/testkit/testsetup", "//pkg/testkit/testutil", diff --git a/pkg/planner/core/indexmerge_path_test.go b/pkg/planner/core/indexmerge_path_test.go index 517ccddc2f801..fcc75e5944326 100644 --- a/pkg/planner/core/indexmerge_path_test.go +++ b/pkg/planner/core/indexmerge_path_test.go @@ -20,7 +20,10 @@ import ( "math/rand" "strings" "testing" + "time" + "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser" @@ -29,8 +32,10 @@ import ( "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/planner/core/base" "github.com/pingcap/tidb/pkg/planner/core/resolve" + "github.com/pingcap/tidb/pkg/planner/util/coretestsdk" "github.com/pingcap/tidb/pkg/sessiontxn" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/util/hint" "github.com/stretchr/testify/require" ) @@ -435,3 +440,58 @@ func randMVIndexValue(opts randMVIndexValOpts) string { } return "" } + +func TestAnalyzeVectorIndex(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 200*time.Millisecond, coretestsdk.WithMockTiFlash(2)) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + + tiflash := infosync.NewMockTiFlash() + infosync.SetMockTiFlash(tiflash) + defer func() { + tiflash.Lock() + tiflash.StatusServer.Close() + tiflash.Unlock() + }() + tk.MustExec(`create table t(a int, b vector(2), c vector(3), j json, index(a))`) + tk.MustExec("alter table t set tiflash replica 2 location labels 'a','b';") + tblInfo, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + err = domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tblInfo.Meta().ID, true) + require.NoError(t, err) + + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess", `return(0)`) + tk.MustExec("alter table t add vector index idx((VEC_COSINE_DISTANCE(b))) USING HNSW") + tk.MustExec("alter table t add vector index idx2((VEC_COSINE_DISTANCE(c))) USING HNSW") + + tk.MustExec("set tidb_analyze_version=2") + tk.MustExec("analyze table t") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows( + "Note 1105 Analyze use auto adjusted sample rate 1.000000 for table test.t, reason to use this rate is \"use min(1, 110000/10000) as the sample-rate=1\"", + "Warning 1105 No predicate column has been collected yet for table test.t, so only indexes and the columns composing the indexes will be analyzed", + "Warning 1105 analyzing vector index is not supported, skip idx", + "Warning 1105 analyzing vector index is not supported, skip idx2")) + tk.MustExec("analyze table t index idx") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows( + "Note 1105 Analyze use auto adjusted sample rate 1.000000 for table test.t, reason to use this rate is \"TiDB assumes that the table is empty, use sample-rate=1\"", + "Warning 1105 No predicate column has been collected yet for table test.t, so only indexes and the columns composing the indexes will be analyzed", + "Warning 1105 The version 2 would collect all statistics not only the selected indexes", + "Warning 1105 analyzing vector index is not supported, skip idx", + "Warning 1105 analyzing vector index is not supported, skip idx2")) + + tk.MustExec("set tidb_analyze_version=1") + tk.MustExec("analyze table t") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows( + "Warning 1105 analyzing vector index is not supported, skip idx", + "Warning 1105 analyzing vector index is not supported, skip idx2")) + tk.MustExec("analyze table t index idx") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows( + "Warning 1105 analyzing vector index is not supported, skip idx")) + tk.MustExec("analyze table t index a") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows()) + tk.MustExec("analyze table t index a, idx, idx2") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows( + "Warning 1105 analyzing vector index is not supported, skip idx", + "Warning 1105 analyzing vector index is not supported, skip idx2")) +} diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 0c0f35862fac0..bf7dbd1278f11 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -2052,7 +2052,7 @@ func (b *PlanBuilder) getMustAnalyzedColumns(tbl *resolve.TableNameW, cols *calc } virtualExprs := make([]expression.Expression, 0, len(tblInfo.Columns)) for _, idx := range tblInfo.Indices { - if idx.State != model.StatePublic || idx.MVIndex { + if idx.State != model.StatePublic || idx.MVIndex || idx.VectorInfo != nil { continue } for _, idxCol := range idx.Columns { @@ -2306,6 +2306,7 @@ func getColOffsetForAnalyze(colsInfo []*model.ColumnInfo, colID int64) int { // For multi-valued index, we need to collect it separately here and analyze it as independent index analyze task. // See comments for AnalyzeResults.ForMVIndex for more details. func getModifiedIndexesInfoForAnalyze( + stmtCtx *stmtctx.StatementContext, tblInfo *model.TableInfo, allColumns bool, colsInfo []*model.ColumnInfo, @@ -2320,6 +2321,10 @@ func getModifiedIndexesInfoForAnalyze( independentIdxsInfo = append(independentIdxsInfo, originIdx) continue } + if originIdx.VectorInfo != nil { + stmtCtx.AppendWarning(errors.NewNoStackErrorf("analyzing vector index is not supported, skip %s", originIdx.Name.L)) + continue + } if allColumns { // If all the columns need to be analyzed, we don't need to modify IndexColumn.Offset. idxsInfo = append(idxsInfo, originIdx) @@ -2436,7 +2441,7 @@ func (b *PlanBuilder) buildAnalyzeFullSamplingTask( } execColsInfo = b.filterSkipColumnTypes(execColsInfo, tbl, &mustAnalyzedCols) allColumns := len(tbl.TableInfo.Columns) == len(execColsInfo) - indexes, independentIndexes := getModifiedIndexesInfoForAnalyze(tbl.TableInfo, allColumns, execColsInfo) + indexes, independentIndexes := getModifiedIndexesInfoForAnalyze(b.ctx.GetSessionVars().StmtCtx, tbl.TableInfo, allColumns, execColsInfo) handleCols := BuildHandleColsForAnalyze(b.ctx, tbl.TableInfo, allColumns, execColsInfo) newTask := AnalyzeColumnsTask{ HandleCols: handleCols, @@ -2694,6 +2699,10 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt, opts map[ast.A b.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("analyzing multi-valued indexes is not supported, skip %s", idx.Name.L)) continue } + if idx.VectorInfo != nil { + b.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("analyzing vector index is not supported, skip %s", idx.Name.L)) + continue + } p.IdxTasks = append(p.IdxTasks, generateIndexTasks(idx, as, tnW.TableInfo, partitionNames, physicalIDs, version)...) } handleCols := BuildHandleColsForAnalyze(b.ctx, tnW.TableInfo, true, nil) @@ -2771,6 +2780,10 @@ func (b *PlanBuilder) buildAnalyzeIndex(as *ast.AnalyzeTableStmt, opts map[ast.A b.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("analyzing multi-valued indexes is not supported, skip %s", idx.Name.L)) continue } + if idx.VectorInfo != nil { + b.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("analyzing vector index is not supported, skip %s", idx.Name.L)) + continue + } p.IdxTasks = append(p.IdxTasks, generateIndexTasks(idx, as, tblInfo, names, physicalIDs, version)...) } return p, nil @@ -2802,6 +2815,10 @@ func (b *PlanBuilder) buildAnalyzeAllIndex(as *ast.AnalyzeTableStmt, opts map[as b.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("analyzing multi-valued indexes is not supported, skip %s", idx.Name.L)) continue } + if idx.VectorInfo != nil { + b.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("analyzing vector index is not supported, skip %s", idx.Name.L)) + continue + } p.IdxTasks = append(p.IdxTasks, generateIndexTasks(idx, as, tblInfo, names, physicalIDs, version)...) } diff --git a/pkg/sessionctx/variable/varsutil.go b/pkg/sessionctx/variable/varsutil.go index 0e3ae63162417..543de7e97c31f 100644 --- a/pkg/sessionctx/variable/varsutil.go +++ b/pkg/sessionctx/variable/varsutil.go @@ -27,6 +27,7 @@ import ( "github.com/docker/go-units" "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/mysql" @@ -632,3 +633,15 @@ func parseSchemaCacheSize(s *SessionVars, normalizedValue string, originalValue return 0, "", ErrTruncatedWrongValue.GenWithStackByArgs(TiDBSchemaCacheSize, originalValue) } + +// DistanceMetric4VectorIndex stores distance metrics for the vector index. +var DistanceMetric4VectorIndex = map[string]model.DistanceMetric{ + ast.VecCosineDistance: model.DistanceMetricCosine, + ast.VecL2Distance: model.DistanceMetricL2, +} + +// Function4VectorIndex stores functions for the vector index. +var Function4VectorIndex = map[model.DistanceMetric]string{ + model.DistanceMetricCosine: ast.VecCosineDistance, + model.DistanceMetricL2: ast.VecL2Distance, +} diff --git a/pkg/store/helper/helper.go b/pkg/store/helper/helper.go index b39afc1a31387..5920ebd6956a5 100644 --- a/pkg/store/helper/helper.go +++ b/pkg/store/helper/helper.go @@ -907,3 +907,26 @@ func CollectTiFlashStatus(statusAddress string, keyspaceID tikv.KeyspaceID, tabl } return nil } + +// SyncTableSchemaToTiFlash query sync schema of one table to TiFlash store. +func SyncTableSchemaToTiFlash(statusAddress string, keyspaceID tikv.KeyspaceID, tableID int64) error { + // The new query schema is like: http:///tiflash/sync-schema/keyspace//table/. + // For TiDB forward compatibility, we define the Nullspace as the "keyspace" of the old table. + // The query URL is like: http:///sync-schema/keyspace//table/ + statURL := fmt.Sprintf("%s://%s/tiflash/sync-schema/keyspace/%d/table/%d", + util.InternalHTTPSchema(), + statusAddress, + keyspaceID, + tableID, + ) + resp, err := util.InternalHTTPClient().Get(statURL) + if err != nil { + return errors.Trace(err) + } + + err = resp.Body.Close() + if err != nil { + logutil.BgLogger().Error("close body failed", zap.Error(err)) + } + return nil +}