Skip to content

Commit

Permalink
Merge pull request #8 from metalbear-co/aviram/mbe-109-add-sqs-consumer
Browse files Browse the repository at this point in the history
add sqs publishing and consuming
  • Loading branch information
aviramha authored Sep 23, 2024
2 parents 5b1932e + b77cfda commit c29b744
Show file tree
Hide file tree
Showing 23 changed files with 5,041 additions and 73 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ip-visit-sqs-consumer/ip-visit-sqs-consumer
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,17 @@

This repository contains different microservices and Kubernetes manifests to deploy them.
Each microservice has it's own `app.yaml` that should contain all of it's dependencies (besides other microservices).


## SQS

To enable SQS:

1. Install mirrord Operator in cluster (with SQS splitting enabled)
2. `aws iam create-user --user-name SQSPlayground`
3. `aws iam create-access-key --user-name SQSPlayground` - save data to file
4. `aws sqs create-queue --queue-name IpCount` - take QueueUrl to be used in deployment.yaml
5. You need to edit `ip-visit-sqs-consumer/policy.json` and set REGION and ACCOUNT_ID
6. `aws iam create-policy --policy-name SQSPlaygroundPolicy --policy-document file://ip-visit-sqs-consumer/policy.json`
7. `aws iam attach-user-policy --policy-arn arn:aws:iam::526936346962:policy/SQSPlaygroundPolicy --user-name SQSPlayground`
8. Set Region in app.yaml in `ip-visit-counter` and `ip-visit-sqs-consumer`
2 changes: 1 addition & 1 deletion ingress.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ spec:
service:
name: ip-visit-frontend
port:
number: 80
number: 3000
rules:
- http:
paths:
Expand Down
1 change: 1 addition & 0 deletions ip-visit-consumer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ COPY go.sum ./
RUN go mod download
COPY *.go ./

ARG TARGETARCH
RUN GOARCH=$TARGETARCH go build -o /main

FROM gcr.io/distroless/static-debian11
Expand Down
17 changes: 6 additions & 11 deletions ip-visit-consumer/go.mod
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
module example/ip-visit-consumer

go 1.20
go 1.23

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.2.0
github.com/gin-gonic/gin v1.9.0
github.com/redis/go-redis/v9 v9.0.2
github.com/spf13/viper v1.15.0
)

