Skip to content

Commit

Permalink
Add working_dir option for Fujitsu TCS job scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
mnakao committed Nov 2, 2023
1 parent e7f334e commit f2eb5d1
Showing 1 changed file with 28 additions and 11 deletions.
39 changes: 28 additions & 11 deletions lib/ood_core/job/adapters/fujitsu_tcs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ class Factory
# @param config [#to_h] the configuration for job adapter
# @option config [Object] :bin (nil) Path to Fujitsu TCS resource manager binaries
# @option config [#to_h] :bin_overrides ({}) Optional overrides to Fujitsu TCS resource manager executables
# @option config [Object] :working_dir (nil) Working directory for submitting a batch script
def self.build_fujitsu_tcs(config)
c = config.to_h.symbolize_keys
bin = c.fetch(:bin, nil)
bin_overrides = c.fetch(:bin_overrides, {})
fujitsu_tcs = Adapters::Fujitsu_TCS::Batch.new(bin: bin, bin_overrides: bin_overrides)
working_dir = c.fetch(:working_dir, nil)
fujitsu_tcs = Adapters::Fujitsu_TCS::Batch.new(bin: bin, bin_overrides: bin_overrides, working_dir: working_dir)
Adapters::Fujitsu_TCS.new(fujitsu_tcs: fujitsu_tcs)
end
end
Expand All @@ -43,6 +45,11 @@ class Batch
# @return Hash<String, String>
attr_reader :bin_overrides

# Working directory for submitting a batch script
# @example
# my_batch.working_dir #=> "HOME" or Dir.pwd
attr_reader :working_dir

# The root exception class that all Fujitsu TCS specific exceptions inherit
# from
class Error < StandardError; end
Expand All @@ -52,9 +59,17 @@ class Fujitsu_TCS_TimeoutError < Error; end

# @param bin [#to_s] path to Fujitsu TCS installation binaries
# @param bin_overrides [#to_h] a hash of bin ovverides to be used in job
def initialize(bin: nil, bin_overrides: {})
# @param working_dir [] Working directory for submitting a batch script
def initialize(bin: nil, bin_overrides: {}, working_dir: nil)
@bin = Pathname.new(bin.to_s)
@bin_overrides = bin_overrides
if working_dir == nil
@working_dir = Dir.pwd
elsif working_dir == "HOME"
@working_dir = Dir.home
else
raise(Error, "Unknown working_dir")
end
end

# Get a list of hashes detailing each of the jobs on the batch server
Expand All @@ -79,7 +94,7 @@ def initialize(bin: nil, bin_overrides: {})
# @raise [Error] if `pjstat` command exited unsuccessfully
# @return [Array<Hash>] list of details for jobs
def get_jobs(id: "", owner: nil)
args = ["-s", "--data", "--choose=jid,jnam,rscg,st,std,stde,adt,sdt,nnumr,usr,elpl,elp"]
args = ["-A", "-s", "--data", "--choose=jid,jnam,rscg,st,std,stde,adt,sdt,nnumr,usr,elpl,elp"]
args.concat ["--filter", "jid=" + id.to_s] unless id.to_s.empty?
args.concat ["--filter", "usr=" + owner.to_s] unless owner.to_s.empty?

Expand All @@ -88,10 +103,10 @@ def get_jobs(id: "", owner: nil)
jobs = []
output.each_line do |line|
l = line.split(",")
jobs << {:JOB_ID => l[1], :JOB_NAME => l[2], :RSC_GRP => l[3].split[0],
jobs << {:JOB_ID => l[1], :JOB_NAME => l[2], :RSC_GRP => l[3].split(" ")[0],
:ST => l[4], :STD => l[5], :STDE => l[6],
:ACCEPT => l[7], :START_DATE => l[8], :NODES => l[9].split(":")[0],
:USER => l[10], :ELAPSE_LIM => l[11], :ELAPSE_TIM => l[12].split[0] }
:USER => l[10], :ELAPSE_LIM => l[11], :ELAPSE_TIM => l[12].split(" ")[0] }
end
jobs
end
Expand Down Expand Up @@ -136,16 +151,18 @@ def delete_job(id)
# @return [String] the id of the job that was created
def submit_string(str, args: [])
args = args.map(&:to_s)
call("pjsub", *args, stdin: str.to_s).split[5]
call("pjsub", *args, stdin: str.to_s).split(" ")[5]
end

private
# Call a forked Fujitsu TCS command
def call(cmd, *args, stdin: "")
cmd = OodCore::Job::Adapters::Helper.bin_path(cmd, bin, bin_overrides)
cmd = OodCore::Job::Adapters::Helper.bin_path(cmd, bin, bin_overrides)
args = args.map(&:to_s)
o, e, s = Open3.capture3(cmd, *(args.map(&:to_s)), stdin_data: stdin.to_s)
s.success? ? o : raise(Error, e)
Dir.chdir(working_dir) do
o, e, s = Open3.capture3(cmd, *(args.map(&:to_s)), stdin_data: stdin.to_s)
s.success? ? o : raise(Error, e)
end
end
end

Expand Down Expand Up @@ -235,7 +252,7 @@ def submit(script, after: [], afterok: [], afternotok: [], afterany: [])
args.concat ["-X"] if script.copy_environment?

# Set native options
args.concat script.native[0].split if script.native
args.concat script.native if script.native

# Set content
content = if script.shell_path.nil?
Expand Down Expand Up @@ -368,7 +385,7 @@ def directive_prefix
private
# Convert duration to seconds
def duration_in_seconds(time)
return 0 if time.nil?
return 0 if time.nil? or time == "-"
time, days = time.split("-").reverse
days.to_i * 24 * 3600 +
time.split(':').map { |v| v.to_i }.inject(0) { |total, v| total * 60 + v }
Expand Down

0 comments on commit f2eb5d1

Please sign in to comment.