Skip to content

Commit

Permalink
修复es插件相关问题 (#988)
Browse files Browse the repository at this point in the history
* fix: 修复elasticsearch配置node_stats http不生效问题

* fix: 修复es开启export_indices配置后goroutine泄漏异常
  • Loading branch information
ttbug committed Jul 2, 2024
1 parent eb3fe99 commit ce930d6
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 9 deletions.
16 changes: 12 additions & 4 deletions inputs/elasticsearch/collector/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sort"
"strconv"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -2363,10 +2364,17 @@ func NewIndices(client *http.Client, url *url.URL, shards bool, includeAliases b

// start go routine to fetch clusterinfo updates and save them to lastClusterinfo
go func() {
for ci := range indices.clusterInfoCh {
if ci != nil {
log.Println("received cluster info update, cluster: ", ci.ClusterName)
indices.lastClusterInfo = ci
timer := time.NewTimer(2 * time.Minute)
for {
select {
case ci := <-indices.clusterInfoCh:
if ci != nil {
log.Println("received cluster info update, cluster: ", ci.ClusterName)
indices.lastClusterInfo = ci
}
case <-timer.C:
close(indices.clusterInfoCh)
return
}
}
}()
Expand Down
50 changes: 50 additions & 0 deletions inputs/elasticsearch/collector/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ type Nodes struct {
breakerMetrics []*breakerMetric
indicesMetrics []*nodeMetric
transportMetrics []*nodeMetric
httpMetrics []*nodeMetric
threadPoolMetrics []*threadPoolMetric
filesystemDataMetrics []*filesystemDataMetric
filesystemIODeviceMetrics []*filesystemIODeviceMetric
Expand Down Expand Up @@ -2105,6 +2106,40 @@ func NewNodes(client *http.Client, url *url.URL, all bool, node string, local bo
Labels: defaultFilesystemIODeviceLabelValues,
},
},
httpMetrics: []*nodeMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "http", "current_open"),
"http current open",
defaultNodeLabels, nil,
),
Value: func(node NodeStatsNodeResponse) float64 {
if v, ok := node.HTTP["current_open"]; ok {
return v.(float64)
}

return 0
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "http", "total_opened"),
"http total opened",
defaultNodeLabels, nil,
),
Value: func(node NodeStatsNodeResponse) float64 {
if v, ok := node.HTTP["total_opened"]; ok {
return v.(float64)
}

return 0
},
Labels: defaultNodeLabelValues,
},
},
}
}

Expand Down Expand Up @@ -2140,6 +2175,9 @@ func (c *Nodes) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range c.filesystemIODeviceMetrics {
ch <- metric.Desc
}
for _, metric := range c.httpMetrics {
ch <- metric.Desc
}
ch <- c.up.Desc()
ch <- c.totalScrapes.Desc()
ch <- c.jsonParseFailures.Desc()
Expand Down Expand Up @@ -2431,6 +2469,18 @@ func (c *Nodes) Collect(ch chan<- prometheus.Metric) {
)
}
}

if isEnable("http", c.nodeStats) {
// HTTP Stats
for _, metric := range c.httpMetrics {
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(node),
metric.Labels(nodeStatsResp.ClusterName, node)...,
)
}
}
}
}

Expand Down
18 changes: 13 additions & 5 deletions inputs/elasticsearch/collector/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"net/url"
"path"
"time"

"flashcat.cloud/categraf/inputs/elasticsearch/pkg/clusterinfo"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -110,14 +111,21 @@ func NewShards(client *http.Client, url *url.URL) *Shards {

// start go routine to fetch clusterinfo updates and save them to lastClusterinfo
go func() {
timer := time.NewTimer(2 * time.Minute)
log.Println("starting cluster info receive loop")
for ci := range shards.clusterInfoCh {
if ci != nil {
log.Println("received cluster info update, cluster ", ci.ClusterName)
shards.lastClusterInfo = ci
for {
select {
case ci := <-shards.clusterInfoCh:
if ci != nil {
log.Println("received cluster info update, cluster ", ci.ClusterName)
shards.lastClusterInfo = ci
}
case <-timer.C:
close(shards.clusterInfoCh)
log.Println("exiting cluster info receive loop")
return
}
}
log.Println("exiting cluster info receive loop")
}()

return shards
Expand Down

0 comments on commit ce930d6

Please sign in to comment.