Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change execution order for single-threaded server #1149

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/api/demo.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

(provide run-demo)

(define *demo?* (make-parameter false))
(define *demo-prefix* (make-parameter "/"))
(define *demo-log* (make-parameter false))

Expand Down
7 changes: 3 additions & 4 deletions src/api/run.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@
(define (merge-profile-jsons ps)
(profile->json (apply profile-merge (map json->profile (dict-values ps)))))

(define (generate-bench-report job-id bench-name test-number dir number-of-test)
(define result (wait-for-job job-id))
(define (generate-bench-report result bench-name test-number dir total-tests)
(define report-path (bench-folder-path bench-name test-number))
(define report-directory (build-path dir report-path))
(unless (directory-exists? report-directory)
Expand All @@ -79,7 +78,7 @@
(make-page page out result #t #f)))))

(define table-data (get-table-data-from-hash result report-path))
(print-test-result (+ test-number 1) number-of-test table-data)
(print-test-result (+ test-number 1) total-tests table-data)
table-data)

(define (run-tests tests #:dir dir #:threads threads)
Expand All @@ -96,7 +95,7 @@
(for/list ([job-id (in-list job-ids)]
[test (in-list tests)]
[test-number (in-naturals)])
(generate-bench-report job-id (test-name test) test-number dir (length tests))))
(generate-bench-report (wait-for-job job-id) (test-name test) test-number dir (length tests))))

(define info (make-report-info results #:seed seed))
(write-datafile (build-path dir "results.json") info)
Expand Down
70 changes: 33 additions & 37 deletions src/api/server.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

(provide make-path
get-improve-table-data
make-improve-result
server-check-on
get-results-for
get-timeline-for
Expand All @@ -31,10 +30,8 @@
wait-for-job
start-job-server
write-results-to-disk
*demo?*
*demo-output*)

(define *demo?* (make-parameter false))
(define *demo-output* (make-parameter false))

(define log-level #f)
Expand Down Expand Up @@ -118,21 +115,25 @@
(place-channel-put manager (list* msg args))
(match msg
['start
(match-define (list hash-false command job-id) args)
(hash-set! completed-work job-id (herbie-do-server-job command job-id))])))
(match-define (list #f command job-id) args)
(hash-set! queued-jobs job-id command)])))

(define (manager-ask msg . args)
(log "Asking manager: ~a, ~a.\n" msg args)
(if manager
(manager-ask-with-callback msg args)
(match (list* msg args) ; public commands
[(list 'wait hash-false job-id) (hash-ref completed-work job-id)]
[(list 'result job-id) (hash-ref completed-work job-id #f)]
[(list 'timeline job-id) (hash-ref completed-work job-id #f)]
[(list 'check job-id) (and (hash-ref completed-work job-id #f) job-id)]
[(list 'wait hash-false job-id)
(define command (hash-ref queued-jobs job-id))
(define result (herbie-do-server-job command job-id))
(hash-set! completed-jobs job-id result)
result]
[(list 'result job-id) (hash-ref completed-jobs job-id #f)]
[(list 'timeline job-id) (hash-ref completed-jobs job-id #f)]
[(list 'check job-id) (and (hash-ref completed-jobs job-id #f) job-id)]
[(list 'count) (list 0 0)]
[(list 'improve)
(for/list ([(job-id result) (in-hash completed-work)]
(for/list ([(job-id result) (in-hash completed-jobs)]
#:when (equal? (hash-ref result 'command) "improve"))
(get-table-data-from-hash result (make-path job-id)))])))

Expand Down Expand Up @@ -161,7 +162,8 @@
'path
(make-path job-id)))

(define completed-work (make-hash))
(define queued-jobs (make-hash))
(define completed-jobs (make-hash))

(define (manager-ask-with-callback msg args)
(define-values (a b) (place-channel))
Expand Down Expand Up @@ -214,8 +216,6 @@
(parameterize ([params fresh] ...)
body ...))))]))

(struct work-item (command id))

(define (make-manager worker-count)
(place/context*
ch
Expand All @@ -240,39 +240,35 @@
(hash-set! waiting-workers i (make-worker i)))
(log "~a workers ready.\n" (hash-count waiting-workers))
(define waiting (make-hash))
(define job-queue (list))
(log "Manager waiting to assign work.\n")
(for ([i (in-naturals)])
(match (place-channel-get ch)
[(list 'start self command job-id)
; Check if the work has been completed already if not assign the work.
(if (hash-has-key? completed-work job-id)
(place-channel-put self (list 'send job-id (hash-ref completed-work job-id)))
(place-channel-put self (list 'queue self job-id command)))]
[(list 'queue self job-id command)
(set! job-queue (append job-queue (list (work-item command job-id))))
(place-channel-put self (list 'assign self))]
(cond
[(hash-has-key? completed-jobs job-id)
(place-channel-put self (list 'send job-id (hash-ref completed-jobs job-id)))]
[else
(hash-set! queued-jobs job-id command job-id)
(place-channel-put self (list 'assign self))])]
[(list 'assign self)
(define reassigned (make-hash))
(for ([(wid worker) (in-hash waiting-workers)]
[job (in-list job-queue)])
(log "Starting worker [~a] on [~a].\n"
(work-item-id job)
(test-name (herbie-command-test (work-item-command job))))
[(jid command) (in-hash queued-jobs)])
(log "Starting worker [~a] on [~a].\n" jid (test-name (herbie-command-test command)))
; Check if the job is already in progress.
(unless (hash-has-key? current-jobs (work-item-id job))
(hash-set! current-jobs (work-item-id job) wid)
(place-channel-put worker (list 'apply self (work-item-command job) (work-item-id job)))
(hash-set! reassigned wid worker)
(unless (hash-has-key? current-jobs jid)
(place-channel-put worker (list 'apply self command jid))
(hash-set! reassigned wid jid)
(hash-set! busy-workers wid worker)))
; remove X many jobs from the Q and update waiting-workers
(for ([(wid worker) (in-hash reassigned)])
(for ([(wid jid) (in-hash reassigned)])
(hash-remove! waiting-workers wid)
(set! job-queue (cdr job-queue)))]
(hash-remove! queued-jobs jid))]
; Job is finished save work and free worker. Move work to 'send state.
[(list 'finished self wid job-id result)
(log "Job ~a finished, saving result.\n" job-id)
(hash-set! completed-work job-id result)
(hash-set! completed-jobs job-id result)

; move worker to waiting list
(hash-remove! current-jobs job-id)
Expand All @@ -286,7 +282,7 @@
(log "Waiting for job: ~a\n" job-id)
; first we add the handler to the wait list.
(hash-update! waiting job-id (curry append (list handler)) '())
(define result (hash-ref completed-work job-id #f))
(define result (hash-ref completed-jobs job-id #f))
; check if the job is completed or not.
(unless (false? result)
(log "Done waiting for job: ~a\n" job-id)
Expand All @@ -298,7 +294,7 @@
(place-channel-put handle result))
(hash-remove! waiting job-id)]
; Get the result for the given id, return false if no work found.
[(list 'result handler job-id) (place-channel-put handler (hash-ref completed-work job-id #f))]
[(list 'result handler job-id) (place-channel-put handler (hash-ref completed-jobs job-id #f))]
[(list 'timeline handler job-id)
(define wid (hash-ref current-jobs job-id #f))
(cond
Expand All @@ -310,17 +306,17 @@
(place-channel-put handler requested-timeline)]
[else
(log "Job complete, no timeline, send result.\n")
(place-channel-put handler (hash-ref completed-work job-id #f))])]
(place-channel-put handler (hash-ref completed-jobs job-id #f))])]
[(list 'check handler job-id)
(place-channel-put handler (and (hash-has-key? completed-work job-id) job-id))]
(place-channel-put handler (and (hash-has-key? completed-jobs job-id) job-id))]
; Returns the current count of working workers.
[(list 'count handler)
(log "Count requested\n")
(place-channel-put handler (list (hash-count busy-workers) (length job-queue)))]
(place-channel-put handler (list (hash-count busy-workers) (hash-count queued-jobs)))]
; Retreive the improve results for results.json
[(list 'improve handler)
(define improved-list
(for/list ([(job-id result) (in-hash completed-work)]
(for/list ([(job-id result) (in-hash completed-jobs)]
#:when (equal? (hash-ref result 'command) "improve"))
(get-table-data-from-hash result (make-path job-id))))
(place-channel-put handler improved-list)]))))
Expand Down
Loading