Class: Orocos::RemoteProcesses::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/orocos/remote_processes/client.rb

Overview

Client-side API to a Server instance

Process servers allow to start/stop and monitor processes on remote machines. Instances of this class provides access to remote process servers.

Defined Under Namespace

Classes: Failed, StartupFailed, TimeoutError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host = 'localhost', port = DEFAULT_PORT, response_timeout: 10, root_loader: Orocos.default_loader, name_service: Orocos.name_service) ⇒ Client

Connects to the process server at host:port

Parameters:

  • options (Hash)

    a customizable set of options



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/orocos/remote_processes/client.rb', line 64

def initialize(host = 'localhost', port = DEFAULT_PORT, response_timeout: 10, root_loader: Orocos.default_loader, name_service: Orocos.name_service)
    @host = host
    @port = port
    @socket =
        begin TCPSocket.new(host, port)
        rescue Errno::ECONNREFUSED => e
            raise e.class, "cannot contact process server at '#{host}:#{port}': #{e.message}"
        end

    socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
    socket.fcntl(Fcntl::FD_CLOEXEC, 1)

    @name_service = name_service
    begin
        @server_pid = pid
    rescue EOFError
        raise StartupFailed, "process server failed at '#{host}:#{port}'"
    end

    @loader = Loader.new(self, root_loader)
    @root_loader = loader.root_loader
    @processes = Hash.new
    @death_queue = Array.new
    @host_id = "#{host}:#{port}:#{server_pid}"
    @response_timeout = response_timeout
end

Instance Attribute Details

#available_deploymentsObject (readonly)

Mapping from deployment names to the corresponding orogen project name. It lists the deployments that are available on the remote process server.



31
32
33
# File 'lib/orocos/remote_processes/client.rb', line 31

def available_deployments
  @available_deployments
end

#available_projectsObject (readonly)

Mapping from orogen project names to the corresponding content of the orogen files. These projects are the ones available to the remote process server



27
28
29
# File 'lib/orocos/remote_processes/client.rb', line 27

def available_projects
  @available_projects
end

#available_typekitsObject (readonly)

Mapping from deployment names to the corresponding XML type registry for the typekits available on the process server



34
35
36
# File 'lib/orocos/remote_processes/client.rb', line 34

def available_typekits
  @available_typekits
end

#hostObject (readonly)

The hostname we are connected to



40
41
42
# File 'lib/orocos/remote_processes/client.rb', line 40

def host
  @host
end

#host_idObject (readonly)

A string that allows to uniquely identify this process server



46
47
48
# File 'lib/orocos/remote_processes/client.rb', line 46

def host_id
  @host_id
end

#loaderLoader (readonly)

The loader object that allows to access models from the remote server

Returns:



19
20
21
# File 'lib/orocos/remote_processes/client.rb', line 19

def loader
  @loader
end

#name_serviceObject (readonly)

The name service object that allows to resolve tasks from this process server



49
50
51
# File 'lib/orocos/remote_processes/client.rb', line 49

def name_service
  @name_service
end

#portObject (readonly)

The port on which we are connected on hostname



42
43
44
# File 'lib/orocos/remote_processes/client.rb', line 42

def port
  @port
end

#processesObject (readonly)

Mapping from a deployment name to the corresponding RemoteProcess instance, for processes that have been started by this client.



37
38
39
# File 'lib/orocos/remote_processes/client.rb', line 37

def processes
  @processes
end

#root_loaderOroGen::Loaders::Base (readonly)

The root loader object

Returns:

  • (OroGen::Loaders::Base)


22
23
24
# File 'lib/orocos/remote_processes/client.rb', line 22

def root_loader
  @root_loader
end

#server_pidObject (readonly)

The PID of the server process



44
45
46
# File 'lib/orocos/remote_processes/client.rb', line 44

def server_pid
  @server_pid
end

#socketObject (readonly)

The socket instance used to communicate with the server



16
17
18
# File 'lib/orocos/remote_processes/client.rb', line 16

def socket
  @socket
end

Instance Method Details

#closeObject



277
278
279
# File 'lib/orocos/remote_processes/client.rb', line 277

def close
    socket.close
end

#create_log_dir(log_dir, time_tag, metadata = Hash.new) ⇒ Object

Creates a new log dir, and save the given time tag in it (used later on by save_log_dir)



197
198
199
200
# File 'lib/orocos/remote_processes/client.rb', line 197

def create_log_dir(log_dir, time_tag,  = Hash.new)
    socket.write(COMMAND_CREATE_LOG)
    Marshal.dump([log_dir, time_tag, ], socket)
end

#disconnectObject



111
112
113
# File 'lib/orocos/remote_processes/client.rb', line 111

def disconnect
    socket.close
end

#info(timeout: @response_timeout) ⇒ Object



103
104
105
106
107
108
109
# File 'lib/orocos/remote_processes/client.rb', line 103

def info(timeout: @response_timeout)
    socket.write(COMMAND_GET_INFO)
	    if !select([socket], [], [], timeout)
raise "timeout while reading process server at '#{host}:#{port}'"
	    end
    Marshal.load(socket)
end

#inspectObject



54
# File 'lib/orocos/remote_processes/client.rb', line 54

def inspect; to_s end

#join(deployment_name) ⇒ Object



263
264
265
266
267
268
269
270
271
# File 'lib/orocos/remote_processes/client.rb', line 263

def join(deployment_name)
    process = processes[deployment_name]
    return if !process

    while true
        result = wait_termination(nil)
        return if result[process]
    end
end

