Skip to content

Commit

Permalink
Merge pull request #27 from commercetools/azure/namespace-settings
Browse files Browse the repository at this point in the history
Add Azure global settings for queue creation
  • Loading branch information
satabin authored Jul 16, 2024
2 parents 8cab801 + cd6a959 commit e0a8850
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.azure.servicebus

/** Represents a size in kibibytes (1 KiB = 1024 bytes). */
final class Size private (val kib: Int) extends AnyVal {

/** Represents a size in mibibytes (1 MiB = 1024 KiB). */
def mib: Int = kib / 1024

/** Represents a size in gibibytes (1 GiB = 1024 MiB). */
def gib: Int = mib / 1024
}

object Size {

/**
* Creates the size with the given amount of KiB.
*/
def kib(kib: Int): Size = new Size(kib)

/**
* Creates the size with the given amount of MiB.
* 1 MiB = 1024 Kib
*/
def mib(mib: Int): Size = new Size(mib * 1024)

/**
* Creates the size with the given amount of GiB.
* 1 GiB = 1024 Mib
*/
def gib(gib: Int): Size = new Size(gib * 1024 * 1024)

}

/**
* Global settings that will be used for new queues created by the library.
*
* @param partitioned Whether the new queues are partitioned (set to true if your namespace is partitioned)
* @param newQueueSize The max size of created queues
* @param maxMessageSize The max allowed size for messages
*/
final case class NewQueueSettings(
partitioned: Option[Boolean],
queueSize: Option[Size],
maxMessageSize: Option[Size])

