Class: Orocos::RemoteProcesses::Server
- Inherits:
-
Object
- Object
- Orocos::RemoteProcesses::Server
- Defined in:
- lib/orocos/remote_processes/server.rb
Overview
A remote process management server.
It allows to start/stop and monitor the status of processes on a client/server way.
Use ProcessClient to access a server
Constant Summary collapse
- DEFAULT_OPTIONS =
{ :wait => false, :output => '%m-%p.txt' }
- INTERNAL_SIGCHLD_TRIGGERED =
"S"
Instance Attribute Summary collapse
-
#default_start_options ⇒ Object
readonly
The startup options to be passed to Orocos.run.
-
#loader ⇒ OroGen::Loaders::Base
readonly
The object we use to load oroGen models.
-
#port ⇒ Integer?
readonly
The TCP port we are listening to.
-
#processes ⇒ Object
readonly
A mapping from the deployment names to the corresponding Process object.
-
#required_port ⇒ Integer
readonly
The TCP port we are required to bind to.
Class Method Summary collapse
- .create_pkgconfig_loader ⇒ Object
-
.run(options = DEFAULT_OPTIONS, port = DEFAULT_PORT) ⇒ Object
Start a standalone process server using the given options and port.
-
.unique_dirname(base_dir, path_spec, date_tag = nil) ⇒ Object
Returns a unique directory name as a subdirectory of
base_dir
, based onpath_spec
.
Instance Method Summary collapse
- #build_system_info ⇒ Object
- #create_log_dir(log_dir, time_tag, metadata = Hash.new) ⇒ Object
- #each_client(&block) ⇒ Object
- #end_process(p) ⇒ Object
- #exec ⇒ Object
-
#handle_command(socket) ⇒ Object
Helper method that deals with one client request.
-
#initialize(default_start_options = DEFAULT_OPTIONS, port = DEFAULT_PORT, loader = self.class.create_pkgconfig_loader) ⇒ Server
constructor
A new instance of Server.
-
#listen ⇒ Object
Main server loop.
- #move_log_dir(log_dir, results_dir) ⇒ Object
- #open ⇒ Object
- #process_dead_processes ⇒ Object
- #quit ⇒ Object
-
#quit_and_join ⇒ Object
Helper method that stops all running processes.
- #start_process(name, deployment_name, name_mappings, options) ⇒ Object
Constructor Details
#initialize(default_start_options = DEFAULT_OPTIONS, port = DEFAULT_PORT, loader = self.class.create_pkgconfig_loader) ⇒ Server
Returns a new instance of Server
101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/orocos/remote_processes/server.rb', line 101 def initialize( = DEFAULT_OPTIONS, port = DEFAULT_PORT, loader = self.class.create_pkgconfig_loader) @default_start_options = Kernel. , :wait => false, :output => '%m-%p.txt' @loader = loader @required_port = port @port = nil @processes = Hash.new @all_ios = Array.new end |
Instance Attribute Details
#default_start_options ⇒ Object (readonly)
The startup options to be passed to Orocos.run
69 70 71 |
# File 'lib/orocos/remote_processes/server.rb', line 69 def @default_start_options end |
#loader ⇒ OroGen::Loaders::Base (readonly)
The object we use to load oroGen models
It is commonly an [OroGen::Loaders::PkgConfig] loader object
95 96 97 |
# File 'lib/orocos/remote_processes/server.rb', line 95 def loader @loader end |
#port ⇒ Integer? (readonly)
The TCP port we are listening to
In general, it is equal to #required_port. Only if #required_port is equal to zero will #port contain the actual used port as allocated by the operating system
It is nil until the server socket is created
87 88 89 |
# File 'lib/orocos/remote_processes/server.rb', line 87 def port @port end |
#processes ⇒ Object (readonly)
A mapping from the deployment names to the corresponding Process object.
90 91 92 |
# File 'lib/orocos/remote_processes/server.rb', line 90 def processes @processes end |
#required_port ⇒ Integer (readonly)
The TCP port we are required to bind to
It is the port given to #initialize. In general, it is equal to #port. Only if it is equal to zero will #port contain the actual used port as allocated by the operating system
77 78 79 |
# File 'lib/orocos/remote_processes/server.rb', line 77 def required_port @required_port end |
Class Method Details
.create_pkgconfig_loader ⇒ Object
97 98 99 |
# File 'lib/orocos/remote_processes/server.rb', line 97 def self.create_pkgconfig_loader OroGen::Loaders::RTT.new(Orocos.orocos_target) end |
.run(options = DEFAULT_OPTIONS, port = DEFAULT_PORT) ⇒ Object
Start a standalone process server using the given options and port. The options are passed to Server.run when a new deployment is started
60 61 62 63 64 65 66 |
# File 'lib/orocos/remote_processes/server.rb', line 60 def self.run( = DEFAULT_OPTIONS, port = DEFAULT_PORT) Orocos.disable_sigchld_handler = true Orocos.initialize new({ :wait => false }.merge(), port).exec rescue Interrupt end |
.unique_dirname(base_dir, path_spec, date_tag = nil) ⇒ Object
Returns a unique directory name as a subdirectory of base_dir
,
based on path_spec
. The generated name is of the form
<base_dir>/a/b/c/YYYYMMDD-HHMM-basename
if path_spec = "a/b/c/basename"
. A .<number>
suffix is appended if the path already exists.
Shamelessly taken from Roby
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/orocos/remote_processes/server.rb', line 22 def self.unique_dirname(base_dir, path_spec, date_tag = nil) if path_spec =~ /\/$/ basename = "" dirname = path_spec else basename = File.basename(path_spec) dirname = File.dirname(path_spec) end date_tag ||= Time.now.strftime('%Y%m%d-%H%M') if basename && !basename.empty? basename = date_tag + "-" + basename else basename = date_tag end # Check if +basename+ already exists, and if it is the case add a # .x suffix to it full_path = File.(File.join(dirname, basename), base_dir) base_dir = File.dirname(full_path) unless File.exists?(base_dir) FileUtils.mkdir_p(base_dir) end final_path, i = full_path, 0 while File.exists?(final_path) i += 1 final_path = full_path + ".#{i}" end final_path end |
Instance Method Details
#build_system_info ⇒ Object
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/orocos/remote_processes/server.rb', line 358 def build_system_info available_projects = Hash.new available_typekits = Hash.new available_deployments = Hash.new loader.each_available_project_name do |name| available_projects[name] = loader.project_model_text_from_name(name) end loader.each_available_typekit_name do |name| available_typekits[name] = loader.typekit_model_text_from_name(name) end loader.each_available_deployment_name do |name| available_deployments[name] = loader.find_project_from_deployment_name(name) end return available_projects, available_deployments, available_typekits end |
#create_log_dir(log_dir, time_tag, metadata = Hash.new) ⇒ Object
337 338 339 340 341 342 343 344 345 346 347 |
# File 'lib/orocos/remote_processes/server.rb', line 337 def create_log_dir(log_dir, time_tag, = Hash.new) log_dir = File.(log_dir) Server.debug " #{log_dir}, time: #{time_tag}" FileUtils.mkdir_p(log_dir) File.open(File.join(log_dir, 'time_tag'), 'w') do |io| io.write(time_tag) end File.open(File.join(log_dir, 'info.yml'), 'w') do |io| YAML.dump(Hash['time' => time_tag].merge(), io) end end |
#each_client(&block) ⇒ Object
116 117 118 119 120 121 |
# File 'lib/orocos/remote_processes/server.rb', line 116 def each_client(&block) clients = @all_ios[2..-1] if clients clients.each(&block) end end |
#end_process(p) ⇒ Object
382 383 384 |
# File 'lib/orocos/remote_processes/server.rb', line 382 def end_process(p) p.kill(false) end |
#exec ⇒ Object
123 124 125 126 |
# File 'lib/orocos/remote_processes/server.rb', line 123 def exec open listen end |
#handle_command(socket) ⇒ Object
Helper method that deals with one client request
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
# File 'lib/orocos/remote_processes/server.rb', line 236 def handle_command(socket) # :nodoc: cmd_code = socket.read(1) raise EOFError if !cmd_code if cmd_code == COMMAND_GET_PID Server.debug "#{socket} requested PID" Marshal.dump([::Process.pid], socket) elsif cmd_code == COMMAND_GET_INFO Server.debug "#{socket} requested system information" Marshal.dump(build_system_info, socket) elsif cmd_code == COMMAND_MOVE_LOG Server.debug "#{socket} requested moving a log directory" begin log_dir, results_dir = Marshal.load(socket) log_dir = File.(log_dir) results_dir = File.(results_dir) move_log_dir(log_dir, results_dir) rescue Interrupt raise rescue Exception => e Server.warn "failed to move log directory from #{log_dir} to #{results_dir}: #{e.}" (e.backtrace || Array.new).each do |line| Server.warn " #{line}" end end elsif cmd_code == COMMAND_CREATE_LOG begin Server.debug "#{socket} requested creating a log directory" log_dir, time_tag, = Marshal.load(socket) ||= Hash.new # compatible with older clients if log_dir log_dir = File.(log_dir) end create_log_dir(log_dir, time_tag, ) rescue Interrupt raise rescue Exception => e Server.warn "failed to create log directory #{log_dir}: #{e.}" (e.backtrace || Array.new).each do |line| Server.warn " #{line}" end end elsif cmd_code == COMMAND_START name, deployment_name, name_mappings, = Marshal.load(socket) ||= Hash.new Server.debug "#{socket} requested startup of #{name} with #{} and mappings #{name_mappings}" begin p = start_process(name, deployment_name, name_mappings, ) Server.debug "#{name}, from #{deployment_name}, is started (#{p.pid})" socket.write(RET_STARTED_PROCESS) Marshal.dump(p.pid, socket) rescue Interrupt raise rescue Exception => e Server.warn "failed to start #{name}: #{e.}" (e.backtrace || Array.new).each do |line| Server.warn " #{line}" end socket.write(RET_NO) socket.write Marshal.dump(e.) end elsif cmd_code == COMMAND_END name = Marshal.load(socket) Server.debug "#{socket} requested end of #{name}" if p = processes[name] begin end_process(p) socket.write(RET_YES) rescue Interrupt raise rescue Exception => e Server.warn "exception raised while calling #{p}#kill(false)" Server.log_pp(:warn, e) socket.write(RET_NO) end else Server.warn "no process named #{name} to end" socket.write(RET_NO) end elsif cmd_code == COMMAND_QUIT quit end true rescue Interrupt raise rescue Exception => e Server.fatal "protocol error on #{socket}: #{e}" Server.fatal "while serving command #{cmd_code}" e.backtrace.each do |bt| Server.fatal " #{bt}" end false rescue EOFError false end |
#listen ⇒ Object
Main server loop. This will block and only return when CTRL+C is hit.
All started processes are stopped when the server quits
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/orocos/remote_processes/server.rb', line 148 def listen Server.info "process server listening on port #{port}" server_io, com_r = *@all_ios[0, 2] while true readable_sockets, _ = select(@all_ios, nil, nil) if readable_sockets.include?(server_io) readable_sockets.delete(server_io) client_socket = server_io.accept client_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true) client_socket.fcntl(Fcntl::FD_CLOEXEC, 1) Server.debug "new connection: #{client_socket}" @all_ios << client_socket end if readable_sockets.include?(com_r) readable_sockets.delete(com_r) cmd = com_r.read(1) if cmd == INTERNAL_SIGCHLD_TRIGGERED process_dead_processes elsif cmd Server.warn "unknown internal communication code #{cmd.inspect}" end end readable_sockets.each do |socket| if !handle_command(socket) Server.debug "#{socket} closed or errored" socket.close @all_ios.delete(socket) end end end rescue Exception => e if e.class == Interrupt # normal procedure Server.fatal "process server exited normally" return end Server.fatal "process server exited because of unhandled exception" Server.fatal "#{e.} #{e.class}" e.backtrace.each do |line| Server.fatal " #{line}" end ensure quit_and_join end |
#move_log_dir(log_dir, results_dir) ⇒ Object
349 350 351 352 353 354 355 356 |
# File 'lib/orocos/remote_processes/server.rb', line 349 def move_log_dir(log_dir, results_dir) date_tag = File.read(File.join(log_dir, 'time_tag')).strip Server.debug " #{log_dir} => #{results_dir}" if File.directory?(log_dir) dirname = Server.unique_dirname(results_dir + '/', '', date_tag) FileUtils.mv log_dir, dirname end end |
#open ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/orocos/remote_processes/server.rb', line 130 def open Server.info "starting on port #{required_port}" server = TCPServer.new(nil, required_port) server.fcntl(Fcntl::FD_CLOEXEC, 1) @port = server.addr[1] com_r, com_w = IO.pipe @all_ios.clear @all_ios << server << com_r trap 'SIGCHLD' do com_w.write INTERNAL_SIGCHLD_TRIGGERED end end |
#process_dead_processes ⇒ Object
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/orocos/remote_processes/server.rb', line 198 def process_dead_processes while exited = ::Process.wait2(-1, ::Process::WNOHANG) pid, exit_status = *exited process_name, process = processes.find { |_, p| p.pid == pid } next if !process_name process.dead!(exit_status) processes.delete(process_name) Server.debug "announcing death: #{process_name}" each_client do |socket| begin Server.debug " announcing to #{socket}" socket.write(EVENT_DEAD_PROCESS) Marshal.dump([process_name, exit_status], socket) rescue IOError Server.debug " #{socket}: IOError" end end end rescue Errno::ECHILD end |
#quit ⇒ Object
386 387 388 |
# File 'lib/orocos/remote_processes/server.rb', line 386 def quit raise Interrupt end |
#quit_and_join ⇒ Object
Helper method that stops all running processes
221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/orocos/remote_processes/server.rb', line 221 def quit_and_join # :nodoc: Server.warn "stopping process server" processes.each_value do |p| Server.warn "killing #{p.name}" p.kill end each_client do |socket| begin socket.close rescue IOError end end end |
#start_process(name, deployment_name, name_mappings, options) ⇒ Object
374 375 376 377 378 379 380 |
# File 'lib/orocos/remote_processes/server.rb', line 374 def start_process(name, deployment_name, name_mappings, ) p = Orocos::Process.new(name, deployment_name, loader: @loader, name_mappings: name_mappings) p.spawn(**self..merge()) processes[name] = p end |