Skip to content

Commit

Permalink
feat: implement just enough of the orocos'rb Async interface for the …
Browse files Browse the repository at this point in the history
…task inspector

This is only the API surface. Data is currently still not transferred.
  • Loading branch information
doudou committed Dec 30, 2024
1 parent d706958 commit 3a0b21a
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 61 deletions.
4 changes: 3 additions & 1 deletion lib/syskit/telemetry/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
require "syskit/telemetry/async/name_service"
require "syskit/telemetry/async/task_context"
require "syskit/telemetry/async/interface_object"
require "syskit/telemetry/async/readable_interface_object"
require "syskit/telemetry/async/attribute"
require "syskit/telemetry/async/property"
require "syskit/telemetry/async/port"
require "syskit/telemetry/async/input_port"
require "syskit/telemetry/async/output_port"

module Syskit
module Telemetry
Expand Down
9 changes: 8 additions & 1 deletion lib/syskit/telemetry/async/attribute.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ module Syskit
module Telemetry
module Async
# Callback-based API for remote task ports
class Attribute < InterfaceObject
class Attribute < ReadableInterfaceObject
def on_raw_change(&block)
on_raw_data(&block)
end

def on_change(&block)
on_data(&block)
end
end
end
end
Expand Down
18 changes: 18 additions & 0 deletions lib/syskit/telemetry/async/input_port.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

module Syskit
module Telemetry
module Async
# Async interface compatible with the orocos.rb's API
class InputPort < ReadableInterfaceObject
def output?
false
end

def input?
true
end
end
end
end
end
26 changes: 23 additions & 3 deletions lib/syskit/telemetry/async/interface_object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ class InterfaceObjectHooks

define_hooks :on_reachable
define_hooks :on_unreachable
define_hooks :on_data
define_hooks :on_raw_data
define_hooks :on_error
end

# Callback-based API to the orocos.rb property API
Expand Down Expand Up @@ -48,10 +47,31 @@ def unreachable!
end

def on_reachable(&block)
super(&block)
super

block.call if @raw_object
end

def once_on_reachable(&block)
# on_reachable might call the block right away, in which case
# `listener` will be nil. Use the called flag to allow disposing
# of the listener the second time without causing a double call
# to the block
called = false
listener = on_reachable do
block.call unless called
called = true
listener&.dispose
end
end

def new_sample
@type.zero
end

def type_name
@type.name
end
end
end
end
Expand Down
18 changes: 18 additions & 0 deletions lib/syskit/telemetry/async/output_port.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

module Syskit
module Telemetry
module Async
# Async interface compatible with the orocos.rb's API
class OutputPort < ReadableInterfaceObject
def output?
true
end

def input?
false
end
end
end
end
end
11 changes: 0 additions & 11 deletions lib/syskit/telemetry/async/port.rb

This file was deleted.

9 changes: 8 additions & 1 deletion lib/syskit/telemetry/async/property.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ module Syskit
module Telemetry
module Async
# Callback-based API to the orocos.rb property API
class Property < InterfaceObject
class Property < ReadableInterfaceObject
def on_raw_change(&block)
on_raw_data(&block)
end

def on_change(&block)
on_data(&block)
end
end
end
end
Expand Down
54 changes: 54 additions & 0 deletions lib/syskit/telemetry/async/readable_interface_object.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# frozen_string_literal: true

module Syskit
module Telemetry
module Async
# Definition of hooks related to reading data
class ReadableInterfaceObjectHooks < InterfaceObject
define_hooks :on_data
define_hooks :on_raw_data
end

# Base class for interface objects that allow to read data
class ReadableInterfaceObject < ReadableInterfaceObjectHooks
# Callback management object with the same API than orocos.rb's
class Listener
def initialize(object, event, block)
@object = object
@event = event
@block = block
end

def start
return if @disposable

@disposable = @object.send(@event, &@block)
end

def stop
@disposable&.dispose
@disposable = nil
end

def dispose
stop
end
end

alias __on_raw_data on_raw_data
def on_raw_data(&block)
listener = Listener.new(self, :__on_raw_data, block)
listener.start
listener
end

alias __on_data on_data
def on_data(&block)
listener = Listener.new(self, :__on_data, block)
listener.start
listener
end
end
end
end
end
114 changes: 80 additions & 34 deletions lib/syskit/telemetry/async/task_context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
module Syskit
module Telemetry
module Async
# Definition of hooks for the {TaskContext} class
#
# This is made separately to allow overloading them in the main class in
# a natural way
class TaskContextHooks
include Roby::Hooks
include Roby::Hooks::InstanceHooks
Expand Down Expand Up @@ -30,6 +34,11 @@ class TaskContext < TaskContextHooks
# @return [String]
attr_reader :identity