object NewQueueSettings {

/**
* The default settings. Default Azure defined values will be used.
*/
val default: NewQueueSettings = NewQueueSettings(None, None, None)

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,22 @@ import com.commercetools.queue.{QueueAdministration, QueueConfiguration}
import java.time.Duration
import scala.concurrent.duration._

class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(implicit F: Async[F])
class ServiceBusAdministration[F[_]](
client: ServiceBusAdministrationClient,
newQueueSettings: NewQueueSettings
)(implicit F: Async[F])
extends QueueAdministration[F] {

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] =
F.blocking(
client.createQueue(
name,
new CreateQueueOptions()
.setDefaultMessageTimeToLive(Duration.ofMillis(messageTTL.toMillis))
.setLockDuration(Duration.ofMillis(lockTTL.toMillis))))
.void
F.blocking {
val options = new CreateQueueOptions()
.setDefaultMessageTimeToLive(Duration.ofMillis(messageTTL.toMillis))
.setLockDuration(Duration.ofMillis(lockTTL.toMillis))
newQueueSettings.partitioned.foreach(options.setPartitioningEnabled(_))
newQueueSettings.queueSize.foreach(size => options.setMaxSizeInMegabytes(size.mib))
newQueueSettings.maxMessageSize.foreach(size => options.setMaxMessageSizeInKilobytes(size.kib.toLong))
client.createQueue(name, options)
}.void
.adaptError(makeQueueException(_, name))

override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient,

class ServiceBusClient[F[_]] private (
clientBuilder: ServiceBusClientBuilder,
adminBuilder: ServiceBusAdministrationClientBuilder
adminBuilder: ServiceBusAdministrationClientBuilder,
newQueueSettings: NewQueueSettings
)(implicit F: Async[F])
extends QueueClient[F] {

override def administration: QueueAdministration[F] =
new ServiceBusAdministration(adminBuilder.buildClient())
new ServiceBusAdministration(adminBuilder.buildClient(), newQueueSettings)

override def statistics(name: String): QueueStatistics[F] =
new ServiceBusStatistics(name, adminBuilder)
Expand All @@ -45,7 +46,15 @@ class ServiceBusClient[F[_]] private (

object ServiceBusClient {

def apply[F[_]](connectionString: String)(implicit F: Async[F]): Resource[F, ServiceBusClient[F]] =
/**
* Creates a new client from the connection string.
* You can optionally provide global settings for when queues are created via the library.
*/
def fromConnectionString[F[_]](
connectionString: String,
newQueueSettings: NewQueueSettings = NewQueueSettings.default
)(implicit F: Async[F]
): Resource[F, ServiceBusClient[F]] =
for {
clientBuilder <- Resource.eval {
F.delay {
Expand All @@ -58,11 +67,16 @@ object ServiceBusClient {
.connectionString(connectionString)
}
}
} yield new ServiceBusClient(clientBuilder, adminBuilder)
} yield new ServiceBusClient(clientBuilder, adminBuilder, newQueueSettings)

/**
* Creates a new client for the given namespace and some credentials.
* You can optionally provide global settings for when queues are created via the library.
*/
def apply[F[_]](
namespace: String,
credentials: TokenCredential,
newQueueSettings: NewQueueSettings = NewQueueSettings.default,
options: Option[ClientOptions] = None
)(implicit F: Async[F]
): Resource[F, ServiceBusClient[F]] =
Expand All @@ -79,13 +93,21 @@ object ServiceBusClient {
options.fold(base)(base.clientOptions(_))
}
}
} yield new ServiceBusClient(clientBuilder, adminBuilder)
} yield new ServiceBusClient(clientBuilder, adminBuilder, newQueueSettings)

/**
* Creates a new client wrapping unmanaged SDK client.
* This is useful when integrating the library in a codebase that already manages a Java SDK client.
* Otherwise, prefer the other variants to construct a client.
*
* You can optionally provide global settings for when queues are created via the library.
*/
def unmanaged[F[_]](
clientBuilder: ServiceBusClientBuilder,
adminBuilder: ServiceBusAdministrationClientBuilder
adminBuilder: ServiceBusAdministrationClientBuilder,
newQueueSettings: NewQueueSettings = NewQueueSettings.default
)(implicit F: Async[F]
): ServiceBusClient[F] =
new ServiceBusClient(clientBuilder, adminBuilder)
new ServiceBusClient(clientBuilder, adminBuilder, newQueueSettings)

}
10 changes: 10 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ lazy val azureServiceBus = crossProject(JVMPlatform)
// TODO: Remove once 0.3 is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.commercetools.queue.azure.servicebus.ServiceBusAdministration.this"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.commercetools.queue.azure.servicebus.ServiceBusClient.this"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.commercetools.queue.azure.servicebus.ServiceBusClient.unmanaged"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.commercetools.queue.azure.servicebus.ServiceBusClient.apply"),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.commercetools.queue.azure.servicebus.ServiceBusClient.apply$default$3"),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.commercetools.queue.azure.servicebus.ServiceBusQueueSubscriber.this")
)
)
Expand Down
29 changes: 29 additions & 0 deletions docs/systems/service-bus.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,33 @@ The client is managed, meaning that it uses a conection pool that will get shut

If integrating with an existing code base where you already have builders that you would like to share, you can use the `unmanaged` variant.

## Global setting for queue creation

When queues are created using the library, you can define global settings at client instantiation time. These settings will be applied to any queue created with the client instance.

For instance, this comes in handy when creating queues in a partitioned Premium namespace, you need to indicate that the queue is partitioned.

```scala mdoc:compile-only
import cats.effect.IO
import com.commercetools.queue.azure.servicebus._
import com.azure.identity.DefaultAzureCredentialBuilder

import scala.concurrent.duration._

val namespace = "{namespace}.servicebus.windows.net" // your namespace
val credentials = new DefaultAzureCredentialBuilder().build() // however you want to authenticate

// queues will be partitioned and have a size of 20GiB
// message size is not changed
val globalQueueSettings =
NewQueueSettings.default.copy(
partitioned = Some(true),
queueSize = Some(Size.gib(20)))

ServiceBusClient[IO](namespace, credentials, newQueueSettings = globalQueueSettings).use { client =>
// the queue will be partitioned and will be able to store up to 20GiB of data
client.administration.create("my-queue", messageTTL = 1.hour, lockTTL = 10.seconds)
}
```

[service-bus]: https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview

0 comments on commit e0a8850

Please sign in to comment.