Skip to content

Commit

Permalink
Minor updates and fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
interkosmos committed Feb 25, 2024
1 parent 5b36fdc commit e5fe98d
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 74 deletions.
14 changes: 7 additions & 7 deletions app/dmfeed.f90
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,15 @@ subroutine create_feed(app, error)

is_file = (len_trim(app%output) > 0 .and. app%output /= '-')

! Connect to database.
rc = dm_db_open(db, app%database, timeout=DB_TIMEOUT_DEFAULT)
feed_block: block
! Connect to database.
rc = dm_db_open(db, app%database, timeout=DB_TIMEOUT_DEFAULT)

if (dm_is_error(rc)) then
call dm_error_out(rc, 'failed to open database')
return
end if
if (dm_is_error(rc)) then
call dm_error_out(rc, 'failed to open database')
exit feed_block
end if

feed_block: block
! Get logs from database.
if (len_trim(app%node) > 0) then
rc = dm_db_select(db = db, &
Expand Down
47 changes: 27 additions & 20 deletions app/dmfs.f90
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,12 @@ integer function read_observ(observ, node_id, sensor_id, debug) result(rc)
character(len=*), intent(in) :: sensor_id !! Sensor id of observation.
logical, intent(in), optional :: debug !! Output debug messages.

character(len=LOG_MESSAGE_LEN) :: message
integer :: delay, i, j, fu, stat
logical :: debug_
type(request_type), pointer :: request ! Next request to execute.
type(response_type), pointer :: response ! Single response in request.
integer :: delay
integer :: fu, stat
integer :: i, j
logical :: debug_
type(request_type), pointer :: request ! Next request to execute.
type(response_type), pointer :: response ! Single response in request.

debug_ = .true.
if (present(debug)) debug_ = debug
Expand Down Expand Up @@ -269,9 +270,7 @@ integer function read_observ(observ, node_id, sensor_id, debug) result(rc)

! Read until the request pattern matches.
read_loop: do
rc = E_READ
request%response = ' '

rc = E_EOF
read (fu, '(a)', iostat=stat) request%response
if (is_iostat_end(stat)) exit read_loop
if (stat /= 0) cycle read_loop
Expand All @@ -280,16 +279,16 @@ integer function read_observ(observ, node_id, sensor_id, debug) result(rc)
rc = dm_regex_request(request)

if (dm_is_error(rc)) then
call dm_log(LOG_WARNING, 'failed to match response', observ=observ, error=rc)
if (debug_) call dm_log(LOG_DEBUG, 'line does not match pattern', observ=observ, error=rc)
cycle read_loop
end if

! Check responses.
do j = 1, request%nresponses
response => request%responses(j)
if (dm_is_ok(response%error)) cycle
call dm_log(LOG_WARNING, 'failed to read response ' // response%name, &
observ=observ, error=response%error)
call dm_log(LOG_WARNING, 'failed to extract response ' // trim(response%name) // &
' of request ' // dm_itoa(i), observ=observ, error=response%error)
end do

! Cycle on error or exit on success.
Expand All @@ -302,7 +301,7 @@ integer function read_observ(observ, node_id, sensor_id, debug) result(rc)

! Save response and return code.
request%response = dm_ascii_escape(request%response)
request%error = rc
request%error = rc

! Create log message and repeat.
if (dm_is_error(rc)) then
Expand All @@ -311,7 +310,7 @@ integer function read_observ(observ, node_id, sensor_id, debug) result(rc)
cycle req_loop
end if

if (debug) then ! Log only if needed.
if (debug_) then
call dm_log(LOG_DEBUG, 'finished request ' // dm_itoa(i) // ' of ' // &
dm_itoa(observ%nrequests), observ=observ)
end if
Expand All @@ -320,9 +319,9 @@ integer function read_observ(observ, node_id, sensor_id, debug) result(rc)
delay = max(0, request%delay)
if (delay <= 0) cycle req_loop

if (debug) then ! Log only if needed.
write (message, '("next request of observ ", a, " in ", i0, " sec")') trim(observ%name), delay / 1000
call dm_log(LOG_DEBUG, message)
if (debug_) then
call dm_log(LOG_DEBUG, 'next request of observ ' // trim(observ%name) // &
' in ' // dm_itoa(delay / 1000) // ' sec', observ=observ)
end if

call dm_usleep(delay * 1000)
Expand Down Expand Up @@ -371,7 +370,7 @@ subroutine run(app)
exit job_loop
end if

call dm_log(LOG_DEBUG, dm_itoa(njobs) // ' job(s) left in job queue')
if (debug) call dm_log(LOG_DEBUG, dm_itoa(njobs) // ' job(s) left in job queue')

! Get next job as deep copy.
rc = dm_job_list_next(app%jobs, job)
Expand All @@ -384,12 +383,20 @@ subroutine run(app)
if (job%valid) then
observ => job%observ

if (debug) then
call dm_log(LOG_DEBUG, 'starting observ ' // trim(observ%name) // &
' for sensor ' // app%sensor, observ=observ)
end if

! Read observation from file system.
call dm_log(LOG_DEBUG, 'starting observ ' // trim(observ%name) // ' for sensor ' // app%sensor, observ=observ)
rc = read_observ(observ, app%node, app%sensor, debug=debug)

if (debug) then
call dm_log(LOG_DEBUG, 'finished observ ' // trim(observ%name) // &
' for sensor ' // app%sensor, observ=observ)
end if

! Forward observation via message queue.
call dm_log(LOG_DEBUG, 'finished observ ' // trim(observ%name) // ' for sensor ' // app%sensor, observ=observ)
rc = dm_mqueue_forward(observ, app%name, APP_MQ_BLOCKING)

! Output observation.
Expand All @@ -399,7 +406,7 @@ subroutine run(app)
! Wait delay time of the job if set (absolute).
delay = max(0, job%delay)
if (delay <= 0) cycle job_loop
call dm_log(LOG_DEBUG, 'next job in ' // dm_itoa(delay / 1000) // ' sec')
if (debug) call dm_log(LOG_DEBUG, 'next job in ' // dm_itoa(delay / 1000) // ' sec', observ=observ)
call dm_usleep(delay * 1000)
end do job_loop
end subroutine run
Expand Down
24 changes: 15 additions & 9 deletions app/dmpipe.f90
Original file line number Diff line number Diff line change
Expand Up @@ -240,25 +240,28 @@ subroutine run(app)
integer :: delay, njobs
integer :: i, j, rc
integer(kind=i8) :: sz
logical :: debug

type(job_type), target :: job ! Next job to run.
type(pipe_type) :: pipe ! Pipe to process.
type(observ_type), pointer :: observ ! Next observation to perform.
type(request_type), pointer :: request ! Next request to execute.
type(response_type), pointer :: response ! Response in request.

debug = (app%debug .or. app%verbose)

call dm_log(LOG_INFO, 'started ' // app%name)

! Run until no jobs are left.
job_loop: do
njobs = dm_job_list_count(app%jobs)

if (njobs == 0) then
call dm_log(LOG_DEBUG, 'no jobs left')
if (debug) call dm_log(LOG_DEBUG, 'no jobs left')
exit job_loop
end if

call dm_log(LOG_DEBUG, dm_itoa(njobs) // ' job(s) left in job queue')
if (debug) call dm_log(LOG_DEBUG, dm_itoa(njobs) // ' job(s) left in job queue')

! Get next job as deep copy.
rc = dm_job_list_next(app%jobs, job)
Expand All @@ -272,7 +275,7 @@ subroutine run(app)
! Get pointer to job observation.
observ => job%observ

call dm_log(LOG_DEBUG, 'starting observ ' // observ%name, observ=observ)
if (debug) call dm_log(LOG_DEBUG, 'starting observ ' // observ%name, observ=observ)

! Initialise observation.
observ%id = dm_uuid4()
Expand All @@ -281,7 +284,7 @@ subroutine run(app)
observ%timestamp = dm_time_now()

if (observ%nrequests == 0) then
call dm_log(LOG_DEBUG, 'no requests in observ ' // observ%name, observ=observ)
if (debug) call dm_log(LOG_DEBUG, 'no requests in observ ' // observ%name, observ=observ)
exit observ_if
end if

Expand Down Expand Up @@ -314,7 +317,8 @@ subroutine run(app)
rc = dm_regex_request(request)

if (dm_is_error(rc)) then
call dm_log(LOG_WARNING, 'failed to match response', observ=observ, error=rc)
call dm_log(LOG_WARNING, 'response to request ' // dm_itoa(i) // ' does not match pattern', &
observ=observ, error=rc)
cycle read_loop
end if

Expand Down Expand Up @@ -346,13 +350,15 @@ subroutine run(app)
! Wait the set delay time of the request.
delay = max(0, request%delay)
if (delay <= 0) cycle req_loop
call dm_log(LOG_DEBUG, 'next request of observ ' // trim(observ%name) // &
' in ' // dm_itoa(delay / 1000) // ' sec')
if (debug) then
call dm_log(LOG_DEBUG, 'next request of observ ' // trim(observ%name) // &
' in ' // dm_itoa(delay / 1000) // ' sec', observ=observ)
end if
call dm_usleep(delay * 1000)
end do req_loop

! Forward observation.
call dm_log(LOG_DEBUG, 'finished observ ' // observ%name, observ=observ)
if (debug) call dm_log(LOG_DEBUG, 'finished observ ' // observ%name, observ=observ)
rc = dm_mqueue_forward(observ, app%name, APP_MQ_BLOCKING)

! Output observation.
Expand All @@ -362,7 +368,7 @@ subroutine run(app)
! Wait the set delay time of the job (absolute).
delay = max(0, job%delay)
if (delay <= 0) cycle job_loop
call dm_log(LOG_DEBUG, 'next job in ' // dm_itoa(delay / 1000) // ' sec')
if (debug) call dm_log(LOG_DEBUG, 'next job in ' // dm_itoa(delay / 1000) // ' sec', observ=observ)
call dm_usleep(delay * 1000)
end do job_loop
end subroutine run
Expand Down
57 changes: 33 additions & 24 deletions app/dmserial.f90
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,14 @@ integer function read_observ(tty, observ, debug) result(rc)
logical, intent(in), optional :: debug !! Output debug messages.

character(len=REQUEST_REQUEST_LEN) :: raw_request ! Raw request (unescaped).
character(len=REQUEST_RESPONSE_LEN) :: raw_response ! Raw response (unescaped).
character(len=REQUEST_DELIMITER_LEN) :: raw_delimiter ! Raw delimiter (unescaped).
integer :: delay, i, j
logical :: debug_ ! Create debug messages only if necessary.
type(request_type), pointer :: request ! Next request to execute.
type(response_type), pointer :: response ! Single response in request.

integer :: delay
integer :: i, j
logical :: debug_ ! Create debug messages only if necessary.
type(request_type), pointer :: request ! Next request to execute.
type(response_type), pointer :: response ! Single response in request.

debug_ = .true.
if (present(debug)) debug_ = debug
Expand All @@ -349,21 +352,20 @@ integer function read_observ(tty, observ, debug) result(rc)
if (debug_) then
call dm_log(LOG_DEBUG, 'starting request ' // dm_itoa(i) // ' of ' // &
dm_itoa(observ%nrequests), observ=observ)
call dm_log(LOG_DEBUG, 'sending request: ' // request%request, observ=observ)
end if

! Set raw values.
raw_request = dm_ascii_unescape(request%request)
raw_delimiter = dm_ascii_unescape(request%delimiter)

! Set default error of responses.
! Prepare request.
rc = dm_request_set_response_error(request, E_INCOMPLETE)

! Send request to sensor.
if (debug_) call dm_log(LOG_DEBUG, 'sending request: ' // raw_request, observ=observ)
raw_request = dm_ascii_unescape(request%request)
raw_delimiter = dm_ascii_unescape(request%delimiter)
raw_response = ' '

request%response = ' '
request%timestamp = dm_time_now()

! Send request to sensor.
request%error = dm_tty_flush(tty, output=.false.)
request%error = dm_tty_write(tty, trim(raw_request))

Expand All @@ -373,34 +375,34 @@ integer function read_observ(tty, observ, debug) result(rc)
cycle req_loop
end if

! Ignore sensor response if no delimiter is set.
if (len_trim(raw_delimiter) == 0) then
if (debug_) call dm_log(LOG_DEBUG, 'no delimiter set in request ' // dm_itoa(i), observ=observ)
cycle req_loop
end if

! Read raw sensor response from TTY.
request%error = dm_tty_read(tty, request%response, trim(raw_delimiter))
! Read sensor response from TTY.
request%error = dm_tty_read(tty, raw_response, trim(raw_delimiter))
request%response = dm_ascii_escape(raw_response)

if (dm_is_error(request%error)) then
call dm_log(LOG_ERROR, 'failed to read from TTY ' // app%tty, &
observ=observ, error=request%error)
cycle req_loop
end if

if (debug_) call dm_log(LOG_DEBUG, 'received raw response: ' // request%response, observ=observ)
if (debug_) call dm_log(LOG_DEBUG, 'received response: ' // raw_response, observ=observ)

! Do not extract responses if no pattern is set.
if (len_trim(request%pattern) == 0) then
if (debug_) call dm_log(LOG_DEBUG, 'no pattern in request ' // dm_itoa(i), observ=observ)
cycle req_loop
end if

! Try to extract the response values if a regex pattern is given.
if (debug_) call dm_log(LOG_DEBUG, 'extracting response values', observ=observ)
if (debug_) call dm_log(LOG_DEBUG, 'extracting response values of request ' // dm_itoa(i), observ=observ)
request%error = dm_regex_request(request)

! Unescape raw response.
request%response = dm_ascii_escape(request%response)

if (dm_is_error(request%error)) then
call dm_log(LOG_WARNING, 'response to request ' // dm_itoa(i) // ' does not match pattern', &
observ=observ, error=request%error)
Expand All @@ -413,7 +415,7 @@ integer function read_observ(tty, observ, debug) result(rc)

if (dm_is_error(response%error)) then
call dm_log(LOG_WARNING, 'failed to extract response ' // trim(response%name) // &
' to request ' // dm_itoa(i), observ=observ, error=response%error)
' of request ' // dm_itoa(i), observ=observ, error=response%error)
cycle
end if

Expand All @@ -431,10 +433,9 @@ integer function read_observ(tty, observ, debug) result(rc)
! Wait the set delay time of the request.
delay = max(0, request%delay)
if (delay <= 0) cycle req_loop

if (debug_) then
call dm_log(LOG_DEBUG, 'next request of observ ' // trim(observ%name) // &
' in ' // dm_itoa(delay / 1000) // ' sec')
' in ' // dm_itoa(delay / 1000) // ' sec', observ=observ)
end if

call dm_usleep(delay * 1000)
Expand Down Expand Up @@ -496,12 +497,20 @@ integer function run(app, tty) result(rc)
observ%sensor_id = app%sensor
observ%path = trim(app%tty)

if (debug) then
call dm_log(LOG_DEBUG, 'starting observ ' // trim(observ%name) // &
' for sensor ' // app%sensor, observ=observ)
end if

! Read observation from TTY.
call dm_log(LOG_DEBUG, 'starting observ ' // trim(observ%name) // ' for sensor ' // app%sensor, observ=observ)
rc = read_observ(tty, observ, debug=debug)

if (debug) then
call dm_log(LOG_DEBUG, 'finished observ ' // trim(observ%name) // &
' for sensor ' // app%sensor, observ=observ)
end if

! Forward observation.
call dm_log(LOG_DEBUG, 'finished observ ' // trim(observ%name) // ' for sensor ' // app%sensor, observ=observ)
rc = dm_mqueue_forward(observ, app%name, blocking=APP_MQ_BLOCKING)

! Output observation.
Expand All @@ -511,7 +520,7 @@ integer function run(app, tty) result(rc)
! Wait the set delay time of the job (absolute).
delay = max(0, job%delay)
if (delay <= 0) cycle job_loop
if (debug) call dm_log(LOG_DEBUG, 'next job in ' // dm_itoa(delay / 1000) // ' sec')
if (debug) call dm_log(LOG_DEBUG, 'next job in ' // dm_itoa(delay / 1000) // ' sec', observ=observ)
call dm_usleep(delay * 1000)
end do job_loop

Expand Down
Loading

0 comments on commit e5fe98d

Please sign in to comment.