Skip to content

Commit

Permalink
poolset: Call GC when free mem is low
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed Dec 4, 2023
1 parent 7a6447d commit d2d80dc
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/MemPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ function __init__()
DISKCACHE_CONFIG[] = diskcache_config = DiskCacheConfig()
setup_global_device!(diskcache_config)

if haskey(ENV, "JULIA_MEMPOOL_MEMORY_RESERVED")
MEM_RESERVED[] = parse(UInt, ENV["JULIA_MEMPOOL_MEMORY_RESERVED"])

Check warning on line 121 in src/MemPool.jl

View check run for this annotation

Codecov / codecov/patch

src/MemPool.jl#L121

Added line #L121 was not covered by tests
end

# Ensure we cleanup all references
atexit(exit_hook)
end
Expand Down
66 changes: 65 additions & 1 deletion src/datastore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,18 @@ end
mutable struct SendQueue
queue::Channel{Any}
@atomic task::Union{Task,Nothing}
processing::Bool
end
const SEND_QUEUE = SendQueue(Channel(typemax(Int)), nothing)
const SEND_QUEUE = SendQueue(Channel(typemax(Int)), nothing, false)
function _enqueue_work(f, args...; gc_context=false)
if SEND_QUEUE.task === nothing
task = Task() do
while true
try
work, _args = take!(SEND_QUEUE.queue)
SEND_QUEUE.processing = true
work(_args...)
SEND_QUEUE.processing = false
catch err
exit_flag[] && continue
err isa ProcessExitedException && continue # TODO: Remove proc from counters
Expand Down Expand Up @@ -348,11 +351,72 @@ isondisk(id::Int) =
isinmemory(x::DRef) = isinmemory(x.id)
isondisk(x::DRef) = isondisk(x.id)

const MEM_RESERVED = Ref{UInt}(512 * (1024^2)) # Reserve 512MB of RAM for OS

"""
When called, ensures that at least `MEM_RESERVED[] + size` bytes are available
to the OS. If there is not enough memory available, then a variety of calls to
the GC are performed to free up memory until either the reservation limit is
satisfied, or `max_sweeps` number of cycles have elapsed.
"""
function ensure_memory_reserved(size::Integer=0; max_sweeps::Integer=5)
sat_sub(x::T, y::T) where T = x < y ? zero(T) : x-y

# Check whether the OS is running tight on memory
sweep_ctr = 0
while Int(storage_available(CPURAMResource())) - size < MEM_RESERVED[]
# We need more memory! Let's encourage the GC to clear some memory...
sweep_start = time_ns()
mem_used = storage_utilized(CPURAMResource())
if sweep_ctr == 0
@debug "Not enough memory to continue! Freeing..."
GC.gc(false)
elseif sweep_ctr == 1
GC.gc(true)

Check warning on line 375 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L369-L375

Added lines #L369 - L375 were not covered by tests
else
@everywhere GC.gc(true)

Check warning on line 377 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L377

Added line #L377 was not covered by tests
end

# Let finalizers run
yield()

Check warning on line 381 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L381

Added line #L381 was not covered by tests

# Wait for send queue to clear
while SEND_QUEUE.processing
sleep(0.001)
end

Check warning on line 386 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L384-L386

Added lines #L384 - L386 were not covered by tests

mem_freed = sat_sub(mem_used, storage_utilized(CPURAMResource()))
@debug "Memory freed: $(Base.format_bytes(mem_freed))"

Check warning on line 389 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L388-L389

Added lines #L388 - L389 were not covered by tests

sweep_elapsed = time_ns() - sweep_start
if sweep_elapsed < QUERY_MEM_PERIOD

Check warning on line 392 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L391-L392

Added lines #L391 - L392 were not covered by tests
# Make sure we get a fresh memory query
to_sleep = (QUERY_MEM_PERIOD - sweep_elapsed) / (1000^3)
@debug "Sleeping for $to_sleep seconds..."
sleep(to_sleep)

Check warning on line 396 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L394-L396

Added lines #L394 - L396 were not covered by tests
end
@debug "Free space: $(Base.format_bytes(storage_available(CPURAMResource())))"

Check warning on line 398 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L398

Added line #L398 was not covered by tests

sweep_ctr += 1
if sweep_ctr == max_sweeps
@debug "Too many sweeps, bailing out..."
break

Check warning on line 403 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L400-L403

Added lines #L400 - L403 were not covered by tests
end
end

Check warning on line 405 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L405

Added line #L405 was not covered by tests
if sweep_ctr > 0
@debug "Freed enough memory to continue!"

Check warning on line 407 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L407

Added line #L407 was not covered by tests
end
end

function poolset(@nospecialize(x), pid=myid(); size=approx_size(x),
retain=false, restore=false,
device=GLOBAL_DEVICE[], leaf_device=initial_leaf_device(device),
tag=nothing, leaf_tag=Tag())
if pid == myid()
if !restore
ensure_memory_reserved(size)
end

id = atomic_add!(id_counter, 1)
sstate = if !restore
StorageState(Some{Any}(x),
Expand Down

0 comments on commit d2d80dc

Please sign in to comment.