# The task model
#
# @return [OroGen::Spec::TaskContext]
attr_reader :model

# Discover information about a Orocos::TaskContext and create the
# corresponding {TaskContext}
#
Expand All @@ -41,27 +50,46 @@ def self.discover(task)
state_reader = task.state_reader(
pull: true, type: :circular_buffer, size: 10
)
puts "#{Time.now} #{task.name}: created state reader"
raw_attributes = task.attribute_names.map { task.attribute(_1) }
puts "#{Time.now} #{task.name}: created attributes"
raw_properties = task.property_names.map { task.property(_1) }
puts "#{Time.now} #{task.name}: created properties"
raw_ports = task.port_names.map { task.port(_1) }
puts "#{Time.now} #{task.name}: created ports"
discover_attributes(async_task, task)
discover_properties(async_task, task)
discover_ports(async_task, task)

# We can do this here ONLY BECAUSE we're populating an initial
# state. Further updates need to call the `discover_` methods in
# the main thread
async_task.reachable!(task, state_reader: state_reader)
async_task
end

# @api private
#
# Discover a remote task's attributes
def self.discover_attributes(async_task, task)
raw_attributes = task.attribute_names.map { task.attribute(_1) }
async_task.discover_attributes(raw_attributes)
end

# @api private
#
# Discover a remote task's properties
def self.discover_properties(async_task, task)
raw_properties = task.property_names.map { task.property(_1) }
async_task.discover_properties(raw_properties)
end

# @api private
#
# Discover a remote task's ports
def self.discover_ports(async_task, task)
raw_ports = task.port_names.map { task.port(_1) }
async_task.discover_ports(raw_ports)
puts "#{Time.now} #{task.name}: discovered"
async_task
end

def initialize(name)
def initialize(name, model: self.class.dummy_orogen_model(name))
super()

@name = name
@model = model

@attributes = {}
@properties = {}
Expand All @@ -70,6 +98,13 @@ def initialize(name)
@current_state = nil
end

@dummy_orogen_models = Concurrent::Hash.new

def self.dummy_orogen_model(name)
@dummy_orogen_models[name] ||=
Orocos.create_orogen_task_context_model(name)
end

def to_proxy
self
end
Expand All @@ -81,22 +116,24 @@ def to_proxy
def unreachable!
run_hook :on_unreachable

@attributes.each_value do
run_hook :on_attribute_unreachable, _1.name
_1.unreachable!
end
run_interface_unreachable_hooks(
@attributes.each_value, :on_attribute_unreachable
)
run_interface_unreachable_hooks(
@properties.each_value, :on_property_unreachable
)
run_interface_unreachable_hooks(
@ports.each_value, :on_port_unreachable
)

@properties.each_value do
run_hook :on_property_unreachable, _1.name
_1.unreachable!
end
dispose
end

@ports.each_value do
run_hook :on_port_unreachable, _1.name
def run_interface_unreachable_hooks(objects, event)
objects.each do
run_hook event, _1.name
_1.unreachable!
end

dispose
end

def reachable?
Expand Down Expand Up @@ -147,8 +184,12 @@ def each_port(&block)
@ports.each_value(&block)
end

def each_property(&block)
@properties.each_value(&block)
def each_input_port(&block)
@ports.each_value.find_all(&:input?).each(&block)
end

def each_output_port(&block)
@ports.each_value.find_all { !_1.input? }.each(&block)
end

def on_attribute_reachable(&block)
Expand Down Expand Up @@ -206,7 +247,15 @@ def discover_properties(raw_properties)
def discover_ports(raw_ports)
@ports =
raw_ports.each_with_object({}) do |p, h|
async = Port.new(p.name, p.type)
klass =
case p
when Orocos::InputPort
InputPort
else
OutputPort
end

async = klass.new(p.name, p.type)
async.reachable!(p)
h[p.name] = async
end
Expand All @@ -215,7 +264,6 @@ def discover_ports(raw_ports)
end

def dispose
@raw_task_context.dispose
@raw_task_context = nil

Concurrent::Promises.future(@state_reader, &:disconnect)
Expand All @@ -234,19 +282,17 @@ def state_read_poll_thread(reader, queue, stop, period: 0.1)
end

def poll(period: 0.1)
begin
while (new_state = read_new_state)
@current_state = new_state
run_hook :on_state_change, new_state
end
rescue ThreadError
sleep(period)
while (new_state = read_new_state)
@current_state = new_state
run_hook :on_state_change, new_state
end
rescue ThreadError
sleep(period)
end

def read_new_state
@state_read_queue.pop(true)
rescue ThreadError
rescue ThreadError # rubocop:disable Lint/SuppressedException
end
end
end
Expand Down
Loading

0 comments on commit 3a0b21a

Please sign in to comment.