Skip to content

Commit

Permalink
Merge branch 'main' into flaky-test
Browse files Browse the repository at this point in the history
  • Loading branch information
Zerpet authored Jan 31, 2024
2 parents 56e06ca + 4a009c7 commit 7d093ad
Show file tree
Hide file tree
Showing 21 changed files with 357 additions and 148 deletions.
10 changes: 8 additions & 2 deletions .ci/install.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Write-Host "[INFO] Installing Erlang to $erlang_install_dir..."

$rabbitmq_installer_download_url = "https://github.com/rabbitmq/rabbitmq-server/releases/download/v$rabbitmq_ver/rabbitmq-server-$rabbitmq_ver.exe"
$rabbitmq_installer_path = Join-Path -Path $base_installers_dir -ChildPath "rabbitmq-server-$rabbitmq_ver.exe"
Write-Host "[INFO] rabbitmq installer path $rabbitmq_installer_path"

$erlang_reg_path = 'HKLM:\SOFTWARE\Ericsson\Erlang'
if (Test-Path 'HKLM:\SOFTWARE\WOW6432Node\')
Expand Down Expand Up @@ -86,13 +87,14 @@ Write-Host '[INFO] Installing and starting RabbitMQ with default config...'
& $rabbitmq_installer_path '/S' | Out-Null
(Get-Service -Name RabbitMQ).Status

$rabbitmq_base_path = (Get-ItemProperty -Name Install_Dir -Path 'HKLM:\SOFTWARE\WOW6432Node\VMware, Inc.\RabbitMQ Server').Install_Dir
$regPath = 'HKLM:\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\RabbitMQ'
if (Test-Path 'HKLM:\SOFTWARE\WOW6432Node\')
{
$regPath = 'HKLM:\SOFTWARE\WOW6432Node\Microsoft\Windows\CurrentVersion\Uninstall\RabbitMQ'
}
$rabbitmq_base_path = Split-Path -Parent (Get-ItemProperty $regPath 'UninstallString').UninstallString
$rabbitmq_version = (Get-ItemProperty $regPath "DisplayVersion").DisplayVersion
$rabbitmq_version = (Get-ItemProperty $regPath 'DisplayVersion').DisplayVersion
Write-Host "[INFO] RabbitMQ version path: $rabbitmq_base_path and version: $rabbitmq_version"

$rabbitmq_home = Join-Path -Path $rabbitmq_base_path -ChildPath "rabbitmq_server-$rabbitmq_version"
Write-Host "[INFO] Setting RABBITMQ_HOME to '$rabbitmq_home'..."
Expand Down Expand Up @@ -155,3 +157,7 @@ Do {
$ErrorActionPreference = 'Continue'
Write-Host '[INFO] Getting RabbitMQ status...'
& $rabbitmqctl_path status

$ErrorActionPreference = 'Continue'
Write-Host '[INFO] Enabling plugins...'
& $rabbitmq_plugins_path enable rabbitmq_management rabbitmq_stream rabbitmq_stream_management rabbitmq_amqp1_0
4 changes: 2 additions & 2 deletions .ci/versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"erlang": "25.3",
"rabbitmq": "3.11.10"
"erlang": "26.1.1",
"rabbitmq": "3.12.6"
}
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
steps:
- uses: actions/setup-go@v3
with:
go-version: '1.19'
go-version: 'stable'
check-latest: true
- uses: actions/checkout@v3
- uses: golangci/golangci-lint-action@v3
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
fail-fast: true
matrix:
go-version: ['1.19', '1.20']
go-version: ['oldstable', 'stable']
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
Expand All @@ -36,7 +36,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: ['1.19', '1.20']
go-version: ['oldstable', 'stable']
services:
rabbitmq:
image: rabbitmq
Expand Down
34 changes: 34 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,39 @@
# Changelog

## [v1.9.0](https://github.com/rabbitmq/amqp091-go/tree/v1.9.0) (2023-10-02)

[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.8.1...v1.9.0)

**Implemented enhancements:**

- Use of buffered delivery channels when prefetch\_count is not null [\#200](https://github.com/rabbitmq/amqp091-go/issues/200)

**Fixed bugs:**

- connection block when write connection reset by peer [\#222](https://github.com/rabbitmq/amqp091-go/issues/222)
- Test failure on 32bit architectures [\#202](https://github.com/rabbitmq/amqp091-go/issues/202)

**Closed issues:**

- Add a constant to set consumer timeout as queue argument [\#201](https://github.com/rabbitmq/amqp091-go/issues/201)
- Add a constant for CQ version [\#199](https://github.com/rabbitmq/amqp091-go/issues/199)
- Examples may need to be updated after \#140 [\#153](https://github.com/rabbitmq/amqp091-go/issues/153)

**Merged pull requests:**

- Update spec091.go [\#224](https://github.com/rabbitmq/amqp091-go/pull/224) ([pinkfish](https://github.com/pinkfish))
- Closes 222 [\#223](https://github.com/rabbitmq/amqp091-go/pull/223) ([yywing](https://github.com/yywing))
- Update write.go [\#221](https://github.com/rabbitmq/amqp091-go/pull/221) ([pinkfish](https://github.com/pinkfish))
- Bump versions [\#219](https://github.com/rabbitmq/amqp091-go/pull/219) ([lukebakken](https://github.com/lukebakken))
- remove extra word 'accept' from ExchangeDeclare description [\#217](https://github.com/rabbitmq/amqp091-go/pull/217) ([a-sabzian](https://github.com/a-sabzian))
- Misc Windows CI updates [\#216](https://github.com/rabbitmq/amqp091-go/pull/216) ([lukebakken](https://github.com/lukebakken))
- Stop using deprecated Publish function [\#207](https://github.com/rabbitmq/amqp091-go/pull/207) ([Zerpet](https://github.com/Zerpet))
- Constant for consumer timeout queue argument [\#206](https://github.com/rabbitmq/amqp091-go/pull/206) ([Zerpet](https://github.com/Zerpet))
- Add a constant for CQ v2 queue argument [\#205](https://github.com/rabbitmq/amqp091-go/pull/205) ([Zerpet](https://github.com/Zerpet))
- Fix example for 32-bit compatibility [\#204](https://github.com/rabbitmq/amqp091-go/pull/204) ([Zerpet](https://github.com/Zerpet))
- Fix to increase timeout milliseconds since it's too tight [\#203](https://github.com/rabbitmq/amqp091-go/pull/203) ([t2y](https://github.com/t2y))
- Add Channel.ConsumeWithContext to be able to cancel delivering [\#192](https://github.com/rabbitmq/amqp091-go/pull/192) ([t2y](https://github.com/t2y))

## [v1.8.1](https://github.com/rabbitmq/amqp091-go/tree/v1.8.1) (2023-05-04)

[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.8.0...v1.8.1)
Expand Down
16 changes: 16 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,19 @@ rabbitmq-server: ## Start a RabbitMQ server using Docker. Container name can be
.PHONY: stop-rabbitmq-server
stop-rabbitmq-server: ## Stop a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit
docker stop $(CONTAINER_NAME)

certs:
./certs.sh

.PHONY: certs-rm
certs-rm:
rm -r ./certs/

.PHONY: rabbitmq-server-tls
rabbitmq-server-tls: | certs ## Start a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit
docker run --detach --rm --name $(CONTAINER_NAME) \
--publish 5672:5672 --publish 5671:5671 --publish 15672:15672 \
--mount type=bind,src=./certs/server,dst=/certs \
--mount type=bind,src=./certs/ca/cacert.pem,dst=/certs/cacert.pem,readonly \
--mount type=bind,src=./rabbitmq-confs/tls/90-tls.conf,dst=/etc/rabbitmq/conf.d/90-tls.conf \
--pull always rabbitmq:3-management
40 changes: 24 additions & 16 deletions allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ const (

// allocator maintains a bitset of allocated numbers.
type allocator struct {
pool *big.Int
last int
low int
high int
pool *big.Int
follow int
low int
high int
}

// NewAllocator reserves and frees integers out of a range between low and
Expand All @@ -31,10 +31,10 @@ type allocator struct {
// sizeof(big.Word)
func newAllocator(low, high int) *allocator {
return &allocator{
pool: big.NewInt(0),
last: low,
low: low,
high: high,
pool: big.NewInt(0),
follow: low,
low: low,
high: high,
}
}

Expand Down Expand Up @@ -69,21 +69,29 @@ func (a allocator) String() string {
// O(N) worst case runtime where N is allocated, but usually O(1) due to a
// rolling index into the oldest allocation.
func (a *allocator) next() (int, bool) {
wrapped := a.last
wrapped := a.follow
defer func() {
// make a.follow point to next value
if a.follow == a.high {
a.follow = a.low
} else {
a.follow += 1
}
}()

// Find trailing bit
for ; a.last <= a.high; a.last++ {
if a.reserve(a.last) {
return a.last, true
for ; a.follow <= a.high; a.follow++ {
if a.reserve(a.follow) {
return a.follow, true
}
}

// Find preceding free'd pool
a.last = a.low
a.follow = a.low

for ; a.last < wrapped; a.last++ {
if a.reserve(a.last) {
return a.last, true
for ; a.follow < wrapped; a.follow++ {
if a.reserve(a.follow) {
return a.follow, true
}
}

Expand Down
22 changes: 22 additions & 0 deletions allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,28 @@ func TestAllocatorShouldReuseReleased(t *testing.T) {
}
}

func TestAllocatorShouldNotReuseEarly(t *testing.T) {
a := newAllocator(1, 2)

first, _ := a.next()
if want, got := 1, first; want != got {
t.Fatalf("expected allocation to be %d, got: %d", want, got)
}

a.release(first)

second, _ := a.next()
if want, got := 2, second; want != got {
t.Fatalf("expected second allocation to be %d, got: %d", want, got)
}

third, _ := a.next()
if want, got := first, third; want != got {
t.Fatalf("expected third allocation to be %d, got: %d", want, got)
}

}

func TestAllocatorReleasesKeepUpWithAllocationsForAllSizes(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down
8 changes: 4 additions & 4 deletions certs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ keyUsage = keyCertSign, cRLSign
[ client_ca_extensions ]
basicConstraints = CA:false
keyUsage = digitalSignature
keyUsage = keyEncipherment,digitalSignature
extendedKeyUsage = 1.3.6.1.5.5.7.3.2
[ server_ca_extensions ]
basicConstraints = CA:false
keyUsage = keyEncipherment
keyUsage = keyEncipherment,digitalSignature
extendedKeyUsage = 1.3.6.1.5.5.7.3.1
subjectAltName = @alt_names
Expand Down Expand Up @@ -106,7 +106,7 @@ openssl req \
-new \
-nodes \
-config openssl.cnf \
-subj "/CN=127.0.0.1/O=server/" \
-subj "/CN=localhost/O=server/" \
-key $root/server/key.pem \
-out $root/server/req.pem \
-outform PEM
Expand All @@ -115,7 +115,7 @@ openssl req \
-new \
-nodes \
-config openssl.cnf \
-subj "/CN=127.0.0.1/O=client/" \
-subj "/CN=localhost/O=client/" \
-key $root/client/key.pem \
-out $root/client/req.pem \
-outform PEM
Expand Down
22 changes: 15 additions & 7 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Channel struct {

// closed is set to 1 when the channel has been closed - see Channel.send()
closed int32
close chan struct{}

// true when we will never notify again
noNotify bool
Expand Down Expand Up @@ -86,6 +87,7 @@ func newChannel(c *Connection, id uint16) *Channel {
confirms: newConfirms(),
recv: (*Channel).recvMethod,
errors: make(chan *Error, 1),
close: make(chan struct{}),
}
}

Expand Down Expand Up @@ -146,6 +148,7 @@ func (ch *Channel) shutdown(e *Error) {
}

close(ch.errors)
close(ch.close)
ch.noNotify = true
})
}
Expand Down Expand Up @@ -368,7 +371,11 @@ func (ch *Channel) dispatch(msg message) {
// deliveries are in flight and a no-wait cancel has happened

default:
ch.rpc <- msg
select {
case <-ch.close:
return
case ch.rpc <- msg:
}
}
}

Expand Down Expand Up @@ -468,6 +475,10 @@ code set to '200'.
It is safe to call this method multiple times.
*/
func (ch *Channel) Close() error {
if ch.IsClosed() {
return nil
}

defer ch.connection.closeChannel(ch, nil)
return ch.call(
&channelClose{ReplyCode: replySuccess},
Expand Down Expand Up @@ -960,9 +971,6 @@ func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table
/*
QueueUnbind removes a binding between an exchange and queue matching the key and
arguments.
It is possible to send and empty string for the exchange name which means to
unbind the queue from the default exchange.
*/
func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error {
if err := args.Validate(); err != nil {
Expand Down Expand Up @@ -1283,7 +1291,7 @@ Note: RabbitMQ declares the default exchange types like 'amq.fanout' as
durable, so queues that bind to these pre-declared exchanges must also be
durable.
Exchanges declared as `internal` do not accept accept publishings. Internal
Exchanges declared as `internal` do not accept publishings. Internal
exchanges are useful when you wish to implement inter-exchange topologies
that should not be exposed to users of the broker.
Expand Down Expand Up @@ -1818,8 +1826,8 @@ func (ch *Channel) Reject(tag uint64, requeue bool) error {
// GetNextPublishSeqNo returns the sequence number of the next message to be
// published, when in confirm mode.
func (ch *Channel) GetNextPublishSeqNo() uint64 {
ch.confirms.m.Lock()
defer ch.confirms.m.Unlock()
ch.confirms.publishedMut.Lock()
defer ch.confirms.publishedMut.Unlock()

return ch.confirms.published + 1
}
Loading

0 comments on commit 7d093ad

Please sign in to comment.