Skip to content

Commit

Permalink
update pandora go sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
李红 authored and 李红 committed Aug 28, 2018
1 parent 8680cbc commit 91da253
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 128 deletions.
11 changes: 4 additions & 7 deletions mgr/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,21 +378,18 @@ func checkSampleData(sampleData []string, logParser parser.Parser) ([]string, er
}

func getTransformerCreator(transformerConfig map[string]interface{}) (transforms.Creator, error) {
transformKeyType, ok := transformerConfig[KeyType]
transformKeyType, ok := transformerConfig[transforms.KeyType]
if !ok {
err := fmt.Errorf("missing param %s", KeyType)
return nil, err
return nil, fmt.Errorf("missing param %s", transforms.KeyType)
}
transformKeyTypeStr, ok := transformKeyType.(string)
if !ok {
err := fmt.Errorf("param %s must be of type string", KeyType)
return nil, err
return nil, fmt.Errorf("param %s must be of type string", transforms.KeyType)
}

create, ok := transforms.Transformers[transformKeyTypeStr]
if !ok {
err := fmt.Errorf("transformer of type %v not exist", transformKeyTypeStr)
return nil, err
return nil, fmt.Errorf("transformer of type %v not exist", transformKeyTypeStr)
}
return create, nil
}
Expand Down
76 changes: 50 additions & 26 deletions mgr/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/qiniu/logkit/sender"
_ "github.com/qiniu/logkit/sender/builtin"
"github.com/qiniu/logkit/transforms"
"github.com/qiniu/logkit/transforms/ip"
. "github.com/qiniu/logkit/utils/models"
)

Expand Down Expand Up @@ -291,15 +292,17 @@ func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, r
senderConfig[sender.KeyPandoraDescription] = LogkitAutoCreateDescription
}
}
if senderConfig[sender.KeySenderType] == sender.TypePandora {
senderConfig = setSenderConfig(senderConfig, serverConfigs)
senderConfig, err := setSenderConfig(senderConfig, serverConfigs)
if err != nil {
return nil, err
}
s, err := sr.NewSender(senderConfig, meta.FtSaveLogPath())
if err != nil {
return nil, err
}
senders = append(senders, s)
delete(rc.SendersConfig[i], sender.InnerUserAgent)
delete(rc.SendersConfig[i], sender.KeyPandoraDescription)
}

