From 77d6683c021ce14b73276d270e88f2f0e39c3cf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aitor=20P=C3=A9rez=20Cedres?= Date: Wed, 12 Jul 2023 17:25:13 +0100 Subject: [PATCH 01/20] Make target to spin up RabbitMQ + TLS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Aitor PĂ©rez Cedres --- Makefile | 16 ++++++++++++++++ certs.sh | 8 ++++---- rabbitmq-confs/tls/90-tls.conf | 8 ++++++++ 3 files changed, 28 insertions(+), 4 deletions(-) create mode 100644 rabbitmq-confs/tls/90-tls.conf diff --git a/Makefile b/Makefile index 69e9e2b..7dc71bc 100644 --- a/Makefile +++ b/Makefile @@ -39,3 +39,19 @@ rabbitmq-server: ## Start a RabbitMQ server using Docker. Container name can be .PHONY: stop-rabbitmq-server stop-rabbitmq-server: ## Stop a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit docker stop $(CONTAINER_NAME) + +certs: + ./certs.sh + +.PHONY: certs-rm +certs-rm: + rm -r ./certs/ + +.PHONY: rabbitmq-server-tls +rabbitmq-server-tls: | certs ## Start a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit + docker run --detach --rm --name $(CONTAINER_NAME) \ + --publish 5672:5672 --publish 5671:5671 --publish 15672:15672 \ + --mount type=bind,src=./certs/server,dst=/certs \ + --mount type=bind,src=./certs/ca/cacert.pem,dst=/certs/cacert.pem,readonly \ + --mount type=bind,src=./rabbitmq-confs/tls/90-tls.conf,dst=/etc/rabbitmq/conf.d/90-tls.conf \ + --pull always rabbitmq:3-management diff --git a/certs.sh b/certs.sh index 403e80c..0bbb1c6 100755 --- a/certs.sh +++ b/certs.sh @@ -71,12 +71,12 @@ keyUsage = keyCertSign, cRLSign [ client_ca_extensions ] basicConstraints = CA:false -keyUsage = digitalSignature +keyUsage = keyEncipherment,digitalSignature extendedKeyUsage = 1.3.6.1.5.5.7.3.2 [ server_ca_extensions ] basicConstraints = CA:false -keyUsage = keyEncipherment +keyUsage = keyEncipherment,digitalSignature extendedKeyUsage = 1.3.6.1.5.5.7.3.1 subjectAltName = @alt_names @@ -106,7 +106,7 @@ openssl req \ -new \ -nodes \ -config openssl.cnf \ - -subj "/CN=127.0.0.1/O=server/" \ + -subj "/CN=localhost/O=server/" \ -key $root/server/key.pem \ -out $root/server/req.pem \ -outform PEM @@ -115,7 +115,7 @@ openssl req \ -new \ -nodes \ -config openssl.cnf \ - -subj "/CN=127.0.0.1/O=client/" \ + -subj "/CN=localhost/O=client/" \ -key $root/client/key.pem \ -out $root/client/req.pem \ -outform PEM diff --git a/rabbitmq-confs/tls/90-tls.conf b/rabbitmq-confs/tls/90-tls.conf new file mode 100644 index 0000000..ace1d70 --- /dev/null +++ b/rabbitmq-confs/tls/90-tls.conf @@ -0,0 +1,8 @@ +listeners.ssl.default = 5671 + +ssl_options.cacertfile = /certs/cacert.pem +ssl_options.certfile = /certs/cert.pem +ssl_options.keyfile = /certs/key.pem +ssl_options.depth = 2 +ssl_options.verify = verify_none +ssl_options.fail_if_no_peer_cert = false From 150c971c1efab4b139c1d1dbe1cb3fb348dd87ec Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 28 Jul 2023 14:41:34 -0700 Subject: [PATCH 02/20] Misc Windows CI updates * Bump Erlang and RabbitMQ versions * Modify how RabbitMQ install dir is found --- .ci/install.ps1 | 10 ++++++++-- .ci/versions.json | 4 ++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/.ci/install.ps1 b/.ci/install.ps1 index 44296e8..eecf5fe 100644 --- a/.ci/install.ps1 +++ b/.ci/install.ps1 @@ -36,6 +36,7 @@ Write-Host "[INFO] Installing Erlang to $erlang_install_dir..." $rabbitmq_installer_download_url = "https://github.com/rabbitmq/rabbitmq-server/releases/download/v$rabbitmq_ver/rabbitmq-server-$rabbitmq_ver.exe" $rabbitmq_installer_path = Join-Path -Path $base_installers_dir -ChildPath "rabbitmq-server-$rabbitmq_ver.exe" +Write-Host "[INFO] rabbitmq installer path $rabbitmq_installer_path" $erlang_reg_path = 'HKLM:\SOFTWARE\Ericsson\Erlang' if (Test-Path 'HKLM:\SOFTWARE\WOW6432Node\') @@ -86,13 +87,14 @@ Write-Host '[INFO] Installing and starting RabbitMQ with default config...' & $rabbitmq_installer_path '/S' | Out-Null (Get-Service -Name RabbitMQ).Status +$rabbitmq_base_path = (Get-ItemProperty -Name Install_Dir -Path 'HKLM:\SOFTWARE\WOW6432Node\VMware, Inc.\RabbitMQ Server').Install_Dir $regPath = 'HKLM:\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\RabbitMQ' if (Test-Path 'HKLM:\SOFTWARE\WOW6432Node\') { $regPath = 'HKLM:\SOFTWARE\WOW6432Node\Microsoft\Windows\CurrentVersion\Uninstall\RabbitMQ' } -$rabbitmq_base_path = Split-Path -Parent (Get-ItemProperty $regPath 'UninstallString').UninstallString -$rabbitmq_version = (Get-ItemProperty $regPath "DisplayVersion").DisplayVersion +$rabbitmq_version = (Get-ItemProperty $regPath 'DisplayVersion').DisplayVersion +Write-Host "[INFO] RabbitMQ version path: $rabbitmq_base_path and version: $rabbitmq_version" $rabbitmq_home = Join-Path -Path $rabbitmq_base_path -ChildPath "rabbitmq_server-$rabbitmq_version" Write-Host "[INFO] Setting RABBITMQ_HOME to '$rabbitmq_home'..." @@ -155,3 +157,7 @@ Do { $ErrorActionPreference = 'Continue' Write-Host '[INFO] Getting RabbitMQ status...' & $rabbitmqctl_path status + +$ErrorActionPreference = 'Continue' +Write-Host '[INFO] Enabling plugins...' +& $rabbitmq_plugins_path enable rabbitmq_management rabbitmq_stream rabbitmq_stream_management rabbitmq_amqp1_0 diff --git a/.ci/versions.json b/.ci/versions.json index 3e27e8d..fed4526 100644 --- a/.ci/versions.json +++ b/.ci/versions.json @@ -1,4 +1,4 @@ { - "erlang": "25.3", - "rabbitmq": "3.11.10" + "erlang": "26.0.2", + "rabbitmq": "3.13.0-beta.3" } From 4792e3b438e28c13d1ad96db0bab7a6b875b537c Mon Sep 17 00:00:00 2001 From: Amirreza Sabzian Date: Tue, 8 Aug 2023 17:58:52 +0430 Subject: [PATCH 03/20] remove extra word 'accept' from ExchangeDeclare description --- channel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/channel.go b/channel.go index 9222d19..7bc3db8 100644 --- a/channel.go +++ b/channel.go @@ -1283,7 +1283,7 @@ Note: RabbitMQ declares the default exchange types like 'amq.fanout' as durable, so queues that bind to these pre-declared exchanges must also be durable. -Exchanges declared as `internal` do not accept accept publishings. Internal +Exchanges declared as `internal` do not accept publishings. Internal exchanges are useful when you wish to implement inter-exchange topologies that should not be exposed to users of the broker. From 97a7db0b0dbcdd6be1e760a2b559ab23eb998c6d Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 8 Sep 2023 08:13:08 -0700 Subject: [PATCH 04/20] Bump versions Golang 1.19 is no longer supported. Usa oldstable and stable aliases Since actions/setup-go v3, we can use oldstable and stable aliases to always refer to the two latest supported versions of Go. Signed-off-by: Aitor Perez Cedres --- .ci/versions.json | 4 ++-- .github/workflows/golangci-lint.yml | 2 +- .github/workflows/tests.yml | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.ci/versions.json b/.ci/versions.json index fed4526..602c19a 100644 --- a/.ci/versions.json +++ b/.ci/versions.json @@ -1,4 +1,4 @@ { - "erlang": "26.0.2", - "rabbitmq": "3.13.0-beta.3" + "erlang": "26.1", + "rabbitmq": "3.12.4" } diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index e1d3572..a510ef7 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -16,7 +16,7 @@ jobs: steps: - uses: actions/setup-go@v3 with: - go-version: '1.19' + go-version: 'stable' check-latest: true - uses: actions/checkout@v3 - uses: golangci/golangci-lint-action@v3 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index e04781b..672125b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -13,7 +13,7 @@ jobs: strategy: fail-fast: true matrix: - go-version: ['1.19', '1.20'] + go-version: ['oldstable', 'stable'] steps: - uses: actions/checkout@v3 - uses: actions/setup-go@v3 @@ -36,7 +36,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: ['1.19', '1.20'] + go-version: ['oldstable', 'stable'] services: rabbitmq: image: rabbitmq From 4a77835dc690606790f169b45060f1c0667a01cf Mon Sep 17 00:00:00 2001 From: pinkfish Date: Fri, 22 Sep 2023 17:14:06 -0700 Subject: [PATCH 05/20] Update write.go Change the time comparison to use Equal, which is safer with timestamps in go. Use IsZero() --- write.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/write.go b/write.go index d0011f8..dcec314 100644 --- a/write.go +++ b/write.go @@ -72,7 +72,6 @@ func (f *heartbeatFrame) write(w io.Writer) (err error) { // short short long long short remainder... func (f *headerFrame) write(w io.Writer) (err error) { var payload bytes.Buffer - var zeroTime time.Time if err = binary.Write(&payload, binary.BigEndian, f.ClassId); err != nil { return @@ -118,7 +117,7 @@ func (f *headerFrame) write(w io.Writer) (err error) { if len(f.Properties.MessageId) > 0 { mask = mask | flagMessageId } - if f.Properties.Timestamp != zeroTime { + if !f.Properties.Timestamp.IsZero() { mask = mask | flagTimestamp } if len(f.Properties.Type) > 0 { From 1e38f27908a63840e998aca47ea1df55786cdfb0 Mon Sep 17 00:00:00 2001 From: yywing <386542536@qq.com> Date: Wed, 27 Sep 2023 14:55:10 +0800 Subject: [PATCH 06/20] fix: channel mult close may lead connection channel leak --- allocator.go | 40 ++++++++++++++++++++++++---------------- allocator_test.go | 22 ++++++++++++++++++++++ channel.go | 4 ++++ connection.go | 13 ++++++++----- 4 files changed, 58 insertions(+), 21 deletions(-) diff --git a/allocator.go b/allocator.go index 0688e4b..f2925e7 100644 --- a/allocator.go +++ b/allocator.go @@ -18,10 +18,10 @@ const ( // allocator maintains a bitset of allocated numbers. type allocator struct { - pool *big.Int - last int - low int - high int + pool *big.Int + follow int + low int + high int } // NewAllocator reserves and frees integers out of a range between low and @@ -31,10 +31,10 @@ type allocator struct { // sizeof(big.Word) func newAllocator(low, high int) *allocator { return &allocator{ - pool: big.NewInt(0), - last: low, - low: low, - high: high, + pool: big.NewInt(0), + follow: low, + low: low, + high: high, } } @@ -69,21 +69,29 @@ func (a allocator) String() string { // O(N) worst case runtime where N is allocated, but usually O(1) due to a // rolling index into the oldest allocation. func (a *allocator) next() (int, bool) { - wrapped := a.last + wrapped := a.follow + defer func() { + // make a.follow point to next value + if a.follow == a.high { + a.follow = a.low + } else { + a.follow += 1 + } + }() // Find trailing bit - for ; a.last <= a.high; a.last++ { - if a.reserve(a.last) { - return a.last, true + for ; a.follow <= a.high; a.follow++ { + if a.reserve(a.follow) { + return a.follow, true } } // Find preceding free'd pool - a.last = a.low + a.follow = a.low - for ; a.last < wrapped; a.last++ { - if a.reserve(a.last) { - return a.last, true + for ; a.follow < wrapped; a.follow++ { + if a.reserve(a.follow) { + return a.follow, true } } diff --git a/allocator_test.go b/allocator_test.go index c2e7e44..2accbf0 100644 --- a/allocator_test.go +++ b/allocator_test.go @@ -76,6 +76,28 @@ func TestAllocatorShouldReuseReleased(t *testing.T) { } } +func TestAllocatorShouldNotReuseEarly(t *testing.T) { + a := newAllocator(1, 2) + + first, _ := a.next() + if want, got := 1, first; want != got { + t.Fatalf("expected allocation to be %d, got: %d", want, got) + } + + a.release(first) + + second, _ := a.next() + if want, got := 2, second; want != got { + t.Fatalf("expected second allocation to be %d, got: %d", want, got) + } + + third, _ := a.next() + if want, got := first, third; want != got { + t.Fatalf("expected third allocation to be %d, got: %d", want, got) + } + +} + func TestAllocatorReleasesKeepUpWithAllocationsForAllSizes(t *testing.T) { if testing.Short() { t.Skip() diff --git a/channel.go b/channel.go index 7bc3db8..ccc9b89 100644 --- a/channel.go +++ b/channel.go @@ -468,6 +468,10 @@ code set to '200'. It is safe to call this method multiple times. */ func (ch *Channel) Close() error { + if ch.IsClosed() { + return nil + } + defer ch.connection.closeChannel(ch, nil) return ch.call( &channelClose{ReplyCode: replySuccess}, diff --git a/connection.go b/connection.go index 3d50d95..3e83bf3 100644 --- a/connection.go +++ b/connection.go @@ -813,13 +813,16 @@ func (c *Connection) allocateChannel() (*Channel, error) { // releaseChannel removes a channel from the registry as the final part of the // channel lifecycle -func (c *Connection) releaseChannel(id uint16) { +func (c *Connection) releaseChannel(ch *Channel) { c.m.Lock() defer c.m.Unlock() if !c.IsClosed() { - delete(c.channels, id) - c.allocator.release(int(id)) + got, ok := c.channels[ch.id] + if ok && got == ch { + delete(c.channels, ch.id) + c.allocator.release(int(ch.id)) + } } } @@ -831,7 +834,7 @@ func (c *Connection) openChannel() (*Channel, error) { } if err := ch.open(); err != nil { - c.releaseChannel(ch.id) + c.releaseChannel(ch) return nil, err } return ch, nil @@ -842,7 +845,7 @@ func (c *Connection) openChannel() (*Channel, error) { // this connection. func (c *Connection) closeChannel(ch *Channel, e *Error) { ch.shutdown(e) - c.releaseChannel(ch.id) + c.releaseChannel(ch) } /* From 919a8937470f26b4ba85f31dc18cfc429551be76 Mon Sep 17 00:00:00 2001 From: yywing <386542536@qq.com> Date: Wed, 27 Sep 2023 15:00:36 +0800 Subject: [PATCH 07/20] fix: connection reader block --- connection.go | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/connection.go b/connection.go index 3e83bf3..c9040b2 100644 --- a/connection.go +++ b/connection.go @@ -640,9 +640,12 @@ func (c *Connection) dispatch0(f frame) { // kthx - all reads reset our deadline. so we can drop this default: // lolwat - channel0 only responds to methods and heartbeats - if err := c.closeWith(ErrUnexpectedFrame); err != nil { - Logger.Printf("error sending connectionCloseOk with ErrUnexpectedFrame, error: %+v", err) - } + // closeWith use call don't block reader + go func() { + if err := c.closeWith(ErrUnexpectedFrame); err != nil { + Logger.Printf("error sending connectionCloseOk with ErrUnexpectedFrame, error: %+v", err) + } + }() } } @@ -689,9 +692,12 @@ func (c *Connection) dispatchClosed(f frame) { // we are already closed, so do nothing default: // unexpected method on closed channel - if err := c.closeWith(ErrClosed); err != nil { - Logger.Printf("error sending connectionCloseOk with ErrClosed, error: %+v", err) - } + // closeWith use call don't block reader + go func() { + if err := c.closeWith(ErrClosed); err != nil { + Logger.Printf("error sending connectionCloseOk with ErrClosed, error: %+v", err) + } + }() } } } @@ -866,13 +872,14 @@ func (c *Connection) call(req message, res ...message) error { } } - msg, ok := <-c.rpc - if !ok { - err, errorsChanIsOpen := <-c.errors - if !errorsChanIsOpen { - return ErrClosed + var msg message + select { + case e, ok := <-c.errors: + if ok { + return e } - return err + return ErrClosed + case msg = <-c.rpc: } // Try to match one of the result types From f4ea4b2b288a3863365f021f45a429099e305086 Mon Sep 17 00:00:00 2001 From: yywing <386542536@qq.com> Date: Thu, 28 Sep 2023 11:57:19 +0800 Subject: [PATCH 08/20] fix: fix reader block --- channel.go | 9 ++++++++- connection.go | 12 +++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/channel.go b/channel.go index ccc9b89..0dcec90 100644 --- a/channel.go +++ b/channel.go @@ -41,6 +41,7 @@ type Channel struct { // closed is set to 1 when the channel has been closed - see Channel.send() closed int32 + close chan struct{} // true when we will never notify again noNotify bool @@ -86,6 +87,7 @@ func newChannel(c *Connection, id uint16) *Channel { confirms: newConfirms(), recv: (*Channel).recvMethod, errors: make(chan *Error, 1), + close: make(chan struct{}), } } @@ -146,6 +148,7 @@ func (ch *Channel) shutdown(e *Error) { } close(ch.errors) + close(ch.close) ch.noNotify = true }) } @@ -368,7 +371,11 @@ func (ch *Channel) dispatch(msg message) { // deliveries are in flight and a no-wait cancel has happened default: - ch.rpc <- msg + select { + case <-ch.close: + return + case ch.rpc <- msg: + } } } diff --git a/connection.go b/connection.go index c9040b2..d5e5baa 100644 --- a/connection.go +++ b/connection.go @@ -112,6 +112,8 @@ type Connection struct { blocks []chan Blocking errors chan *Error + // if connection is closed should close this chan + close chan struct{} Config Config // The negotiated Config after connection.open @@ -263,6 +265,7 @@ func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) { rpc: make(chan message), sends: make(chan time.Time), errors: make(chan *Error, 1), + close: make(chan struct{}), deadlines: make(chan readDeadliner, 1), } go c.reader(conn) @@ -597,6 +600,8 @@ func (c *Connection) shutdown(err *Error) { } c.conn.Close() + // reader exit + close(c.close) c.channels = nil c.allocator = nil @@ -634,7 +639,12 @@ func (c *Connection) dispatch0(f frame) { c <- Blocking{Active: false} } default: - c.rpc <- m + select { + case <-c.close: + return + case c.rpc <- m: + } + } case *heartbeatFrame: // kthx - all reads reset our deadline. so we can drop this From ceadd2fe8d725192244bb39cf9741205bdd0edae Mon Sep 17 00:00:00 2001 From: pinkfish Date: Wed, 27 Sep 2023 12:06:12 -0700 Subject: [PATCH 09/20] Update spec091.go Fix up a go lint error with no space between the // and the text. --- spec091.go | 128 ++++++++++++++++++++++++++--------------------------- 1 file changed, 64 insertions(+), 64 deletions(-) diff --git a/spec091.go b/spec091.go index d86e753..6e02ba9 100644 --- a/spec091.go +++ b/spec091.go @@ -2817,7 +2817,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // connection start - //fmt.Println("NextMethod: class:10 method:10") + // fmt.Println("NextMethod: class:10 method:10") method := &connectionStart{} if err = method.read(r.r); err != nil { return @@ -2825,7 +2825,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // connection start-ok - //fmt.Println("NextMethod: class:10 method:11") + // fmt.Println("NextMethod: class:10 method:11") method := &connectionStartOk{} if err = method.read(r.r); err != nil { return @@ -2833,7 +2833,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 20: // connection secure - //fmt.Println("NextMethod: class:10 method:20") + // fmt.Println("NextMethod: class:10 method:20") method := &connectionSecure{} if err = method.read(r.r); err != nil { return @@ -2841,7 +2841,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 21: // connection secure-ok - //fmt.Println("NextMethod: class:10 method:21") + // fmt.Println("NextMethod: class:10 method:21") method := &connectionSecureOk{} if err = method.read(r.r); err != nil { return @@ -2849,7 +2849,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 30: // connection tune - //fmt.Println("NextMethod: class:10 method:30") + // fmt.Println("NextMethod: class:10 method:30") method := &connectionTune{} if err = method.read(r.r); err != nil { return @@ -2857,7 +2857,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 31: // connection tune-ok - //fmt.Println("NextMethod: class:10 method:31") + // fmt.Println("NextMethod: class:10 method:31") method := &connectionTuneOk{} if err = method.read(r.r); err != nil { return @@ -2865,7 +2865,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 40: // connection open - //fmt.Println("NextMethod: class:10 method:40") + // fmt.Println("NextMethod: class:10 method:40") method := &connectionOpen{} if err = method.read(r.r); err != nil { return @@ -2873,7 +2873,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 41: // connection open-ok - //fmt.Println("NextMethod: class:10 method:41") + // fmt.Println("NextMethod: class:10 method:41") method := &connectionOpenOk{} if err = method.read(r.r); err != nil { return @@ -2881,7 +2881,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 50: // connection close - //fmt.Println("NextMethod: class:10 method:50") + // fmt.Println("NextMethod: class:10 method:50") method := &connectionClose{} if err = method.read(r.r); err != nil { return @@ -2889,7 +2889,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 51: // connection close-ok - //fmt.Println("NextMethod: class:10 method:51") + // fmt.Println("NextMethod: class:10 method:51") method := &connectionCloseOk{} if err = method.read(r.r); err != nil { return @@ -2897,7 +2897,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 60: // connection blocked - //fmt.Println("NextMethod: class:10 method:60") + // fmt.Println("NextMethod: class:10 method:60") method := &connectionBlocked{} if err = method.read(r.r); err != nil { return @@ -2905,7 +2905,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 61: // connection unblocked - //fmt.Println("NextMethod: class:10 method:61") + // fmt.Println("NextMethod: class:10 method:61") method := &connectionUnblocked{} if err = method.read(r.r); err != nil { return @@ -2913,7 +2913,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 70: // connection update-secret - //fmt.Println("NextMethod: class:10 method:70") + // fmt.Println("NextMethod: class:10 method:70") method := &connectionUpdateSecret{} if err = method.read(r.r); err != nil { return @@ -2921,7 +2921,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 71: // connection update-secret-ok - //fmt.Println("NextMethod: class:10 method:71") + // fmt.Println("NextMethod: class:10 method:71") method := &connectionUpdateSecretOk{} if err = method.read(r.r); err != nil { return @@ -2936,7 +2936,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // channel open - //fmt.Println("NextMethod: class:20 method:10") + // fmt.Println("NextMethod: class:20 method:10") method := &channelOpen{} if err = method.read(r.r); err != nil { return @@ -2944,7 +2944,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // channel open-ok - //fmt.Println("NextMethod: class:20 method:11") + // fmt.Println("NextMethod: class:20 method:11") method := &channelOpenOk{} if err = method.read(r.r); err != nil { return @@ -2952,7 +2952,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 20: // channel flow - //fmt.Println("NextMethod: class:20 method:20") + // fmt.Println("NextMethod: class:20 method:20") method := &channelFlow{} if err = method.read(r.r); err != nil { return @@ -2960,7 +2960,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 21: // channel flow-ok - //fmt.Println("NextMethod: class:20 method:21") + // fmt.Println("NextMethod: class:20 method:21") method := &channelFlowOk{} if err = method.read(r.r); err != nil { return @@ -2968,7 +2968,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 40: // channel close - //fmt.Println("NextMethod: class:20 method:40") + // fmt.Println("NextMethod: class:20 method:40") method := &channelClose{} if err = method.read(r.r); err != nil { return @@ -2976,7 +2976,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 41: // channel close-ok - //fmt.Println("NextMethod: class:20 method:41") + // fmt.Println("NextMethod: class:20 method:41") method := &channelCloseOk{} if err = method.read(r.r); err != nil { return @@ -2991,7 +2991,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // exchange declare - //fmt.Println("NextMethod: class:40 method:10") + // fmt.Println("NextMethod: class:40 method:10") method := &exchangeDeclare{} if err = method.read(r.r); err != nil { return @@ -2999,7 +2999,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // exchange declare-ok - //fmt.Println("NextMethod: class:40 method:11") + // fmt.Println("NextMethod: class:40 method:11") method := &exchangeDeclareOk{} if err = method.read(r.r); err != nil { return @@ -3007,7 +3007,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 20: // exchange delete - //fmt.Println("NextMethod: class:40 method:20") + // fmt.Println("NextMethod: class:40 method:20") method := &exchangeDelete{} if err = method.read(r.r); err != nil { return @@ -3015,7 +3015,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 21: // exchange delete-ok - //fmt.Println("NextMethod: class:40 method:21") + // fmt.Println("NextMethod: class:40 method:21") method := &exchangeDeleteOk{} if err = method.read(r.r); err != nil { return @@ -3023,7 +3023,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 30: // exchange bind - //fmt.Println("NextMethod: class:40 method:30") + // fmt.Println("NextMethod: class:40 method:30") method := &exchangeBind{} if err = method.read(r.r); err != nil { return @@ -3031,7 +3031,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 31: // exchange bind-ok - //fmt.Println("NextMethod: class:40 method:31") + // fmt.Println("NextMethod: class:40 method:31") method := &exchangeBindOk{} if err = method.read(r.r); err != nil { return @@ -3039,7 +3039,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 40: // exchange unbind - //fmt.Println("NextMethod: class:40 method:40") + // fmt.Println("NextMethod: class:40 method:40") method := &exchangeUnbind{} if err = method.read(r.r); err != nil { return @@ -3047,7 +3047,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 51: // exchange unbind-ok - //fmt.Println("NextMethod: class:40 method:51") + // fmt.Println("NextMethod: class:40 method:51") method := &exchangeUnbindOk{} if err = method.read(r.r); err != nil { return @@ -3062,7 +3062,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // queue declare - //fmt.Println("NextMethod: class:50 method:10") + // fmt.Println("NextMethod: class:50 method:10") method := &queueDeclare{} if err = method.read(r.r); err != nil { return @@ -3070,7 +3070,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // queue declare-ok - //fmt.Println("NextMethod: class:50 method:11") + // fmt.Println("NextMethod: class:50 method:11") method := &queueDeclareOk{} if err = method.read(r.r); err != nil { return @@ -3078,7 +3078,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 20: // queue bind - //fmt.Println("NextMethod: class:50 method:20") + // fmt.Println("NextMethod: class:50 method:20") method := &queueBind{} if err = method.read(r.r); err != nil { return @@ -3086,7 +3086,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 21: // queue bind-ok - //fmt.Println("NextMethod: class:50 method:21") + // fmt.Println("NextMethod: class:50 method:21") method := &queueBindOk{} if err = method.read(r.r); err != nil { return @@ -3094,7 +3094,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 50: // queue unbind - //fmt.Println("NextMethod: class:50 method:50") + // fmt.Println("NextMethod: class:50 method:50") method := &queueUnbind{} if err = method.read(r.r); err != nil { return @@ -3102,7 +3102,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 51: // queue unbind-ok - //fmt.Println("NextMethod: class:50 method:51") + // fmt.Println("NextMethod: class:50 method:51") method := &queueUnbindOk{} if err = method.read(r.r); err != nil { return @@ -3110,7 +3110,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 30: // queue purge - //fmt.Println("NextMethod: class:50 method:30") + // fmt.Println("NextMethod: class:50 method:30") method := &queuePurge{} if err = method.read(r.r); err != nil { return @@ -3118,7 +3118,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 31: // queue purge-ok - //fmt.Println("NextMethod: class:50 method:31") + // fmt.Println("NextMethod: class:50 method:31") method := &queuePurgeOk{} if err = method.read(r.r); err != nil { return @@ -3126,7 +3126,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 40: // queue delete - //fmt.Println("NextMethod: class:50 method:40") + // fmt.Println("NextMethod: class:50 method:40") method := &queueDelete{} if err = method.read(r.r); err != nil { return @@ -3134,7 +3134,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 41: // queue delete-ok - //fmt.Println("NextMethod: class:50 method:41") + // fmt.Println("NextMethod: class:50 method:41") method := &queueDeleteOk{} if err = method.read(r.r); err != nil { return @@ -3149,7 +3149,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // basic qos - //fmt.Println("NextMethod: class:60 method:10") + // fmt.Println("NextMethod: class:60 method:10") method := &basicQos{} if err = method.read(r.r); err != nil { return @@ -3157,7 +3157,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // basic qos-ok - //fmt.Println("NextMethod: class:60 method:11") + // fmt.Println("NextMethod: class:60 method:11") method := &basicQosOk{} if err = method.read(r.r); err != nil { return @@ -3165,7 +3165,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 20: // basic consume - //fmt.Println("NextMethod: class:60 method:20") + // fmt.Println("NextMethod: class:60 method:20") method := &basicConsume{} if err = method.read(r.r); err != nil { return @@ -3173,7 +3173,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 21: // basic consume-ok - //fmt.Println("NextMethod: class:60 method:21") + // fmt.Println("NextMethod: class:60 method:21") method := &basicConsumeOk{} if err = method.read(r.r); err != nil { return @@ -3181,7 +3181,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 30: // basic cancel - //fmt.Println("NextMethod: class:60 method:30") + // fmt.Println("NextMethod: class:60 method:30") method := &basicCancel{} if err = method.read(r.r); err != nil { return @@ -3189,7 +3189,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 31: // basic cancel-ok - //fmt.Println("NextMethod: class:60 method:31") + // fmt.Println("NextMethod: class:60 method:31") method := &basicCancelOk{} if err = method.read(r.r); err != nil { return @@ -3197,7 +3197,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 40: // basic publish - //fmt.Println("NextMethod: class:60 method:40") + // fmt.Println("NextMethod: class:60 method:40") method := &basicPublish{} if err = method.read(r.r); err != nil { return @@ -3205,7 +3205,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 50: // basic return - //fmt.Println("NextMethod: class:60 method:50") + // fmt.Println("NextMethod: class:60 method:50") method := &basicReturn{} if err = method.read(r.r); err != nil { return @@ -3213,7 +3213,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 60: // basic deliver - //fmt.Println("NextMethod: class:60 method:60") + // fmt.Println("NextMethod: class:60 method:60") method := &basicDeliver{} if err = method.read(r.r); err != nil { return @@ -3221,7 +3221,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 70: // basic get - //fmt.Println("NextMethod: class:60 method:70") + // fmt.Println("NextMethod: class:60 method:70") method := &basicGet{} if err = method.read(r.r); err != nil { return @@ -3229,7 +3229,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 71: // basic get-ok - //fmt.Println("NextMethod: class:60 method:71") + // fmt.Println("NextMethod: class:60 method:71") method := &basicGetOk{} if err = method.read(r.r); err != nil { return @@ -3237,7 +3237,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 72: // basic get-empty - //fmt.Println("NextMethod: class:60 method:72") + // fmt.Println("NextMethod: class:60 method:72") method := &basicGetEmpty{} if err = method.read(r.r); err != nil { return @@ -3245,7 +3245,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 80: // basic ack - //fmt.Println("NextMethod: class:60 method:80") + // fmt.Println("NextMethod: class:60 method:80") method := &basicAck{} if err = method.read(r.r); err != nil { return @@ -3253,7 +3253,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 90: // basic reject - //fmt.Println("NextMethod: class:60 method:90") + // fmt.Println("NextMethod: class:60 method:90") method := &basicReject{} if err = method.read(r.r); err != nil { return @@ -3261,7 +3261,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 100: // basic recover-async - //fmt.Println("NextMethod: class:60 method:100") + // fmt.Println("NextMethod: class:60 method:100") method := &basicRecoverAsync{} if err = method.read(r.r); err != nil { return @@ -3269,7 +3269,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 110: // basic recover - //fmt.Println("NextMethod: class:60 method:110") + // fmt.Println("NextMethod: class:60 method:110") method := &basicRecover{} if err = method.read(r.r); err != nil { return @@ -3277,7 +3277,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 111: // basic recover-ok - //fmt.Println("NextMethod: class:60 method:111") + // fmt.Println("NextMethod: class:60 method:111") method := &basicRecoverOk{} if err = method.read(r.r); err != nil { return @@ -3285,7 +3285,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 120: // basic nack - //fmt.Println("NextMethod: class:60 method:120") + // fmt.Println("NextMethod: class:60 method:120") method := &basicNack{} if err = method.read(r.r); err != nil { return @@ -3300,7 +3300,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // tx select - //fmt.Println("NextMethod: class:90 method:10") + // fmt.Println("NextMethod: class:90 method:10") method := &txSelect{} if err = method.read(r.r); err != nil { return @@ -3308,7 +3308,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // tx select-ok - //fmt.Println("NextMethod: class:90 method:11") + // fmt.Println("NextMethod: class:90 method:11") method := &txSelectOk{} if err = method.read(r.r); err != nil { return @@ -3316,7 +3316,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 20: // tx commit - //fmt.Println("NextMethod: class:90 method:20") + // fmt.Println("NextMethod: class:90 method:20") method := &txCommit{} if err = method.read(r.r); err != nil { return @@ -3324,7 +3324,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 21: // tx commit-ok - //fmt.Println("NextMethod: class:90 method:21") + // fmt.Println("NextMethod: class:90 method:21") method := &txCommitOk{} if err = method.read(r.r); err != nil { return @@ -3332,7 +3332,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 30: // tx rollback - //fmt.Println("NextMethod: class:90 method:30") + // fmt.Println("NextMethod: class:90 method:30") method := &txRollback{} if err = method.read(r.r); err != nil { return @@ -3340,7 +3340,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 31: // tx rollback-ok - //fmt.Println("NextMethod: class:90 method:31") + // fmt.Println("NextMethod: class:90 method:31") method := &txRollbackOk{} if err = method.read(r.r); err != nil { return @@ -3355,7 +3355,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // confirm select - //fmt.Println("NextMethod: class:85 method:10") + // fmt.Println("NextMethod: class:85 method:10") method := &confirmSelect{} if err = method.read(r.r); err != nil { return @@ -3363,7 +3363,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // confirm select-ok - //fmt.Println("NextMethod: class:85 method:11") + // fmt.Println("NextMethod: class:85 method:11") method := &confirmSelectOk{} if err = method.read(r.r); err != nil { return From 607b8178dc16e3ff962cd8c5ca7511bff44ea57e Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 2 Oct 2023 08:28:11 -0700 Subject: [PATCH 10/20] Fix gen.go, formatting --- gen.ps1 | 14 ++++++++++++++ spec/gen.go | 27 ++++++++++++++------------- 2 files changed, 28 insertions(+), 13 deletions(-) create mode 100644 gen.ps1 diff --git a/gen.ps1 b/gen.ps1 new file mode 100644 index 0000000..c933543 --- /dev/null +++ b/gen.ps1 @@ -0,0 +1,14 @@ +$DebugPreference = 'Continue' +$ErrorActionPreference = 'Stop' + +Set-PSDebug -Off +Set-StrictMode -Version 'Latest' -ErrorAction 'Stop' -Verbose + +New-Variable -Name curdir -Option Constant -Value $PSScriptRoot + +$specDir = Resolve-Path -LiteralPath (Join-Path -Path $curdir -ChildPath 'spec') +$amqpSpecXml = Resolve-Path -LiteralPath (Join-Path -Path $specDir -ChildPath 'amqp0-9-1.stripped.extended.xml') +$gen = Resolve-Path -LiteralPath (Join-Path -Path $specDir -ChildPath 'gen.go') +$spec091 = Resolve-Path -LiteralPath (Join-Path -Path $curdir -ChildPath 'spec091.go') + +Get-Content -LiteralPath $amqpSpecXml | go run $gen | gofmt | Set-Content -Force -Path $spec091 diff --git a/spec/gen.go b/spec/gen.go index c0f7518..7b640f5 100644 --- a/spec/gen.go +++ b/spec/gen.go @@ -15,14 +15,15 @@ import ( "fmt" "io" "log" + "os" "regexp" "strings" "text/template" ) var ( - ErrUnknownType = errors.New("Unknown field type in gen") - ErrUnknownDomain = errors.New("Unknown domain type in gen") + ErrUnknownType = errors.New("unknown field type in gen") + ErrUnknownDomain = errors.New("unknown domain type in gen") ) var amqpTypeToNative = map[string]string{ @@ -147,17 +148,17 @@ var ( // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. - /* GENERATED FILE - DO NOT EDIT */ - /* Rebuild from the spec/gen.go tool */ + /* GENERATED FILE - DO NOT EDIT */ + /* Rebuild from the spec/gen.go tool */ - {{with .Root}} - package amqp091 + {{with .Root}} + package amqp091 - import ( - "fmt" - "encoding/binary" - "io" - ) + import ( + "fmt" + "encoding/binary" + "io" + ) // Error codes that can be sent from the server during a connection or // channel exception or used by the client to indicate a class of error like @@ -241,7 +242,7 @@ var ( switch mf.MethodId { {{range .Methods}} case {{.Index}}: // {{$class.Name}} {{.Name}} - //fmt.Println("NextMethod: class:{{$class.Index}} method:{{.Index}}") + // fmt.Println("NextMethod: class:{{$class.Index}} method:{{.Index}}") method := &{{$.StructName $class.Name .Name}}{} if err = method.read(r.r); err != nil { return @@ -434,7 +435,7 @@ func (renderer *renderer) Domain(field Field) (domain Domain, err error) { } } return domain, nil - //return domain, ErrUnknownDomain + // return domain, ErrUnknownDomain } func (renderer *renderer) FieldName(field Field) (t string) { From 5eb51bef315be9a8721bb396540a5765df38bf0e Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 2 Oct 2023 12:11:52 -0700 Subject: [PATCH 11/20] Version 1.9.0 --- .ci/versions.json | 4 ++-- connection.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.ci/versions.json b/.ci/versions.json index 602c19a..89002ac 100644 --- a/.ci/versions.json +++ b/.ci/versions.json @@ -1,4 +1,4 @@ { - "erlang": "26.1", - "rabbitmq": "3.12.4" + "erlang": "26.1.1", + "rabbitmq": "3.12.6" } diff --git a/connection.go b/connection.go index d5e5baa..c8bb820 100644 --- a/connection.go +++ b/connection.go @@ -28,7 +28,7 @@ const ( defaultHeartbeat = 10 * time.Second defaultConnectionTimeout = 30 * time.Second defaultProduct = "AMQP 0.9.1 Client" - buildVersion = "1.8.1" + buildVersion = "1.9.0" platform = "golang" // Safer default that makes channel leaks a lot easier to spot // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593. From 69013f16824c2227d9d10d174713d565d92f13ea Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 2 Oct 2023 12:28:46 -0700 Subject: [PATCH 12/20] CHANGELOG v1.9.0 skip-checks:true --- CHANGELOG.md | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index db633d4..a8583ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,39 @@ # Changelog +## [v1.9.0](https://github.com/rabbitmq/amqp091-go/tree/v1.9.0) (2023-10-02) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.8.1...v1.9.0) + +**Implemented enhancements:** + +- Use of buffered delivery channels when prefetch\_count is not null [\#200](https://github.com/rabbitmq/amqp091-go/issues/200) + +**Fixed bugs:** + +- connection block when write connection reset by peer [\#222](https://github.com/rabbitmq/amqp091-go/issues/222) +- Test failure on 32bit architectures [\#202](https://github.com/rabbitmq/amqp091-go/issues/202) + +**Closed issues:** + +- Add a constant to set consumer timeout as queue argument [\#201](https://github.com/rabbitmq/amqp091-go/issues/201) +- Add a constant for CQ version [\#199](https://github.com/rabbitmq/amqp091-go/issues/199) +- Examples may need to be updated after \#140 [\#153](https://github.com/rabbitmq/amqp091-go/issues/153) + +**Merged pull requests:** + +- Update spec091.go [\#224](https://github.com/rabbitmq/amqp091-go/pull/224) ([pinkfish](https://github.com/pinkfish)) +- Closes 222 [\#223](https://github.com/rabbitmq/amqp091-go/pull/223) ([yywing](https://github.com/yywing)) +- Update write.go [\#221](https://github.com/rabbitmq/amqp091-go/pull/221) ([pinkfish](https://github.com/pinkfish)) +- Bump versions [\#219](https://github.com/rabbitmq/amqp091-go/pull/219) ([lukebakken](https://github.com/lukebakken)) +- remove extra word 'accept' from ExchangeDeclare description [\#217](https://github.com/rabbitmq/amqp091-go/pull/217) ([a-sabzian](https://github.com/a-sabzian)) +- Misc Windows CI updates [\#216](https://github.com/rabbitmq/amqp091-go/pull/216) ([lukebakken](https://github.com/lukebakken)) +- Stop using deprecated Publish function [\#207](https://github.com/rabbitmq/amqp091-go/pull/207) ([Zerpet](https://github.com/Zerpet)) +- Constant for consumer timeout queue argument [\#206](https://github.com/rabbitmq/amqp091-go/pull/206) ([Zerpet](https://github.com/Zerpet)) +- Add a constant for CQ v2 queue argument [\#205](https://github.com/rabbitmq/amqp091-go/pull/205) ([Zerpet](https://github.com/Zerpet)) +- Fix example for 32-bit compatibility [\#204](https://github.com/rabbitmq/amqp091-go/pull/204) ([Zerpet](https://github.com/Zerpet)) +- Fix to increase timeout milliseconds since it's too tight [\#203](https://github.com/rabbitmq/amqp091-go/pull/203) ([t2y](https://github.com/t2y)) +- Add Channel.ConsumeWithContext to be able to cancel delivering [\#192](https://github.com/rabbitmq/amqp091-go/pull/192) ([t2y](https://github.com/t2y)) + ## [v1.8.1](https://github.com/rabbitmq/amqp091-go/tree/v1.8.1) (2023-05-04) [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.8.0...v1.8.1) From 3e3940ac8035e54ced3c391706991dc61f8eb441 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 25 Oct 2023 02:41:53 +0000 Subject: [PATCH 13/20] Bump go.uber.org/goleak from 1.2.1 to 1.3.0 Bumps [go.uber.org/goleak](https://github.com/uber-go/goleak) from 1.2.1 to 1.3.0. - [Release notes](https://github.com/uber-go/goleak/releases) - [Changelog](https://github.com/uber-go/goleak/blob/master/CHANGELOG.md) - [Commits](https://github.com/uber-go/goleak/compare/v1.2.1...v1.3.0) --- updated-dependencies: - dependency-name: go.uber.org/goleak dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 0b1f2fe..1690170 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/rabbitmq/amqp091-go go 1.16 -require go.uber.org/goleak v1.2.1 +require go.uber.org/goleak v1.3.0 diff --git a/go.sum b/go.sum index 9aec9d7..a17aaea 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,8 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= -go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 194791d10bd55ec038dea5a213dd89d548fc36ca Mon Sep 17 00:00:00 2001 From: Wisa Powthongchin <32290196+wisaTong@users.noreply.github.com> Date: Fri, 10 Nov 2023 16:12:17 +0700 Subject: [PATCH 14/20] fix comment typo in example_client_test.go --- example_client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example_client_test.go b/example_client_test.go index d285a34..34c7fc3 100644 --- a/example_client_test.go +++ b/example_client_test.go @@ -190,7 +190,7 @@ func (client *Client) connect(addr string) (*amqp.Connection, error) { return conn, nil } -// handleReconnect will wait for a channel error +// handleReInit will wait for a channel error // and then continuously attempt to re-initialize both channels func (client *Client) handleReInit(conn *amqp.Connection) bool { for { From eb83b1b2c8fef22e7def70a0ad3a1ae6c1af29c0 Mon Sep 17 00:00:00 2001 From: Dominik Steffen Date: Wed, 29 Nov 2023 10:47:29 +0100 Subject: [PATCH 15/20] clarifies message ttl expiration - adds documentation that describes how the Expiration field should be used - provides two constants for the most presumable pitfalls --- types.go | 44 ++++++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/types.go b/types.go index 8f43a72..0e42083 100644 --- a/types.go +++ b/types.go @@ -144,6 +144,19 @@ const ( flagReserved1 = 0x0004 ) +// Expiration. These constants should be used to set a messages expiration TTL. +// They should be viewed as a clarification of the expiration functionality in +// messages and their usage is not enforced by this pkg. +// +// The server requires a string value that is interpreted as milliseconds. If +// no value is set, which translates to the nil value of string, the message +// will never expire by itself. This does not influence queue configured TTL +// configurations. +const ( + NeverExpire string = "" // empty value means never expire + ImmediatelyExpire string = "0" // 0 means immediately expire +) + // Queue captures the current server state of the queue on the server returned // from Channel.QueueDeclare or Channel.QueueInspect. type Queue struct { @@ -162,18 +175,25 @@ type Publishing struct { Headers Table // Properties - ContentType string // MIME content type - ContentEncoding string // MIME content encoding - DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) - Priority uint8 // 0 to 9 - CorrelationId string // correlation identifier - ReplyTo string // address to to reply to (ex: RPC) - Expiration string // message expiration spec - MessageId string // message identifier - Timestamp time.Time // message timestamp - Type string // message type name - UserId string // creating user id - ex: "guest" - AppId string // creating application id + ContentType string // MIME content type + ContentEncoding string // MIME content encoding + DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) + Priority uint8 // 0 to 9 + CorrelationId string // correlation identifier + ReplyTo string // address to to reply to (ex: RPC) + // Expiration represents the message TTL in milliseconds. A value of "0" + // indicates that the message will immediately expire if the message arrives + // at its destination and the message is not directly handled by a consumer + // that currently has the capacatity to do so. If you wish the message to + // not expire on its own, set this value to empty string or use the + // corresponding constant NeverExpire. This does not influence queue + // configured TTL values. + Expiration string + MessageId string // message identifier + Timestamp time.Time // message timestamp + Type string // message type name + UserId string // creating user id - ex: "guest" + AppId string // creating application id // The application specific payload of the message Body []byte From ddbf4b3ebe5a56ef63ae255900b792231db0e6f7 Mon Sep 17 00:00:00 2001 From: Dominik Steffen Date: Wed, 29 Nov 2023 11:00:40 +0100 Subject: [PATCH 16/20] improves doc --- types.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/types.go b/types.go index 0e42083..d7d8f26 100644 --- a/types.go +++ b/types.go @@ -144,14 +144,14 @@ const ( flagReserved1 = 0x0004 ) -// Expiration. These constants should be used to set a messages expiration TTL. +// Expiration. These constants can be used to set a messages expiration TTL. // They should be viewed as a clarification of the expiration functionality in // messages and their usage is not enforced by this pkg. // -// The server requires a string value that is interpreted as milliseconds. If -// no value is set, which translates to the nil value of string, the message -// will never expire by itself. This does not influence queue configured TTL -// configurations. +// The server requires a string value that is interpreted by the server as +// milliseconds. If no value is set, which translates to the nil value of +// string, the message will never expire by itself. This does not influence queue +// configured TTL configurations. const ( NeverExpire string = "" // empty value means never expire ImmediatelyExpire string = "0" // 0 means immediately expire @@ -185,9 +185,9 @@ type Publishing struct { // indicates that the message will immediately expire if the message arrives // at its destination and the message is not directly handled by a consumer // that currently has the capacatity to do so. If you wish the message to - // not expire on its own, set this value to empty string or use the - // corresponding constant NeverExpire. This does not influence queue - // configured TTL values. + // not expire on its own, set this value to any ttl value, empty string or + // use the corresponding constants NeverExpire and ImmediatelyExpire. This + // does not influence queue configured TTL values. Expiration string MessageId string // message identifier Timestamp time.Time // message timestamp From a6fa7f7d76ecb00f1f46d42d02adfe0454c5514f Mon Sep 17 00:00:00 2001 From: Aitor Perez Cedres Date: Mon, 18 Dec 2023 12:43:36 +0000 Subject: [PATCH 17/20] Remove incorrect comment [skip ci] Signed-off-by: Aitor Perez Cedres --- channel.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/channel.go b/channel.go index 0dcec90..09ce37e 100644 --- a/channel.go +++ b/channel.go @@ -971,9 +971,6 @@ func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table /* QueueUnbind removes a binding between an exchange and queue matching the key and arguments. - -It is possible to send and empty string for the exchange name which means to -unbind the queue from the default exchange. */ func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error { if err := args.Validate(); err != nil { From fdcaa26aab45209772f266b297907ad51ff4e7a1 Mon Sep 17 00:00:00 2001 From: Mahmud Ridwan Date: Tue, 23 Jan 2024 20:58:42 +0600 Subject: [PATCH 18/20] Use correct mutex --- channel.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/channel.go b/channel.go index 09ce37e..96ebd0b 100644 --- a/channel.go +++ b/channel.go @@ -1826,8 +1826,8 @@ func (ch *Channel) Reject(tag uint64, requeue bool) error { // GetNextPublishSeqNo returns the sequence number of the next message to be // published, when in confirm mode. func (ch *Channel) GetNextPublishSeqNo() uint64 { - ch.confirms.m.Lock() - defer ch.confirms.m.Unlock() + ch.confirms.publishedMut.Lock() + defer ch.confirms.publishedMut.Unlock() return ch.confirms.published + 1 } From 936f704f88301b2888b62c4938edd620cef09a7f Mon Sep 17 00:00:00 2001 From: Mahmud Ridwan Date: Tue, 23 Jan 2024 21:21:13 +0600 Subject: [PATCH 19/20] Add test --- integration_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/integration_test.go b/integration_test.go index f92d788..80ab7b3 100644 --- a/integration_test.go +++ b/integration_test.go @@ -2025,6 +2025,57 @@ func TestIntegrationGetNextPublishSeqNo(t *testing.T) { } } +func TestIntegrationGetNextPublishSeqNoRace(t *testing.T) { + if c := integrationConnection(t, "GetNextPublishSeqNoRace"); c != nil { + defer c.Close() + + ch, err := c.Channel() + if err != nil { + t.Fatalf("channel: %v", err) + } + + if err = ch.Confirm(false); err != nil { + t.Fatalf("could not confirm") + } + + ex := "test-get-next-pub" + if err = ch.ExchangeDeclare(ex, "direct", false, false, false, false, nil); err != nil { + t.Fatalf("cannot declare %v: got: %v", ex, err) + } + + n := ch.GetNextPublishSeqNo() + if n != 1 { + t.Fatalf("wrong next publish seqence number before any publish, expected: %d, got: %d", 1, n) + } + + wg := sync.WaitGroup{} + + wg.Add(2) + + go func() { + defer wg.Done() + n := ch.GetNextPublishSeqNo() + if n <= 0 { + t.Fatalf("wrong next publish seqence number, expected: > %d, got: %d", 0, n) + } + }() + + go func() { + defer wg.Done() + if err := ch.PublishWithContext(context.TODO(), "test-get-next-pub-seq", "", false, false, Publishing{}); err != nil { + t.Fatalf("publish error: %v", err) + } + }() + + wg.Wait() + + n = ch.GetNextPublishSeqNo() + if n != 2 { + t.Fatalf("wrong next publish seqence number after 15 publishing, expected: %d, got: %d", 2, n) + } + } +} + // https://github.com/rabbitmq/amqp091-go/pull/44 func TestShouldNotWaitAfterConnectionClosedIssue44(t *testing.T) { conn := integrationConnection(t, "TestShouldNotWaitAfterConnectionClosedIssue44") From 977c4d20faa7ec9d287741e1016ba760d8ef57d0 Mon Sep 17 00:00:00 2001 From: Mahmud Ridwan Date: Tue, 23 Jan 2024 23:28:32 +0600 Subject: [PATCH 20/20] Fix lint issues --- integration_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/integration_test.go b/integration_test.go index 80ab7b3..50c6507 100644 --- a/integration_test.go +++ b/integration_test.go @@ -2049,25 +2049,27 @@ func TestIntegrationGetNextPublishSeqNoRace(t *testing.T) { } wg := sync.WaitGroup{} + fail := false wg.Add(2) go func() { defer wg.Done() - n := ch.GetNextPublishSeqNo() - if n <= 0 { - t.Fatalf("wrong next publish seqence number, expected: > %d, got: %d", 0, n) - } + _ = ch.GetNextPublishSeqNo() }() go func() { defer wg.Done() if err := ch.PublishWithContext(context.TODO(), "test-get-next-pub-seq", "", false, false, Publishing{}); err != nil { - t.Fatalf("publish error: %v", err) + t.Logf("publish error: %v", err) + fail = true } }() wg.Wait() + if fail { + t.FailNow() + } n = ch.GetNextPublishSeqNo() if n != 2 {