-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
42d2272
commit 80fc376
Showing
4 changed files
with
7 additions
and
177 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,4 +27,6 @@ go.work | |
.idea/ | ||
example/main | ||
|
||
**/mock_*.go | ||
**/mock_*.go | ||
|
||
**/main |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -171,190 +171,18 @@ The `Signature` struct represents a single task invocation and has the following | |
|
||
### Publisher (Enqueue Task) | ||
In order to enqueue jobs, you'll need to make a WorkerPool. publish jobs to broker. | ||
```go | ||
package main | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"os" | ||
|
||
"github.com/sirupsen/logrus" | ||
"github.com/surendratiwari3/paota/config" | ||
"github.com/surendratiwari3/paota/logger" | ||
"github.com/surendratiwari3/paota/task" | ||
"github.com/surendratiwari3/paota/workerpool" | ||
) | ||
|
||
func main() { | ||
// Set up the logger | ||
logger.ApplicationLogger = logrus.StandardLogger() | ||
|
||
// Configure Paota | ||
cnf := config.Config{ | ||
Broker: "amqp", | ||
TaskQueueName: "paota_task_queue", | ||
AMQP: &config.AMQPConfig{ | ||
Url: "amqp://guest:guest@localhost:55005/", | ||
Exchange: "paota_task_exchange", | ||
ExchangeType: "direct", | ||
BindingKey: "paota_task_binding_key", | ||
PrefetchCount: 100, | ||
ConnectionPoolSize: 10, | ||
}, | ||
} | ||
|
||
err := config.GetConfigProvider().SetApplicationConfig(cnf) | ||
if err != nil { | ||
logger.ApplicationLogger.Error("config error", err) | ||
return | ||
} | ||
|
||
newWorkerPool, err := workerpool.NewWorkerPool(context.Background(), 10, "testWorker") | ||
if err != nil { | ||
logger.ApplicationLogger.Error("workerPool is not created", err) | ||
os.Exit(0) | ||
} else if newWorkerPool == nil { | ||
logger.ApplicationLogger.Info("workerPool is nil") | ||
os.Exit(0) | ||
} | ||
logger.ApplicationLogger.Info("newWorkerPool created successfully") | ||
|
||
// Register tasks | ||
regTasks := map[string]interface{}{ | ||
"Print": Print, | ||
} | ||
err = newWorkerPool.RegisterTasks(regTasks) | ||
if err != nil { | ||
logger.ApplicationLogger.Info("error while registering task") | ||
return | ||
} | ||
logger.ApplicationLogger.Info(newWorkerPool.IsTaskRegistered("Print")) | ||
|
||
logger.ApplicationLogger.Info("Worker is also started") | ||
|
||
// UserRecord represents the structure of user records. | ||
type UserRecord struct { | ||
ID string `json:"id"` | ||
Name string `json:"name"` | ||
Email string `json:"email"` | ||
// Add other fields as needed | ||
} | ||
|
||
// Replace this with the received user record | ||
user := UserRecord{ | ||
ID: "1", | ||
Name: "John Doe", | ||
Email: "[email protected]", | ||
} | ||
|
||
// Convert the struct to a JSON string | ||
userJSON, err := json.Marshal(user) | ||
if err != nil { | ||
// | ||
} | ||
|
||
printJob := &task.Signature{ | ||
Name: "Print", | ||
Args: []task.Arg{ | ||
{ | ||
Type: "string", | ||
Value: string(userJSON), | ||
}, | ||
}, | ||
IgnoreWhenTaskNotRegistered: true, | ||
} | ||
|
||
go func() { | ||
for i := 0; i < 100000; i++ { | ||
newWorkerPool.SendTaskWithContext(context.Background(), printJob) | ||
} | ||
}() | ||
} | ||
|
||
func Print(arg *task.Signature) error { | ||
logger.ApplicationLogger.Info("Print Function Completed") | ||
return nil | ||
} | ||
```go | ||
example/producer/main.go | ||
``` | ||
|
||
### Consumer (Task Processor) | ||
In order to process jobs, you'll need to make a WorkerPool. Add jobs to the pool, and start the pool. | ||
```go | ||
package main | ||
|
||
import ( | ||
"context" | ||
"os" | ||
|
||
"github.com/sirupsen/logrus" | ||
"github.com/surendratiwari3/paota/config" | ||
"github.com/surendratiwari3/paota/logger" | ||
"github.com/surendratiwari3/paota/task" | ||
"github.com/surendratiwari3/paota/workerpool" | ||
) | ||
|
||
func main() { | ||
// Set up the logger | ||
logger.ApplicationLogger = logrus.StandardLogger() | ||
|
||
// Configure Paota | ||
cnf := config.Config{ | ||
Broker: "amqp", | ||
TaskQueueName: "paota_task_queue", | ||
AMQP: &config.AMQPConfig{ | ||
Url: "amqp://guest:guest@localhost:55005/", | ||
Exchange: "paota_task_exchange", | ||
ExchangeType: "direct", | ||
BindingKey: "paota_task_binding_key", | ||
PrefetchCount: 100, | ||
ConnectionPoolSize: 10, | ||
}, | ||
} | ||
|
||
err := config.GetConfigProvider().SetApplicationConfig(cnf) | ||
if err != nil { | ||
logger.ApplicationLogger.Error("config error", err) | ||
return | ||
} | ||
|
||
// Create a new worker pool | ||
newWorkerPool, err := workerpool.NewWorkerPool(context.Background(), 10, "testWorker") | ||
if err != nil { | ||
logger.ApplicationLogger.Error("workerPool is not created", err) | ||
os.Exit(0) | ||
} else if newWorkerPool == nil { | ||
logger.ApplicationLogger.Info("workerPool is nil") | ||
os.Exit(0) | ||
} | ||
logger.ApplicationLogger.Info("newWorkerPool created successfully") | ||
|
||
// Register tasks | ||
regTasks := map[string]interface{}{ | ||
"Print": Print, | ||
} | ||
err = newWorkerPool.RegisterTasks(regTasks) | ||
if err != nil { | ||
logger.ApplicationLogger.Info("error while registering task") | ||
return | ||
} | ||
logger.ApplicationLogger.Info(newWorkerPool.IsTaskRegistered("Print")) | ||
|
||
logger.ApplicationLogger.Info("Worker is also started") | ||
|
||
// Start the worker pool | ||
newWorkerPool.Start() | ||
} | ||
|
||
// Print is an example task function | ||
func Print(arg *task.Signature) error { | ||
logger.ApplicationLogger.Info("Print Function Completed") | ||
return nil | ||
} | ||
|
||
```go | ||
example/consumer/main.go | ||
``` | ||
|
||
|
||
### Mocks for this repository are generated using mockery(v2) | ||
```bash | ||
mockery --all --output=mocks | ||
|
Binary file not shown.
Binary file not shown.