Skip to content

Commit

Permalink
Trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikwerder committed Sep 6, 2023
1 parent c85f0ea commit 84d045f
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 47 deletions.
22 changes: 11 additions & 11 deletions .github/workflows/build-rhel7.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
name: build-daqingest-rhel7
on:
push:
tags:
- buildaction
# push:
# tags:
# - buildaction
workflow_dispatch:
inputs:
flags:
Expand Down Expand Up @@ -84,20 +84,20 @@ jobs:
- run: find $GITHUB_WORKSPACE -type f -and \( -name \*.rs -or -name \*.toml \)
- run: find ${{steps.wdset.outputs.gh}} -type f -and \( -name \*.rs -or -name \*.toml \)
working-directory: ${{steps.wdset.outputs.gh}}/build
# - run: cargo build --release
# working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest
- run: mkdir -p target/release && cp /usr/bin/cat target/release/daqingest
working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/
- run: cargo build --release
working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest
# - run: mkdir -p target/release && cp /usr/bin/cat target/release/daqingest
# working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/
- run: ls -l
working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release
- run: ./daqingest --version
working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release
# - run: echo "daqingest_version=$(./daqingest --version)" >> "$GITHUB_OUTPUT"
# id: daqingest_version_set
# working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release
- run: echo "daqingest_version=0.0.0-dummy.0" >> "$GITHUB_OUTPUT"
- run: echo "daqingest_version=$(./daqingest --version)" >> "$GITHUB_OUTPUT"
id: daqingest_version_set
working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release
# - run: echo "daqingest_version=0.0.0-dummy.0" >> "$GITHUB_OUTPUT"
# id: daqingest_version_set
# working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release
- uses: actions/upload-artifact@v3
with:
name: daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}
Expand Down
6 changes: 6 additions & 0 deletions daqingest/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ pub struct Daemon {
insert_rx_weak: WeakReceiver<QueryItem>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
connset_ctrl: CaConnSetCtrl,
query_item_tx: Sender<QueryItem>,
}

impl Daemon {
Expand Down Expand Up @@ -335,6 +336,7 @@ impl Daemon {
insert_rx_weak: common_insert_item_queue_2.downgrade(),
channel_info_query_tx,
connset_ctrl: conn_set_ctrl,
query_item_tx: common_insert_item_queue.sender().unwrap().inner().clone(),
};
Ok(ret)
}
Expand Down Expand Up @@ -1003,6 +1005,10 @@ impl Daemon {
}
}
}
QueryItem(item) => {
self.query_item_tx.send(item).await?;
Ok(())
}
EndOfStream => {
self.stats.ca_conn_status_done_inc();
self.handle_ca_conn_done(addr).await
Expand Down
43 changes: 7 additions & 36 deletions netfetch/src/ca/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ pub enum CaConnEventValue {
None,
EchoTimeout,
ConnCommandResult(ConnCommandResult),
QueryItem(QueryItem),
EndOfStream,
}

Expand Down Expand Up @@ -785,38 +786,6 @@ impl CaConn {
}
}

fn handle_insert_futs(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
use Poll::*;
loop {
break {
self.stats.caconn_loop4_count_inc();
if self.sender_polling.is_sending() {
match self.sender_polling.poll_unpin(cx) {
Ready(Ok(())) => {
self.stats.inserts_queue_push_inc();
continue;
}
Ready(Err(e)) => {
use crate::senderpolling::Error::*;
match e {
NoSendInProgress => break Ready(Err(Error::with_msg_no_trace("no send in progress"))),
Closed(_item) => break Ready(Err(Error::with_msg_no_trace("insert channel closed"))),
}
}
Pending => Pending,
}
} else {
if let Some(item) = self.insert_item_queue.pop_front() {
self.sender_polling.send2(item);
continue;
} else {
Ready(Ok(()))
}
}
};
}
}

fn check_channels_alive(&mut self) -> Result<(), Error> {
let tsnow = Instant::now();
trace!("CheckChannelsAlive {addr:?}", addr = &self.remote_addr_dbg);
Expand Down Expand Up @@ -1784,6 +1753,12 @@ impl Stream for CaConn {
})))
} else if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
Ready(Some(Ok(item)))
} else if let Some(item) = self.insert_item_queue.pop_front() {
let ev = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::QueryItem(item),
};
Ready(Some(Ok(ev)))
} else {
let ret = loop {
self.stats.caconn_loop1_count_inc();
Expand All @@ -1807,10 +1782,6 @@ impl Stream for CaConn {
}
};
}
match self.handle_insert_futs(cx) {
Ready(_) => {}
Pending => break Pending,
}
if self.is_shutdown() {
if self.outgoing_queues_empty() {
debug!("shut down and all items flushed {}", self.remote_addr_dbg);
Expand Down
2 changes: 2 additions & 0 deletions netfetch/src/ca/connset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub struct CaConnSet {
connset_tx: Sender<CaConnSetEvent>,
connset_rx: Receiver<CaConnSetEvent>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
storage_insert_tx: Sender<QueryItem>,
shutdown: bool,
}

Expand All @@ -108,6 +109,7 @@ impl CaConnSet {
connset_tx: connset_tx.clone(),
connset_rx,
channel_info_query_tx,
storage_insert_tx,
shutdown: false,
};
// TODO await on jh
Expand Down
1 change: 1 addition & 0 deletions netfetch/src/daemon_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl DaemonEvent {
None => format!("CaConnEvent/None"),
EchoTimeout => format!("CaConnEvent/EchoTimeout"),
ConnCommandResult(_) => format!("CaConnEvent/ConnCommandResult"),
QueryItem(_) => format!("CaConnEvent/QueryItem"),
EndOfStream => format!("CaConnEvent/EndOfStream"),
}
}
Expand Down

0 comments on commit 84d045f

Please sign in to comment.