Skip to content

Commit

Permalink
Added Quorum Queue Support
Browse files Browse the repository at this point in the history
Signed-off-by: [email protected] <Rishi Kumar S>
Signed-off-by: Rishi Kumar S <[email protected]>
  • Loading branch information
[email protected] authored and sonicboom15 committed Jan 20, 2025
1 parent 1132db5 commit 5582a0d
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 0 deletions.
1 change: 1 addition & 0 deletions bindings/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type rabbitMQMetadata struct {
ClientCert string `mapstructure:"clientCert"`
ClientKey string `mapstructure:"clientKey"`
ExternalSasl bool `mapstructure:"externalSasl"`
QueueType string `mapstructure:"queueType"`
}

// NewRabbitMQ returns a new rabbitmq instance.
Expand Down
42 changes: 42 additions & 0 deletions bindings/rabbitmq/rabbitmq_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,45 @@ func TestPublishWithHeaders(t *testing.T) {
// assert.Contains(t, msg.Header, "custom_header1")
// assert.Contains(t, msg.Header, "custom_header2")
}
func TestQuorumQueue(t *testing.T) {
rabbitmqHost := getTestRabbitMQHost()
assert.NotEmpty(t, rabbitmqHost, fmt.Sprintf("RabbitMQ host configuration must be set in environment variable '%s' (example 'amqp://guest:guest@localhost:5672/')", testRabbitMQHostEnvKey))

queueName := uuid.New().String()
durable := true
exclusive := false

metadata := bindings.Metadata{
Base: contribMetadata.Base{
Name: "testQueue",
Properties: map[string]string{
"queueName": queueName,
"host": rabbitmqHost,
"deleteWhenUnused": strconv.FormatBool(exclusive),
"durable": strconv.FormatBool(durable),
"queueType": "quorum",
},
},
}

logger := logger.NewLogger("test")

r := NewRabbitMQ(logger).(*RabbitMQ)
err := r.Init(context.Background(), metadata)
require.NoError(t, err)

// Assert that the queue is created with quorum type
conn, err := amqp.Dial(rabbitmqHost)
require.NoError(t, err)
defer conn.Close()

ch, err := conn.Channel()
require.NoError(t, err)
defer ch.Close()

queue, err := ch.QueueDeclarePassive(queueName, durable, false, exclusive, false, amqp.Table{})
require.NoError(t, err)
assert.Equal(t, "quorum", queue.Arguments["x-queue-type"])

require.NoError(t, r.Close())
}
9 changes: 9 additions & 0 deletions bindings/rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestParseMetadata(t *testing.T) {
expectedClientKey string
expectedCACert string
expectedSaslExternal bool
expectedQueueType string
}{
{
name: "Delete / Durable",
Expand Down Expand Up @@ -101,6 +102,13 @@ func TestParseMetadata(t *testing.T) {
expectedDurable: false,
expectedExclusive: true,
},
{
name: "Quorum Queue",
properties: map[string]string{"queueName": queueName, "host": host, "deleteWhenUnused": "false", "durable": "false", "queueType": "quorum"},
expectedDeleteWhenUnused: false,
expectedDurable: false,
expectedQueueType: "quorum",
},
{
name: "With maxPriority",
properties: map[string]string{"queueName": queueName, "host": host, "deleteWhenUnused": "false", "durable": "false", "maxPriority": "1"},
Expand Down Expand Up @@ -158,6 +166,7 @@ func TestParseMetadata(t *testing.T) {
assert.Equal(t, tt.expectedClientKey, r.metadata.ClientKey)
assert.Equal(t, tt.expectedCACert, r.metadata.CaCert)
assert.Equal(t, tt.expectedSaslExternal, r.metadata.ExternalSasl)
assert.Equal(t, tt.expectedQueueType, r.metadata.QueueType)
if tt.expectedReconnectWaitCheck != nil {
assert.True(t, tt.expectedReconnectWaitCheck(r.metadata.ReconnectWait))
}
Expand Down
7 changes: 7 additions & 0 deletions pubsub/rabbitmq/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type rabbitmqMetadata struct {
SaslExternal bool `mapstructure:"saslExternal"`
Concurrency pubsub.ConcurrencyMode `mapstructure:"concurrency"`
DefaultQueueTTL *time.Duration `mapstructure:"ttlInSeconds"`
QueueType string `mapstructure:"queueType"`
}

const (
Expand Down Expand Up @@ -82,6 +83,7 @@ const (
metadataClientNameKey = "clientName"
metadataHeartBeatKey = "heartBeat"
metadataQueueNameKey = "queueName"
metadataQueueType = "queueType"

defaultReconnectWaitSeconds = 3

Expand All @@ -102,6 +104,7 @@ func createMetadata(pubSubMetadata pubsub.Metadata, log logger.Logger) (*rabbitm
PublisherConfirm: false,
SaslExternal: false,
HeartBeat: defaultHeartbeat,
QueueType: amqp.QueueTypeClassic,
}

// upgrade metadata
Expand Down Expand Up @@ -158,6 +161,10 @@ func createMetadata(pubSubMetadata pubsub.Metadata, log logger.Logger) (*rabbitm
return &result, fmt.Errorf("%s can only be set to true, when all these properties are set: %s, %s, %s", metadataSaslExternal, pubsub.CACert, pubsub.ClientCert, pubsub.ClientKey)
}

if result.QueueType != amqp.QueueTypeClassic && result.QueueType != amqp.QueueTypeQuorum {
return &result, fmt.Errorf("%s invalid RabbitMQ queue type %s", errorMessagePrefix, result.QueueType)
}

result.Concurrency, err = pubsub.Concurrency(pubSubMetadata.Properties)
return &result, err
}
Expand Down

0 comments on commit 5582a0d

Please sign in to comment.