Class: Orocos::RemoteProcesses::Client
- Inherits:
-
Object
- Object
- Orocos::RemoteProcesses::Client
- Defined in:
- lib/orocos/remote_processes/client.rb
Overview
Client-side API to a Server instance
Process servers allow to start/stop and monitor processes on remote machines. Instances of this class provides access to remote process servers.
Defined Under Namespace
Classes: Failed, StartupFailed, TimeoutError
Instance Attribute Summary collapse
-
#available_deployments ⇒ Object
readonly
Mapping from deployment names to the corresponding orogen project name.
-
#available_projects ⇒ Object
readonly
Mapping from orogen project names to the corresponding content of the orogen files.
-
#available_typekits ⇒ Object
readonly
Mapping from deployment names to the corresponding XML type registry for the typekits available on the process server.
-
#host ⇒ Object
readonly
The hostname we are connected to.
-
#host_id ⇒ Object
readonly
A string that allows to uniquely identify this process server.
-
#loader ⇒ Loader
readonly
The loader object that allows to access models from the remote server.
-
#name_service ⇒ Object
readonly
The name service object that allows to resolve tasks from this process server.
-
#port ⇒ Object
readonly
The port on which we are connected on
hostname
. -
#processes ⇒ Object
readonly
Mapping from a deployment name to the corresponding RemoteProcess instance, for processes that have been started by this client.
-
#root_loader ⇒ OroGen::Loaders::Base
readonly
The root loader object.
-
#server_pid ⇒ Object
readonly
The PID of the server process.
-
#socket ⇒ Object
readonly
The socket instance used to communicate with the server.
Instance Method Summary collapse
- #close ⇒ Object
-
#create_log_dir(log_dir, time_tag, metadata = Hash.new) ⇒ Object
Creates a new log dir, and save the given time tag in it (used later on by save_log_dir).
- #disconnect ⇒ Object
- #info(timeout: @response_timeout) ⇒ Object
-
#initialize(host = 'localhost', port = DEFAULT_PORT, response_timeout: 10, root_loader: Orocos.default_loader, name_service: Orocos.name_service) ⇒ Client
constructor
Connects to the process server at
host
:port
. - #inspect ⇒ Object
- #join(deployment_name) ⇒ Object
- #pid(timeout: @response_timeout) ⇒ Object
- #queue_death_announcement ⇒ Object
- #quit_server ⇒ Object
-
#save_log_dir(log_dir, results_dir) ⇒ Object
Requests that the process server moves the log directory at
log_dir
toresults_dir
. -
#start(process_name, deployment, name_mappings = Hash.new, options = Hash.new) ⇒ Object
Starts the given deployment on the remote server, without waiting for it to be ready.
-
#stop(deployment_name, wait) ⇒ Object
Requests to stop the given deployment.
- #to_s ⇒ Object
- #wait_for_ack ⇒ Object
- #wait_for_answer(timeout: @response_timeout) ⇒ Object
-
#wait_termination(timeout = nil) ⇒ Object
Waits for processes to terminate.
Constructor Details
#initialize(host = 'localhost', port = DEFAULT_PORT, response_timeout: 10, root_loader: Orocos.default_loader, name_service: Orocos.name_service) ⇒ Client
Connects to the process server at host
:port
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/orocos/remote_processes/client.rb', line 64 def initialize(host = 'localhost', port = DEFAULT_PORT, response_timeout: 10, root_loader: Orocos.default_loader, name_service: Orocos.name_service) @host = host @port = port @socket = begin TCPSocket.new(host, port) rescue Errno::ECONNREFUSED => e raise e.class, "cannot contact process server at '#{host}:#{port}': #{e.}" end socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true) socket.fcntl(Fcntl::FD_CLOEXEC, 1) @name_service = name_service begin @server_pid = pid rescue EOFError raise StartupFailed, "process server failed at '#{host}:#{port}'" end @loader = Loader.new(self, root_loader) @root_loader = loader.root_loader @processes = Hash.new @death_queue = Array.new @host_id = "#{host}:#{port}:#{server_pid}" @response_timeout = response_timeout end |
Instance Attribute Details
#available_deployments ⇒ Object (readonly)
Mapping from deployment names to the corresponding orogen project name. It lists the deployments that are available on the remote process server.
31 32 33 |
# File 'lib/orocos/remote_processes/client.rb', line 31 def available_deployments @available_deployments end |
#available_projects ⇒ Object (readonly)
Mapping from orogen project names to the corresponding content of the orogen files. These projects are the ones available to the remote process server
27 28 29 |
# File 'lib/orocos/remote_processes/client.rb', line 27 def available_projects @available_projects end |
#available_typekits ⇒ Object (readonly)
Mapping from deployment names to the corresponding XML type registry for the typekits available on the process server
34 35 36 |
# File 'lib/orocos/remote_processes/client.rb', line 34 def available_typekits @available_typekits end |
#host ⇒ Object (readonly)
The hostname we are connected to
40 41 42 |
# File 'lib/orocos/remote_processes/client.rb', line 40 def host @host end |
#host_id ⇒ Object (readonly)
A string that allows to uniquely identify this process server
46 47 48 |
# File 'lib/orocos/remote_processes/client.rb', line 46 def host_id @host_id end |
#loader ⇒ Loader (readonly)
The loader object that allows to access models from the remote server
19 20 21 |
# File 'lib/orocos/remote_processes/client.rb', line 19 def loader @loader end |
#name_service ⇒ Object (readonly)
The name service object that allows to resolve tasks from this process server
49 50 51 |
# File 'lib/orocos/remote_processes/client.rb', line 49 def name_service @name_service end |
#port ⇒ Object (readonly)
The port on which we are connected on hostname
42 43 44 |
# File 'lib/orocos/remote_processes/client.rb', line 42 def port @port end |
#processes ⇒ Object (readonly)
Mapping from a deployment name to the corresponding RemoteProcess instance, for processes that have been started by this client.
37 38 39 |
# File 'lib/orocos/remote_processes/client.rb', line 37 def processes @processes end |
#root_loader ⇒ OroGen::Loaders::Base (readonly)
The root loader object
22 23 24 |
# File 'lib/orocos/remote_processes/client.rb', line 22 def root_loader @root_loader end |
#server_pid ⇒ Object (readonly)
The PID of the server process
44 45 46 |
# File 'lib/orocos/remote_processes/client.rb', line 44 def server_pid @server_pid end |
#socket ⇒ Object (readonly)
The socket instance used to communicate with the server
16 17 18 |
# File 'lib/orocos/remote_processes/client.rb', line 16 def socket @socket end |
Instance Method Details
#close ⇒ Object
277 278 279 |
# File 'lib/orocos/remote_processes/client.rb', line 277 def close socket.close end |
#create_log_dir(log_dir, time_tag, metadata = Hash.new) ⇒ Object
Creates a new log dir, and save the given time tag in it (used later on by save_log_dir)
197 198 199 200 |
# File 'lib/orocos/remote_processes/client.rb', line 197 def create_log_dir(log_dir, time_tag, = Hash.new) socket.write(COMMAND_CREATE_LOG) Marshal.dump([log_dir, time_tag, ], socket) end |
#disconnect ⇒ Object
111 112 113 |
# File 'lib/orocos/remote_processes/client.rb', line 111 def disconnect socket.close end |
#info(timeout: @response_timeout) ⇒ Object
103 104 105 106 107 108 109 |
# File 'lib/orocos/remote_processes/client.rb', line 103 def info(timeout: @response_timeout) socket.write(COMMAND_GET_INFO) if !select([socket], [], [], timeout) raise "timeout while reading process server at '#{host}:#{port}'" end Marshal.load(socket) end |
#inspect ⇒ Object
54 |
# File 'lib/orocos/remote_processes/client.rb', line 54 def inspect; to_s end |
#join(deployment_name) ⇒ Object
263 264 265 266 267 268 269 270 271 |
# File 'lib/orocos/remote_processes/client.rb', line 263 def join(deployment_name) process = processes[deployment_name] return if !process while true result = wait_termination(nil) return if result[process] end end |
#pid(timeout: @response_timeout) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/orocos/remote_processes/client.rb', line 91 def pid(timeout: @response_timeout) if @server_pid return @server_pid end socket.write(COMMAND_GET_PID) if !select([socket], [], [], timeout) raise "timeout while reading process server at '#{host}:#{port}'" end @server_pid = Integer(Marshal.load(socket).first) end |
#queue_death_announcement ⇒ Object
202 203 204 |
# File 'lib/orocos/remote_processes/client.rb', line 202 def queue_death_announcement @death_queue.push Marshal.load(socket) end |
#quit_server ⇒ Object
273 274 275 |
# File 'lib/orocos/remote_processes/client.rb', line 273 def quit_server socket.write(COMMAND_QUIT) end |
#save_log_dir(log_dir, results_dir) ⇒ Object
Requests that the process server moves the log directory at
log_dir
to results_dir
190 191 192 193 |
# File 'lib/orocos/remote_processes/client.rb', line 190 def save_log_dir(log_dir, results_dir) socket.write(COMMAND_MOVE_LOG) Marshal.dump([log_dir, results_dir], socket) end |
#start(process_name, deployment, name_mappings = Hash.new, options = Hash.new) ⇒ Object
Starts the given deployment on the remote server, without waiting for it to be ready.
Returns a RemoteProcess instance that represents the process on the remote side.
Raises Failed if the server reports a startup failure
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/orocos/remote_processes/client.rb', line 154 def start(process_name, deployment, name_mappings = Hash.new, = Hash.new) if processes[process_name] raise ArgumentError, "this client already started a process called #{process_name}" end if deployment.respond_to?(:to_str) deployment_model = loader.root_loader.deployment_model_from_name(deployment) if !loader.has_deployment?(deployment) raise OroGen::DeploymentModelNotFound, "deployment #{deployment} exists locally but not on the remote process server #{self}" end else deployment_model = deployment end prefix_mappings = Orocos::ProcessBase.resolve_prefix(deployment_model, .delete(:prefix)) name_mappings = prefix_mappings.merge(name_mappings) socket.write(COMMAND_START) Marshal.dump([process_name, deployment_model.name, name_mappings, ], socket) wait_for_answer do |pid_s| if pid_s == RET_NO msg = Marshal.load(socket) raise Failed, "failed to start #{deployment_model.name}: #{msg}" elsif pid_s == RET_STARTED_PROCESS pid = Marshal.load(socket) process = Process.new(process_name, deployment_model, self, pid) process.name_mappings = name_mappings processes[process_name] = process return process else raise InternalError, "unexpected reply #{pid_s} to the start command" end end end |
#stop(deployment_name, wait) ⇒ Object
Requests to stop the given deployment
The call does not block until the process has quit. You will have to call #wait_termination to wait for the process end.
251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/orocos/remote_processes/client.rb', line 251 def stop(deployment_name, wait) socket.write(COMMAND_END) Marshal.dump(deployment_name, socket) if !wait_for_ack raise Failed, "failed to quit #{deployment_name}" end if wait join(deployment_name) end end |
#to_s ⇒ Object
51 52 53 |
# File 'lib/orocos/remote_processes/client.rb', line 51 def to_s "#<Orocos::RemoteProcesses::Client #{host}:#{port}>" end |
#wait_for_ack ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/orocos/remote_processes/client.rb', line 135 def wait_for_ack wait_for_answer do |reply| if reply == RET_YES return true elsif reply == RET_NO return false else raise InternalError, "unexpected reply #{reply}" end end end |
#wait_for_answer(timeout: @response_timeout) ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/orocos/remote_processes/client.rb', line 118 def wait_for_answer(timeout: @response_timeout) while true if !select([socket], [], [], timeout) raise TimeoutError, "reached timeout of #{timeout}s in #wait_for_answer" end reply = socket.read(1) if !reply raise Orocos::ComError, "failed to read from process server #{self}" elsif reply == EVENT_DEAD_PROCESS queue_death_announcement else yield(reply) end end end |
#wait_termination(timeout = nil) ⇒ Object
Waits for processes to terminate. timeout
is the number of
milliseconds we should wait. If set to nil, the call will block until a
process terminates
Returns a hash that maps deployment names to the Process::Status object that represents their exit status.
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/orocos/remote_processes/client.rb', line 212 def wait_termination(timeout = nil) if @death_queue.empty? reader = select([socket], nil, nil, timeout) return Hash.new if !reader while reader if socket.eof? # remote closed, probably a crash raise ComError, "communication to process server closed" end data = socket.read(1) if !data return Hash.new elsif data != EVENT_DEAD_PROCESS raise "unexpected message #{data} from process server" end queue_death_announcement reader = select([socket], nil, nil, 0) end end result = Hash.new @death_queue.each do |name, status| Orocos.debug "#{name} died" if p = processes.delete(name) p.dead! result[p] = status else Orocos.warn "process server reported the exit of '#{name}', but no process with that name is registered" end end @death_queue.clear result end |