Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat : Added queue handler #37

Merged
merged 15 commits into from
Jan 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
PORT = 8999
DISCORD_PUBLIC_KEY = "<DISCORD_PUBLIC_KEY>"
PORT = "<PORT>" # Default :8999
DISCORD_PUBLIC_KEY = "<DISCORD_PUBLIC_KEY>"
GUILD_ID = "<DISCORD_GUILD_ID>"
BOT_TOKEN = "<BOT_TOKEN>"
DISCORD_QUEUE = "<DISCORD_QUEUE>" # Default :DISCORD_QUEUE
QUEUE_URL = "<QUEUE_URL>" # Default :amqp://localhost
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,31 @@ Before running the project, ensure that you have the following installed:
To install Air, follow the installation steps here:
[Air Installation Guide](https://github.com/air-verse/air)

## Running RabbitMQ with Docker

1. Ensure Docker is installed and running on your machine.
2. Navigate to the project directory.
3. Create a `docker-compose.yml` file with the following content:

```yaml
version: '3.8'

services:
rabbitmq:
image: rabbitmq:3.13-management
container_name: rabbitmq
ports:
- '5672:5672'
- '15672:15672'
```

4. Start the RabbitMQ container:

```sh
docker-compose up -d
```

5. Verify that RabbitMQ is running by accessing the management interface at [http://localhost:15672](http://localhost:15672). The default username and password are both `guest`.

## Running the Project Using Go

Expand Down Expand Up @@ -62,7 +87,6 @@ Before running the project, ensure that you have the following installed:
air
```


## Running the Project Using Make

You can run the project using the `Makefile`, which provides several commands for various tasks. Below are the steps to run the project:
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ type Config struct {
DISCORD_PUBLIC_KEY string
GUILD_ID string
BOT_TOKEN string
QUEUE_URL string
QUEUE_NAME string
MAX_RETRIES int
}

var AppConfig Config
Expand All @@ -26,9 +29,12 @@ func init() {

AppConfig = Config{
Port: loadEnv("PORT"),
QUEUE_URL: loadEnv("QUEUE_URL"),
DISCORD_PUBLIC_KEY: loadEnv("DISCORD_PUBLIC_KEY"),
GUILD_ID: loadEnv("GUILD_ID"),
BOT_TOKEN: loadEnv("BOT_TOKEN"),
QUEUE_NAME: loadEnv("QUEUE_NAME"),
MAX_RETRIES: 5,
}
}

Expand Down
21 changes: 21 additions & 0 deletions controllers/queueHandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package controllers

import (
"io"
"net/http"

"github.com/julienschmidt/httprouter"
"github.com/sirupsen/logrus"
)

func QueueHandler(response http.ResponseWriter, request *http.Request, params httprouter.Params) {
body, err := io.ReadAll(request.Body)
if err != nil {
http.Error(response, "Failed to read request body", http.StatusInternalServerError)
return
}
logrus.Infof("QueueHandler: %s\n", string(body))
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(http.StatusOK)

}
41 changes: 41 additions & 0 deletions controllers/queueHandler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package controllers_test

import (
"bytes"
"errors"
"net/http"
"net/http/httptest"
"testing"

"github.com/Real-Dev-Squad/discord-service/controllers"
"github.com/julienschmidt/httprouter"
"github.com/stretchr/testify/assert"
)

type errorReader struct{}

func (e *errorReader) Read(p []byte) (n int, err error) {
return 0, errors.New("simulated read error")
}
func TestQueueHandler(t *testing.T) {

router := httprouter.New()
router.POST("/queue", controllers.QueueHandler)
t.Run("should return 200 OK and log the request body", func(t *testing.T) {
body := []byte(`{"message": "test message"}`)
req, err := http.NewRequest("POST", "/queue", bytes.NewBuffer(body))
assert.NoError(t, err)
rr := httptest.NewRecorder()
router.ServeHTTP(rr, req)
assert.Equal(t, http.StatusOK, rr.Code)
assert.Equal(t, "application/json", rr.Header().Get("Content-Type"))
})

t.Run("should return 500 Internal Server Error if payload is unable to be decoded", func(t *testing.T) {
req, err := http.NewRequest("POST", "/queue", &errorReader{})
assert.NoError(t, err)
rr := httptest.NewRecorder()
router.ServeHTTP(rr, req)
assert.Equal(t, http.StatusInternalServerError, rr.Code)
})
}
6 changes: 6 additions & 0 deletions dtos/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package dtos

type TextMessage struct {
Text string
Priority uint8
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/sys v0.26.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4d
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA=
github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package main

import (
config "github.com/Real-Dev-Squad/discord-service/config"
queue "github.com/Real-Dev-Squad/discord-service/queue"
"github.com/Real-Dev-Squad/discord-service/routes"
"github.com/sirupsen/logrus"
)

func main() {
logrus.Info("Starting server on port " + config.AppConfig.Port)
queue.GetQueueInstance()
routes.Listen(":" + config.AppConfig.Port)
}
114 changes: 114 additions & 0 deletions queue/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package queue

import (
"sync"

"github.com/Real-Dev-Squad/discord-service/config"
"github.com/Real-Dev-Squad/discord-service/dtos"
"github.com/Real-Dev-Squad/discord-service/utils"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus"
)

var (
queueInstance *Queue
once sync.Once
)

type sessionInterface interface {
dial() error
createChannel() error
declareQueue() error
}

type Queue struct {
Connection *amqp.Connection
Queue amqp.Queue
Name string
Channel *amqp.Channel
}

func (q *Queue) dial() error {
var err error
q.Connection, err = amqp.Dial(config.AppConfig.QUEUE_URL)
return err
}

func (q *Queue) createChannel() error {
var err error
q.Channel, err = q.Connection.Channel()
return err
}

func (q *Queue) declareQueue() error {
var err error
q.Queue, err = q.Channel.QueueDeclare(
config.AppConfig.QUEUE_NAME, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{"x-max-priority": 2}, // arguments
)
return err
}

func InitQueueConnection(openSession sessionInterface) {
var err error
f := func() error {
err = openSession.dial()
if err != nil {
return err
}
err = openSession.createChannel()
if err != nil {
return err
}
err = openSession.declareQueue()
return err
}

err = utils.ExponentialBackoffRetry(config.AppConfig.MAX_RETRIES, f)
if err != nil {
logrus.Errorf("Failed to initialize queue after %d attempts: %s", config.AppConfig.MAX_RETRIES, err)
return
}
logrus.Infof("Established a connection to RabbitMQ named %s", config.AppConfig.QUEUE_NAME)

}

func queueHandler() {
queueInstance = &Queue{}
InitQueueConnection(queueInstance)
}

var GetQueueInstance = func() *Queue {
once.Do(queueHandler)
return queueInstance
}

func SendMessage(message dtos.TextMessage) {
queue := GetQueueInstance()

if queue.Channel == nil {
logrus.Errorf("Queue channel is not initialized")
return
}

err := queue.Channel.Publish(
"", // default exchange
queue.Queue.Name, // use the actual queue name
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message.Text),
Priority: message.Priority,
})

if err != nil {
logrus.Errorf("Failed to publish message: %v", err)
return
}
logrus.Info("Message sent successfully")
}
Loading
Loading