diff --git a/.env.example b/.env.example index 0a185e0..7a6f196 100644 --- a/.env.example +++ b/.env.example @@ -1,2 +1,6 @@ -PORT = 8999 -DISCORD_PUBLIC_KEY = "" \ No newline at end of file +PORT = "" # Default :8999 +DISCORD_PUBLIC_KEY = "" +GUILD_ID = "" +BOT_TOKEN = "" +DISCORD_QUEUE = "" # Default :DISCORD_QUEUE +QUEUE_URL = "" # Default :amqp://localhost diff --git a/README.md b/README.md index 4d21c96..7f98d8f 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,31 @@ Before running the project, ensure that you have the following installed: To install Air, follow the installation steps here: [Air Installation Guide](https://github.com/air-verse/air) +## Running RabbitMQ with Docker + +1. Ensure Docker is installed and running on your machine. +2. Navigate to the project directory. +3. Create a `docker-compose.yml` file with the following content: + + ```yaml + version: '3.8' + + services: + rabbitmq: + image: rabbitmq:3.13-management + container_name: rabbitmq + ports: + - '5672:5672' + - '15672:15672' + ``` + +4. Start the RabbitMQ container: + + ```sh + docker-compose up -d + ``` + +5. Verify that RabbitMQ is running by accessing the management interface at [http://localhost:15672](http://localhost:15672). The default username and password are both `guest`. ## Running the Project Using Go @@ -62,7 +87,6 @@ Before running the project, ensure that you have the following installed: air ``` - ## Running the Project Using Make You can run the project using the `Makefile`, which provides several commands for various tasks. Below are the steps to run the project: diff --git a/config/config.go b/config/config.go index da0a0ab..625a8c9 100644 --- a/config/config.go +++ b/config/config.go @@ -13,6 +13,9 @@ type Config struct { DISCORD_PUBLIC_KEY string GUILD_ID string BOT_TOKEN string + QUEUE_URL string + QUEUE_NAME string + MAX_RETRIES int } var AppConfig Config @@ -26,9 +29,12 @@ func init() { AppConfig = Config{ Port: loadEnv("PORT"), + QUEUE_URL: loadEnv("QUEUE_URL"), DISCORD_PUBLIC_KEY: loadEnv("DISCORD_PUBLIC_KEY"), GUILD_ID: loadEnv("GUILD_ID"), BOT_TOKEN: loadEnv("BOT_TOKEN"), + QUEUE_NAME: loadEnv("QUEUE_NAME"), + MAX_RETRIES: 5, } } diff --git a/controllers/queueHandler.go b/controllers/queueHandler.go new file mode 100644 index 0000000..f6a1c1b --- /dev/null +++ b/controllers/queueHandler.go @@ -0,0 +1,21 @@ +package controllers + +import ( + "io" + "net/http" + + "github.com/julienschmidt/httprouter" + "github.com/sirupsen/logrus" +) + +func QueueHandler(response http.ResponseWriter, request *http.Request, params httprouter.Params) { + body, err := io.ReadAll(request.Body) + if err != nil { + http.Error(response, "Failed to read request body", http.StatusInternalServerError) + return + } + logrus.Infof("QueueHandler: %s\n", string(body)) + response.Header().Set("Content-Type", "application/json") + response.WriteHeader(http.StatusOK) + +} diff --git a/controllers/queueHandler_test.go b/controllers/queueHandler_test.go new file mode 100644 index 0000000..cefc712 --- /dev/null +++ b/controllers/queueHandler_test.go @@ -0,0 +1,41 @@ +package controllers_test + +import ( + "bytes" + "errors" + "net/http" + "net/http/httptest" + "testing" + + "github.com/Real-Dev-Squad/discord-service/controllers" + "github.com/julienschmidt/httprouter" + "github.com/stretchr/testify/assert" +) + +type errorReader struct{} + +func (e *errorReader) Read(p []byte) (n int, err error) { + return 0, errors.New("simulated read error") +} +func TestQueueHandler(t *testing.T) { + + router := httprouter.New() + router.POST("/queue", controllers.QueueHandler) + t.Run("should return 200 OK and log the request body", func(t *testing.T) { + body := []byte(`{"message": "test message"}`) + req, err := http.NewRequest("POST", "/queue", bytes.NewBuffer(body)) + assert.NoError(t, err) + rr := httptest.NewRecorder() + router.ServeHTTP(rr, req) + assert.Equal(t, http.StatusOK, rr.Code) + assert.Equal(t, "application/json", rr.Header().Get("Content-Type")) + }) + + t.Run("should return 500 Internal Server Error if payload is unable to be decoded", func(t *testing.T) { + req, err := http.NewRequest("POST", "/queue", &errorReader{}) + assert.NoError(t, err) + rr := httptest.NewRecorder() + router.ServeHTTP(rr, req) + assert.Equal(t, http.StatusInternalServerError, rr.Code) + }) +} diff --git a/dtos/queue.go b/dtos/queue.go new file mode 100644 index 0000000..c3e9cac --- /dev/null +++ b/dtos/queue.go @@ -0,0 +1,6 @@ +package dtos + +type TextMessage struct { + Text string + Priority uint8 +} diff --git a/go.mod b/go.mod index 2ee5740..e98308c 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect golang.org/x/crypto v0.28.0 // indirect golang.org/x/sys v0.26.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 937bf1a..0971de0 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4d github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= diff --git a/main.go b/main.go index 815fe0e..bd2250f 100644 --- a/main.go +++ b/main.go @@ -2,11 +2,13 @@ package main import ( config "github.com/Real-Dev-Squad/discord-service/config" + queue "github.com/Real-Dev-Squad/discord-service/queue" "github.com/Real-Dev-Squad/discord-service/routes" "github.com/sirupsen/logrus" ) func main() { logrus.Info("Starting server on port " + config.AppConfig.Port) + queue.GetQueueInstance() routes.Listen(":" + config.AppConfig.Port) } diff --git a/queue/main.go b/queue/main.go new file mode 100644 index 0000000..6969105 --- /dev/null +++ b/queue/main.go @@ -0,0 +1,114 @@ +package queue + +import ( + "sync" + + "github.com/Real-Dev-Squad/discord-service/config" + "github.com/Real-Dev-Squad/discord-service/dtos" + "github.com/Real-Dev-Squad/discord-service/utils" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/sirupsen/logrus" +) + +var ( + queueInstance *Queue + once sync.Once +) + +type sessionInterface interface { + dial() error + createChannel() error + declareQueue() error +} + +type Queue struct { + Connection *amqp.Connection + Queue amqp.Queue + Name string + Channel *amqp.Channel +} + +func (q *Queue) dial() error { + var err error + q.Connection, err = amqp.Dial(config.AppConfig.QUEUE_URL) + return err +} + +func (q *Queue) createChannel() error { + var err error + q.Channel, err = q.Connection.Channel() + return err +} + +func (q *Queue) declareQueue() error { + var err error + q.Queue, err = q.Channel.QueueDeclare( + config.AppConfig.QUEUE_NAME, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + amqp.Table{"x-max-priority": 2}, // arguments + ) + return err +} + +func InitQueueConnection(openSession sessionInterface) { + var err error + f := func() error { + err = openSession.dial() + if err != nil { + return err + } + err = openSession.createChannel() + if err != nil { + return err + } + err = openSession.declareQueue() + return err + } + + err = utils.ExponentialBackoffRetry(config.AppConfig.MAX_RETRIES, f) + if err != nil { + logrus.Errorf("Failed to initialize queue after %d attempts: %s", config.AppConfig.MAX_RETRIES, err) + return + } + logrus.Infof("Established a connection to RabbitMQ named %s", config.AppConfig.QUEUE_NAME) + +} + +func queueHandler() { + queueInstance = &Queue{} + InitQueueConnection(queueInstance) +} + +var GetQueueInstance = func() *Queue { + once.Do(queueHandler) + return queueInstance +} + +func SendMessage(message dtos.TextMessage) { + queue := GetQueueInstance() + + if queue.Channel == nil { + logrus.Errorf("Queue channel is not initialized") + return + } + + err := queue.Channel.Publish( + "", // default exchange + queue.Queue.Name, // use the actual queue name + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(message.Text), + Priority: message.Priority, + }) + + if err != nil { + logrus.Errorf("Failed to publish message: %v", err) + return + } + logrus.Info("Message sent successfully") +} diff --git a/queue/main_test.go b/queue/main_test.go new file mode 100644 index 0000000..7985544 --- /dev/null +++ b/queue/main_test.go @@ -0,0 +1,117 @@ +package queue + +import ( + "errors" + "testing" + + "github.com/Real-Dev-Squad/discord-service/config" + "github.com/Real-Dev-Squad/discord-service/dtos" + _ "github.com/Real-Dev-Squad/discord-service/tests/helpers" + "github.com/Real-Dev-Squad/discord-service/utils" + amqp "github.com/rabbitmq/amqp091-go" + + "github.com/stretchr/testify/assert" +) + +type mockQueue struct { + dialError error + channelError error + queueError error +} + +func (m *mockQueue) dial() error { + return m.dialError +} + +func (m *mockQueue) createChannel() error { + return m.channelError +} +func (m *mockQueue) declareQueue() error { + return m.queueError +} + +func TestInitQueueConnection(t *testing.T) { + config.AppConfig.MAX_RETRIES = 1 + t.Run("should not panic when Dial() returns error", func(t *testing.T) { + mockQueue := &mockQueue{dialError: errors.New("connection failed")} + assert.NotPanics(t, func() { + InitQueueConnection(mockQueue) + }, "InitQueueConnection should not panic when Dial is unsuccessful") + + }) + + t.Run("should not panic when CreateChannel() returns error", func(t *testing.T) { + mockQueue := &mockQueue{channelError: errors.New("channel failed")} + assert.NotPanics(t, func() { + InitQueueConnection(mockQueue) + }, "InitQueueConnection should not panic when CreateChannel is unsuccessful") + + }) + + t.Run("should not panic when DeclareQueue() returns error", func(t *testing.T) { + mockQueue := &mockQueue{queueError: errors.New("queue failed")} + assert.NotPanics(t, func() { + InitQueueConnection(mockQueue) + }, "InitQueueConnection should not when DeclareQueue is unsuccessful") + + }) + + t.Run("should pass when no error is returned", func(t *testing.T) { + mockQueue := &mockQueue{} + assert.NotPanics(t, func() { + InitQueueConnection(mockQueue) + }) + }) +} + +func TestGetQueueInstance(t *testing.T) { + t.Run("Should use ExponentialBackoffRetry via GetQueueInstance", func(t *testing.T) { + attempt := 0 + originalFunc := utils.ExponentialBackoffRetry + utils.ExponentialBackoffRetry = func(maxRetries int, operation func() error) error { + attempt++ + return errors.New("error") + } + defer func() { utils.ExponentialBackoffRetry = originalFunc }() + assert.NotNil(t, GetQueueInstance()) + assert.Equal(t, 1, attempt) + }) +} + +func TestSessionWrapper(t *testing.T) { + sessionWrapper := &Queue{} + + t.Run("SessionWrapper should always implement dial() method", func(t *testing.T) { + err := sessionWrapper.dial() + assert.Error(t, err) + }) + + t.Run("SessionWrapper should always implement createChannel() method", func(t *testing.T) { + sessionWrapper.Connection = &amqp.Connection{} + assert.Panics(t, func() { + sessionWrapper.createChannel() + }) + + }) + + t.Run("SessionWrapper should always implement declareQueue() method", func(t *testing.T) { + sessionWrapper.Channel = &amqp.Channel{} + assert.Panics(t, func() { + sessionWrapper.declareQueue() + }) + }) + +} + +func TestSendMessage(t *testing.T) { + t.Run("Should not panic when SendMessage returns error", func(t *testing.T) { + config.AppConfig.MAX_RETRIES = 1 + message := dtos.TextMessage{ + Text: "test", + Priority: 1, + } + assert.NotPanics(t, func() { + SendMessage(message) + }, "SendMessage should panic when SendMessage returns error") + }) +} diff --git a/routes/baseRoute.go b/routes/baseRoute.go index 10206d1..01e5cb6 100644 --- a/routes/baseRoute.go +++ b/routes/baseRoute.go @@ -9,4 +9,5 @@ import ( func SetupBaseRoutes(router *httprouter.Router) { router.POST("/", middleware.VerifyCommand(controllers.HomeHandler)) router.GET("/health", controllers.HealthCheckHandler) + router.POST("/queue", controllers.QueueHandler) } diff --git a/routes/main.go b/routes/main.go index 8e4526c..70930cc 100644 --- a/routes/main.go +++ b/routes/main.go @@ -24,6 +24,6 @@ func Listen(listenAddress string) { router := SetupV1Routes() err := http.ListenAndServe(listenAddress, router) if err != nil { - logrus.Error(err) + logrus.Fatal(err) } } diff --git a/tests/helpers/config.go b/tests/helpers/config.go index 8d2db63..10715bd 100644 --- a/tests/helpers/config.go +++ b/tests/helpers/config.go @@ -9,4 +9,6 @@ func init() { os.Setenv("DISCORD_PUBLIC_KEY", "8933e3749b4feb4d76169b26ed372af3c378f4353c2024fee0601f2a2e7918e1") os.Setenv("GUILD_ID", "8933e3749b4feb4d76169b26ed372af3c378f4353c2024fee0601f2a2e7918e1") os.Setenv("BOT_TOKEN", "8933e3749b4feb4d76169b26ed372af3c378f4353c2024fee0601f2a2e7918e1") + os.Setenv("QUEUE_NAME", "DISCORD_QUEUE") + os.Setenv("QUEUE_URL", "local:5672") } diff --git a/utils/helper.go b/utils/helper.go new file mode 100644 index 0000000..895ac7d --- /dev/null +++ b/utils/helper.go @@ -0,0 +1,23 @@ +package utils + +import ( + "math" + "time" + + "github.com/sirupsen/logrus" +) + +var ExponentialBackoffRetry = func(maxRetries int, operation func() error) error { + var err error + for i := 0; i < maxRetries; i++ { + err = operation() + if err == nil { + return nil + } + logrus.Errorf("Attempt %d: Operation failed: %s", i+1, err) + if i < maxRetries-1 { + time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) + } + } + return err +} diff --git a/utils/helper_test.go b/utils/helper_test.go new file mode 100644 index 0000000..717945b --- /dev/null +++ b/utils/helper_test.go @@ -0,0 +1,58 @@ +package utils + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestExponentialBackoffRetry_Success(t *testing.T) { + attempts := 0 + operation := func() error { + attempts++ + if attempts < 3 { + return errors.New("temporary error") + } + return nil + } + + err := ExponentialBackoffRetry(5, operation) + assert.NoError(t, err) + assert.Equal(t, 3, attempts) +} + +func TestExponentialBackoffRetry_Failure(t *testing.T) { + attempts := 0 + operation := func() error { + attempts++ + return errors.New("permanent error") + } + + err := ExponentialBackoffRetry(3, operation) + assert.Error(t, err) + assert.Equal(t, 3, attempts) +} + +func TestExponentialBackoffRetry_NoRetries(t *testing.T) { + attempts := 0 + operation := func() error { + attempts++ + return errors.New("error") + } + + err := ExponentialBackoffRetry(0, operation) + assert.Nil(t, err) + assert.Equal(t, 0, attempts) +} + +func TestExponentialBackoffRetry_ImmediateSuccess(t *testing.T) { + attempts := 0 + operation := func() error { + attempts++ + return nil + } + err := ExponentialBackoffRetry(5, operation) + assert.NoError(t, err) + assert.Equal(t, 1, attempts) +}