Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

Commit

Permalink
fix: reduce unnecessary k8s caching (#9)
Browse files Browse the repository at this point in the history
Signed-off-by: Grant Linville <[email protected]>
  • Loading branch information
g-linville authored Jan 8, 2024
1 parent 84a8b06 commit 61ca0ed
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 368 deletions.
8 changes: 0 additions & 8 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,8 @@ func (a *Aggregator) processk8s() {
a.processPod(d)
case k8s.SERVICE:
a.processSvc(d)
case k8s.REPLICASET:
a.processReplicaSet(d)
case k8s.DEPLOYMENT:
a.processDeployment(d)
case k8s.ENDPOINTS:
a.processEndpoints(d)
case k8s.CONTAINER:
a.processContainer(d)
case k8s.DAEMONSET:
a.processDaemonSet(d)
default:
log.Logger.Warn().Msgf("unknown resource type %s", d.ResourceType)
}
Expand Down
172 changes: 0 additions & 172 deletions aggregator/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/ddosify/alaz/k8s"
"github.com/ddosify/alaz/log"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -125,81 +124,6 @@ func (a *Aggregator) processSvc(d k8s.K8sResourceMessage) {
}
}

func (a *Aggregator) persistReplicaSet(dto datastore.ReplicaSet, eventType string) {
err := a.ds.PersistReplicaSet(dto, eventType)
if err != nil {
log.Logger.Error().Err(err).Msgf("error on persistReplicaset call to %s", eventType)
}
}

func (a *Aggregator) processReplicaSet(d k8s.K8sResourceMessage) {
replicaSet := d.Object.(*appsv1.ReplicaSet)

var ownerType, ownerID, ownerName string
if len(replicaSet.OwnerReferences) > 0 {
ownerType = replicaSet.OwnerReferences[0].Kind
ownerID = string(replicaSet.OwnerReferences[0].UID)
ownerName = replicaSet.OwnerReferences[0].Name
} else {
log.Logger.Debug().Msgf("ReplicaSet %s/%s has no owner, event: %s", replicaSet.Namespace, replicaSet.Name, d.EventType)
}

dtoReplicaSet := datastore.ReplicaSet{
UID: string(replicaSet.UID),
Name: ownerName,
Namespace: replicaSet.Namespace,
OwnerType: ownerType,
OwnerID: ownerID,
OwnerName: ownerName,
Replicas: replicaSet.Status.Replicas,
}

switch d.EventType {
case k8s.ADD:
go a.persistReplicaSet(dtoReplicaSet, ADD)
case k8s.UPDATE:
go a.persistReplicaSet(dtoReplicaSet, UPDATE)
case k8s.DELETE:
go a.persistReplicaSet(dtoReplicaSet, DELETE)
}

}

func (a *Aggregator) processDeployment(d k8s.K8sResourceMessage) {
deployment := d.Object.(*appsv1.Deployment)

dto := datastore.Deployment{
UID: string(deployment.UID),
Name: deployment.Name,
Namespace: deployment.Namespace,
Replicas: deployment.Status.Replicas,
}

switch d.EventType {
case k8s.ADD:
go func() {
err := a.ds.PersistDeployment(dto, ADD)
if err != nil {
log.Logger.Error().Err(err).Msgf("error on PersistDeployment call to %s, uid: %s", ADD, dto.UID)
}
}()
case k8s.UPDATE:
go func() {
err := a.ds.PersistDeployment(dto, UPDATE)
if err != nil {
log.Logger.Error().Err(err).Msgf("error on PersistDeployment call to %s, uid: %s", UPDATE, dto.UID)
}
}()
case k8s.DELETE:
go func() {
err := a.ds.PersistDeployment(dto, DELETE)
if err != nil {
log.Logger.Error().Err(err).Msgf("error on PersistDeployment call to %s, uid: %s", DELETE, dto.UID)
}
}()
}
}

func (a *Aggregator) processContainer(d k8s.K8sResourceMessage) {
c := d.Object.(*k8s.Container)

Expand Down Expand Up @@ -229,99 +153,3 @@ func (a *Aggregator) processContainer(d k8s.K8sResourceMessage) {
// No need for delete container
}
}

