Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

poolset: Call GC when free mem is low #75

Merged
merged 2 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ jobs:
strategy:
matrix:
julia-version:
- '~1.7'
- '~1.8'
- '~1.9'
- 'nightly'
Expand Down
4 changes: 3 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"

[compat]
DataStructures = "0.18"
julia = "1.7"
ScopedValues = "1"
julia = "1.8"

[extras]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Expand Down
5 changes: 5 additions & 0 deletions src/MemPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import Serialization: serialize, deserialize
export DRef, FileRef, poolset, poolget, mmwrite, mmread, cleanup
import .Threads: ReentrantLock
using ScopedValues

## Wrapping-unwrapping of payloads:

Expand Down Expand Up @@ -117,6 +118,10 @@
DISKCACHE_CONFIG[] = diskcache_config = DiskCacheConfig()
setup_global_device!(diskcache_config)

if haskey(ENV, "JULIA_MEMPOOL_MEMORY_RESERVED")
MEM_RESERVED[] = parse(UInt, ENV["JULIA_MEMPOOL_MEMORY_RESERVED"])

Check warning on line 122 in src/MemPool.jl

View check run for this annotation

Codecov / codecov/patch

src/MemPool.jl#L122

Added line #L122 was not covered by tests
end

# Ensure we cleanup all references
atexit(exit_hook)
end
Expand Down
66 changes: 65 additions & 1 deletion src/datastore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,18 @@
mutable struct SendQueue
queue::Channel{Any}
@atomic task::Union{Task,Nothing}
processing::Bool
end
const SEND_QUEUE = SendQueue(Channel(typemax(Int)), nothing)
const SEND_QUEUE = SendQueue(Channel(typemax(Int)), nothing, false)
function _enqueue_work(f, args...; gc_context=false)
if SEND_QUEUE.task === nothing
task = Task() do
while true
try
work, _args = take!(SEND_QUEUE.queue)
SEND_QUEUE.processing = true
work(_args...)
SEND_QUEUE.processing = false
catch err
exit_flag[] && continue
err isa ProcessExitedException && continue # TODO: Remove proc from counters
Expand Down Expand Up @@ -348,12 +351,73 @@
isinmemory(x::DRef) = isinmemory(x.id)
isondisk(x::DRef) = isondisk(x.id)

const MEM_RESERVED = Ref{UInt}(512 * (1024^2)) # Reserve 512MB of RAM for OS
const MEM_RESERVE_LOCK = Threads.ReentrantLock()

"""
When called, ensures that at least `MEM_RESERVED[] + size` bytes are available
to the OS. If there is not enough memory available, then a variety of calls to
the GC are performed to free up memory until either the reservation limit is
satisfied, or `max_sweeps` number of cycles have elapsed.
"""
function ensure_memory_reserved(size::Integer=0; max_sweeps::Integer=5)
sat_sub(x::T, y::T) where T = x < y ? zero(T) : x-y

# Check whether the OS is running tight on memory
sweep_ctr = 0
while true
with(QUERY_MEM_OVERRIDE => true) do
Int(storage_available(CPURAMResource())) - size < MEM_RESERVED[]
end || break

# We need more memory! Let's encourage the GC to clear some memory...
sweep_start = time_ns()
mem_used = with(QUERY_MEM_OVERRIDE => true) do
storage_utilized(CPURAMResource())

Check warning on line 376 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L374-L376

Added lines #L374 - L376 were not covered by tests
end
if sweep_ctr == 0
@debug "Not enough memory to continue! Sweeping up unused memory..."
GC.gc(false)
elseif sweep_ctr == 1
GC.gc(true)

Check warning on line 382 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L378-L382

Added lines #L378 - L382 were not covered by tests
else
@everywhere GC.gc(true)

Check warning on line 384 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L384

Added line #L384 was not covered by tests
end

# Let finalizers run
yield()

Check warning on line 388 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L388

Added line #L388 was not covered by tests

# Wait for send queue to clear
while SEND_QUEUE.processing
yield()
end

Check warning on line 393 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L391-L393

Added lines #L391 - L393 were not covered by tests

with(QUERY_MEM_OVERRIDE => true) do
mem_freed = sat_sub(mem_used, storage_utilized(CPURAMResource()))
@debug "Freed $(Base.format_bytes(mem_freed)) bytes, available: $(Base.format_bytes(storage_available(CPURAMResource())))"

Check warning on line 397 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L395-L397

Added lines #L395 - L397 were not covered by tests
end

sweep_ctr += 1
if sweep_ctr == max_sweeps
@debug "Made too many sweeps, bailing out..."
break

Check warning on line 403 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L400-L403

Added lines #L400 - L403 were not covered by tests
end
end

Check warning on line 405 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L405

Added line #L405 was not covered by tests
if sweep_ctr > 0
@debug "Swept for $sweep_ctr cycles"

Check warning on line 407 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L407

Added line #L407 was not covered by tests
end
end

function poolset(@nospecialize(x), pid=myid(); size=approx_size(x),
retain=false, restore=false,
device=GLOBAL_DEVICE[], leaf_device=initial_leaf_device(device),
tag=nothing, leaf_tag=Tag(),
destructor=nothing)
if pid == myid()
if !restore
@lock MEM_RESERVE_LOCK ensure_memory_reserved(size)
end

id = atomic_add!(id_counter, 1)
sstate = if !restore
StorageState(Some{Any}(x),
Expand Down
3 changes: 2 additions & 1 deletion src/storage.jl
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ QueriedMemInfo() = QueriedMemInfo(UInt64(0), UInt64(0))
const QUERY_MEM_AVAILABLE = Ref(QueriedMemInfo())
const QUERY_MEM_CAPACITY = Ref(QueriedMemInfo())
const QUERY_MEM_PERIOD = 10 * 1000^2 # 10ms
const QUERY_MEM_OVERRIDE = ScopedValue(false)
function _query_mem_periodically(kind::Symbol)
if !(kind in (:available, :capacity))
throw(ArgumentError("Invalid memory query kind: $kind"))
Expand All @@ -197,7 +198,7 @@ function _query_mem_periodically(kind::Symbol)
end
mem_info = mem_bin[]
now_ns = time_ns()
if mem_info.last_ns < now_ns - QUERY_MEM_PERIOD
if QUERY_MEM_OVERRIDE[] || mem_info.last_ns < now_ns - QUERY_MEM_PERIOD
if kind == :available
new_mem_info = QueriedMemInfo(free_memory(), now_ns)
elseif kind == :capacity
Expand Down
Loading