Skip to content

Commit

Permalink
Merge pull request #13 from toyokumo/bugfix/not-to-take-extra-signs
Browse files Browse the repository at this point in the history
Do not take extra signs from consume-chan
  • Loading branch information
liquidz authored Aug 8, 2022
2 parents 8c71148 + eae4bb0 commit 5e81a73
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Change Log

## [Unreleased]
### Fixed
* Fixed `respond` and `raise` not to take extra signs when these functions are called more than once.

## 0.4.87
### Changed
Expand Down
40 changes: 22 additions & 18 deletions src/gluttony/record/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,31 @@

(defn- respond*
[{:keys [client queue-url consume-chan]} p message]
(deliver p :respond)
(a/go
(when consume-chan
;; takes a sign and make a space in which next consume can work
(a/<! consume-chan)
(log/debugf "takes a sign of message-id:%s" (:message-id message)))
(a/<! (sqs/delete-message client {:queue-url queue-url
:receipt-handle (:receipt-handle message)}))))
(let [already-realized? (realized? p)]
(deliver p :respond)
(a/go
(when (and consume-chan
(not already-realized?))
;; takes a sign and make a space in which next consume can work
(a/<! consume-chan)
(log/debugf "takes a sign of message-id:%s" (:message-id message)))
(a/<! (sqs/delete-message client {:queue-url queue-url
:receipt-handle (:receipt-handle message)})))))

(defn- raise*
[{:keys [client queue-url consume-chan]} p message & [retry-delay]]
(deliver p :raise)
(a/go
(when consume-chan
;; takes a sign and make a space in which next consume can work
(a/<! consume-chan)
(log/debugf "takes a sign of message-id:%s" (:message-id message)))
(let [retry-delay (or retry-delay 0)]
(a/<! (sqs/change-message-visibility client {:queue-url queue-url
:receipt-handle (:receipt-handle message)
:visibility-timeout retry-delay})))))
(let [already-realized? (realized? p)]
(deliver p :raise)
(a/go
(when (and consume-chan
(not already-realized?))
;; takes a sign and make a space in which next consume can work
(a/<! consume-chan)
(log/debugf "takes a sign of message-id:%s" (:message-id message)))
(let [retry-delay (or retry-delay 0)]
(a/<! (sqs/change-message-visibility client {:queue-url queue-url
:receipt-handle (:receipt-handle message)
:visibility-timeout retry-delay}))))))

(defn- heartbeat*
"When heartbeat parameter is set, a heartbeat process start after the first heartbeat"
Expand Down
2 changes: 2 additions & 0 deletions test/gluttony/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@
(a/<! (a/timeout 10)) ; Make a point of park
(swap! collected
conj Integer/MIN_VALUE)
(respond)
;; Respond twice on purpose
(respond)))
consumer (start-consumer queue-url consume
{:client th/client
Expand Down

0 comments on commit 5e81a73

Please sign in to comment.