Skip to content

Commit

Permalink
Merge pull request #2 from Codigami/add-parallelism
Browse files Browse the repository at this point in the history
Add option to process messages in parallel
  • Loading branch information
ameykpatil authored May 30, 2017
2 parents fe6af07 + 9140e20 commit c16ce1f
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 2 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## v1.1 (May 29, 2017)

* Add option to process messages in parallel ([#2](https://github.com/Codigami/gohaqd/pull/2), [@ApsOps](https://github.com/ApsOps))

## v1.0 (May 9, 2017)

* Initial stable release

## v0.3 (December 21, 2016)

* Instantiate SQS object only once ([#5](https://github.com/ApsOps/gohaqd/pull/5), [@ApsOps](https://github.com/ApsOps))
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ It pulls data off a queue, inserts it into the message body, and sends an HTTP P
## Flags:
```
--aws-region string AWS Region for the SQS queue (default "us-east-1")
--parallel int Number of messages to be consumed in parallel (default 1)
-q, --queue-name string queue name to use
--sqs-endpoint string SQS Endpoint for using with fake_sqs
-u, --url string endpoint to send an HTTP POST request with contents of queue message in the body
Expand Down
27 changes: 25 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var queueName string
var url string
var awsRegion string
var sqsEndpoint string
var parallelRequests int

// RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{
Expand All @@ -62,6 +63,7 @@ func init() {
RootCmd.PersistentFlags().StringVarP(&url, "url", "u", "", "endpoint to send an HTTP POST request with contents of queue message in the body")
RootCmd.PersistentFlags().StringVar(&awsRegion, "aws-region", "us-east-1", "AWS Region for the SQS queue")
RootCmd.PersistentFlags().StringVar(&sqsEndpoint, "sqs-endpoint", "", "SQS Endpoint for using with fake_sqs")
RootCmd.PersistentFlags().IntVar(&parallelRequests, "parallel", 1, "Number of messages to be consumed in parallel")
RootCmd.MarkPersistentFlagRequired("queuename")
RootCmd.MarkPersistentFlagRequired("url")

Expand All @@ -72,6 +74,7 @@ func init() {
var svc *sqs.SQS
var msgparams *sqs.ReceiveMessageInput
var httpClient *http.Client
var sem chan *sqs.Message

func startGohaqd(cmd *cobra.Command, args []string) {
var config *aws.Config
Expand All @@ -97,18 +100,37 @@ func startGohaqd(cmd *cobra.Command, args []string) {
QueueUrl: q.QueueUrl,
WaitTimeSeconds: aws.Int64(20),
}

// Create semaphore channel for passing messages to consumers
sem = make(chan *sqs.Message)

// Start multiple goroutines for consumers base on --parallel flag
for i := 0; i < parallelRequests; i++ {
go startConsumer(q.QueueUrl)
}

// Infinitely poll SQS queue for messages
for {
pollSQS(q.QueueUrl)
pollSQS()
}
}

func pollSQS(queueURL *string) {
// Receives messages from SQS queue and adds to semaphore channel
func pollSQS() {
resp, err := svc.ReceiveMessage(msgparams)
if err != nil {
log.Fatalf(err.Error())
}

for _, msg := range resp.Messages {
sem <- msg
}
}

// Receives messages from semaphore channel and
// deletes a message from SQS queue is it's consumed successfully
func startConsumer(queueURL *string) {
for msg := range sem {
if sendMessageToURL(*msg.Body) {
_, err := svc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: queueURL,
Expand All @@ -121,6 +143,7 @@ func pollSQS(queueURL *string) {
}
}

// Sends a POST request to consumption endpoint with the SQS message as body
func sendMessageToURL(msg string) bool {
var resp *http.Response
var err error
Expand Down

0 comments on commit c16ce1f

Please sign in to comment.