Skip to content

Commit

Permalink
Updated TTY interface.
Browse files Browse the repository at this point in the history
  • Loading branch information
interkosmos committed Mar 5, 2024
1 parent c65ac4f commit d7ee6c4
Show file tree
Hide file tree
Showing 10 changed files with 625 additions and 300 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ jobs:
- name: Configure System (Linux)
if: contains(matrix.os, 'ubuntu')
run: |
ls -al /dev/
sudo sysctl fs.mqueue.msg_max=100
sudo sysctl fs.mqueue.msgsize_max=16384
Expand Down
18 changes: 9 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,11 @@ SRC = src/dm_version.f90 src/dm_kind.f90 src/dm_platform.f90 src/dm_ascii.f90 \
src/dm_time.f90 src/dm_timer.f90 src/dm_base64.f90 src/dm_path.f90 \
src/dm_file.f90 src/dm_hash.f90 src/dm_hash_table.f90 src/dm_hdf5.f90 \
src/dm_unit.f90 src/dm_id.f90 src/dm_uuid.f90 src/dm_arg.f90 \
src/dm_signal.f90 src/dm_system.f90 src/dm_pipe.f90 src/dm_tty.f90 \
src/dm_sem.f90 src/dm_mutex.f90 src/dm_dp.f90 src/dm_fifo.f90 \
src/dm_node.f90 src/dm_sensor.f90 src/dm_target.f90 src/dm_response.f90 \
src/dm_request.f90 src/dm_observ.f90 src/dm_log.f90 src/dm_job.f90 \
src/dm_plot.f90 src/dm_report.f90 src/dm_regex.f90 src/dm_sync.f90 \
src/dm_signal.f90 src/dm_system.f90 src/dm_pipe.f90 src/dm_sem.f90 \
src/dm_mutex.f90 src/dm_dp.f90 src/dm_fifo.f90 src/dm_node.f90 \
src/dm_sensor.f90 src/dm_target.f90 src/dm_response.f90 src/dm_request.f90 \
src/dm_observ.f90 src/dm_log.f90 src/dm_job.f90 src/dm_tty.f90 \
src/dm_plot.f90 src/dm_report.f90 src/dm_regex.f90 src/dm_sync.f90 \
src/dm_beat.f90 src/dm_mqueue.f90 src/dm_logger.f90 src/dm_test.f90 \
src/dm_nml.f90 src/dm_sql.f90 src/dm_db.f90 src/dm_z.f90 src/dm_person.f90 \
src/dm_mail.f90 src/dm_http.f90 src/dm_mime.f90 src/dm_api.f90 \
Expand All @@ -234,9 +234,9 @@ OBJ = dm_version.o dm_kind.o dm_platform.o dm_ascii.o dm_const.o dm_error.o \
dm_string.o dm_type.o dm_format.o dm_ansi.o dm_env.o dm_util.o dm_time.o \
dm_timer.o dm_base64.o dm_path.o dm_file.o dm_hash.o dm_hash_table.o \
dm_hdf5.o dm_unit.o dm_id.o dm_uuid.o dm_arg.o dm_signal.o dm_system.o \
dm_pipe.o dm_tty.o dm_sem.o dm_mutex.o dm_dp.o dm_fifo.o dm_node.o \
dm_sensor.o dm_target.o dm_response.o dm_request.o dm_observ.o dm_log.o \
dm_job.o dm_plot.o dm_report.o dm_regex.o dm_sync.o dm_beat.o dm_mqueue.o \
dm_pipe.o dm_sem.o dm_mutex.o dm_dp.o dm_fifo.o dm_node.o dm_sensor.o \
dm_target.o dm_response.o dm_request.o dm_observ.o dm_log.o dm_job.o \
dm_tty.o dm_plot.o dm_report.o dm_regex.o dm_sync.o dm_beat.o dm_mqueue.o \
dm_logger.o dm_test.o dm_nml.o dm_sql.o dm_db.o dm_z.o dm_person.o dm_mail.o \
dm_http.o dm_mime.o dm_api.o dm_rpc.o dm_mqtt.o dm_cgi.o dm_fcgi.o dm_block.o \
dm_csv.o dm_json.o dm_jsonl.o dm_html.o dm_atom.o dm_cgi_router.o dm_la.o \
Expand Down Expand Up @@ -381,7 +381,6 @@ $(OBJ): $(SRC)
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_signal.f90
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_system.f90
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_pipe.f90
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_tty.f90
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_sem.f90
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_mutex.f90
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_dp.f90
Expand All @@ -394,6 +393,7 @@ $(OBJ): $(SRC)
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_observ.f90
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_log.f90
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_job.f90
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_tty.f90
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_plot.f90
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_report.f90
$(FC) $(FFLAGS) $(LDFLAGS) -c src/dm_regex.f90
Expand Down
82 changes: 57 additions & 25 deletions app/dmfs.f90
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,17 @@ integer function read_observ(observ, node_id, sensor_id, source, debug) result(r
character(len=*), intent(in) :: source !! Source of observation.
logical, intent(in), optional :: debug !! Output debug messages.

character(len=REQUEST_RESPONSE_LEN) :: raw ! Raw response (unescaped).

integer :: delay
integer :: fu, stat
integer :: i, j
integer :: i, j, n
logical :: debug_
type(request_type), pointer :: request ! Next request to execute.
type(response_type), pointer :: response ! Single response in request.

rc = E_EMPTY

debug_ = .true.
if (present(debug)) debug_ = debug

Expand All @@ -238,16 +242,21 @@ integer function read_observ(observ, node_id, sensor_id, source, debug) result(r
observ%source = source
observ%timestamp = dm_time_now()

if (observ%nrequests == 0) then
n = observ%nrequests

if (n == 0) then
if (debug_) call dm_log_debug('no requests in observ ' // observ%name, observ=observ)
observ%error = rc
return
end if

! Read files in requests sequentially.
req_loop: do i = 1, observ%nrequests
req_loop: do i = 1, n
! Get pointer to next request.
request => observ%requests(i)

if (debug_) call dm_log_debug('starting ' // request_name_string(request%name, i, n), observ=observ)

! Initialise request.
request%timestamp = dm_time_now()
request%error = E_IO
Expand All @@ -264,32 +273,39 @@ integer function read_observ(observ, node_id, sensor_id, source, debug) result(r
if (stat == 0) request%error = E_NONE

if (dm_is_error(request%error)) then
call dm_log_error('failed to open ' // trim(request%request), &
call dm_log_error('failed to open file ' // trim(request%request), &
observ=observ, error=request%error)
cycle req_loop
end if

! Read until the request pattern matches.
! Read until the request pattern matches or end is reached.
read_loop: do
rc = E_EOF
read (fu, '(a)', iostat=stat) request%response
read (fu, '(a)', iostat=stat) raw
if (is_iostat_end(stat)) exit read_loop
if (stat /= 0) cycle read_loop

! Try to extract the response values.
request%response = dm_ascii_escape(raw)
rc = dm_regex_request(request)

if (dm_is_error(rc)) then
if (debug_) call dm_log_debug('line does not match pattern', observ=observ, error=rc)
if (debug_) then
call dm_log_debug('response of ' // request_name_string(request%name, i) // &
' does not match pattern', observ=observ, error=request%error)
end if

cycle read_loop
end if

! Check responses.
do j = 1, request%nresponses
response => request%responses(j)
if (dm_is_ok(response%error)) cycle

call dm_log_warning('failed to extract response ' // trim(response%name) // &
' of request ' // dm_itoa(i), observ=observ, error=response%error)
' of ' // request_name_string(request%name, i), &
observ=observ, error=response%error)
end do

! Cycle on error or exit on success.
Expand All @@ -300,9 +316,7 @@ integer function read_observ(observ, node_id, sensor_id, source, debug) result(r
! Close file.
close (fu)

! Save response and return code.
request%response = dm_ascii_escape(request%response)
request%error = rc
request%error = rc

! Create log message and repeat.
if (dm_is_error(rc)) then
Expand All @@ -311,24 +325,42 @@ integer function read_observ(observ, node_id, sensor_id, source, debug) result(r
cycle req_loop
end if

if (debug_) then
call dm_log_debug('finished request ' // dm_itoa(i) // ' of ' // &
dm_itoa(observ%nrequests), observ=observ)
end if
if (debug_) call dm_log_debug('finished ' // request_name_string(request%name, i, n), observ=observ)

! Wait the set delay time of the request.
delay = max(0, request%delay)
if (delay <= 0) cycle req_loop

if (debug_) then
call dm_log_debug('next request of observ ' // trim(observ%name) // &
if (debug_ .and. i < n) then
call dm_log_debug('next ' // request_name_string(observ%requests(i + 1)%name, i + 1, n, observ%name) // &
' in ' // dm_itoa(delay / 1000) // ' sec', observ=observ)
else if (debug_) then
call dm_log_debug('next observ in ' // dm_itoa(delay / 1000) // ' sec', observ=observ)
end if

call dm_usleep(delay * 1000)
call dm_usleep(delay * 1000) ! [msec] to [us].
end do req_loop
end function read_observ

pure function request_name_string(request_name, i, n, observ_name) result(str)
!! Returns string of request name and index for logging.
character(len=*), intent(in) :: request_name !! Request name.
integer, intent(in) :: i !! Request index.
integer, intent(in), optional :: n !! Number of requests in observation.
character(len=*), intent(in), optional :: observ_name !! Observation name.
character(len=:), allocatable :: str !! Result.

if (present(n)) then
str = 'request ' // trim(request_name) // ' (' // dm_itoa(i) // ')'
else
str = 'request ' // trim(request_name) // ' (' // dm_itoa(i) // ' of ' // dm_itoa(n) // ')'
end if

if (present(observ_name)) then
str = str // ' of observ ' // trim(observ_name)
end if
end function request_name_string

integer function write_observ(observ, unit, format) result(rc)
!! Writes observation to file unit, in CSV or JSON Lines format.
type(observ_type), intent(inout) :: observ !! Observation to write.
Expand Down Expand Up @@ -385,23 +417,23 @@ subroutine run(app)
observ => job%observ

if (debug) then
call dm_log_debug('starting observ ' // trim(observ%name) // &
' for sensor ' // app%sensor, observ=observ)
call dm_log_debug('starting observ ' // trim(observ%name) // ' for sensor ' // &
app%sensor, observ=observ)
end if

! Read observation from file system.
rc = read_observ(observ, app%node, app%sensor, app%name, debug=debug)

if (debug) then
call dm_log_debug('finished observ ' // trim(observ%name) // &
' for sensor ' // app%sensor, observ=observ)
end if

! Forward observation via message queue.
rc = dm_mqueue_forward(observ, app%name, APP_MQ_BLOCKING)

! Output observation.
rc = output_observ(observ, app%output_type)

if (debug) then
call dm_log_debug('finished observ ' // trim(observ%name) // ' for sensor ' // &
app%sensor, observ=observ)
end if
end if

! Wait delay time of the job if set (absolute).
Expand Down
Loading

0 comments on commit d7ee6c4

Please sign in to comment.