require (
github.com/bytedance/sonic v1.8.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
Expand All @@ -21,7 +19,6 @@ require (
github.com/goccy/go-json v0.10.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand All @@ -30,8 +27,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/segmentio/kafka-go v0.4.42 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand All @@ -41,10 +36,10 @@ require (
github.com/ugorji/go/codec v1.2.9 // indirect
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
1,996 changes: 1,977 additions & 19 deletions ip-visit-consumer/go.sum

Large diffs are not rendered by default.

57 changes: 29 additions & 28 deletions ip-visit-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,15 @@ package main
import (
"context"
"fmt"
"log"
"net/http"
"time"

"github.com/segmentio/kafka-go"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/gin-gonic/gin"
"github.com/spf13/viper"
)

var ctx = context.Background()
var KafkaWriter *kafka.Writer

// SetupKafka
// Initialize the Kafka Writer
func SetupKafka(address, topic string) {
KafkaWriter = &kafka.Writer{
Addr: kafka.TCP(address),
Topic: topic,
Balancer: &kafka.LeastBytes{},
}
}

// Config
// Struct that holds local service port, remote redis host and port
Expand Down Expand Up @@ -54,31 +42,44 @@ func loadConfig() Config {
}

func StartKafkaReader(address, topic, group string) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{address},
GroupID: group,
Topic: topic,
MaxBytes: 10e6, // 10MB

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": address,
"group.id": group,
"auto.offset.reset": "earliest",
})

for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
if err != nil {
panic(err)
}

if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
c.SubscribeTopics([]string{topic}, nil)

// A signal handler or similar could be used to set this to false to break the loop.
run := true

for run {
msg, err := c.ReadMessage(time.Second)
if err == nil {
if msg != nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
}
} else if !err.(kafka.Error).IsTimeout() {
// The client will automatically try to recover from all errors.
// Timeout is not considered an error because it is raised by
// ReadMessage in absence of messages.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}

c.Close()

}

func main() {
config := loadConfig()

go StartKafkaReader(config.KafkaAddress, config.KafkaTopic, config.KafkaConsumerGroup)

router := gin.Default()
router.GET("/health", func(ctx *gin.Context) { ctx.Status(http.StatusOK) })
fmt.Print("loaded")
Expand Down
5 changes: 3 additions & 2 deletions ip-visit-counter/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
FROM --platform=$BUILDPLATFORM golang:1.20-alpine as build-env
FROM --platform=$BUILDPLATFORM golang:1.21-alpine AS build-env

WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY *.go ./

ARG TARGETARCH
RUN GOARCH=$TARGETARCH go build -o /main

FROM gcr.io/distroless/static-debian11
FROM gcr.io/distroless/static-debian11

COPY response.txt /app/response.txt
COPY --from=build-env /main /main
Expand Down
16 changes: 15 additions & 1 deletion ip-visit-counter/app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,18 @@ spec:
- name: KAFKATOPIC
value: "ip-visit"
- name: IPINFOADDRESS
value: "http://ip-info"
value: "http://ip-info"
- name: SQSQUEUENAME
value: "IpCount"
- name: AWS_DEFAULT_REGION
value: "eu-north-1"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-credentials
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-credentials
key: AWS_SECRET_ACCESS_KEY
16 changes: 15 additions & 1 deletion ip-visit-counter/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module example/ip-visit-counter

go 1.20
go 1.21

require (
github.com/gin-gonic/gin v1.9.1
Expand All @@ -10,6 +10,20 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2 v1.31.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.36 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.34 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 // indirect
github.com/aws/aws-sdk-go-v2/service/sqs v1.35.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.23.0 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.31.0 // indirect
github.com/aws/smithy-go v1.21.0 // indirect
github.com/bytedance/sonic v1.10.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
Expand Down
28 changes: 28 additions & 0 deletions ip-visit-counter/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,34 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/aws/aws-sdk-go-v2 v1.31.0 h1:3V05LbxTSItI5kUqNwhJrrrY1BAXxXt0sN0l72QmG5U=
github.com/aws/aws-sdk-go-v2 v1.31.0/go.mod h1:ztolYtaEUtdpf9Wftr31CJfLVjOnD/CVRkKOOYgF8hA=
github.com/aws/aws-sdk-go-v2/config v1.27.36 h1:4IlvHh6Olc7+61O1ktesh0jOcqmq/4WG6C2Aj5SKXy0=
github.com/aws/aws-sdk-go-v2/config v1.27.36/go.mod h1:IiBpC0HPAGq9Le0Xxb1wpAKzEfAQ3XlYgJLYKEVYcfw=
github.com/aws/aws-sdk-go-v2/credentials v1.17.34 h1:gmkk1l/cDGSowPRzkdxYi8edw+gN4HmVK151D/pqGNc=
github.com/aws/aws-sdk-go-v2/credentials v1.17.34/go.mod h1:4R9OEV3tgFMsok4ZeFpExn7zQaZRa9MRGFYnI/xC/vs=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 h1:C/d03NAmh8C4BZXhuRNboF/DqhBkBCeDiJDcaqIT5pA=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14/go.mod h1:7I0Ju7p9mCIdlrfS+JCgqcYD0VXz/N4yozsox+0o078=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 h1:kYQ3H1u0ANr9KEKlGs/jTLrBFPo8P8NaH/w7A01NeeM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18/go.mod h1:r506HmK5JDUh9+Mw4CfGJGSSoqIiLCndAuqXuhbv67Y=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 h1:Z7IdFUONvTcvS7YuhtVxN99v2cCoHRXOS4mTr0B/pUc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18/go.mod h1:DkKMmksZVVyat+Y+r1dEOgJEfUeA7UngIHWeKsi0yNc=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 h1:QFASJGfT8wMXtuP3D5CRmMjARHv9ZmzFUMJznHDOY3w=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5/go.mod h1:QdZ3OmoIjSX+8D1OPAzPxDfjXASbBMDsz9qvtyIhtik=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 h1:Xbwbmk44URTiHNx6PNo0ujDE6ERlsCKJD3u1zfnzAPg=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20/go.mod h1:oAfOFzUB14ltPZj1rWwRc3d/6OgD76R8KlvU3EqM9Fg=
github.com/aws/aws-sdk-go-v2/service/sqs v1.35.0 h1:lvWhMxvNP7A9Gf+c12mRVIb5xH3u7DOehm8pV0fnrTE=
github.com/aws/aws-sdk-go-v2/service/sqs v1.35.0/go.mod h1:WuGxWQhu2LXoPGA2HBIbotpwhM6T4hAz0Ip/HjdxfJg=
github.com/aws/aws-sdk-go-v2/service/sso v1.23.0 h1:fHySkG0IGj2nepgGJPmmhZYL9ndnsq1Tvc6MeuVQCaQ=
github.com/aws/aws-sdk-go-v2/service/sso v1.23.0/go.mod h1:XRlMvmad0ZNL+75C5FYdMvbbLkd6qiqz6foR1nA1PXY=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.0 h1:cU/OeQPNReyMj1JEBgjE29aclYZYtXcsPMXbTkVGMFk=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.0/go.mod h1:FnvDM4sfa+isJ3kDXIzAB9GAwVSzFzSy97uZ3IsHo4E=
github.com/aws/aws-sdk-go-v2/service/sts v1.31.0 h1:GNVxIHBTi2EgwCxpNiozhNasMOK+ROUA2Z3X+cSBX58=
github.com/aws/aws-sdk-go-v2/service/sts v1.31.0/go.mod h1:yMWe0F+XG0DkRZK5ODZhG7BEFYhLXi2dqGsv6tX0cgI=
github.com/aws/smithy-go v1.21.0 h1:H7L8dtDRk0P1Qm6y0ji7MCYMQObJ5R9CRpyPhRUkLYA=
github.com/aws/smithy-go v1.21.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
Expand Down
Loading

0 comments on commit c29b744

Please sign in to comment.