func (a *Aggregator) processEndpoints(ep k8s.K8sResourceMessage) {
endpoints := ep.Object.(*corev1.Endpoints)

// subsets
adrs := []datastore.Address{}

// subset[0].address -> ips
// subset[0].ports -> ports

for _, subset := range endpoints.Subsets {
ips := []datastore.AddressIP{}
ports := []datastore.AddressPort{}

for _, addr := range subset.Addresses {
// Probably external IP
if addr.TargetRef == nil {
ips = append(ips, datastore.AddressIP{
IP: addr.IP,
})
continue
}

// TargetRef: Pod probably
ips = append(ips, datastore.AddressIP{
Type: string(addr.TargetRef.Kind),
ID: string(addr.TargetRef.UID),
Name: addr.TargetRef.Name,
Namespace: addr.TargetRef.Namespace,
IP: addr.IP,
})
}

for _, port := range subset.Ports {
ports = append(ports, datastore.AddressPort{
Port: port.Port,
Protocol: string(port.Protocol),
})
}

adrs = append(adrs, datastore.Address{
IPs: ips,
Ports: ports,
})
}

dto := datastore.Endpoints{
UID: string(endpoints.UID),
Name: endpoints.Name,
Namespace: endpoints.Namespace,
Addresses: adrs,
}

switch ep.EventType {
case k8s.ADD:
go func() {
err := a.ds.PersistEndpoints(dto, ADD)
if err != nil {
log.Logger.Error().Err(err).Msgf("error on PersistEndpoints call to %s, uid: %s", ADD, dto.UID)
}
}()
case k8s.UPDATE:
go func() {
err := a.ds.PersistEndpoints(dto, UPDATE)
if err != nil {
log.Logger.Error().Err(err).Msgf("error on PersistEndpoints call to %s, uid: %s", UPDATE, dto.UID)
}
}()
case k8s.DELETE:
go func() {
err := a.ds.PersistEndpoints(dto, DELETE)
if err != nil {
log.Logger.Error().Err(err).Msgf("error on PersistEndpoints call to %s, uid: %s", DELETE, dto.UID)
}
}()
}
}

func (a *Aggregator) processDaemonSet(d k8s.K8sResourceMessage) {
daemonSet := d.Object.(*appsv1.DaemonSet)

dtoDaemonSet := datastore.DaemonSet{
UID: string(daemonSet.UID),
Name: daemonSet.Name,
Namespace: daemonSet.Namespace,
}

switch d.EventType {
case k8s.ADD:
go a.ds.PersistDaemonSet(dtoDaemonSet, ADD)
case k8s.UPDATE:
go a.ds.PersistDaemonSet(dtoDaemonSet, UPDATE)
case k8s.DELETE:
go a.ds.PersistDaemonSet(dtoDaemonSet, DELETE)
}
}
18 changes: 8 additions & 10 deletions ebpf/throughput/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,14 @@ func DeployAndWait(ctx context.Context, ch chan interface{}, eventChan <-chan in

bpfEvent := (*ThroughputEventBpf)(unsafe.Pointer(&record.RawSample[0]))

go func() {
ch <- ThroughputEvent{
Timestamp: bpfEvent.Timestamp,
Size: bpfEvent.Size,
SPort: bpfEvent.SPort,
DPort: bpfEvent.DPort,
SAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.SAddr[0], bpfEvent.SAddr[1], bpfEvent.SAddr[2], bpfEvent.SAddr[3]),
DAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.DAddr[0], bpfEvent.DAddr[1], bpfEvent.DAddr[2], bpfEvent.DAddr[3]),
}
}()
ch <- ThroughputEvent{
Timestamp: bpfEvent.Timestamp,
Size: bpfEvent.Size,
SPort: bpfEvent.SPort,
DPort: bpfEvent.DPort,
SAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.SAddr[0], bpfEvent.SAddr[1], bpfEvent.SAddr[2], bpfEvent.SAddr[3]),
DAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.DAddr[0], bpfEvent.DAddr[1], bpfEvent.DAddr[2], bpfEvent.DAddr[3]),
}
}

for {
Expand Down
31 changes: 0 additions & 31 deletions k8s/daemonset.go

This file was deleted.

31 changes: 0 additions & 31 deletions k8s/deployment.go

This file was deleted.

31 changes: 0 additions & 31 deletions k8s/endpoints.go

This file was deleted.

Loading

0 comments on commit 61ca0ed

Please sign in to comment.