Class: OodCore::Job::Adapters::Sge::Batch Private

Inherits:
Object
  • Object
show all
Defined in:
lib/ood_core/job/adapters/sge/batch.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Object used for simplified communication with a SGE batch server

Defined Under Namespace

Classes: Error

Constant Summary collapse

STATE_MAP =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Adapted from www.softpanorama.org/HPC/Grid_engine/Queues/queue_states.shtml

{
  'EhRqw'   => :undetermined, # all pending states with error
  'Ehqw'    => :undetermined, # all pending states with error
  'Eqw'     => :undetermined, # all pending states with error
  'RS'      => :suspended,    # all suspended with re-submit
  'RT'      => :suspended,    # all suspended with re-submit
  'Rr'      => :running,      # running, re-submit
  'Rs'      => :suspended,    # all suspended with re-submit
  'Rt'      => :running,      # transferring, re-submit
  'RtS'     => :suspended,    # all suspended with re-submit
  'RtT'     => :suspended,    # all suspended with re-submit
  'Rts'     => :suspended,    # all suspended with re-submit
  'S'       => :suspended,    # queue suspended
  'T'       => :suspended,    # queue suspended by alarm
  'dRS'     => :completed,    # all running and suspended states with deletion
  'dRT'     => :completed,    # all running and suspended states with deletion
  'dRr'     => :completed,    # all running and suspended states with deletion
  'dRs'     => :completed,    # all running and suspended states with deletion
  'dRt'     => :completed,    # all running and suspended states with deletion
  'dS'      => :completed,    # all running and suspended states with deletion
  'dT'      => :completed,    # all running and suspended states with deletion
  'dr'      => :completed,    # all running and suspended states with deletion
  'ds'      => :completed,    # all running and suspended states with deletion
  'dt'      => :completed,    # all running and suspended states with deletion
  'hRwq'    => :queued_held,  # pending, system hold, re-queue
  'hqw'     => :queued_held,  # pending, system hold
  'qw'      => :queued,       # pending
  'r'       => :running,      # running
  's'       => :suspended,    # suspended
  't'       => :running,      # transferring
  'tS'      => :suspended,    # queue suspended
  'tT'      => :suspended,    # queue suspended by alarm
  'ts'      => :suspended,    # obsuspended
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Batch

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of Batch.

Parameters:

  • opts (#to_h)

    the options defining this adapter

See Also:



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 34

def initialize(config)
  @cluster          = config.fetch(:cluster, nil)
  @bin              = Pathname.new(config.fetch(:bin, nil).to_s)
  @sge_root         = Pathname.new(config[:sge_root] || ENV['SGE_ROOT'] || "/var/lib/gridengine")
  @bin_overrides    = config.fetch(:bin_overrides, {})
  @submit_host      = config.fetch(:submit_host, "")
  @strict_host_checking = config.fetch(:strict_host_checking, true)

  # FIXME: hack as this affects env of the process!
  ENV['SGE_ROOT'] = @sge_root.to_s

  if config[:libdrmaa_path]
    load_drmaa(config[:libdrmaa_path])
    @can_use_drmaa    = true
  else
    @can_use_drmaa    = false
  end

  @helper = OodCore::Job::Adapters::Sge::Helper.new
end

Instance Attribute Details

#binObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def bin
  @bin
end

#bin_overridesObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def bin_overrides
  @bin_overrides
end

#clusterObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def cluster
  @cluster
end

#confObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def conf
  @conf
end

#helperObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def helper
  @helper
end

#strict_host_checkingObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def strict_host_checking
  @strict_host_checking
end

#submit_hostObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def submit_host
  @submit_host
end

Instance Method Details

#call(cmd, *args, env: {}, stdin: "", chdir: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Call a forked SGE command for a given batch server



172
173
174
175
176
177
178
179
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 172

def call(cmd, *args, env: {}, stdin: "", chdir: nil)
  cmd = OodCore::Job::Adapters::Helper.bin_path(cmd, bin, bin_overrides)
  env = env.to_h.each_with_object({}) { |(k, v), h| h[k.to_s] = v.to_s }
  cmd, args = OodCore::Job::Adapters::Helper.ssh_wrap(submit_host, cmd, args, strict_host_checking, env)
  chdir ||= "."
  o, e, s = Open3.capture3(env, cmd, *(args.map(&:to_s)), stdin_data: stdin.to_s, chdir: chdir.to_s)
  s.success? ? o : raise(Error, e)
end

#can_use_drmaa?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


139
140
141
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 139

def can_use_drmaa?
  @can_use_drmaa
end

#delete(job_id) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Call qdel

Parameters:

  • job_id (#to_s)


160
161
162
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 160

def delete(job_id)
  call('qdel', job_id)
end

#get_all(owner: nil) ⇒ Array<OodCore::Job::Info>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get OodCore::Job::Info for every enqueued job, optionally filtering on owner

Parameters:

  • owner (#to_s) (defaults to: nil)

    the owner or owner list

Returns:



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 64

def get_all(owner: nil)
  begin
    listener = QstatXmlRListener.new
    argv = ['qstat', '-r', '-xml']
    argv.concat ['-u', owner] unless owner.nil?
    REXML::Parsers::StreamParser.new(call(*argv), listener).parse

    listener.parsed_jobs.map do |job_hash| 
      OodCore::Job::Info.new(**post_process_qstat_job_hash(job_hash))
    end
  rescue REXML::ParseException => e
    warn("Error parsing response: #{e}")
    []
  end
end

#get_info_enqueued_job(job_id) ⇒ OodCore::Job::Info

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get OodCore::Job::Info for a job_id that may still be in the queue

If libdrmaa is not loaded then we cannot use DRMAA. Using DRMAA provides better job status and should always be chosen if it is possible.

When qstat is called in XML mode for a job id that is not in the queue invalid XML is returned. The second line of the invalid XML contains the string '<unknown_jobs' which will be used to recognize this case.

Parameters:

  • job_id (#to_s)

Returns:



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 91

def get_info_enqueued_job(job_id)
  job_info = OodCore::Job::Info.new(id: job_id.to_s, status: :completed)
  argv = ['qstat', '-r', '-xml', '-j', job_id.to_s]

  begin
    results = call(*argv)
    listener = QstatXmlJRListener.new
    REXML::Parsers::StreamParser.new(results, listener).parse

    job_hash = listener.parsed_job

    if job_hash[:id]
      update_job_hash_status!(job_hash)
    else
      job_hash[:id] = job_id
      job_hash[:status] = :completed
    end

    job_info = OodCore::Job::Info.new(**job_hash)
  rescue REXML::ParseException => e
    # If the error is something other than a job not being found by qstat re-raise the error
    unless results =~ /unknown_jobs/
      raise e, "REXML::ParseException error and command '#{argv.join(' ')}' produced results that didn't contain string 'unknown_jobs'. ParseException: #{e.message}"
    end
  rescue StandardError => e
    # Note that DRMAA is not guaranteed to be defined, hence the tests
    raise e unless ( can_use_drmaa? && e.is_a?(DRMAA::DRMAAInvalidJobError))  # raised when job is not found
  end

  job_info
end

#get_status_from_drmaa?(job_hash) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


133
134
135
136
137
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 133

def get_status_from_drmaa?(job_hash)
  # DRMAA does not recognize the parent task in job arrays
  # e.g. 123 is invalid if it is an array job, while 123.4 is valid
  can_use_drmaa? && job_hash[:tasks].empty?
end

#get_status_from_drmma(job_id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get the job status using DRMAA



238
239
240
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 238

def get_status_from_drmma(job_id)
  translate_drmaa_state(DRMAA::SessionSingleton.instance.job_ps(job_id.to_s))
end

#hold(job_id) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Call qhold

Parameters:

  • job_id (#to_s)


146
147
148
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 146

def hold(job_id)
  call('qhold', job_id)
end

#load_drmaa(libdrmaa_path) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



55
56
57
58
59
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 55

def load_drmaa(libdrmaa_path)
  FFI_DRMAA.libdrmaa_path = libdrmaa_path if libdrmaa_path
  require "ood_core/job/adapters/drmaa"
  require "ood_core/refinements/drmaa_extensions"
end

#post_process_qstat_job_hash(job_hash) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



225
226
227
228
229
230
231
232
233
234
235
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 225

def post_process_qstat_job_hash(job_hash)
  # dispatch is not set if the job is not running
  if ! job_hash.key?(:wallclock_time)
    job_hash[:wallclock_time] = job_hash.key?(:dispatch_time) ? Time.now.to_i - job_hash[:dispatch_time] : 0
  end

  job_hash[:status] = translate_sge_state(job_hash[:status])
  update_job_hash_status!(job_hash)

  job_hash
end

#release(job_id) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Call qrls

Parameters:

  • job_id (#to_s)


153
154
155
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 153

def release(job_id)
  call('qrls', job_id)
end

#submit(content, args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Call qsub with arguments and the scripts content

Parameters:

  • job_id (#to_s)


167
168
169
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 167

def submit(content, args)
    @helper.parse_job_id_from_qsub(call('qsub', *args, :stdin => content))
end

#translate_drmaa_state(drmaa_state_code) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



221
222
223
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 221

def translate_drmaa_state(drmaa_state_code)
  DRMAA::DRMMA_TO_OOD_STATE_MAP.fetch(drmaa_state_code, :undetermined)
end

#translate_sge_state(sge_state_code) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



217
218
219
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 217

def translate_sge_state(sge_state_code)
  STATE_MAP.fetch(sge_state_code, :undetermined)
end

#update_job_hash_status!(job_hash) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



123
124
125
126
127
128
129
130
131
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 123

def update_job_hash_status!(job_hash)
  if get_status_from_drmaa?(job_hash)
    begin
      job_hash[:status] = get_status_from_drmma(job_hash[:id])
    rescue DRMAA::DRMAAException => e
      # log DRMAA error?
    end
  end
end