Skip to content

Commit

Permalink
Add more instrumentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Aug 26, 2024
1 parent a4684f0 commit a90d993
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 7 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ permissions:

env:
CONSOLE_OUTPUT: XTerm
TRACES_BACKEND: traces/backend/test
METRICS_BACKEND: metrics/backend/test

jobs:
test:
Expand Down
2 changes: 2 additions & 0 deletions async-pool.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ Gem::Specification.new do |spec|
spec.required_ruby_version = ">= 3.1"

spec.add_dependency "async", ">= 1.25"
spec.add_dependency "traces"
spec.add_dependency "metrics"
end
64 changes: 57 additions & 7 deletions lib/async/pool/controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
require 'async/notification'
require 'async/semaphore'

require 'traces'
require 'metrics'

module Async
module Pool
# A resource pool controller.
Expand Down Expand Up @@ -154,7 +157,9 @@ def release(resource)
retire(resource) unless processed
end

private def drain
def drain
Console.debug(self, "Draining pool...", size: @resources.size)

# Enumerate all existing resources and retire them:
while resource = acquire_existing_resource
retire(resource)
Expand Down Expand Up @@ -262,6 +267,7 @@ def availability_summary

def reuse(resource)
Console.debug(self) {"Reuse #{resource}"}

usage = @resources[resource]

if usage.nil? || usage.zero?
Expand All @@ -286,11 +292,7 @@ def wait_for_resource
@notification.wait
end

Console.debug(self) {"Wait for resource -> #{resource}"}

# if resource.concurrency > 1
# @notification.signal
# end
# Be careful not to context switch or fail here.

return resource
end
Expand Down Expand Up @@ -318,12 +320,16 @@ def create_resource
def available_resource
resource = nil

Console.debug(self, "Acquiring concurrency guard...", blocking: @guard.blocking?)

@guard.acquire do
Console.debug(self, "Acquired concurrency guard.")

resource = acquire_or_create_resource
end

return resource
rescue Exception
rescue Exception => error
reuse(resource) if resource
raise
end
Expand Down Expand Up @@ -375,6 +381,50 @@ def acquire_or_create_resource
return create_resource
end
end

Traces::Provider(self) do
def create_resource(...)
attributes = {
concurrency: @guard.limit,
size: @resources.size,
limit: @limit,
}

Traces.trace('async.pool.create', attributes: attributes) {super}
end

def drain(...)
attributes = {
size: @resources.size,
}

Traces.trace('async.pool.drain', attributes: attributes) {super}
end
end

Metrics::Provider(self) do
ACQUIRE_COUNT = Metrics.metric('async.pool.acquire', :counter, description: 'Number of times a resource was invoked.')
RELEASE_COUNT = Metrics.metric('async.pool.release', :counter, description: 'Number of times a resource was released.')
RETIRE_COUNT = Metrics.metric('async.pool.retire', :counter, description: 'Number of times a resource was retired.')

def acquire(...)
ACQUIRE_COUNT.emit(1)

super
end

def release(...)
super.tap do
RELEASE_COUNT.emit(1)
end
end

def retire(...)
super.tap do
RETIRE_COUNT.emit(1)
end
end
end
end
end
end
50 changes: 50 additions & 0 deletions test/async/pool/failure.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2021-2024, by Samuel Williams.

require 'async/pool/controller'
require 'async/pool/resource'
require 'async/queue'

require 'sus/fixtures/async/reactor_context'

describe Async::Pool::Controller do
include Sus::Fixtures::Async::ReactorContext

let(:resources) {Async::Queue.new}

let(:constructor) do
lambda do
resource = resources.dequeue

if resource.is_a?(Exception)
raise resource
end

resource
end
end

let(:pool) {subject.new(constructor)}

with "a constructor that fails" do
it "robustly creates new resources" do
resource1 = Async::Pool::Resource.new
resource2 = Async::Pool::Resource.new

resources.enqueue(RuntimeError.new("Failed to connect"))
resources.enqueue(resource1)
resources.enqueue(RuntimeError.new("Failed to connect"))
resources.enqueue(resource2)

expect{pool.acquire}.to raise_exception(RuntimeError)
expect(pool.acquire).to be == resource1
expect{pool.acquire}.to raise_exception(RuntimeError)
expect(pool.acquire).to be == resource2
ensure
pool.release(resource1)
pool.release(resource2)
end
end
end

0 comments on commit a90d993

Please sign in to comment.