#pid(timeout: @response_timeout) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
# File 'lib/orocos/remote_processes/client.rb', line 91

def pid(timeout: @response_timeout)
    if @server_pid
        return @server_pid
    end

    socket.write(COMMAND_GET_PID)
	    if !select([socket], [], [], timeout)
raise "timeout while reading process server at '#{host}:#{port}'"
	    end
    @server_pid = Integer(Marshal.load(socket).first)
end

#queue_death_announcementObject



202
203
204
# File 'lib/orocos/remote_processes/client.rb', line 202

def queue_death_announcement
    @death_queue.push Marshal.load(socket)
end

#quit_serverObject



273
274
275
# File 'lib/orocos/remote_processes/client.rb', line 273

def quit_server
    socket.write(COMMAND_QUIT)
end

#save_log_dir(log_dir, results_dir) ⇒ Object

Requests that the process server moves the log directory at log_dir to results_dir



190
191
192
193
# File 'lib/orocos/remote_processes/client.rb', line 190

def save_log_dir(log_dir, results_dir)
    socket.write(COMMAND_MOVE_LOG)
    Marshal.dump([log_dir, results_dir], socket)
end

#start(process_name, deployment, name_mappings = Hash.new, options = Hash.new) ⇒ Object

Starts the given deployment on the remote server, without waiting for it to be ready.

Returns a RemoteProcess instance that represents the process on the remote side.

Raises Failed if the server reports a startup failure



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/orocos/remote_processes/client.rb', line 154

def start(process_name, deployment, name_mappings = Hash.new, options = Hash.new)
    if processes[process_name]
        raise ArgumentError, "this client already started a process called #{process_name}"
    end

    if deployment.respond_to?(:to_str)
        deployment_model = loader.root_loader.deployment_model_from_name(deployment)
        if !loader.has_deployment?(deployment)
            raise OroGen::DeploymentModelNotFound, "deployment #{deployment} exists locally but not on the remote process server #{self}"
        end
    else deployment_model = deployment
    end

    prefix_mappings = Orocos::ProcessBase.resolve_prefix(deployment_model, options.delete(:prefix))
    name_mappings = prefix_mappings.merge(name_mappings)

    socket.write(COMMAND_START)
    Marshal.dump([process_name, deployment_model.name, name_mappings, options], socket)
    wait_for_answer do |pid_s|
        if pid_s == RET_NO
            msg = Marshal.load(socket)
            raise Failed, "failed to start #{deployment_model.name}: #{msg}"
        elsif pid_s == RET_STARTED_PROCESS
            pid = Marshal.load(socket)
            process = Process.new(process_name, deployment_model, self, pid)
            process.name_mappings = name_mappings
            processes[process_name] = process
            return process
        else
            raise InternalError, "unexpected reply #{pid_s} to the start command"
        end
    end
end

#stop(deployment_name, wait) ⇒ Object

Requests to stop the given deployment

The call does not block until the process has quit. You will have to call #wait_termination to wait for the process end.



251
252
253
254
255
256
257
258
259
260
261
# File 'lib/orocos/remote_processes/client.rb', line 251

def stop(deployment_name, wait)
    socket.write(COMMAND_END)
    Marshal.dump(deployment_name, socket)
    if !wait_for_ack
        raise Failed, "failed to quit #{deployment_name}"
    end

    if wait
        join(deployment_name)
    end
end

#to_sObject



51
52
53
# File 'lib/orocos/remote_processes/client.rb', line 51

def to_s
    "#<Orocos::RemoteProcesses::Client #{host}:#{port}>"
end

#wait_for_ackObject



135
136
137
138
139
140
141
142
143
144
145
# File 'lib/orocos/remote_processes/client.rb', line 135

def wait_for_ack
    wait_for_answer do |reply|
        if reply == RET_YES
            return true
        elsif reply == RET_NO
            return false
        else
            raise InternalError, "unexpected reply #{reply}"
        end
    end
end

#wait_for_answer(timeout: @response_timeout) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/orocos/remote_processes/client.rb', line 118

def wait_for_answer(timeout: @response_timeout)
    while true
        if !select([socket], [], [], timeout)
            raise TimeoutError, "reached timeout of #{timeout}s in #wait_for_answer"
        end

        reply = socket.read(1)
        if !reply
            raise Orocos::ComError, "failed to read from process server #{self}"
        elsif reply == EVENT_DEAD_PROCESS
            queue_death_announcement
        else
            yield(reply)
        end
    end
end

#wait_termination(timeout = nil) ⇒ Object

Waits for processes to terminate. timeout is the number of milliseconds we should wait. If set to nil, the call will block until a process terminates

Returns a hash that maps deployment names to the Process::Status object that represents their exit status.



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/orocos/remote_processes/client.rb', line 212

def wait_termination(timeout = nil)
    if @death_queue.empty?
        reader = select([socket], nil, nil, timeout)
        return Hash.new if !reader
        while reader
            if socket.eof? # remote closed, probably a crash
                raise ComError, "communication to process server closed"
            end

            data = socket.read(1)
            if !data
                return Hash.new
            elsif data != EVENT_DEAD_PROCESS
                raise "unexpected message #{data} from process server"
            end
            queue_death_announcement
            reader = select([socket], nil, nil, 0)
        end
    end

    result = Hash.new
    @death_queue.each do |name, status|
        Orocos.debug "#{name} died"
        if p = processes.delete(name)
            p.dead!
            result[p] = status
        else
            Orocos.warn "process server reported the exit of '#{name}', but no process with that name is registered"
        end
    end
    @death_queue.clear

    result
end