From 19a9546c5acfcd206463307f82047a48bbda64b4 Mon Sep 17 00:00:00 2001 From: wonderflow Date: Sun, 22 Jul 2018 14:49:29 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=BC=98=E5=8C=96ip=20transformer=20?= =?UTF-8?q?=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- reader/utils_test.go | 17 +-- sender/pandora/pandora.go | 4 +- transforms/ip/ip.go | 127 ++++++++++++---- transforms/ip/ip_test.go | 31 ++++ transforms/mutate/pandorakey_convert.go | 10 +- transforms/mutate/pandorakey_convert_test.go | 20 +++ utils/models/utils.go | 95 ++++++++---- utils/models/utils_test.go | 147 ++++++++++++++++++- 8 files changed, 383 insertions(+), 68 deletions(-) diff --git a/reader/utils_test.go b/reader/utils_test.go index f11f576fc..3eabf64d2 100644 --- a/reader/utils_test.go +++ b/reader/utils_test.go @@ -136,11 +136,11 @@ func TestGetTags(t *testing.T) { assert.Equal(t, exp, tags) } -func TestSetMapValueWithPrefix(t *testing.T) { +func TestSetMapValueExistWithPrefix(t *testing.T) { data1 := map[string]interface{}{ "a": "b", } - err1 := SetMapValueWithPrefix(data1, "newVal", "prefix", false, "a") + err1 := SetMapValueExistWithPrefix(data1, "newVal", "prefix", "a") assert.NoError(t, err1) exp1 := map[string]interface{}{ "a": "b", @@ -154,7 +154,7 @@ func TestSetMapValueWithPrefix(t *testing.T) { "age": 45, }, } - err2 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"a", "name"}...) + err2 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"a", "name"}...) assert.NoError(t, err2) exp2 := map[string]interface{}{ "a": map[string]interface{}{ @@ -165,10 +165,10 @@ func TestSetMapValueWithPrefix(t *testing.T) { } assert.Equal(t, exp2, data2) - err3 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"xy", "name"}...) + err3 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"xy", "name"}...) assert.Error(t, err3) - err4 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"a", "hello"}...) + err4 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"a", "hello"}...) assert.NoError(t, err4) exp4 := map[string]interface{}{ "a": map[string]interface{}{ @@ -180,11 +180,4 @@ func TestSetMapValueWithPrefix(t *testing.T) { } assert.Equal(t, exp4, data2) - data5 := map[string]interface{}{} - err5 := SetMapValueWithPrefix(data5, "newVal", "prefix", true, "a") - assert.NoError(t, err5) - exp5 := map[string]interface{}{ - "prefix_a": "newVal", - } - assert.Equal(t, exp5, data5) } diff --git a/sender/pandora/pandora.go b/sender/pandora/pandora.go index 13b6b665f..4b0d4f8b2 100644 --- a/sender/pandora/pandora.go +++ b/sender/pandora/pandora.go @@ -48,6 +48,7 @@ type Sender struct { microsecondCounter uint64 extraInfo map[string]string sendType string + keyCache map[string]KeyInfo } // UserSchema was parsed pandora schema from user's raw schema @@ -431,6 +432,7 @@ func newPandoraSender(opt *PandoraOption) (s *Sender, err error) { schemas: make(map[string]pipeline.RepoSchemaEntry), extraInfo: utilsos.GetExtraInfo(), sendType: opt.sendType, + keyCache: make(map[string]KeyInfo), } expandAttr := make([]string, 0) @@ -851,7 +853,7 @@ func (s *Sender) Send(datas []Data) (se error) { return s.rawSend(datas) default: for i, v := range datas { - datas[i] = DeepConvertKey(v) + datas[i] = DeepConvertKeyWithCache(v, s.keyCache) } return s.schemaFreeSend(datas) } diff --git a/transforms/ip/ip.go b/transforms/ip/ip.go index bcbe38317..b4dea7072 100644 --- a/transforms/ip/ip.go +++ b/transforms/ip/ip.go @@ -33,8 +33,18 @@ type Transformer struct { DataPath string `json:"data_path"` KeyAsPrefix bool `json:"key_as_prefix"` - loc Locator - stats StatsInfo + loc Locator + keys []string + lastEleKey string + keysRegion []string + keysCity []string + keysCountry []string + keysIsp []string + keysCountryCode []string + keysLatitude []string + keysLongitude []string + keysDistrictCode []string + stats StatsInfo } func (t *Transformer) Init() error { @@ -43,9 +53,32 @@ func (t *Transformer) Init() error { return fmt.Errorf("new locator: %v", err) } t.loc = loc + t.keys = GetKeys(t.Key) + + newKeys := make([]string, len(t.keys)) + copy(newKeys, t.keys) + t.lastEleKey = t.keys[len(t.keys)-1] + t.keysRegion = generateKeys(t.keys, Region, t.KeyAsPrefix) + t.keysCity = generateKeys(t.keys, City, t.KeyAsPrefix) + t.keysCountry = generateKeys(t.keys, Country, t.KeyAsPrefix) + t.keysIsp = generateKeys(t.keys, Isp, t.KeyAsPrefix) + t.keysCountryCode = generateKeys(t.keys, CountryCode, t.KeyAsPrefix) + t.keysLatitude = generateKeys(t.keys, Latitude, t.KeyAsPrefix) + t.keysLongitude = generateKeys(t.keys, Longitude, t.KeyAsPrefix) + t.keysDistrictCode = generateKeys(t.keys, DistrictCode, t.KeyAsPrefix) return nil } +func generateKeys(keys []string, lastEle string, keyAsPrefix bool) []string { + newKeys := make([]string, len(keys)) + copy(newKeys, keys) + if keyAsPrefix { + lastEle = keys[len(keys)-1] + "_" + lastEle + } + newKeys[len(keys)-1] = lastEle + return newKeys +} + func (_ *Transformer) RawTransform(datas []string) ([]string, error) { return datas, errors.New("IP transformer not support rawTransform") } @@ -54,18 +87,15 @@ func (t *Transformer) Transform(datas []Data) ([]Data, error) { var err, fmtErr error errNum := 0 if t.loc == nil { - loc, err := NewLocator(t.DataPath) + err := t.Init() if err != nil { - t.stats, _ = transforms.SetStatsInfo(err, t.stats, int64(errNum), int64(len(datas)), t.Type()) return datas, err } - t.loc = loc } - keys := GetKeys(t.Key) - newKeys := make([]string, len(keys)) + newKeys := make([]string, len(t.keys)) for i := range datas { - copy(newKeys, keys) - val, getErr := GetMapValue(datas[i], keys...) + copy(newKeys, t.keys) + val, getErr := GetMapValue(datas[i], t.keys...) if getErr != nil { errNum, err = transforms.SetError(errNum, getErr, transforms.GetErr, t.Key) continue @@ -81,29 +111,45 @@ func (t *Transformer) Transform(datas []Data) ([]Data, error) { errNum, err = transforms.SetError(errNum, findErr, transforms.General, "") continue } - newKeys[len(newKeys)-1] = Region - SetMapValueWithPrefix(datas[i], info.Region, keys[len(keys)-1], t.KeyAsPrefix, newKeys...) - newKeys[len(newKeys)-1] = City - SetMapValueWithPrefix(datas[i], info.City, keys[len(keys)-1], t.KeyAsPrefix, newKeys...) - newKeys[len(newKeys)-1] = Country - SetMapValueWithPrefix(datas[i], info.Country, keys[len(keys)-1], t.KeyAsPrefix, newKeys...) - newKeys[len(newKeys)-1] = Isp - SetMapValueWithPrefix(datas[i], info.Isp, keys[len(keys)-1], t.KeyAsPrefix, newKeys...) + findErr = t.SetMapValue(datas[i], info.Region, t.keysRegion...) + if findErr != nil { + errNum, err = transforms.SetError(errNum, findErr, transforms.General, "") + } + findErr = t.SetMapValue(datas[i], info.City, t.keysCity...) + if findErr != nil { + errNum, err = transforms.SetError(errNum, findErr, transforms.General, "") + } + findErr = t.SetMapValue(datas[i], info.Country, t.keysCountry...) + if findErr != nil { + errNum, err = transforms.SetError(errNum, findErr, transforms.General, "") + } + findErr = t.SetMapValue(datas[i], info.Isp, t.keysIsp...) + if findErr != nil { + errNum, err = transforms.SetError(errNum, findErr, transforms.General, "") + } if info.CountryCode != "" { - newKeys[len(newKeys)-1] = CountryCode - SetMapValueWithPrefix(datas[i], info.CountryCode, keys[len(keys)-1], t.KeyAsPrefix, newKeys...) + findErr = t.SetMapValue(datas[i], info.CountryCode, t.keysCountryCode...) + if findErr != nil { + errNum, err = transforms.SetError(errNum, findErr, transforms.General, "") + } } if info.Latitude != "" { - newKeys[len(newKeys)-1] = Latitude - SetMapValueWithPrefix(datas[i], info.Latitude, keys[len(keys)-1], t.KeyAsPrefix, newKeys...) + findErr = t.SetMapValue(datas[i], info.Latitude, t.keysLatitude...) + if findErr != nil { + errNum, err = transforms.SetError(errNum, findErr, transforms.General, "") + } } if info.Longitude != "" { - newKeys[len(newKeys)-1] = Longitude - SetMapValueWithPrefix(datas[i], info.Longitude, keys[len(keys)-1], t.KeyAsPrefix, newKeys...) + findErr = t.SetMapValue(datas[i], info.Longitude, t.keysLongitude...) + if findErr != nil { + errNum, err = transforms.SetError(errNum, findErr, transforms.General, "") + } } if info.DistrictCode != "" { - newKeys[len(newKeys)-1] = DistrictCode - SetMapValueWithPrefix(datas[i], info.DistrictCode, keys[len(keys)-1], t.KeyAsPrefix, newKeys...) + findErr = t.SetMapValue(datas[i], info.DistrictCode, t.keysDistrictCode...) + if findErr != nil { + errNum, err = transforms.SetError(errNum, findErr, transforms.General, "") + } } } @@ -111,6 +157,37 @@ func (t *Transformer) Transform(datas []Data) ([]Data, error) { return datas, fmtErr } +//通过层级key设置value值, 如果keys不存在则不加前缀,否则加前缀 +func (t *Transformer) SetMapValue(m map[string]interface{}, val interface{}, keys ...string) error { + if len(keys) == 0 { + return nil + } + var curr map[string]interface{} + curr = m + for _, k := range keys[0 : len(keys)-1] { + finalVal, ok := curr[k] + if !ok { + n := make(map[string]interface{}) + curr[k] = n + curr = n + continue + } + //判断val是否为map[string]interface{}类型 + if curr, ok = finalVal.(map[string]interface{}); ok { + continue + } + return fmt.Errorf("SetMapValueWithPrefix failed, %v is not the type of map[string]interface{}", keys) + } + //判断val(k)是否存在 + _, exist := curr[keys[len(keys)-1]] + if exist { + curr[t.lastEleKey+"_"+keys[len(keys)-1]] = val + } else { + curr[keys[len(keys)-1]] = val + } + return nil +} + func (_ *Transformer) Description() string { //return "transform ip to country region and isp" return "获取IP的区域、国家、城市和运营商信息" diff --git a/transforms/ip/ip_test.go b/transforms/ip/ip_test.go index b256f14eb..623f1f0d1 100644 --- a/transforms/ip/ip_test.go +++ b/transforms/ip/ip_test.go @@ -16,6 +16,7 @@ func TestTransformer(t *testing.T) { Key: "ip", DataPath: "./test_data/17monipdb.dat", } + assert.Nil(t, ipt.Init()) data, err := ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}}) assert.Error(t, err) exp := []Data{{ @@ -65,6 +66,7 @@ func TestTransformer(t *testing.T) { Key: "multi.ip", DataPath: "./test_data/17monipdb.dat", } + assert.Nil(t, ipt.Init()) data2, err2 := ipt2.Transform([]Data{{"multi": map[string]interface{}{"ip": "111.2.3.4"}}, {"multi": map[string]interface{}{"ip": "x.x.x.x"}}}) assert.Error(t, err2) exp2 := []Data{{ @@ -173,6 +175,35 @@ func TestTransformer(t *testing.T) { assert.Len(t, locatorStore.locators, 2) } +var dttest []Data + +//old: 1000000 1152 ns/op 432 B/op 16 allocs/op +//new: 2000000 621 ns/op 232 B/op 7 allocs/op +func BenchmarkIpTrans(b *testing.B) { + b.ReportAllocs() + ipt4 := &Transformer{ + Key: "multi.ip2", + DataPath: "./test_data/17monipdb.dat", + KeyAsPrefix: true, + } + ipt4.Init() + data := []Data{ + { + "multi": map[string]interface{}{ + "ip": "111.2.3.4", + "Region": "浙江", + "City": "宁波", + "Country": "中国", + "Isp": "N/A", + "ip2": "183.251.28.250", + }, + }, + } + for i := 0; i < b.N; i++ { + dttest, _ = ipt4.Transform(data) + } +} + func Test_badData(t *testing.T) { ipt := &Transformer{ Key: "ip", diff --git a/transforms/mutate/pandorakey_convert.go b/transforms/mutate/pandorakey_convert.go index 4fcdd5500..319893dff 100644 --- a/transforms/mutate/pandorakey_convert.go +++ b/transforms/mutate/pandorakey_convert.go @@ -14,15 +14,21 @@ var ( type PandoraKeyConvert struct { stats StatsInfo + cache map[string]KeyInfo } +func (g *PandoraKeyConvert) Init() error { + g.cache = make(map[string]KeyInfo) + return nil +} func (g *PandoraKeyConvert) RawTransform(datas []string) ([]string, error) { return datas, errors.New("pandora_key_convert transformer not support rawTransform") } func (g *PandoraKeyConvert) Transform(datas []Data) ([]Data, error) { for i, v := range datas { - datas[i] = DeepConvertKey(v) + datas[i] = DeepConvertKeyWithCache(v, g.cache) + //datas[i] = DeepConvertKey(v) } g.stats, _ = transforms.SetStatsInfo(nil, g.stats, 0, int64(len(datas)), g.Type()) @@ -63,6 +69,6 @@ func (g *PandoraKeyConvert) SetStats(err string) StatsInfo { func init() { transforms.Add("pandora_key_convert", func() transforms.Transformer { - return &PandoraKeyConvert{} + return &PandoraKeyConvert{cache: make(map[string]KeyInfo)} }) } diff --git a/transforms/mutate/pandorakey_convert_test.go b/transforms/mutate/pandorakey_convert_test.go index be3383544..a25ee695f 100644 --- a/transforms/mutate/pandorakey_convert_test.go +++ b/transforms/mutate/pandorakey_convert_test.go @@ -26,3 +26,23 @@ func TestDeepconvertkey(t *testing.T) { exp = []Data{{"ts_ts2": map[string]interface{}{"K200": 1, "a_xs_1": 2}}} assert.Equal(t, exp, got) } + +var got []Data + +//old(没有cache):500000 2846 ns/op 2536 B/op 33 allocs/op +//new(cache): 500000 2249 ns/op 2392 B/op 17 allocs/op +func BenchmarkCache(b *testing.B) { + pandoraConvert := &PandoraKeyConvert{cache: make(map[string]KeyInfo)} + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + data := []Data{{"ts。ts2": "stamp1"}, {"ts-tes2/1.2": "stamp2"}} + got, _ = pandoraConvert.Transform(data) + + data = []Data{{"ts。ts2": map[string]interface{}{"_xs1_2s.xs.1": 1, "a.xs.1": 2}}, {"ts- ": "stamp2"}} + got, _ = pandoraConvert.Transform(data) + + data = []Data{{"ts。ts2": map[string]interface{}{"200": 1, "a.xs.1": 2}}} + got, _ = pandoraConvert.Transform(data) + } +} diff --git a/utils/models/utils.go b/utils/models/utils.go index a1bbd4d26..30dd4c76a 100644 --- a/utils/models/utils.go +++ b/utils/models/utils.go @@ -425,31 +425,30 @@ func SetMapValue(m map[string]interface{}, val interface{}, coercive bool, keys return nil } -//通过层级key设置value值, 如果keys不存在则不加前缀,否则加前缀,forceSet为true时无论原来的值存不存在,都加前缀. -func SetMapValueWithPrefix(m map[string]interface{}, val interface{}, prefix string, forceAdd bool, keys ...string) error { +//通过层级key设置value值, 如果keys不存在则不加前缀,否则加前缀 +func SetMapValueExistWithPrefix(m map[string]interface{}, val interface{}, prefix string, keys ...string) error { if len(keys) == 0 { return nil } var curr map[string]interface{} curr = m - var exist bool - for i, k := range keys { - if i < len(keys)-1 { - finalVal, ok := curr[k] - if !ok { - return fmt.Errorf("SetMapValueWithPrefix failed, keys %v are non-existent", val) - } - //判断val是否为map[string]interface{}类型 - if curr, ok = finalVal.(map[string]interface{}); ok { - continue - } - return fmt.Errorf("SetMapValueWithPrefix failed, %v is not the type of map[string]interface{}", keys) + for _, k := range keys[0 : len(keys)-1] { + finalVal, ok := curr[k] + if !ok { + n := make(map[string]interface{}) + curr[k] = n + curr = n + continue } - - //判断val(k)是否存在 - _, exist = curr[k] + //判断val是否为map[string]interface{}类型 + if curr, ok = finalVal.(map[string]interface{}); ok { + continue + } + return fmt.Errorf("SetMapValueWithPrefix failed, %v is not the type of map[string]interface{}", keys) } - if exist || forceAdd { + //判断val(k)是否存在 + _, exist := curr[keys[len(keys)-1]] + if exist { curr[prefix+"_"+keys[len(keys)-1]] = val } else { curr[keys[len(keys)-1]] = val @@ -752,6 +751,17 @@ func GetMapList(data string) map[string]string { return ret } +//为了提升性能做的一个预先检查,避免CPU浪费 +func CheckPandoraKey(key string) bool { + for _, c := range key { + if (c >= '0' && c <= '9') || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') { + continue + } + return false + } + return true +} + // 判断时只有数字和字母为合法字符,规则: // 1. 首字符为数字时,增加首字符 "K" // 2. 首字符为非法字符时,去掉首字符(例如,如果字符串全为非法字符,则转换后为空) @@ -793,33 +803,66 @@ func PandoraKey(key string) (string, bool) { for idx, c := range key { if c >= '0' && c <= '9' { if idx == 0 { - bp += copy(bytes, "K") + bytes[bp] = 'K' + bp++ } - bp += copy(bytes[bp:], string(c)) + bytes[bp] = byte(c) + bp++ continue } if (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') { - bp += copy(bytes[bp:], string(c)) + bytes[bp] = byte(c) + bp++ continue } if idx > 0 { - bp += copy(bytes[bp:], "_") + bytes[bp] = '_' + bp++ } } - return string(bytes), false + return string(bytes), valid } func DeepConvertKey(data map[string]interface{}) map[string]interface{} { for k, v := range data { - nk, valid := PandoraKey(k) - if nv, ok := v.(map[string]interface{}); ok { + nv, ok := v.(map[string]interface{}) + if ok { v = DeepConvertKey(nv) } + valid := CheckPandoraKey(k) if !valid { delete(data, k) - data[nk] = v + k, _ := PandoraKey(k) + data[k] = v + } + } + return data +} + +//注意:cache如果是nil,这个函数就完全没有意义,不如调用 DeepConvertKey +func DeepConvertKeyWithCache(data map[string]interface{}, cache map[string]KeyInfo) map[string]interface{} { + for k, v := range data { + if nv, ok := v.(map[string]interface{}); ok { + v = DeepConvertKeyWithCache(nv, cache) + } + keyInfo, exist := cache[k] + if !exist { + keyInfo.NewKey, keyInfo.Valid = PandoraKey(k) + if cache == nil { + cache = make(map[string]KeyInfo) + } + cache[k] = keyInfo + } + if !keyInfo.Valid { + delete(data, k) + data[keyInfo.NewKey] = v } } return data } + +type KeyInfo struct { + Valid bool + NewKey string +} diff --git a/utils/models/utils_test.go b/utils/models/utils_test.go index b22e2b321..743f45b11 100644 --- a/utils/models/utils_test.go +++ b/utils/models/utils_test.go @@ -626,6 +626,15 @@ func TestPandoraKey(t *testing.T) { } } +func TestCheckPandoraKey(t *testing.T) { + testKeys := []string{"@timestamp", ".dot", "percent%100", "^^^^^^^^^^", "timestamp"} + expectValid := []bool{false, false, false, false, true} + for idx, key := range testKeys { + valid := CheckPandoraKey(key) + assert.Equal(t, expectValid[idx], valid) + } +} + func BenchmarkPandoraKey(b *testing.B) { b.ReportAllocs() testKeys := []string{"@timestamp", ".dot", "percent%100", "^^^^^^^^^^", "timestamp", "aaa"} @@ -636,12 +645,22 @@ func BenchmarkPandoraKey(b *testing.B) { } } +func BenchmarkCheckPandoraKey(b *testing.B) { + b.ReportAllocs() + testKeys := []string{"@timestamp", ".dot", "percent%100", "^^^^^^^^^^", "timestamp", "aaa"} + for i := 0; i < b.N; i++ { + for _, key := range testKeys { + CheckPandoraKey(key) + } + } +} + +//1000000 1493 ns/op 32 B/op 2 allocs/op func BenchmarkDeepConvertKey(b *testing.B) { b.ReportAllocs() testDatas := []map[string]interface{}{ { "@timestamp": "2018-07-18T10:17:36.549054846+08:00", - //"timestamp": "2018-07-19T10:17:36.549054846+08:00", }, { ".dot": map[string]interface{}{".dot2": "dot"}, @@ -651,6 +670,40 @@ func BenchmarkDeepConvertKey(b *testing.B) { "percent%100": 100, "^^^^^^^^^^": "mytest", }, + { + "timestamp": "2018-07-18T10:17:36.549054846+08:00", + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + "percent100": 100, + "axsxs": "mytest", + }, + { + "timestamp": "2018-07-18T10:17:36.549054846+08:00", + //"timestamp": "2018-07-19T10:17:36.549054846+08:00", + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + "percent100": 100, + "axsxs": "mytest", + }, + { + "timestamp": "2018-07-18T10:17:36.549054846+08:00", + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + "percent100": 100, + "axsxs": "mytest", + }, } for i := 0; i < b.N; i++ { for _, data := range testDatas { @@ -663,7 +716,6 @@ func TestDeepConvertKey(t *testing.T) { testDatas := []map[string]interface{}{ { "@timestamp": "2018-07-18T10:17:36.549054846+08:00", - //"timestamp": "2018-07-19T10:17:36.549054846+08:00", }, { ".dot": map[string]interface{}{".dot2": "dot"}, @@ -693,3 +745,94 @@ func TestDeepConvertKey(t *testing.T) { assert.Equal(t, expectDatas[idx], actual) } } + +func TestDeepConvertKeyWithCache(t *testing.T) { + testDatas := []map[string]interface{}{ + { + "@timestamp": "2018-07-18T10:17:36.549054846+08:00", + }, + { + ".dot": map[string]interface{}{".dot2": "dot"}, + }, + { + "dot": map[string]interface{}{".dot2": "dot"}, + "percent%100": 100, + "^^^^^^^^^^": "mytest", + }, + } + expectDatas := []map[string]interface{}{ + { + "timestamp": "2018-07-18T10:17:36.549054846+08:00", + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + "percent_100": 100, + "": "mytest", + }, + } + cache := make(map[string]KeyInfo) + for idx, data := range testDatas { + actual := DeepConvertKeyWithCache(data, cache) + assert.Equal(t, expectDatas[idx], actual) + } +} + +//1000000 1647 ns/op 0 B/op 0 allocs/op +func BenchmarkDeepConvertKeyWithCache(b *testing.B) { + b.ReportAllocs() + testDatas := []map[string]interface{}{ + { + "@timestamp": "2018-07-18T10:17:36.549054846+08:00", + }, + { + ".dot": map[string]interface{}{".dot2": "dot"}, + }, + { + "dot": map[string]interface{}{".dot2": "dot"}, + "percent%100": 100, + "^^^^^^^^^^": "mytest", + }, + { + "timestamp": "2018-07-18T10:17:36.549054846+08:00", + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + "percent100": 100, + "axsxs": "mytest", + }, + { + "timestamp": "2018-07-18T10:17:36.549054846+08:00", + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + "percent100": 100, + "axsxs": "mytest", + }, + { + "timestamp": "2018-07-18T10:17:36.549054846+08:00", + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + }, + { + "dot": map[string]interface{}{"dot2": "dot"}, + "percent100": 100, + "axsxs": "mytest", + }, + } + cache := make(map[string]KeyInfo) + for i := 0; i < b.N; i++ { + for _, data := range testDatas { + DeepConvertKeyWithCache(data, cache) + } + } +} From 32b15b079cb160ccd37a121824a73e6cfef3aee6 Mon Sep 17 00:00:00 2001 From: wonderflow Date: Sun, 22 Jul 2018 16:18:15 +0800 Subject: [PATCH 2/3] init with get keys --- reader/utils_test.go | 5 ++++- transforms/ip/ip.go | 8 +++++--- transforms/mutate/arrayexpand.go | 20 +++++++++++++----- transforms/mutate/discard.go | 20 ++++++++++++++---- transforms/mutate/json.go | 24 +++++++++++++++------- transforms/mutate/mapreplace.go | 24 ++++++++++++++-------- transforms/mutate/rename.go | 35 ++++++++++++++++++-------------- transforms/mutate/replace.go | 10 ++++++--- transforms/mutate/script.go | 1 + transforms/mutate/urlparam.go | 16 ++++++++++++--- transforms/mutate/xml.go | 26 ++++++++++++++++-------- utils/models/utils.go | 5 +++-- 12 files changed, 135 insertions(+), 59 deletions(-) diff --git a/reader/utils_test.go b/reader/utils_test.go index 3eabf64d2..2da363d68 100644 --- a/reader/utils_test.go +++ b/reader/utils_test.go @@ -166,7 +166,7 @@ func TestSetMapValueExistWithPrefix(t *testing.T) { assert.Equal(t, exp2, data2) err3 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"xy", "name"}...) - assert.Error(t, err3) + assert.NoError(t, err3) err4 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"a", "hello"}...) assert.NoError(t, err4) @@ -177,6 +177,9 @@ func TestSetMapValueExistWithPrefix(t *testing.T) { "prefix_name": "newVal", "hello": "newVal", }, + "xy": map[string]interface{}{ + "name": "newVal", + }, } assert.Equal(t, exp4, data2) diff --git a/transforms/ip/ip.go b/transforms/ip/ip.go index b4dea7072..905a581fa 100644 --- a/transforms/ip/ip.go +++ b/transforms/ip/ip.go @@ -33,7 +33,10 @@ type Transformer struct { DataPath string `json:"data_path"` KeyAsPrefix bool `json:"key_as_prefix"` - loc Locator + loc Locator + stats StatsInfo + + //为了提升性能提前做处理 keys []string lastEleKey string keysRegion []string @@ -44,13 +47,12 @@ type Transformer struct { keysLatitude []string keysLongitude []string keysDistrictCode []string - stats StatsInfo } func (t *Transformer) Init() error { loc, err := NewLocator(t.DataPath) if err != nil { - return fmt.Errorf("new locator: %v", err) + return err } t.loc = loc t.keys = GetKeys(t.Key) diff --git a/transforms/mutate/arrayexpand.go b/transforms/mutate/arrayexpand.go index cad9e4f2e..288fd2e06 100644 --- a/transforms/mutate/arrayexpand.go +++ b/transforms/mutate/arrayexpand.go @@ -13,11 +13,19 @@ import ( var ( _ transforms.StatsTransformer = &ArrayExpand{} _ transforms.Transformer = &ArrayExpand{} + _ transforms.Initializer = &ArrayExpand{} ) type ArrayExpand struct { Key string `json:"key"` stats StatsInfo + + keys []string +} + +func (p *ArrayExpand) Init() error { + p.keys = GetKeys(p.Key) + return nil } func (p *ArrayExpand) transformToMap(val interface{}, key string) map[string]interface{} { @@ -137,16 +145,18 @@ func (p *ArrayExpand) RawTransform(datas []string) ([]string, error) { func (p *ArrayExpand) Transform(datas []Data) ([]Data, error) { var err, fmtErr error errNum := 0 - keys := GetKeys(p.Key) - newKeys := make([]string, len(keys)) + if p.keys == nil { + p.Init() + } + newKeys := make([]string, len(p.keys)) for i := range datas { - copy(newKeys, keys) - val, getErr := GetMapValue(datas[i], keys...) + copy(newKeys, p.keys) + val, getErr := GetMapValue(datas[i], p.keys...) if getErr != nil { errNum, err = transforms.SetError(errNum, getErr, transforms.GetErr, p.Key) continue } - if resultMap := p.transformToMap(val, newKeys[len(keys)-1]); resultMap != nil { + if resultMap := p.transformToMap(val, newKeys[len(p.keys)-1]); resultMap != nil { for key, arrVal := range resultMap { suffix := 0 keyName := key diff --git a/transforms/mutate/discard.go b/transforms/mutate/discard.go index 38352bf20..7b36b88f7 100644 --- a/transforms/mutate/discard.go +++ b/transforms/mutate/discard.go @@ -10,12 +10,24 @@ import ( var ( _ transforms.StatsTransformer = &Discarder{} _ transforms.Transformer = &Discarder{} + _ transforms.Initializer = &Discarder{} ) type Discarder struct { Key string `json:"key"` StageTime string `json:"stage"` stats StatsInfo + + discardKeys [][]string +} + +func (g *Discarder) Init() error { + discardKeys := strings.Split(g.Key, ",") + g.discardKeys = make([][]string, len(discardKeys)) + for i := range g.discardKeys { + g.discardKeys[i] = GetKeys(discardKeys[i]) + } + return nil } func (g *Discarder) RawTransform(datas []string) ([]string, error) { @@ -32,14 +44,14 @@ func (g *Discarder) RawTransform(datas []string) ([]string, error) { } func (g *Discarder) Transform(datas []Data) ([]Data, error) { - discardKeys := strings.Split(g.Key, ",") - for _, v := range discardKeys { - keys := GetKeys(v) + if g.discardKeys == nil { + g.Init() + } + for _, keys := range g.discardKeys { for i := range datas { DeleteMapValue(datas[i], keys...) } } - g.stats, _ = transforms.SetStatsInfo(nil, g.stats, 0, int64(len(datas)), g.Type()) return datas, nil } diff --git a/transforms/mutate/json.go b/transforms/mutate/json.go index 99dcb2cb2..6d3e7fad8 100644 --- a/transforms/mutate/json.go +++ b/transforms/mutate/json.go @@ -15,6 +15,7 @@ import ( var ( _ transforms.StatsTransformer = &Json{} _ transforms.Transformer = &Json{} + _ transforms.Initializer = &Json{} ) type Json struct { @@ -22,16 +23,25 @@ type Json struct { New string `json:"new"` stats StatsInfo jsonTool jsoniter.API + + keys []string + news []string +} + +func (g *Json) Init() error { + g.keys = GetKeys(g.Key) + g.news = GetKeys(g.New) + return nil } func (g *Json) Transform(datas []Data) ([]Data, error) { var err, fmtErr error errNum := 0 - keys := GetKeys(g.Key) - news := GetKeys(g.New) - + if g.keys == nil { + g.Init() + } for i := range datas { - val, getErr := GetMapValue(datas[i], keys...) + val, getErr := GetMapValue(datas[i], g.keys...) if getErr != nil { errNum, err = transforms.SetError(errNum, getErr, transforms.GetErr, g.Key) continue @@ -52,10 +62,10 @@ func (g *Json) Transform(datas []Data) ([]Data, error) { continue } - if len(news) == 0 { - news = keys + if len(g.news) == 0 { + g.news = g.keys } - setErr := SetMapValue(datas[i], jsonVal, false, news...) + setErr := SetMapValue(datas[i], jsonVal, false, g.news...) if setErr != nil { errNum, err = transforms.SetError(errNum, setErr, transforms.SetErr, g.New) } diff --git a/transforms/mutate/mapreplace.go b/transforms/mutate/mapreplace.go index 93135edbd..63e06fe4f 100644 --- a/transforms/mutate/mapreplace.go +++ b/transforms/mutate/mapreplace.go @@ -25,9 +25,14 @@ type MapReplacer struct { New string `json:"new"` rp map[string]string stats StatsInfo + + keys []string + news []string } func (g *MapReplacer) Init() error { + g.keys = GetKeys(g.Key) + g.news = GetKeys(g.New) if g.Map != "" { g.rp = GetMapList(g.Map) if len(g.rp) < 1 { @@ -47,6 +52,7 @@ func (g *MapReplacer) Init() error { if err != nil { return fmt.Errorf("read %v as mapdata err %v", g.MapFile, err) } + return nil } @@ -61,11 +67,14 @@ func (g *MapReplacer) convert(value string) (string, bool) { func (g *MapReplacer) Transform(datas []Data) ([]Data, error) { var err, fmtErr error errNum := 0 - keys := GetKeys(g.Key) - news := GetKeys(g.New) - + if g.rp == nil { + err := g.Init() + if err != nil { + return datas, err + } + } for i := range datas { - val, getErr := GetMapValue(datas[i], keys...) + val, getErr := GetMapValue(datas[i], g.keys...) if getErr != nil { errNum, err = transforms.SetError(errNum, getErr, transforms.GetErr, g.Key) continue @@ -91,15 +100,14 @@ func (g *MapReplacer) Transform(datas []Data) ([]Data, error) { continue } } - - if len(news) == 0 { - news = keys + if len(g.news) == 0 { + g.news = g.keys } setVal, set := g.convert(strVal) if !set { continue } - setErr := SetMapValue(datas[i], setVal, false, news...) + setErr := SetMapValue(datas[i], setVal, false, g.news...) if setErr != nil { errNum, err = transforms.SetError(errNum, setErr, transforms.SetErr, g.Key) } diff --git a/transforms/mutate/rename.go b/transforms/mutate/rename.go index 716935fcb..48e9d98d6 100644 --- a/transforms/mutate/rename.go +++ b/transforms/mutate/rename.go @@ -10,31 +10,45 @@ import ( var ( _ transforms.StatsTransformer = &Rename{} _ transforms.Transformer = &Rename{} + _ transforms.Initializer = &Rename{} ) type Rename struct { Key string `json:"key"` NewKeyName string `json:"new_key_name"` + NewKey string `json:"new"` stats StatsInfo + + keys []string + news []string } +func (g *Rename) Init() error { + g.keys = GetKeys(g.Key) + if g.NewKey == "" { + g.NewKey = g.NewKeyName + } + g.news = GetKeys(g.NewKey) + return nil +} func (g *Rename) RawTransform(datas []string) ([]string, error) { return datas, errors.New("rename transformer not support rawTransform") } func (g *Rename) Transform(datas []Data) ([]Data, error) { + if g.keys == nil { + g.Init() + } var err, fmtErr error errNum := 0 - keySlice := GetKeys(g.Key) - newKeySlice := GetKeys(g.NewKeyName) for i := range datas { - val, getErr := GetMapValue(datas[i], keySlice...) + val, getErr := GetMapValue(datas[i], g.keys...) if getErr != nil { errNum, err = transforms.SetError(errNum, getErr, transforms.GetErr, g.Key) continue } - DeleteMapValue(datas[i], keySlice...) - setErr := SetMapValue(datas[i], val, false, newKeySlice...) + DeleteMapValue(datas[i], g.keys...) + setErr := SetMapValue(datas[i], val, false, g.news...) if setErr != nil { errNum, err = transforms.SetError(errNum, setErr, transforms.SetErr, g.NewKeyName) } @@ -64,16 +78,7 @@ func (g *Rename) SampleConfig() string { func (g *Rename) ConfigOptions() []Option { return []Option{ transforms.KeyFieldName, - { - KeyName: "new_key_name", - ChooseOnly: false, - Default: "", - Required: true, - Placeholder: "new_key_name", - DefaultNoUse: true, - Description: "修改后的字段名(new_key_name)", - Type: transforms.TransformTypeString, - }, + transforms.KeyFieldNewRequired, } } diff --git a/transforms/mutate/replace.go b/transforms/mutate/replace.go index f78de76ee..ce8050b57 100644 --- a/transforms/mutate/replace.go +++ b/transforms/mutate/replace.go @@ -11,6 +11,7 @@ import ( var ( _ transforms.StatsTransformer = &Replacer{} _ transforms.Transformer = &Replacer{} + _ transforms.Initializer = &Replacer{} ) type Replacer struct { @@ -21,6 +22,8 @@ type Replacer struct { Regex bool `json:"regex"` stats StatsInfo Regexp *regexp.Regexp + + keys []string } func (g *Replacer) Init() error { @@ -33,15 +36,16 @@ func (g *Replacer) Init() error { return err } g.Regexp = rgx + g.keys = GetKeys(g.Key) return nil } func (g *Replacer) Transform(datas []Data) ([]Data, error) { var err, fmtErr error errNum := 0 - keys := GetKeys(g.Key) + for i := range datas { - val, getErr := GetMapValue(datas[i], keys...) + val, getErr := GetMapValue(datas[i], g.keys...) if getErr != nil { errNum++ err = fmt.Errorf("transform key %v not exist in data", g.Key) @@ -53,7 +57,7 @@ func (g *Replacer) Transform(datas []Data) ([]Data, error) { err = fmt.Errorf("transform key %v data type is not string", g.Key) continue } - setErr := SetMapValue(datas[i], g.Regexp.ReplaceAllString(strVal, g.New), false, keys...) + setErr := SetMapValue(datas[i], g.Regexp.ReplaceAllString(strVal, g.New), false, g.keys...) if setErr != nil { errNum++ err = fmt.Errorf("value of %v is not the type of map[string]interface{}", g.Key) diff --git a/transforms/mutate/script.go b/transforms/mutate/script.go index 1254d99b2..bfe2e126b 100644 --- a/transforms/mutate/script.go +++ b/transforms/mutate/script.go @@ -18,6 +18,7 @@ import ( var ( _ transforms.StatsTransformer = &Script{} _ transforms.Transformer = &Script{} + _ transforms.Initializer = &Script{} ) type Script struct { diff --git a/transforms/mutate/urlparam.go b/transforms/mutate/urlparam.go index 5f81cca3c..e5c9e1d9f 100644 --- a/transforms/mutate/urlparam.go +++ b/transforms/mutate/urlparam.go @@ -20,11 +20,19 @@ const ( var ( _ transforms.StatsTransformer = &UrlParam{} _ transforms.Transformer = &UrlParam{} + _ transforms.Initializer = &UrlParam{} ) type UrlParam struct { Key string `json:"key"` stats StatsInfo + + keys []string +} + +func (p *UrlParam) Init() error { + p.keys = GetKeys(p.Key) + return nil } func (p *UrlParam) transformToMap(strVal string, key string) (map[string]interface{}, error) { @@ -77,12 +85,14 @@ func (p *UrlParam) RawTransform(datas []string) ([]string, error) { } func (p *UrlParam) Transform(datas []Data) ([]Data, error) { + if p.keys == nil { + p.Init() + } var err, fmtErr, toMapErr error errNum := 0 - keys := GetKeys(p.Key) - newKeys := make([]string, len(keys)) + newKeys := make([]string, len(p.keys)) for i := range datas { - copy(newKeys, keys) + copy(newKeys, p.keys) val, getErr := GetMapValue(datas[i], newKeys...) if getErr != nil { errNum, err = transforms.SetError(errNum, getErr, transforms.GetErr, p.Key) diff --git a/transforms/mutate/xml.go b/transforms/mutate/xml.go index 3cd53b0a2..e9210dc43 100644 --- a/transforms/mutate/xml.go +++ b/transforms/mutate/xml.go @@ -14,22 +14,32 @@ import ( var ( _ transforms.StatsTransformer = &Xml{} _ transforms.Transformer = &Xml{} + _ transforms.Initializer = &Xml{} ) type Xml struct { Key string `json:"key"` New string `json:"new"` stats StatsInfo + + keys []string + news []string +} + +func (g *Xml) Init() error { + g.keys = GetKeys(g.Key) + g.news = GetKeys(g.New) + return nil } func (g *Xml) Transform(datas []Data) ([]Data, error) { var err, fmtErr error errNum := 0 - keys := GetKeys(g.Key) - news := GetKeys(g.New) - + if g.keys == nil { + g.Init() + } for i := range datas { - val, getErr := GetMapValue(datas[i], keys...) + val, getErr := GetMapValue(datas[i], g.keys...) if getErr != nil { errNum, err = transforms.SetError(errNum, getErr, transforms.GetErr, g.Key) continue @@ -49,11 +59,11 @@ func (g *Xml) Transform(datas []Data) ([]Data, error) { errNum, err = transforms.SetError(errNum, perr, transforms.General, "") continue } - if len(news) == 0 { - DeleteMapValue(datas[i], keys...) - news = keys + if len(g.news) == 0 { + DeleteMapValue(datas[i], g.keys...) + g.news = g.keys } - setErr := SetMapValue(datas[i], xmlVal, false, news...) + setErr := SetMapValue(datas[i], xmlVal, false, g.news...) if setErr != nil { errNum, err = transforms.SetError(errNum, setErr, transforms.SetErr, g.New) } diff --git a/utils/models/utils.go b/utils/models/utils.go index 30dd4c76a..68ae45a96 100644 --- a/utils/models/utils.go +++ b/utils/models/utils.go @@ -531,14 +531,15 @@ func GetMapValue(m map[string]interface{}, keys ...string) (interface{}, error) var err error var val interface{} val = m - for i, k := range keys { + curKeys := keys + for i, k := range curKeys { //判断val是否为map[string]interface{}类型 if _, ok := val.(map[string]interface{}); ok { //判断val(k)是否存在 if _, ok := val.(map[string]interface{})[k]; ok { val = val.(map[string]interface{})[k] } else { - keys = keys[0 : i+1] + curKeys = curKeys[0 : i+1] err = fmt.Errorf("GetMapValue failed, keys %v are non-existent", keys) return nil, err } From 68a04f3f6565e5b71f079833bf6fd60babb65026 Mon Sep 17 00:00:00 2001 From: wonderflow Date: Mon, 23 Jul 2018 08:49:48 +0800 Subject: [PATCH 3/3] change pandora_auto_convert_date default to false --- sender/fault_tolerant.go | 47 +++++++--- sender/fault_tolerant/fault_tolerant_test.go | 99 ++++++++++++++++++++ sender/pandora/pandora.go | 5 - sender/pandora/pandora_test.go | 91 ------------------ sender/rest_senders_models.go | 4 +- sender/sender.go | 4 +- 6 files changed, 136 insertions(+), 114 deletions(-) diff --git a/sender/fault_tolerant.go b/sender/fault_tolerant.go index bfa100428..884bb6f26 100644 --- a/sender/fault_tolerant.go +++ b/sender/fault_tolerant.go @@ -35,19 +35,20 @@ var _ SkipDeepCopySender = &FtSender{} // FtSender fault tolerance sender wrapper type FtSender struct { - stopped int32 - exitChan chan struct{} - innerSender Sender - logQueue queue.BackendQueue - BackupQueue queue.BackendQueue - writeLimit int // 写入速度限制,单位MB - strategy string - procs int //发送并发数 - runnerName string - opt *FtOption - stats StatsInfo - statsMutex *sync.RWMutex - jsontool jsoniter.API + stopped int32 + exitChan chan struct{} + innerSender Sender + logQueue queue.BackendQueue + BackupQueue queue.BackendQueue + writeLimit int // 写入速度限制,单位MB + strategy string + procs int //发送并发数 + runnerName string + opt *FtOption + stats StatsInfo + statsMutex *sync.RWMutex + jsontool jsoniter.API + pandoraKeyCache map[string]KeyInfo } type FtOption struct { @@ -59,6 +60,7 @@ type FtOption struct { memoryChannel bool memoryChannelSize int longDataDiscard bool + innerSenderType string } type datasContext struct { @@ -66,7 +68,7 @@ type datasContext struct { } // NewFtSender Fault tolerant sender constructor -func NewFtSender(ftSender Sender, conf conf.MapConf, ftSaveLogPath string) (*FtSender, error) { +func NewFtSender(innerSender Sender, conf conf.MapConf, ftSaveLogPath string) (*FtSender, error) { memoryChannel, _ := conf.GetBoolOr(KeyFtMemoryChannel, false) memoryChannelSize, _ := conf.GetIntOr(KeyFtMemoryChannelSize, 100) logPath, _ := conf.GetStringOr(KeyFtSaveLogPath, ftSaveLogPath) @@ -74,6 +76,7 @@ func NewFtSender(ftSender Sender, conf conf.MapConf, ftSaveLogPath string) (*FtS writeLimit, _ := conf.GetIntOr(KeyFtWriteLimit, defaultWriteLimit) strategy, _ := conf.GetStringOr(KeyFtStrategy, KeyFtStrategyBackupOnly) longDataDiscard, _ := conf.GetBoolOr(KeyFtLongDataDiscard, false) + senderType, _ := conf.GetStringOr(KeySenderType, "") //此处不会没有SenderType,在调用NewFtSender时已经检查 switch strategy { case KeyFtStrategyAlwaysSave, KeyFtStrategyBackupOnly, KeyFtStrategyConcurrent: default: @@ -91,9 +94,10 @@ func NewFtSender(ftSender Sender, conf conf.MapConf, ftSaveLogPath string) (*FtS memoryChannel: memoryChannel, memoryChannelSize: memoryChannelSize, longDataDiscard: longDataDiscard, + innerSenderType: senderType, } - return newFtSender(ftSender, runnerName, opt) + return newFtSender(innerSender, runnerName, opt) } func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSender, error) { @@ -123,6 +127,10 @@ func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSende statsMutex: new(sync.RWMutex), jsontool: jsoniter.Config{EscapeHTML: true, UseNumber: true}.Froze(), } + + if opt.innerSenderType == TypePandora { + ftSender.pandoraKeyCache = make(map[string]KeyInfo) + } go ftSender.asyncSendLogFromDiskQueue() return &ftSender, nil } @@ -132,6 +140,15 @@ func (ft *FtSender) Name() string { } func (ft *FtSender) Send(datas []Data) error { + + switch ft.opt.innerSenderType { + case TypePandora: + for i, v := range datas { + datas[i] = DeepConvertKeyWithCache(v, ft.pandoraKeyCache) + } + default: + } + se := &StatsError{Ft: true} if ft.strategy == KeyFtStrategyBackupOnly { // 尝试直接发送数据,当数据失败的时候会加入到本地重试队列。外部不需要重试 diff --git a/sender/fault_tolerant/fault_tolerant_test.go b/sender/fault_tolerant/fault_tolerant_test.go index 4509fd66d..bf4bf16bb 100644 --- a/sender/fault_tolerant/fault_tolerant_test.go +++ b/sender/fault_tolerant/fault_tolerant_test.go @@ -531,3 +531,102 @@ func TestSkipDeepCopySender(t *testing.T) { assert.True(t, fs.SkipDeepCopy()) } } + +func TestPandoraExtraInfo(t *testing.T) { + pandoraServer, pt := mock_pandora.NewMockPandoraWithPrefix("/v2") + conf1 := conf.MapConf{ + "force_microsecond": "false", + "ft_memory_channel": "false", + "ft_strategy": "backup_only", + "ignore_invalid_field": "true", + "logkit_send_time": "false", + "pandora_extra_info": "true", + "pandora_ak": "ak", + "pandora_auto_convert_date": "true", + "pandora_gzip": "true", + "pandora_host": "http://127.0.0.1:" + pt, + "pandora_region": "nb", + "pandora_repo_name": "TestPandoraSenderTime", + "pandora_schema_free": "true", + "pandora_sk": "sk", + "runner_name": "runner.20171117110730", + "sender_type": "pandora", + "name": "TestPandoraSenderTime", + "KeyPandoraSchemaUpdateInterval": "1s", + } + + innerSender, err := pandora.NewSender(conf1) + if err != nil { + t.Fatal(err) + } + s, err := sender.NewFtSender(innerSender, conf1, fttestdir) + defer os.RemoveAll(fttestdir) + if err != nil { + t.Fatal(err) + } + d := Data{} + d["x1"] = "123.2" + d["hostname"] = "123.2" + d["hostname0"] = "123.2" + d["hostname1"] = "123.2" + d["hostname2"] = "123.2" + d["osinfo"] = "123.2" + err = s.Send([]Data{d}) + if st, ok := err.(*StatsError); ok { + err = st.ErrorDetail + } + if err != nil { + t.Error(err) + } + resp := pandoraServer.Body + assert.Equal(t, true, strings.Contains(resp, "core")) + assert.Equal(t, true, strings.Contains(resp, "x1=123.2")) + assert.Equal(t, true, strings.Contains(resp, "osinfo=123.2")) + assert.Equal(t, true, strings.Contains(resp, "hostname=123.2")) + assert.Equal(t, true, strings.Contains(resp, "hostname0=123.2")) + assert.Equal(t, true, strings.Contains(resp, "hostname1=123.2")) + assert.Equal(t, true, strings.Contains(resp, "hostname2=123.2")) + + conf2 := conf.MapConf{ + "force_microsecond": "false", + "ft_memory_channel": "false", + "ft_strategy": "backup_only", + "ignore_invalid_field": "true", + "logkit_send_time": "false", + "pandora_extra_info": "false", + "pandora_ak": "ak", + "pandora_auto_convert_date": "true", + "pandora_gzip": "true", + "pandora_host": "http://127.0.0.1:" + pt, + "pandora_region": "nb", + "pandora_repo_name": "TestPandoraSenderTime", + "pandora_schema_free": "true", + "pandora_sk": "sk", + "runner_name": "runner.20171117110730", + "sender_type": "pandora", + "name": "TestPandoraSenderTime", + "KeyPandoraSchemaUpdateInterval": "1s", + } + innerSender, err = pandora.NewSender(conf2) + if err != nil { + t.Fatal(err) + } + + s, err = sender.NewFtSender(innerSender, conf1, fttestdir) + d = Data{ + "*x1": "123.2", + "x2.dot": "123.2", + "@timestamp": "2018-07-18T10:17:36.549054846+08:00", + } + err = s.Send([]Data{d}) + if st, ok := err.(*StatsError); ok { + err = st.ErrorDetail + } + if err != nil { + t.Error(err) + } + resp = pandoraServer.Body + assert.Equal(t, true, strings.Contains(resp, "x1=123.2")) + assert.Equal(t, true, strings.Contains(resp, "x2_dot=123.2")) + assert.Equal(t, true, strings.Contains(resp, "timestamp=2018-07-18T10:17:36.549054846+08:00")) +} diff --git a/sender/pandora/pandora.go b/sender/pandora/pandora.go index 4b0d4f8b2..1c2609450 100644 --- a/sender/pandora/pandora.go +++ b/sender/pandora/pandora.go @@ -48,7 +48,6 @@ type Sender struct { microsecondCounter uint64 extraInfo map[string]string sendType string - keyCache map[string]KeyInfo } // UserSchema was parsed pandora schema from user's raw schema @@ -432,7 +431,6 @@ func newPandoraSender(opt *PandoraOption) (s *Sender, err error) { schemas: make(map[string]pipeline.RepoSchemaEntry), extraInfo: utilsos.GetExtraInfo(), sendType: opt.sendType, - keyCache: make(map[string]KeyInfo), } expandAttr := make([]string, 0) @@ -852,9 +850,6 @@ func (s *Sender) Send(datas []Data) (se error) { case SendTypeRaw: return s.rawSend(datas) default: - for i, v := range datas { - datas[i] = DeepConvertKeyWithCache(v, s.keyCache) - } return s.schemaFreeSend(datas) } return nil diff --git a/sender/pandora/pandora_test.go b/sender/pandora/pandora_test.go index f6bd5b836..8d9c1bfc4 100644 --- a/sender/pandora/pandora_test.go +++ b/sender/pandora/pandora_test.go @@ -951,94 +951,3 @@ func TestPandoraSenderTime(t *testing.T) { resp = pandora.Body assert.Equal(t, resp, "x1=123.2") } - -func TestPandoraExtraInfo(t *testing.T) { - pandora, pt := mockPandora.NewMockPandoraWithPrefix("/v2") - conf1 := conf.MapConf{ - "force_microsecond": "false", - "ft_memory_channel": "false", - "ft_strategy": "backup_only", - "ignore_invalid_field": "true", - "logkit_send_time": "false", - "pandora_extra_info": "true", - "pandora_ak": "ak", - "pandora_auto_convert_date": "true", - "pandora_gzip": "true", - "pandora_host": "http://127.0.0.1:" + pt, - "pandora_region": "nb", - "pandora_repo_name": "TestPandoraSenderTime", - "pandora_schema_free": "true", - "pandora_sk": "sk", - "runner_name": "runner.20171117110730", - "sender_type": "pandora", - "name": "TestPandoraSenderTime", - "KeyPandoraSchemaUpdateInterval": "1s", - } - s, err := NewSender(conf1) - if err != nil { - t.Fatal(err) - } - d := Data{} - d["x1"] = "123.2" - d["hostname"] = "123.2" - d["hostname0"] = "123.2" - d["hostname1"] = "123.2" - d["hostname2"] = "123.2" - d["osinfo"] = "123.2" - err = s.Send([]Data{d}) - if st, ok := err.(*StatsError); ok { - err = st.ErrorDetail - } - if err != nil { - t.Error(err) - } - resp := pandora.Body - assert.Equal(t, true, strings.Contains(resp, "core")) - assert.Equal(t, true, strings.Contains(resp, "x1=123.2")) - assert.Equal(t, true, strings.Contains(resp, "osinfo=123.2")) - assert.Equal(t, true, strings.Contains(resp, "hostname=123.2")) - assert.Equal(t, true, strings.Contains(resp, "hostname0=123.2")) - assert.Equal(t, true, strings.Contains(resp, "hostname1=123.2")) - assert.Equal(t, true, strings.Contains(resp, "hostname2=123.2")) - - conf2 := conf.MapConf{ - "force_microsecond": "false", - "ft_memory_channel": "false", - "ft_strategy": "backup_only", - "ignore_invalid_field": "true", - "logkit_send_time": "false", - "pandora_extra_info": "false", - "pandora_ak": "ak", - "pandora_auto_convert_date": "true", - "pandora_gzip": "true", - "pandora_host": "http://127.0.0.1:" + pt, - "pandora_region": "nb", - "pandora_repo_name": "TestPandoraSenderTime", - "pandora_schema_free": "true", - "pandora_sk": "sk", - "runner_name": "runner.20171117110730", - "sender_type": "pandora", - "name": "TestPandoraSenderTime", - "KeyPandoraSchemaUpdateInterval": "1s", - } - s, err = NewSender(conf2) - if err != nil { - t.Fatal(err) - } - d = Data{ - "*x1": "123.2", - "x2.dot": "123.2", - "@timestamp": "2018-07-18T10:17:36.549054846+08:00", - } - err = s.Send([]Data{d}) - if st, ok := err.(*StatsError); ok { - err = st.ErrorDetail - } - if err != nil { - t.Error(err) - } - resp = pandora.Body - assert.Equal(t, true, strings.Contains(resp, "x1=123.2")) - assert.Equal(t, true, strings.Contains(resp, "x2_dot=123.2")) - assert.Equal(t, true, strings.Contains(resp, "timestamp=2018-07-18T10:17:36.549054846+08:00")) -} diff --git a/sender/rest_senders_models.go b/sender/rest_senders_models.go index 3e9b9739a..e31a16912 100644 --- a/sender/rest_senders_models.go +++ b/sender/rest_senders_models.go @@ -500,8 +500,8 @@ var ModeKeyOptions = map[string][]Option{ KeyName: KeyPandoraAutoConvertDate, Element: Radio, ChooseOnly: true, - ChooseOptions: []interface{}{"true", "false"}, - Default: "true", + ChooseOptions: []interface{}{"false", "true"}, + Default: "false", DefaultNoUse: false, Description: "自动转换时间类型(pandora_auto_convert_date)", Advance: true, diff --git a/sender/sender.go b/sender/sender.go index 6f5705b37..24d7f52e9 100644 --- a/sender/sender.go +++ b/sender/sender.go @@ -266,7 +266,9 @@ func (r *Registry) NewSender(conf conf.MapConf, ftSaveLogPath string) (sender Se return } faultTolerant, _ := conf.GetBoolOr(KeyFaultTolerant, true) - if faultTolerant { + + //如果是 PandoraSender,目前的依赖必须启用 ftsender,依赖Ftsender做key转换检查 + if faultTolerant || sendType == TypePandora { sender, err = NewFtSender(sender, conf, ftSaveLogPath) if err != nil { return