diff --git a/mgr/dataflow.go b/mgr/dataflow.go index d759c278e..1d030a128 100644 --- a/mgr/dataflow.go +++ b/mgr/dataflow.go @@ -378,21 +378,18 @@ func checkSampleData(sampleData []string, logParser parser.Parser) ([]string, er } func getTransformerCreator(transformerConfig map[string]interface{}) (transforms.Creator, error) { - transformKeyType, ok := transformerConfig[KeyType] + transformKeyType, ok := transformerConfig[transforms.KeyType] if !ok { - err := fmt.Errorf("missing param %s", KeyType) - return nil, err + return nil, fmt.Errorf("missing param %s", transforms.KeyType) } transformKeyTypeStr, ok := transformKeyType.(string) if !ok { - err := fmt.Errorf("param %s must be of type string", KeyType) - return nil, err + return nil, fmt.Errorf("param %s must be of type string", transforms.KeyType) } create, ok := transforms.Transformers[transformKeyTypeStr] if !ok { - err := fmt.Errorf("transformer of type %v not exist", transformKeyTypeStr) - return nil, err + return nil, fmt.Errorf("transformer of type %v not exist", transformKeyTypeStr) } return create, nil } diff --git a/mgr/runner.go b/mgr/runner.go index 6e550421e..f2ceb4638 100644 --- a/mgr/runner.go +++ b/mgr/runner.go @@ -28,6 +28,7 @@ import ( "github.com/qiniu/logkit/sender" _ "github.com/qiniu/logkit/sender/builtin" "github.com/qiniu/logkit/transforms" + "github.com/qiniu/logkit/transforms/ip" . "github.com/qiniu/logkit/utils/models" ) @@ -291,8 +292,9 @@ func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, r senderConfig[sender.KeyPandoraDescription] = LogkitAutoCreateDescription } } - if senderConfig[sender.KeySenderType] == sender.TypePandora { - senderConfig = setSenderConfig(senderConfig, serverConfigs) + senderConfig, err := setSenderConfig(senderConfig, serverConfigs) + if err != nil { + return nil, err } s, err := sr.NewSender(senderConfig, meta.FtSaveLogPath()) if err != nil { @@ -300,6 +302,7 @@ func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, r } senders = append(senders, s) delete(rc.SendersConfig[i], sender.InnerUserAgent) + delete(rc.SendersConfig[i], sender.KeyPandoraDescription) } senderCnt := len(senders) @@ -314,7 +317,7 @@ func createTransformers(rc RunnerConfig) ([]transforms.Transformer, error) { transformers := make([]transforms.Transformer, 0) for idx := range rc.Transforms { tConf := rc.Transforms[idx] - tp := tConf[KeyType] + tp := tConf[transforms.KeyType] if tp == nil { return nil, fmt.Errorf("transformer config type is empty %v", tConf) } @@ -1296,38 +1299,59 @@ func MergeExtraInfoTags(meta *reader.Meta, tags map[string]interface{}) map[stri return tags } -func setSenderConfig(senderConfig conf.MapConf, serverConfigs []map[string]interface{}) conf.MapConf { +func setSenderConfig(senderConfig conf.MapConf, serverConfigs []map[string]interface{}) (conf.MapConf, error) { + if senderConfig[sender.KeySenderType] != sender.TypePandora { + return senderConfig, nil + } + + var err error for _, serverConfig := range serverConfigs { - keyType, ok := serverConfig[KeyType].(string) - if !ok || keyType != KeyIP { - continue - } - localEnable, ok := serverConfig[LocalEnable].(bool) + keyType, ok := serverConfig[transforms.KeyType].(string) if !ok { continue } - - autoCreate := senderConfig[sender.KeyPandoraAutoCreate] - if localEnable { - schema := fmt.Sprintf(",%v ip", KeyIP) - if autoCreate == fmt.Sprintf("%v ip", KeyIP) { - autoCreate = "" - } else if index := strings.Index(autoCreate, schema); index != -1 { - autoCreate = autoCreate[:index] + autoCreate[index+len(schema):] + switch keyType { + case ip.Name: + if senderConfig, err = setIPConfig(senderConfig, serverConfig); err != nil { + return senderConfig, err } - senderConfig[sender.KeyPandoraAutoCreate] = autoCreate - continue } - if autoCreate == "" { - senderConfig[sender.KeyPandoraAutoCreate] = fmt.Sprintf("%v ip", KeyIP) - continue - } + } + + return senderConfig, nil +} + +func setIPConfig(senderConfig conf.MapConf, serverConfig map[string]interface{}) (conf.MapConf, error) { + key, keyOk := serverConfig["key"].(string) + if !keyOk { + return senderConfig, nil + } - if !strings.Contains(autoCreate, KeyIP) { - senderConfig[sender.KeyPandoraAutoCreate] += fmt.Sprintf(",%v ip", KeyIP) + if len(GetKeys(key)) > 1 { + return senderConfig, fmt.Errorf("key: %v ip transform key in server doesn't support dot(.)", key) + } + autoCreate := senderConfig[sender.KeyPandoraAutoCreate] + transformAt, transformAtOk := serverConfig[transforms.TransformAt].(string) + if !transformAtOk { + return senderConfig, nil + } + if transformAt == ip.Local { + schema := fmt.Sprintf(",%v ip", key) + if autoCreate == fmt.Sprintf("%v ip", key) { + autoCreate = "" + } else if index := strings.Index(autoCreate, schema); index != -1 { + autoCreate = autoCreate[:index] + autoCreate[index+len(schema):] } + senderConfig[sender.KeyPandoraAutoCreate] = autoCreate + return senderConfig, nil + } + + if autoCreate == "" { + senderConfig[sender.KeyPandoraAutoCreate] = fmt.Sprintf("%s %s", key, TypeIP) + return senderConfig, nil } - return senderConfig + senderConfig[sender.KeyPandoraAutoCreate] += fmt.Sprintf(",%s %s", key, TypeIP) + return senderConfig, nil } diff --git a/mgr/runner_test.go b/mgr/runner_test.go index b616fbba8..2db8b3179 100644 --- a/mgr/runner_test.go +++ b/mgr/runner_test.go @@ -26,7 +26,9 @@ import ( _ "github.com/qiniu/logkit/sender/builtin" "github.com/qiniu/logkit/sender/mock" "github.com/qiniu/logkit/sender/pandora" + "github.com/qiniu/logkit/transforms" _ "github.com/qiniu/logkit/transforms/builtin" + "github.com/qiniu/logkit/transforms/ip" . "github.com/qiniu/logkit/utils/models" ) @@ -1896,6 +1898,7 @@ DONE: break DONE default: dft++ + } time.Sleep(50 * time.Millisecond) if dft > 60 { @@ -1903,5 +1906,63 @@ DONE: } } assert.Equal(t, 1, ret) +} + +func Test_setSenderConfig(t *testing.T) { + senderConfig := conf.MapConf{ + sender.KeySenderType: sender.TypePandora, + } + + serverConfigs := []map[string]interface{}{ + { + transforms.KeyType: ip.Name, + transforms.TransformAt: ip.Server, + }, + } + actualConfig, err := setSenderConfig(senderConfig, serverConfigs) + assert.NoError(t, err) + assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate]) + + serverConfigs = []map[string]interface{}{ + { + transforms.KeyType: ip.Name, + transforms.TransformAt: ip.Server, + "key": "ip", + }, + } + actualConfig, err = setSenderConfig(senderConfig, serverConfigs) + assert.NoError(t, err) + assert.Equal(t, "ip ip", actualConfig[sender.KeyPandoraAutoCreate]) + senderConfig = conf.MapConf{ + sender.KeySenderType: sender.TypePandora, + } + serverConfigs = []map[string]interface{}{ + { + transforms.KeyType: ip.Name, + transforms.TransformAt: ip.Local, + }, + } + actualConfig, err = setSenderConfig(senderConfig, serverConfigs) + assert.NoError(t, err) + assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate]) + + serverConfigs = []map[string]interface{}{ + { + transforms.KeyType: "other", + }, + } + actualConfig, err = setSenderConfig(senderConfig, serverConfigs) + assert.NoError(t, err) + assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate]) + + serverConfigs = []map[string]interface{}{ + { + transforms.KeyType: ip.Name, + transforms.TransformAt: ip.Server, + "key": "ip.ip", + }, + } + actualConfig, err = setSenderConfig(senderConfig, serverConfigs) + assert.Error(t, err) } diff --git a/sender/pandora/pandora.go b/sender/pandora/pandora.go index 57992886f..4bf5e41d3 100644 --- a/sender/pandora/pandora.go +++ b/sender/pandora/pandora.go @@ -48,7 +48,6 @@ type Sender struct { microsecondCounter uint64 extraInfo map[string]string sendType string - EnbleServerIp bool } // UserSchema was parsed pandora schema from user's raw schema @@ -497,7 +496,6 @@ func newPandoraSender(opt *PandoraOption) (s *Sender, err error) { log.Errorf("Runner[%v] Sender[%v]: auto create pandora repo error: %v, you can create on pandora portal, ignored...", opt.runnerName, opt.name, err) err = nil } - log.Infof("`````````````````````````````dsl: %v, schemas: %v, opt.autoCreate: %v", dsl, schemas, opt.autoCreate) if initErr := s.client.InitOrUpdateWorkflow(&pipeline.InitOrUpdateWorkflowInput{ // 此处要的 schema 为 autoCreate 中用户指定的,所以 SchemaFree 要恒为 true InitOptionChange: true, diff --git a/transforms/ip/ip.go b/transforms/ip/ip.go index 94ef45e10..dcccd2eba 100644 --- a/transforms/ip/ip.go +++ b/transforms/ip/ip.go @@ -20,6 +20,9 @@ const ( Latitude = "Latitude" Longitude = "Longitude" DistrictCode = "DistrictCode" + + Local = "本地" + Server = "服务端" ) var ( @@ -33,7 +36,7 @@ type Transformer struct { StageTime string `json:"stage"` Key string `json:"key"` DataPath string `json:"data_path"` - LocalEnable bool `json:"local_enable"` + TransformAt string `json:"transform_at"` KeyAsPrefix bool `json:"key_as_prefix"` Language string `json:"language"` @@ -54,13 +57,13 @@ type Transformer struct { } func (t *Transformer) Init() error { - if t.Key != "" { - t.LocalEnable = true + if t.TransformAt == "" { + t.TransformAt = Local } - - if !t.LocalEnable { + if t.TransformAt != Local { return nil } + if t.Language == "" { t.Language = "zh-CN" } @@ -100,9 +103,10 @@ func (_ *Transformer) RawTransform(datas []string) ([]string, error) { } func (t *Transformer) Transform(datas []Data) ([]Data, error) { - if !t.LocalEnable { + if t.TransformAt != Local { return datas, nil } + var err, fmtErr error errNum := 0 if t.loc == nil { @@ -232,62 +236,65 @@ func (_ *Transformer) SampleConfig() string { func (_ *Transformer) ConfigOptions() []Option { return []Option{ { - KeyName: LocalEnable, + KeyName: transforms.TransformAt, Element: Radio, ChooseOnly: true, - ChooseOptions: []interface{}{true, false}, - Default: false, + ChooseOptions: []interface{}{Local, Server}, + Default: Local, Required: true, DefaultNoUse: false, - Description: "使用本地解析", - Type: transforms.TransformTypeBoolean, + Description: "运行方式", + Type: transforms.TransformTypeString, + ToolTip: "本地运行使用客户自己的IP库,更为灵活。服务端运行固定使用七牛IP库,用户无需提供IP库", }, { - KeyName: "key", - ChooseOnly: false, - Default: "", - Required: true, - Placeholder: "my_field_keyname", - DefaultNoUse: true, - Description: "要进行Transform变化的键(key)", - ToolTip: "对该字段的值进行transform变换", - Type: transforms.TransformTypeString, - AdvanceDepend: LocalEnable, + KeyName: "key", + ChooseOnly: false, + Default: "", + Required: true, + Placeholder: "my_field_keyname", + DefaultNoUse: true, + Description: "要进行Transform变化的键(key)", + ToolTip: "对该字段的值进行transform变换, 服务端不支持嵌套(.)", + Type: transforms.TransformTypeString, }, { - KeyName: "data_path", - ChooseOnly: false, - Default: "", - Required: true, - Placeholder: "your/path/to/ip.dat(x)", - DefaultNoUse: true, - Description: "IP数据库路径(data_path)", - Type: transforms.TransformTypeString, - AdvanceDepend: LocalEnable, + KeyName: "data_path", + ChooseOnly: false, + Default: "", + Required: true, + Placeholder: "your/path/to/ip.dat(x)", + DefaultNoUse: true, + Description: "IP数据库路径(data_path)", + Type: transforms.TransformTypeString, + AdvanceDepend: transforms.TransformAt, + AdvanceDependValue: Local, }, { - KeyName: "key_as_prefix", - ChooseOnly: true, - ChooseOptions: []interface{}{false, true}, - Required: false, - Default: true, - DefaultNoUse: false, - Element: Checkbox, - Description: "字段名称作为前缀(key_as_prefix)", - Type: transforms.TransformTypeString, - AdvanceDepend: LocalEnable, + KeyName: "key_as_prefix", + ChooseOnly: true, + ChooseOptions: []interface{}{false, true}, + Required: false, + Default: true, + DefaultNoUse: false, + Element: Checkbox, + Description: "字段名称作为前缀(key_as_prefix)", + Type: transforms.TransformTypeString, + AdvanceDepend: transforms.TransformAt, + AdvanceDependValue: Local, }, { - KeyName: "language", - ChooseOnly: false, - Default: "zh-CN", - Required: true, - Placeholder: "zh-CN", - DefaultNoUse: true, - Description: "mmdb格式库使用的语种", - Advance: true, - Type: transforms.TransformTypeString, - AdvanceDepend: LocalEnable, + KeyName: "language", + ChooseOnly: false, + Default: "zh-CN", + Required: true, + Placeholder: "zh-CN", + DefaultNoUse: true, + Description: "mmdb格式库使用的语种", + Advance: true, + Type: transforms.TransformTypeString, + AdvanceDepend: transforms.TransformAt, + AdvanceDependValue: Local, }, } } @@ -314,8 +321,10 @@ func (t *Transformer) Close() error { func (t *Transformer) ServerConfig() map[string]interface{} { config := make(map[string]interface{}) - config[KeyType] = Name - config[LocalEnable] = t.LocalEnable + config[transforms.KeyType] = Name + config[transforms.TransformAt] = t.TransformAt + config["key"] = t.Key + return config } diff --git a/transforms/ip/ip_test.go b/transforms/ip/ip_test.go index f090a1c19..632738b1c 100644 --- a/transforms/ip/ip_test.go +++ b/transforms/ip/ip_test.go @@ -307,7 +307,7 @@ func Test_badData(t *testing.T) { ipt := &Transformer{ Key: "ip", DataPath: "./test_data/bad.dat", - LocalEnable: true, + TransformAt: Local, } _, err := ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}}) assert.Error(t, err) @@ -318,7 +318,7 @@ func Test_badData(t *testing.T) { ipt = &Transformer{ Key: "ip", DataPath: "./test_data/bad.datx", - LocalEnable: true, + TransformAt: Local, } _, err = ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}}) assert.Error(t, err) @@ -335,7 +335,7 @@ func Test_badData(t *testing.T) { ipt = &Transformer{ Key: "ip", DataPath: "./test_data/bad.datn", - LocalEnable: true, + TransformAt: Local, } _, err = ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}}) assert.Error(t, err) @@ -344,7 +344,7 @@ func Test_badData(t *testing.T) { ipt = &Transformer{ Key: "ip", DataPath: "./test_data/bad.mmdb", - LocalEnable: true, + TransformAt: Local, } _, err = ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}}) assert.Error(t, err) diff --git a/transforms/registry.go b/transforms/registry.go index 0cb1f10e6..7374e393f 100644 --- a/transforms/registry.go +++ b/transforms/registry.go @@ -4,6 +4,12 @@ import ( . "github.com/qiniu/logkit/utils/models" ) +const ( + KeyType = "type" + + TransformAt = "transform_at" +) + const ( TransformTypeString = "string" TransformTypeLong = "long" diff --git a/utils/models/models.go b/utils/models/models.go index 9565eab25..032b29d9b 100644 --- a/utils/models/models.go +++ b/utils/models/models.go @@ -36,8 +36,7 @@ const ( KeyPandoraStash = "pandora_stash" // 当只有一条数据且 sendError 时候,将其转化为 raw 发送到 pandora_stash 这个字段 KeyPandoraSeparateId = "pandora_separate_id" // 当一条数据大于2M且 sendError 时候,将其切片,切片记录到 pandora_separate_id 这个字段 - KeyIP = "ip" // schema ip - KeyType = "type" + TypeIP = "ip" // schema ip SchemaFreeTokensPrefix = "schema_free_tokens_" LogDBTokensPrefix = "logdb_tokens_" @@ -61,8 +60,6 @@ const ( Checkbox = "checkbox" Radio = "radio" InputNumber = "inputNumber" - - LocalEnable = "local_enable" ) var ( @@ -73,22 +70,23 @@ var ( ) type Option struct { - KeyName string - ChooseOnly bool - Element string // 前端显示类型 - ChooseOptions []interface{} - Default interface{} - DefaultNoUse bool // 是否使用默认值,true为不使用默认值,false为使用默认值 - Description string - CheckRegex string - Style string `json:"style"` - Required bool `json:"required"` // 是否必填 - Placeholder string `json:"placeholder"` - Type string `json:"Type,omitempty"` - Secret bool - Advance bool `json:"advance,omitempty"` - AdvanceDepend string `json:"advance_depend,omitempty"` - ToolTip string `json:"tooltip,omitempty"` // 该选项说明 + KeyName string + ChooseOnly bool + Element string // 前端显示类型 + ChooseOptions []interface{} + Default interface{} + DefaultNoUse bool // 是否使用默认值,true为不使用默认值,false为使用默认值 + Description string + CheckRegex string + Style string `json:"style"` + Required bool `json:"required"` // 是否必填 + Placeholder string `json:"placeholder"` + Type string `json:"Type,omitempty"` + Secret bool + Advance bool `json:"advance,omitempty"` + AdvanceDepend string `json:"advance_depend,omitempty"` + AdvanceDependValue interface{} `json:"advance_depend_value,omitempty"` + ToolTip string `json:"tooltip,omitempty"` // 该选项说明 } type KeyValue struct { diff --git a/vendor/github.com/qiniu/pandora-go-sdk/pipeline/schemafree.go b/vendor/github.com/qiniu/pandora-go-sdk/pipeline/schemafree.go index 8a5b3e1b0..a25d56b8d 100644 --- a/vendor/github.com/qiniu/pandora-go-sdk/pipeline/schemafree.go +++ b/vendor/github.com/qiniu/pandora-go-sdk/pipeline/schemafree.go @@ -710,7 +710,6 @@ func (c *Pipeline) getOrCreateWorkflow(input *InitOrUpdateWorkflowInput, ns *boo if input.Description != nil { createWorkflowInput.Comment = *input.Description } - log.Infof("`````````````````createWorkflowInput: %v", createWorkflowInput) if err = c.CreateWorkflow(createWorkflowInput); err != nil && reqerr.IsExistError(err) { workflow, err = c.GetWorkflow(&GetWorkflowInput{ WorkflowName: input.WorkflowName, diff --git a/vendor/vendor.json b/vendor/vendor.json index 9215cfdda..778f77691 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -471,38 +471,38 @@ { "checksumSHA1": "0JEyusBC8nfE7dF2sKLylHTh93Y=", "path": "github.com/qiniu/pandora-go-sdk/base", - "revision": "ac6de8a2a2ebc0a6d571d14344e6f6c75dba431c", - "revisionTime": "2018-08-28T06:40:15Z" + "revision": "e65f67ba741f939567a66989aeefe81112735b15", + "revisionTime": "2018-08-28T09:09:24Z" }, { "checksumSHA1": "7t2Z+kAE+w1wEigJqyPp4yjDWYA=", "path": "github.com/qiniu/pandora-go-sdk/base/config", - "revision": "ac6de8a2a2ebc0a6d571d14344e6f6c75dba431c", - "revisionTime": "2018-08-28T06:40:15Z" + "revision": "e65f67ba741f939567a66989aeefe81112735b15", + "revisionTime": "2018-08-28T09:09:24Z" }, { "checksumSHA1": "k5JfcSQ5YsAFd6DSHpSeuNWbql8=", "path": "github.com/qiniu/pandora-go-sdk/base/models", - "revision": "ac6de8a2a2ebc0a6d571d14344e6f6c75dba431c", - "revisionTime": "2018-08-28T06:40:15Z" + "revision": "e65f67ba741f939567a66989aeefe81112735b15", + "revisionTime": "2018-08-28T09:09:24Z" }, { "checksumSHA1": "lV2zb3SZ4BKaa1XpC4Q3ktbVgDo=", "path": "github.com/qiniu/pandora-go-sdk/base/ratelimit", - "revision": "ac6de8a2a2ebc0a6d571d14344e6f6c75dba431c", - "revisionTime": "2018-08-28T06:40:15Z" + "revision": "e65f67ba741f939567a66989aeefe81112735b15", + "revisionTime": "2018-08-28T09:09:24Z" }, { "checksumSHA1": "sLeqqUJX9pa++wH3PtRSVbaCejs=", "path": "github.com/qiniu/pandora-go-sdk/base/reqerr", - "revision": "ac6de8a2a2ebc0a6d571d14344e6f6c75dba431c", - "revisionTime": "2018-08-28T06:40:15Z" + "revision": "e65f67ba741f939567a66989aeefe81112735b15", + "revisionTime": "2018-08-28T09:09:24Z", }, { "checksumSHA1": "oEpRonb6KY/u9OWNOxSEjyyqqXk=", "path": "github.com/qiniu/pandora-go-sdk/base/request", - "revision": "ac6de8a2a2ebc0a6d571d14344e6f6c75dba431c", - "revisionTime": "2018-08-28T06:40:15Z" + "revision": "e65f67ba741f939567a66989aeefe81112735b15", + "revisionTime": "2018-08-28T09:09:24Z" }, { "checksumSHA1": "Hoo04OF4fQYRDb6iBAP/gZ/Z7Q4=", @@ -513,14 +513,20 @@ { "checksumSHA1": "cbRliv4W/RdDpuUy7tVfQAhGl6c=", "path": "github.com/qiniu/pandora-go-sdk/pipeline", - "revision": "ac6de8a2a2ebc0a6d571d14344e6f6c75dba431c", - "revisionTime": "2018-08-28T06:40:15Z" + "revision": "e65f67ba741f939567a66989aeefe81112735b15", + "revisionTime": "2018-08-28T09:09:24Z" + }, + { + "checksumSHA1": "J7sL+KL0K9HbjhsOg/+wm4nciDM=", + "path": "github.com/qiniu/pandora-go-sdk/pipeline", + "revision": "e65f67ba741f939567a66989aeefe81112735b15", + "revisionTime": "2018-08-28T09:09:24Z" }, { "checksumSHA1": "YeSUJIE3zLnpjK3g161wiMeIbmk=", "path": "github.com/qiniu/pandora-go-sdk/tsdb", - "revision": "ac6de8a2a2ebc0a6d571d14344e6f6c75dba431c", - "revisionTime": "2018-08-28T06:40:15Z" + "revision": "e65f67ba741f939567a66989aeefe81112735b15", + "revisionTime": "2018-08-28T09:09:24Z" }, { "checksumSHA1": "KAzbLjI9MzW2tjfcAsK75lVRp6I=",