From 84d045f245b832033bdd9ec67cc57cc7fab14918 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 6 Sep 2023 11:59:08 +0200 Subject: [PATCH] Trigger --- .github/workflows/build-rhel7.yml | 22 ++++++++-------- daqingest/src/daemon.rs | 6 +++++ netfetch/src/ca/conn.rs | 43 +++++-------------------------- netfetch/src/ca/connset.rs | 2 ++ netfetch/src/daemon_common.rs | 1 + 5 files changed, 27 insertions(+), 47 deletions(-) diff --git a/.github/workflows/build-rhel7.yml b/.github/workflows/build-rhel7.yml index 95b1d03..9f6191e 100644 --- a/.github/workflows/build-rhel7.yml +++ b/.github/workflows/build-rhel7.yml @@ -1,8 +1,8 @@ name: build-daqingest-rhel7 on: - push: - tags: - - buildaction + # push: + # tags: + # - buildaction workflow_dispatch: inputs: flags: @@ -84,20 +84,20 @@ jobs: - run: find $GITHUB_WORKSPACE -type f -and \( -name \*.rs -or -name \*.toml \) - run: find ${{steps.wdset.outputs.gh}} -type f -and \( -name \*.rs -or -name \*.toml \) working-directory: ${{steps.wdset.outputs.gh}}/build - # - run: cargo build --release - # working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest - - run: mkdir -p target/release && cp /usr/bin/cat target/release/daqingest - working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/ + - run: cargo build --release + working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest + # - run: mkdir -p target/release && cp /usr/bin/cat target/release/daqingest + # working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/ - run: ls -l working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release - run: ./daqingest --version working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release - # - run: echo "daqingest_version=$(./daqingest --version)" >> "$GITHUB_OUTPUT" - # id: daqingest_version_set - # working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release - - run: echo "daqingest_version=0.0.0-dummy.0" >> "$GITHUB_OUTPUT" + - run: echo "daqingest_version=$(./daqingest --version)" >> "$GITHUB_OUTPUT" id: daqingest_version_set working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release + # - run: echo "daqingest_version=0.0.0-dummy.0" >> "$GITHUB_OUTPUT" + # id: daqingest_version_set + # working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release - uses: actions/upload-artifact@v3 with: name: daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}} diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 234ed63..0c96c9e 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -216,6 +216,7 @@ pub struct Daemon { insert_rx_weak: WeakReceiver, channel_info_query_tx: Sender, connset_ctrl: CaConnSetCtrl, + query_item_tx: Sender, } impl Daemon { @@ -335,6 +336,7 @@ impl Daemon { insert_rx_weak: common_insert_item_queue_2.downgrade(), channel_info_query_tx, connset_ctrl: conn_set_ctrl, + query_item_tx: common_insert_item_queue.sender().unwrap().inner().clone(), }; Ok(ret) } @@ -1003,6 +1005,10 @@ impl Daemon { } } } + QueryItem(item) => { + self.query_item_tx.send(item).await?; + Ok(()) + } EndOfStream => { self.stats.ca_conn_status_done_inc(); self.handle_ca_conn_done(addr).await diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 2dd036e..877e901 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -373,6 +373,7 @@ pub enum CaConnEventValue { None, EchoTimeout, ConnCommandResult(ConnCommandResult), + QueryItem(QueryItem), EndOfStream, } @@ -785,38 +786,6 @@ impl CaConn { } } - fn handle_insert_futs(&mut self, cx: &mut Context) -> Poll> { - use Poll::*; - loop { - break { - self.stats.caconn_loop4_count_inc(); - if self.sender_polling.is_sending() { - match self.sender_polling.poll_unpin(cx) { - Ready(Ok(())) => { - self.stats.inserts_queue_push_inc(); - continue; - } - Ready(Err(e)) => { - use crate::senderpolling::Error::*; - match e { - NoSendInProgress => break Ready(Err(Error::with_msg_no_trace("no send in progress"))), - Closed(_item) => break Ready(Err(Error::with_msg_no_trace("insert channel closed"))), - } - } - Pending => Pending, - } - } else { - if let Some(item) = self.insert_item_queue.pop_front() { - self.sender_polling.send2(item); - continue; - } else { - Ready(Ok(())) - } - } - }; - } - } - fn check_channels_alive(&mut self) -> Result<(), Error> { let tsnow = Instant::now(); trace!("CheckChannelsAlive {addr:?}", addr = &self.remote_addr_dbg); @@ -1784,6 +1753,12 @@ impl Stream for CaConn { }))) } else if let Some(item) = self.ca_conn_event_out_queue.pop_front() { Ready(Some(Ok(item))) + } else if let Some(item) = self.insert_item_queue.pop_front() { + let ev = CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::QueryItem(item), + }; + Ready(Some(Ok(ev))) } else { let ret = loop { self.stats.caconn_loop1_count_inc(); @@ -1807,10 +1782,6 @@ impl Stream for CaConn { } }; } - match self.handle_insert_futs(cx) { - Ready(_) => {} - Pending => break Pending, - } if self.is_shutdown() { if self.outgoing_queues_empty() { debug!("shut down and all items flushed {}", self.remote_addr_dbg); diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index f4c4d44..a20899b 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -94,6 +94,7 @@ pub struct CaConnSet { connset_tx: Sender, connset_rx: Receiver, channel_info_query_tx: Sender, + storage_insert_tx: Sender, shutdown: bool, } @@ -108,6 +109,7 @@ impl CaConnSet { connset_tx: connset_tx.clone(), connset_rx, channel_info_query_tx, + storage_insert_tx, shutdown: false, }; // TODO await on jh diff --git a/netfetch/src/daemon_common.rs b/netfetch/src/daemon_common.rs index f55663d..3c573ea 100644 --- a/netfetch/src/daemon_common.rs +++ b/netfetch/src/daemon_common.rs @@ -45,6 +45,7 @@ impl DaemonEvent { None => format!("CaConnEvent/None"), EchoTimeout => format!("CaConnEvent/EchoTimeout"), ConnCommandResult(_) => format!("CaConnEvent/ConnCommandResult"), + QueryItem(_) => format!("CaConnEvent/QueryItem"), EndOfStream => format!("CaConnEvent/EndOfStream"), } }