-
Notifications
You must be signed in to change notification settings - Fork 1
/
aggregate_command_handler.go
83 lines (67 loc) · 2.38 KB
/
aggregate_command_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package cqrs
import (
"context"
"fmt"
"github.com/mbict/go-commandbus"
)
type ErrorAggregateNotFound string
func (e ErrorAggregateNotFound) Error() string {
return fmt.Sprintf("aggregate not found for id: \"%s\"", string(e))
}
type ErrorNotAnAggregateCommand string
func (e ErrorNotAnAggregateCommand) Error() string {
return fmt.Sprintf("cannot convert to aggregate command for command: \"%s\"", string(e))
}
type ErrorAggregateCannotHandleCommand string
func (e ErrorAggregateCannotHandleCommand) Error() string {
return fmt.Sprintf("aggregate cannot handle commands directly for command: \"%s\"", string(e))
}
// AggregateCommandHandler is a command handler middleware who loads the aggregate
// calls the aggregate command handler to execute the business logic and saves the
// events to the aggregate store afterwards.
func AggregateCommandHandler(repository AggregateRepository) commandbus.CommandHandler {
return AggregateCommandHandlerCallback(repository, func(aggregate Aggregate, cmd Command) error {
agg, ok := aggregate.(AggregateHandlesCommands)
if !ok {
return ErrorAggregateCannotHandleCommand(cmd.CommandName())
}
return agg.HandleCommand(cmd)
})
}
type AggregateCommandHandlerFunc func(aggregate Aggregate, command Command) error
func AggregateCommandHandlerCallback(repository AggregateRepository, handler AggregateCommandHandlerFunc) commandbus.CommandHandler {
return commandbus.CommandHandlerFunc(func(_ context.Context, command commandbus.Command) error {
cmd, ok := command.(Command)
if !ok {
return ErrorNotAnAggregateCommand(command.CommandName())
}
// load aggregate from store
aggregate, err := repository.Load(cmd.AggregateId())
if err != nil {
return err
}
if aggregate == nil {
return ErrorAggregateNotFound(cmd.AggregateId().String())
}
// run validation if there is a validate structure implemented
if validate, ok := command.(Validate); ok {
if err := validate.Validate(); err != nil {
return err
}
}
// if it is an aggregate composition we need to get the real aggregate
var agg Aggregate = aggregate
if aggregateComposition, ok := aggregate.(AggregateComposition); ok {
agg = aggregateComposition.Aggregate()
}
//call the handler
if err = handler(agg, cmd); err != nil {
return err
}
// save event to the event store
if err = repository.Save(aggregate); err != nil {
return err
}
return nil
})
}