senderCnt := len(senders)
Expand All @@ -314,7 +317,7 @@ func createTransformers(rc RunnerConfig) ([]transforms.Transformer, error) {
transformers := make([]transforms.Transformer, 0)
for idx := range rc.Transforms {
tConf := rc.Transforms[idx]
tp := tConf[KeyType]
tp := tConf[transforms.KeyType]
if tp == nil {
return nil, fmt.Errorf("transformer config type is empty %v", tConf)
}
Expand Down Expand Up @@ -1296,38 +1299,59 @@ func MergeExtraInfoTags(meta *reader.Meta, tags map[string]interface{}) map[stri
return tags
}

func setSenderConfig(senderConfig conf.MapConf, serverConfigs []map[string]interface{}) conf.MapConf {
func setSenderConfig(senderConfig conf.MapConf, serverConfigs []map[string]interface{}) (conf.MapConf, error) {
if senderConfig[sender.KeySenderType] != sender.TypePandora {
return senderConfig, nil
}

var err error
for _, serverConfig := range serverConfigs {
keyType, ok := serverConfig[KeyType].(string)
if !ok || keyType != KeyIP {
continue
}
localEnable, ok := serverConfig[LocalEnable].(bool)
keyType, ok := serverConfig[transforms.KeyType].(string)
if !ok {
continue
}

autoCreate := senderConfig[sender.KeyPandoraAutoCreate]
if localEnable {
schema := fmt.Sprintf(",%v ip", KeyIP)
if autoCreate == fmt.Sprintf("%v ip", KeyIP) {
autoCreate = ""
} else if index := strings.Index(autoCreate, schema); index != -1 {
autoCreate = autoCreate[:index] + autoCreate[index+len(schema):]
switch keyType {
case ip.Name:
if senderConfig, err = setIPConfig(senderConfig, serverConfig); err != nil {
return senderConfig, err
}
senderConfig[sender.KeyPandoraAutoCreate] = autoCreate
continue
}

if autoCreate == "" {
senderConfig[sender.KeyPandoraAutoCreate] = fmt.Sprintf("%v ip", KeyIP)
continue
}
}

return senderConfig, nil
}

func setIPConfig(senderConfig conf.MapConf, serverConfig map[string]interface{}) (conf.MapConf, error) {
key, keyOk := serverConfig["key"].(string)
if !keyOk {
return senderConfig, nil
}

if !strings.Contains(autoCreate, KeyIP) {
senderConfig[sender.KeyPandoraAutoCreate] += fmt.Sprintf(",%v ip", KeyIP)
if len(GetKeys(key)) > 1 {
return senderConfig, fmt.Errorf("key: %v ip transform key in server doesn't support dot(.)", key)
}
autoCreate := senderConfig[sender.KeyPandoraAutoCreate]
transformAt, transformAtOk := serverConfig[transforms.TransformAt].(string)
if !transformAtOk {
return senderConfig, nil
}
if transformAt == ip.Local {
schema := fmt.Sprintf(",%v ip", key)
if autoCreate == fmt.Sprintf("%v ip", key) {
autoCreate = ""
} else if index := strings.Index(autoCreate, schema); index != -1 {
autoCreate = autoCreate[:index] + autoCreate[index+len(schema):]
}
senderConfig[sender.KeyPandoraAutoCreate] = autoCreate
return senderConfig, nil
}

if autoCreate == "" {
senderConfig[sender.KeyPandoraAutoCreate] = fmt.Sprintf("%s %s", key, TypeIP)
return senderConfig, nil
}

return senderConfig
senderConfig[sender.KeyPandoraAutoCreate] += fmt.Sprintf(",%s %s", key, TypeIP)
return senderConfig, nil
}
61 changes: 61 additions & 0 deletions mgr/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
_ "github.com/qiniu/logkit/sender/builtin"
"github.com/qiniu/logkit/sender/mock"
"github.com/qiniu/logkit/sender/pandora"
"github.com/qiniu/logkit/transforms"
_ "github.com/qiniu/logkit/transforms/builtin"
"github.com/qiniu/logkit/transforms/ip"
. "github.com/qiniu/logkit/utils/models"
)

Expand Down Expand Up @@ -1896,12 +1898,71 @@ DONE:
break DONE
default:
dft++

}
time.Sleep(50 * time.Millisecond)
if dft > 60 {
break
}
}
assert.Equal(t, 1, ret)
}

func Test_setSenderConfig(t *testing.T) {
senderConfig := conf.MapConf{
sender.KeySenderType: sender.TypePandora,
}

serverConfigs := []map[string]interface{}{
{
transforms.KeyType: ip.Name,
transforms.TransformAt: ip.Server,
},
}
actualConfig, err := setSenderConfig(senderConfig, serverConfigs)
assert.NoError(t, err)
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])

serverConfigs = []map[string]interface{}{
{
transforms.KeyType: ip.Name,
transforms.TransformAt: ip.Server,
"key": "ip",
},
}
actualConfig, err = setSenderConfig(senderConfig, serverConfigs)
assert.NoError(t, err)
assert.Equal(t, "ip ip", actualConfig[sender.KeyPandoraAutoCreate])

senderConfig = conf.MapConf{
sender.KeySenderType: sender.TypePandora,
}
serverConfigs = []map[string]interface{}{
{
transforms.KeyType: ip.Name,
transforms.TransformAt: ip.Local,
},
}
actualConfig, err = setSenderConfig(senderConfig, serverConfigs)
assert.NoError(t, err)
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])

serverConfigs = []map[string]interface{}{
{
transforms.KeyType: "other",
},
}
actualConfig, err = setSenderConfig(senderConfig, serverConfigs)
assert.NoError(t, err)
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])

serverConfigs = []map[string]interface{}{
{
transforms.KeyType: ip.Name,
transforms.TransformAt: ip.Server,
"key": "ip.ip",
},
}
actualConfig, err = setSenderConfig(senderConfig, serverConfigs)
assert.Error(t, err)
}
2 changes: 0 additions & 2 deletions sender/pandora/pandora.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type Sender struct {
microsecondCounter uint64
extraInfo map[string]string
sendType string
EnbleServerIp bool
}

// UserSchema was parsed pandora schema from user's raw schema
Expand Down Expand Up @@ -497,7 +496,6 @@ func newPandoraSender(opt *PandoraOption) (s *Sender, err error) {
log.Errorf("Runner[%v] Sender[%v]: auto create pandora repo error: %v, you can create on pandora portal, ignored...", opt.runnerName, opt.name, err)
err = nil
}
log.Infof("`````````````````````````````dsl: %v, schemas: %v, opt.autoCreate: %v", dsl, schemas, opt.autoCreate)
if initErr := s.client.InitOrUpdateWorkflow(&pipeline.InitOrUpdateWorkflowInput{
// 此处要的 schema 为 autoCreate 中用户指定的,所以 SchemaFree 要恒为 true
InitOptionChange: true,
Expand Down
Loading

0 comments on commit 91da253

Please sign in to comment.