Skip to content

Commit

Permalink
Merge pull request #3 from junotx/master
Browse files Browse the repository at this point in the history
fix bug with excessive cpu usage
  • Loading branch information
Benjamin Huo authored Jul 23, 2020
2 parents 93ef314 + 445a2dc commit 6fa66c6
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 60 deletions.
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,23 @@ $(KE_DOCGEN_BINARY): cmd/docgen/kube-events-docgen.go

image-push: image
docker push $(REPO_OPERATOR):$(TAG)
docker push $(REPO_OPERATOR):latest
docker push $(REPO_EXPORTER):$(TAG)
docker push $(REPO_EXPORTER):latest
docker push $(REPO_RULER):$(TAG)
docker push $(REPO_RULER):latest

.PHONY: image
image: operator-image exporter-image ruler-image

operator-image: cmd/operator/Dockerfile
docker build -t $(REPO_OPERATOR):$(TAG) -f cmd/operator/Dockerfile .
docker build -t $(REPO_OPERATOR):$(TAG) -t $(REPO_OPERATOR):latest -f cmd/operator/Dockerfile .

exporter-image: cmd/exporter/Dockerfile
docker build -t $(REPO_EXPORTER):$(TAG) -f cmd/exporter/Dockerfile .
docker build -t $(REPO_EXPORTER):$(TAG) -t $(REPO_EXPORTER):latest -f cmd/exporter/Dockerfile .

ruler-image: cmd/ruler/Dockerfile
docker build -t $(REPO_RULER):$(TAG) -f cmd/ruler/Dockerfile .
docker build -t $(REPO_RULER):$(TAG) -t $(REPO_RULER):latest -f cmd/ruler/Dockerfile .


ca-secret:
Expand Down
7 changes: 6 additions & 1 deletion cmd/exporter/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ WORKDIR /workspace
COPY . .
RUN CGO_ENABLED=0 GO111MODULE=on go build -a -o ./bin/kube-events-exporter ./cmd/exporter/main.go

FROM alpine
FROM alpine:3.9
WORKDIR /
COPY --from=builder /workspace/bin/kube-events-exporter /usr/local/bin/kube-events-exporter

RUN adduser -D -g eventer -u 1002 eventer && \
chown -R eventer:eventer /usr/local/bin/kube-events-exporter
USER eventer

ENTRYPOINT ["kube-events-exporter"]
7 changes: 6 additions & 1 deletion cmd/operator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ WORKDIR /workspace
COPY . .
RUN CGO_ENABLED=0 GO111MODULE=on go build -a -o ./bin/kube-events-operator ./cmd/operator/main.go

FROM alpine
FROM alpine:3.9
WORKDIR /
COPY --from=builder /workspace/bin/kube-events-operator /usr/local/bin/kube-events-operator

RUN adduser -D -g eventer -u 1002 eventer && \
chown -R eventer:eventer /usr/local/bin/kube-events-operator
USER eventer

ENTRYPOINT ["kube-events-operator"]
7 changes: 6 additions & 1 deletion cmd/ruler/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ WORKDIR /workspace
COPY . .
RUN CGO_ENABLED=0 GO111MODULE=on go build -a -o ./bin/kube-events-ruler ./cmd/ruler/main.go

FROM alpine
FROM alpine:3.9
WORKDIR /
COPY --from=builder /workspace/bin/kube-events-ruler /usr/local/bin/kube-events-ruler

RUN adduser -D -g eventer -u 1002 eventer && \
chown -R eventer:eventer /usr/local/bin/kube-events-ruler
USER eventer

ENTRYPOINT ["kube-events-ruler"]
36 changes: 22 additions & 14 deletions pkg/exporter/kube_events_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"k8s.io/klog"
)

var maxBatchSize = 500

