Skip to content

Commit

Permalink
1. bug fixes program crash problem.
Browse files Browse the repository at this point in the history
2. sync Pod to the latest version before updating its Annotation information.
3. add more debug tracing.
  • Loading branch information
Kevin Yang committed Jun 25, 2018
1 parent 4853146 commit 1efb546
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 18 deletions.
37 changes: 27 additions & 10 deletions kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -26,14 +27,23 @@ var (

type PODStatus int
type PODEvent struct {
Pod *corev1.Pod
Status PODStatus
MetricType string
Endpoints string
FechingInterval string
FechingTimeout string
LabeledNamespace string
HasAnnotation bool
Pod *corev1.Pod
Status PODStatus
MetricType string
Endpoints string
FechingInterval string
FechingTimeout string
LabeledNamespace string
HasAnnotation bool
NeededAppendingAnnotation string
}

func (e *PODEvent) UpdateAnnotation() {
if e.NeededAppendingAnnotation == "" {
delete(e.Pod.Annotations, automaticTaggedAnnotationKey)
} else {
e.Pod.Annotations[automaticTaggedAnnotationKey] = e.NeededAppendingAnnotation
}
}

func (e *PODEvent) ParseAnnotation() {
Expand Down Expand Up @@ -137,10 +147,17 @@ func handlePodModify(pod *corev1.Pod, status PODStatus) {
}

func updatePod(e *PODEvent) {
_, err := k8sClient.CoreV1().Pods(e.Pod.Namespace).Update(e.Pod)
newPod, err := k8sClient.CoreV1().Pods(e.Pod.Namespace).Get(e.Pod.Name, meta_v1.GetOptions{})
if err != nil {
log.Errorf("Cannot update POD: %s to newest status, error: %s", e.Pod.Name, err.Error())
return
}
e.Pod = newPod
e.UpdateAnnotation()
_, err = k8sClient.CoreV1().Pods(e.Pod.Namespace).Update(e.Pod)
if err != nil {
log.Errorf("Failed to update POD's annotation to the remote Kubernetes cluster, error: %s", err.Error())
} else {
log.Infof("Successfully updated POD's annotation (%s) by automatic Prometheus metrics disconvery.", e.Pod.Namespace)
log.Infof("Successfully updated POD's annotation (%s) by automatic Prometheus metrics disconvery.", e.Pod.Name)
}
}
18 changes: 10 additions & 8 deletions pod_event_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ var (
prometheusOutputChan chan *PrometheusData
monitoringPods map[types.UID]*PODMetricsMonitor
lock *sync.Mutex
parser *expfmt.TextParser
fetchSucceedCounter = prometheus.NewCounter(prometheus.CounterOpts{Name: "fetch_prometheus_metrics_succeed_count_total", Help: "Total count of successful fetch the remote Prometheus metric endpoints."})
fetchFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{Name: "fetch_prometheus_metrics_failed_count_total", Help: "Total count of failed fetching the remote Prometheus metric endpoints."})
)
Expand Down Expand Up @@ -75,8 +74,11 @@ func (m *PODMetricsMonitor) Start() {
}

func doFetch(m *PODMetricsMonitor) {
url := fmt.Sprintf("http://%s%s", m.Event.Pod.Status.PodIP, m.Event.Endpoints)
log.Debugf("%#v", m.Event.Pod.Status)
log.Debugf("Preparing to fetch metrics URL: %s, POD IP: %s", url, m.Event.Pod.Status.PodIP)
//m.Event.Endpoints support ONLY one address by now.
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s%s", m.Event.Pod.Status.PodIP, m.Event.Endpoints), nil)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
fetchFailedCounter.Inc()
log.Errorf("[Fetching Metric] Failed to fetch POD's metric, error: %s", err.Error())
Expand Down Expand Up @@ -116,7 +118,6 @@ func initKubernetesPODEventProcessor(eventChan chan *PODEvent) chan *PrometheusD
prometheusOutputChan = make(chan *PrometheusData, args.PrometheusDataSyncBufferSize)
}
lock = &sync.Mutex{}
parser = &expfmt.TextParser{}
monitoringPods = make(map[types.UID]*PODMetricsMonitor)
go readPodEvents(eventChan)
return prometheusOutputChan
Expand Down Expand Up @@ -159,6 +160,10 @@ func processPodEvent(e *PODEvent) {
}
} else {
if e.Status == POD_ADD || (e.Status == POD_UPDATE && e.HasAnnotation) {
if e.Pod.Status.PodIP == "" {
log.Debugf("Ignored POD \"%s\" without any IP.", e.Pod.Name)
return
}
ctx, cancel := context.WithCancel(context.Background())
pmm := &PODMetricsMonitor{Event: *e, Ctx: ctx, Cancel: cancel}
monitoringPods[e.Pod.UID] = pmm
Expand Down Expand Up @@ -211,6 +216,7 @@ func sendMessage(e *PODEvent, data []byte, needDelete bool) {

func needUpdateAnnotation(e *PODEvent, obj *PrometheusData) (bool, error) {
var err error
parser := expfmt.TextParser{}
metrics, err := parser.TextToMetricFamilies(bytes.NewReader(obj.RspData))
if err != nil {
return false, err
Expand All @@ -225,11 +231,7 @@ func needUpdateAnnotation(e *PODEvent, obj *PrometheusData) (bool, error) {
oldAnnotatedStr := e.Pod.Annotations[automaticTaggedAnnotationKey]
newAnnotatedStr := sb.String()
if oldAnnotatedStr != newAnnotatedStr {
if newAnnotatedStr == "" {
delete(e.Pod.Annotations, automaticTaggedAnnotationKey)
} else {
e.Pod.Annotations[automaticTaggedAnnotationKey] = newAnnotatedStr
}
e.NeededAppendingAnnotation = newAnnotatedStr
return true, nil
}
return false, nil
Expand Down

0 comments on commit 1efb546

Please sign in to comment.