Class: OodCore::Job::Adapters::Sge::Batch Private
- Inherits:
-
Object
- Object
- OodCore::Job::Adapters::Sge::Batch
- Defined in:
- lib/ood_core/job/adapters/sge/batch.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Object used for simplified communication with a SGE batch server
Defined Under Namespace
Classes: Error
Constant Summary collapse
- STATE_MAP =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Adapted from www.softpanorama.org/HPC/Grid_engine/Queues/queue_states.shtml
{ 'EhRqw' => :undetermined, # all pending states with error 'Ehqw' => :undetermined, # all pending states with error 'Eqw' => :undetermined, # all pending states with error 'RS' => :suspended, # all suspended with re-submit 'RT' => :suspended, # all suspended with re-submit 'Rr' => :running, # running, re-submit 'Rs' => :suspended, # all suspended with re-submit 'Rt' => :running, # transferring, re-submit 'RtS' => :suspended, # all suspended with re-submit 'RtT' => :suspended, # all suspended with re-submit 'Rts' => :suspended, # all suspended with re-submit 'S' => :suspended, # queue suspended 'T' => :suspended, # queue suspended by alarm 'dRS' => :completed, # all running and suspended states with deletion 'dRT' => :completed, # all running and suspended states with deletion 'dRr' => :completed, # all running and suspended states with deletion 'dRs' => :completed, # all running and suspended states with deletion 'dRt' => :completed, # all running and suspended states with deletion 'dS' => :completed, # all running and suspended states with deletion 'dT' => :completed, # all running and suspended states with deletion 'dr' => :completed, # all running and suspended states with deletion 'ds' => :completed, # all running and suspended states with deletion 'dt' => :completed, # all running and suspended states with deletion 'hRwq' => :queued_held, # pending, system hold, re-queue 'hqw' => :queued_held, # pending, system hold 'qw' => :queued, # pending 'r' => :running, # running 's' => :suspended, # suspended 't' => :running, # transferring 'tS' => :suspended, # queue suspended 'tT' => :suspended, # queue suspended by alarm 'ts' => :suspended, # obsuspended }
Instance Attribute Summary collapse
- #bin ⇒ Object readonly private
- #bin_overrides ⇒ Object readonly private
- #cluster ⇒ Object readonly private
- #conf ⇒ Object readonly private
- #helper ⇒ Object readonly private
- #strict_host_checking ⇒ Object readonly private
- #submit_host ⇒ Object readonly private
Instance Method Summary collapse
-
#call(cmd, *args, env: {}, stdin: "", chdir: nil) ⇒ Object
private
Call a forked SGE command for a given batch server.
- #can_use_drmaa? ⇒ Boolean private
-
#delete(job_id) ⇒ void
private
Call qdel.
-
#get_all(owner: nil) ⇒ Array<OodCore::Job::Info>
private
Get OodCore::Job::Info for every enqueued job, optionally filtering on owner.
-
#get_info_enqueued_job(job_id) ⇒ OodCore::Job::Info
private
Get OodCore::Job::Info for a job_id that may still be in the queue.
- #get_status_from_drmaa?(job_hash) ⇒ Boolean private
-
#get_status_from_drmma(job_id) ⇒ Object
private
Get the job status using DRMAA.
-
#hold(job_id) ⇒ void
private
Call qhold.
-
#initialize(config) ⇒ Batch
constructor
private
A new instance of Batch.
- #load_drmaa(libdrmaa_path) ⇒ Object private
- #post_process_qstat_job_hash(job_hash) ⇒ Object private
-
#release(job_id) ⇒ void
private
Call qrls.
-
#submit(content, args) ⇒ Object
private
Call qsub with arguments and the scripts content.
- #translate_drmaa_state(drmaa_state_code) ⇒ Object private
- #translate_sge_state(sge_state_code) ⇒ Object private
- #update_job_hash_status!(job_hash) ⇒ Object private
Constructor Details
#initialize(config) ⇒ Batch
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of Batch.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 34 def initialize(config) @cluster = config.fetch(:cluster, nil) @bin = Pathname.new(config.fetch(:bin, nil).to_s) @sge_root = Pathname.new(config[:sge_root] || ENV['SGE_ROOT'] || "/var/lib/gridengine") @bin_overrides = config.fetch(:bin_overrides, {}) @submit_host = config.fetch(:submit_host, "") @strict_host_checking = config.fetch(:strict_host_checking, true) # FIXME: hack as this affects env of the process! ENV['SGE_ROOT'] = @sge_root.to_s if config[:libdrmaa_path] load_drmaa(config[:libdrmaa_path]) @can_use_drmaa = true else @can_use_drmaa = false end @helper = OodCore::Job::Adapters::Sge::Helper.new end |
Instance Attribute Details
#bin ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
18 19 20 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18 def bin @bin end |
#bin_overrides ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
18 19 20 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18 def bin_overrides @bin_overrides end |
#cluster ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
18 19 20 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18 def cluster @cluster end |
#conf ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
18 19 20 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18 def conf @conf end |
#helper ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
18 19 20 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18 def helper @helper end |
#strict_host_checking ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
18 19 20 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18 def strict_host_checking @strict_host_checking end |
#submit_host ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
18 19 20 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18 def submit_host @submit_host end |
Instance Method Details
#call(cmd, *args, env: {}, stdin: "", chdir: nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Call a forked SGE command for a given batch server
169 170 171 172 173 174 175 176 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 169 def call(cmd, *args, env: {}, stdin: "", chdir: nil) cmd = OodCore::Job::Adapters::Helper.bin_path(cmd, bin, bin_overrides) env = env.to_h.each_with_object({}) { |(k, v), h| h[k.to_s] = v.to_s } cmd, args = OodCore::Job::Adapters::Helper.ssh_wrap(submit_host, cmd, args, strict_host_checking, env) chdir ||= "." o, e, s = Open3.capture3(env, cmd, *(args.map(&:to_s)), stdin_data: stdin.to_s, chdir: chdir.to_s) s.success? ? o : raise(Error, e) end |
#can_use_drmaa? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
136 137 138 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 136 def can_use_drmaa? @can_use_drmaa end |
#delete(job_id) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Call qdel
157 158 159 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 157 def delete(job_id) call('qdel', job_id) end |
#get_all(owner: nil) ⇒ Array<OodCore::Job::Info>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Get OodCore::Job::Info for every enqueued job, optionally filtering on owner
64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 64 def get_all(owner: nil) listener = QstatXmlRListener.new argv = ['qstat', '-r', '-xml'] argv.concat ['-u', owner] unless owner.nil? REXML::Parsers::StreamParser.new(call(*argv), listener).parse listener.parsed_jobs.map{ |job_hash| OodCore::Job::Info.new( **post_process_qstat_job_hash(job_hash) ) } end |
#get_info_enqueued_job(job_id) ⇒ OodCore::Job::Info
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Get OodCore::Job::Info for a job_id that may still be in the queue
If libdrmaa is not loaded then we cannot use DRMAA. Using DRMAA provides better job status and should always be chosen if it is possible.
When qstat is called in XML mode for a job id that is not in the queue invalid XML is returned. The second line of the invalid XML contains the string '<unknown_jobs' which will be used to recognize this case.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 88 def get_info_enqueued_job(job_id) job_info = OodCore::Job::Info.new(id: job_id.to_s, status: :completed) argv = ['qstat', '-r', '-xml', '-j', job_id.to_s] begin results = call(*argv) listener = QstatXmlJRListener.new REXML::Parsers::StreamParser.new(results, listener).parse job_hash = listener.parsed_job if job_hash[:id] update_job_hash_status!(job_hash) else job_hash[:id] = job_id job_hash[:status] = :completed end job_info = OodCore::Job::Info.new(**job_hash) rescue REXML::ParseException => e # If the error is something other than a job not being found by qstat re-raise the error unless results =~ /unknown_jobs/ raise e, "REXML::ParseException error and command '#{argv.join(' ')}' produced results that didn't contain string 'unknown_jobs'. ParseException: #{e.}" end rescue StandardError => e # Note that DRMAA is not guaranteed to be defined, hence the tests raise e unless ( can_use_drmaa? && e.is_a?(DRMAA::DRMAAInvalidJobError)) # raised when job is not found end job_info end |
#get_status_from_drmaa?(job_hash) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
130 131 132 133 134 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 130 def get_status_from_drmaa?(job_hash) # DRMAA does not recognize the parent task in job arrays # e.g. 123 is invalid if it is an array job, while 123.4 is valid can_use_drmaa? && job_hash[:tasks].empty? end |
#get_status_from_drmma(job_id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Get the job status using DRMAA
235 236 237 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 235 def get_status_from_drmma(job_id) translate_drmaa_state(DRMAA::SessionSingleton.instance.job_ps(job_id.to_s)) end |
#hold(job_id) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Call qhold
143 144 145 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 143 def hold(job_id) call('qhold', job_id) end |
#load_drmaa(libdrmaa_path) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
55 56 57 58 59 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 55 def load_drmaa(libdrmaa_path) FFI_DRMAA.libdrmaa_path = libdrmaa_path if libdrmaa_path require "ood_core/job/adapters/drmaa" require "ood_core/refinements/drmaa_extensions" end |
#post_process_qstat_job_hash(job_hash) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 222 def post_process_qstat_job_hash(job_hash) # dispatch is not set if the job is not running if ! job_hash.key?(:wallclock_time) job_hash[:wallclock_time] = job_hash.key?(:dispatch_time) ? Time.now.to_i - job_hash[:dispatch_time] : 0 end job_hash[:status] = translate_sge_state(job_hash[:status]) update_job_hash_status!(job_hash) job_hash end |
#release(job_id) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Call qrls
150 151 152 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 150 def release(job_id) call('qrls', job_id) end |
#submit(content, args) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Call qsub with arguments and the scripts content
164 165 166 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 164 def submit(content, args) @helper.parse_job_id_from_qsub(call('qsub', *args, :stdin => content)) end |
#translate_drmaa_state(drmaa_state_code) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
218 219 220 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 218 def translate_drmaa_state(drmaa_state_code) DRMAA::DRMMA_TO_OOD_STATE_MAP.fetch(drmaa_state_code, :undetermined) end |
#translate_sge_state(sge_state_code) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
214 215 216 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 214 def translate_sge_state(sge_state_code) STATE_MAP.fetch(sge_state_code, :undetermined) end |
#update_job_hash_status!(job_hash) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
120 121 122 123 124 125 126 127 128 |
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 120 def update_job_hash_status!(job_hash) if get_status_from_drmaa?(job_hash) begin job_hash[:status] = get_status_from_drmma(job_hash[:id]) rescue DRMAA::DRMAAException => e # log DRMAA error? end end end |