type K8sEventSource struct {
workqueue workqueue.RateLimitingInterface
inf cache.SharedIndexInformer
Expand Down Expand Up @@ -76,20 +78,26 @@ func (s *K8sEventSource) waitForCacheSync(stopc <-chan struct{}) error {
return nil
}

func (s *K8sEventSource) drainEvents() ([]*corev1.Event, bool) {
l := s.workqueue.Len()
if l <= 0 {
return nil, s.workqueue.ShuttingDown()
func (s *K8sEventSource) drainEvents() (evts []*corev1.Event, shutdown bool) {
var (
i = 0
m = s.workqueue.Len()
)
if m > maxBatchSize {
m = maxBatchSize
}
var evts []*corev1.Event
for i := 0; i < l; i++ {
obj, sd := s.workqueue.Get()
if sd {
return evts, sd
for {
var obj interface{}
obj, shutdown = s.workqueue.Get()
if obj != nil {
evts = append(evts, obj.(*corev1.Event))
}
i++
if i >= m {
break
}
evts = append(evts, obj.(*corev1.Event))
}
return evts, false
return
}

func (s *K8sEventSource) sinkEvents(ctx context.Context) {
Expand All @@ -100,10 +108,10 @@ func (s *K8sEventSource) sinkEvents(ctx context.Context) {
default:
}
evts, shutdown := s.drainEvents()
if shutdown {
return
}
if len(evts) == 0 {
if shutdown {
return
}
continue
}

Expand Down
100 changes: 60 additions & 40 deletions pkg/ruler/kube_events_ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"k8s.io/klog"
)

var maxBatchSize = 500

type KubeEventsRuler struct {
kcfg *rest.Config

Expand Down Expand Up @@ -109,52 +111,69 @@ func (r *KubeEventsRuler) AddEvents(evts []*types.Event) {
}

func (r *KubeEventsRuler) drainEvents() (evts []*types.Event, shutdown bool) {
l := r.evtQueue.Len()
if l <= 0 {
shutdown = r.evtQueue.ShuttingDown()
return
var (
i = 0
m = r.evtQueue.Len()
)
if m > maxBatchSize {
m = maxBatchSize
}
for i := 0; i < l; i++ {
obj, sd := r.evtQueue.Get()
if sd {
shutdown = sd
for {
var obj interface{}
obj, shutdown = r.evtQueue.Get()
if obj != nil {
evts = append(evts, obj.(*types.Event))
}
i++
if i >= m {
break
}
evts = append(evts, obj.(*types.Event))
}
return
}

func (r *KubeEventsRuler) drainNotifications() ([]*types.EventNotification, bool) {
l := r.notificaQueue.Len()
if l <= 0 {
return nil, r.notificaQueue.ShuttingDown()
func (r *KubeEventsRuler) drainNotifications() (evtNotifications []*types.EventNotification, shutdown bool) {
var (
i = 0
m = r.notificaQueue.Len()
)
if m > maxBatchSize {
m = maxBatchSize
}
var evtNotifications []*types.EventNotification
for i := 0; i < l; i++ {
obj, sd := r.notificaQueue.Get()
if sd {
return evtNotifications, sd
for {
var obj interface{}
obj, shutdown = r.notificaQueue.Get()
if obj != nil {
evtNotifications = append(evtNotifications, obj.(*types.EventNotification))
}
i++
if i >= m {
break
}
evtNotifications = append(evtNotifications, obj.(*types.EventNotification))
}
return evtNotifications, false
return
}

func (r *KubeEventsRuler) drainAlerts() ([]*types.EventAlert, bool) {
l := r.alertQueue.Len()
if l <= 0 {
return nil, r.alertQueue.ShuttingDown()
func (r *KubeEventsRuler) drainAlerts() (evtAlerts []*types.EventAlert, shutdown bool) {
var (
i = 0
m = r.alertQueue.Len()
)
if m > maxBatchSize {
m = maxBatchSize
}
var evtAlerts []*types.EventAlert
for i := 0; i < l; i++ {
obj, sd := r.alertQueue.Get()
if sd {
return evtAlerts, sd
for {
var obj interface{}
obj, shutdown = r.alertQueue.Get()
if obj != nil {
evtAlerts = append(evtAlerts, obj.(*types.EventAlert))
}
i++
if i >= m {
break
}
evtAlerts = append(evtAlerts, obj.(*types.EventAlert))
}
return evtAlerts, false
return
}

func (r *KubeEventsRuler) evalEvents(ctx context.Context) {
Expand All @@ -165,10 +184,10 @@ func (r *KubeEventsRuler) evalEvents(ctx context.Context) {
default:
}
evts, shutdown := r.drainEvents()
if shutdown {
return
}
if len(evts) == 0 {
if shutdown {
return
}
continue
}
rc := r.getRulerConds()
Expand Down Expand Up @@ -218,10 +237,10 @@ func (r *KubeEventsRuler) sinkNotifications(ctx context.Context) {
default:
}
notificas, shutdown := r.drainNotifications()
if shutdown {
return
}
if len(notificas) == 0 {
if shutdown {
return
}
continue
}

Expand Down Expand Up @@ -256,12 +275,13 @@ func (r *KubeEventsRuler) sinkAlerts(ctx context.Context) {
default:
}
alerts, shutdown := r.drainAlerts()
if shutdown {
return
}
if len(alerts) == 0 {
if shutdown {
return
}
continue
}

func() {
postFunc := r.alertQueue.Forget
defer func() {
Expand Down

0 comments on commit 6fa66c6

Please sign in to comment.