diff --git a/config/config.md b/config/config.md index 54d09c247763..ad8d42f024f0 100644 --- a/config/config.md +++ b/config/config.md @@ -33,9 +33,7 @@ | `postgres.tls.key_path` | String | `None` | Private key file path. | | `postgres.tls.watch` | Bool | `false` | Watch for Certificate and key file change and auto reload | | `opentsdb` | -- | -- | OpenTSDB protocol options. | -| `opentsdb.enable` | Bool | `true` | Whether to enable | -| `opentsdb.addr` | String | `127.0.0.1:4242` | OpenTSDB telnet API server address. | -| `opentsdb.runtime_size` | Integer | `2` | The number of server worker threads. | +| `opentsdb.enable` | Bool | `true` | Whether to enable OpenTSDB put in HTTP API. | | `influxdb` | -- | -- | InfluxDB protocol options. | | `influxdb.enable` | Bool | `true` | Whether to enable InfluxDB protocol in HTTP API. | | `prom_store` | -- | -- | Prometheus remote storage options | @@ -168,9 +166,7 @@ | `postgres.tls.key_path` | String | `None` | Private key file path. | | `postgres.tls.watch` | Bool | `false` | Watch for Certificate and key file change and auto reload | | `opentsdb` | -- | -- | OpenTSDB protocol options. | -| `opentsdb.enable` | Bool | `true` | Whether to enable | -| `opentsdb.addr` | String | `127.0.0.1:4242` | OpenTSDB telnet API server address. | -| `opentsdb.runtime_size` | Integer | `2` | The number of server worker threads. | +| `opentsdb.enable` | Bool | `true` | Whether to enable OpenTSDB put in HTTP API. | | `influxdb` | -- | -- | InfluxDB protocol options. | | `influxdb.enable` | Bool | `true` | Whether to enable InfluxDB protocol in HTTP API. | | `prom_store` | -- | -- | Prometheus remote storage options | diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 9123e4f8a00e..9724ea352586 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -88,12 +88,8 @@ watch = false ## OpenTSDB protocol options. [opentsdb] -## Whether to enable +## Whether to enable OpenTSDB put in HTTP API. enable = true -## OpenTSDB telnet API server address. -addr = "127.0.0.1:4242" -## The number of server worker threads. -runtime_size = 2 ## InfluxDB protocol options. [influxdb] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 55fc42f4b4f0..30b8fbe65538 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -83,12 +83,8 @@ watch = false ## OpenTSDB protocol options. [opentsdb] -## Whether to enable +## Whether to enable OpenTSDB put in HTTP API. enable = true -## OpenTSDB telnet API server address. -addr = "127.0.0.1:4242" -## The number of server worker threads. -runtime_size = 2 ## InfluxDB protocol options. [influxdb] diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 0ff35846256f..918eaa990773 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -126,8 +126,6 @@ pub struct StartCommand { mysql_addr: Option, #[clap(long)] postgres_addr: Option, - #[clap(long)] - opentsdb_addr: Option, #[clap(short, long)] config_file: Option, #[clap(short, long)] @@ -198,11 +196,6 @@ impl StartCommand { opts.postgres.tls = tls_opts; } - if let Some(addr) = &self.opentsdb_addr { - opts.opentsdb.enable = true; - opts.opentsdb.addr.clone_from(addr); - } - if let Some(enable) = self.influxdb_enable { opts.influxdb.enable = enable; } @@ -316,7 +309,6 @@ mod tests { http_addr: Some("127.0.0.1:1234".to_string()), mysql_addr: Some("127.0.0.1:5678".to_string()), postgres_addr: Some("127.0.0.1:5432".to_string()), - opentsdb_addr: Some("127.0.0.1:4321".to_string()), influxdb_enable: Some(false), disable_dashboard: Some(false), ..Default::default() @@ -330,7 +322,6 @@ mod tests { assert_eq!(ReadableSize::mb(64), opts.http.body_limit); assert_eq!(opts.mysql.addr, "127.0.0.1:5678"); assert_eq!(opts.postgres.addr, "127.0.0.1:5432"); - assert_eq!(opts.opentsdb.addr, "127.0.0.1:4321"); let default_opts = FrontendOptions::default(); @@ -343,10 +334,6 @@ mod tests { default_opts.postgres.runtime_size ); assert!(opts.opentsdb.enable); - assert_eq!( - opts.opentsdb.runtime_size, - default_opts.opentsdb.runtime_size - ); assert!(!opts.influxdb.enable); } @@ -362,6 +349,9 @@ mod tests { timeout = "30s" body_limit = "2GB" + [opentsdb] + enable = false + [logging] level = "debug" dir = "/tmp/greptimedb/test/logs" @@ -386,6 +376,7 @@ mod tests { assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap()); assert_eq!("/tmp/greptimedb/test/logs".to_string(), fe_opts.logging.dir); + assert!(!fe_opts.opentsdb.enable); } #[tokio::test] diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 061c4f98e2bd..2daacdc1c047 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -253,8 +253,6 @@ pub struct StartCommand { mysql_addr: Option, #[clap(long)] postgres_addr: Option, - #[clap(long)] - opentsdb_addr: Option, #[clap(short, long)] influxdb_enable: bool, #[clap(short, long)] @@ -340,11 +338,6 @@ impl StartCommand { opts.postgres.tls = tls_opts; } - if let Some(addr) = &self.opentsdb_addr { - opts.opentsdb.enable = true; - opts.opentsdb.addr.clone_from(addr); - } - if self.influxdb_enable { opts.influxdb.enable = self.influxdb_enable; } @@ -586,6 +579,9 @@ mod tests { timeout = "33s" body_limit = "128MB" + [opentsdb] + enable = true + [logging] level = "debug" dir = "/tmp/greptimedb/test/logs" @@ -613,6 +609,7 @@ mod tests { assert_eq!(2, fe_opts.mysql.runtime_size); assert_eq!(None, fe_opts.mysql.reject_no_database); assert!(fe_opts.influxdb.enable); + assert!(fe_opts.opentsdb.enable); let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else { unreachable!() diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 7ab5e65ed7d1..f7b5939279ef 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -24,7 +24,6 @@ use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; -use servers::opentsdb::OpentsdbServer; use servers::postgres::PostgresServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter; use servers::query_handler::sql::ServerSqlQueryHandlerAdapter; @@ -258,24 +257,6 @@ where handlers.insert((pg_server, pg_addr)).await; } - if opts.opentsdb.enable { - // Init OpenTSDB server - let opts = &opts.opentsdb; - let addr = parse_addr(&opts.addr)?; - - let io_runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(opts.runtime_size) - .thread_name("opentsdb-io-handlers") - .build() - .context(error::RuntimeResourceSnafu)?, - ); - - let server = OpentsdbServer::create_server(instance.clone(), io_runtime); - - handlers.insert((server, addr)).await; - } - Ok(handlers) } } diff --git a/src/frontend/src/service_config/opentsdb.rs b/src/frontend/src/service_config/opentsdb.rs index 59959f4c816c..e100dcd797c4 100644 --- a/src/frontend/src/service_config/opentsdb.rs +++ b/src/frontend/src/service_config/opentsdb.rs @@ -17,16 +17,10 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct OpentsdbOptions { pub enable: bool, - pub addr: String, - pub runtime_size: usize, } impl Default for OpentsdbOptions { fn default() -> Self { - Self { - enable: true, - addr: "127.0.0.1:4242".to_string(), - runtime_size: 2, - } + Self { enable: true } } } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 2af6a38243d7..303819c12a82 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -190,13 +190,6 @@ pub enum Error { error: hyper::Error, }, - #[snafu(display("Invalid OpenTSDB line"))] - InvalidOpentsdbLine { - #[snafu(source)] - error: FromUtf8Error, - location: Location, - }, - #[snafu(display("Invalid OpenTSDB Json request"))] InvalidOpentsdbJsonRequest { #[snafu(source)] @@ -508,7 +501,6 @@ impl ErrorExt for Error { | InvalidQuery { .. } | InfluxdbLineProtocol { .. } | ConnResetByPeer { .. } - | InvalidOpentsdbLine { .. } | InvalidOpentsdbJsonRequest { .. } | DecodePromRemoteRequest { .. } | DecodeOtlpRequest { .. } @@ -664,7 +656,6 @@ impl IntoResponse for Error { Error::InfluxdbLineProtocol { .. } | Error::RowWriter { .. } | Error::PromSeriesWrite { .. } - | Error::InvalidOpentsdbLine { .. } | Error::InvalidOpentsdbJsonRequest { .. } | Error::DecodePromRemoteRequest { .. } | Error::DecodeOtlpRequest { .. } diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 0459ca2a3f6c..876bcc348661 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -44,7 +44,6 @@ pub mod query_handler; mod repeated_field; mod row_writer; pub mod server; -mod shutdown; pub mod tls; #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index 5fc63ba7d838..ac599df53565 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -25,8 +25,8 @@ use axum::response::IntoResponse; use hyper::Body; use lazy_static::lazy_static; use prometheus::{ - register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec, - register_int_gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, + register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge, + Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, }; use tonic::body::BoxBody; use tower::{Layer, Service}; @@ -130,11 +130,6 @@ lazy_static! { &[METRIC_DB_LABEL] ) .unwrap(); - pub static ref METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED: Histogram = register_histogram!( - "greptime_servers_opentsdb_line_write_elapsed", - "servers opentsdb line write elapsed" - ) - .unwrap(); pub static ref METRIC_MYSQL_CONNECTIONS: IntGauge = register_int_gauge!( "greptime_servers_mysql_connection_count", "servers mysql connection count" diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs index 37fc1264046e..9ae63c1b9ed6 100644 --- a/src/servers/src/opentsdb.rs +++ b/src/servers/src/opentsdb.rs @@ -13,127 +13,14 @@ // limitations under the License. pub mod codec; -pub mod connection; -mod handler; - -use std::future::Future; -use std::net::SocketAddr; -use std::sync::Arc; use api::v1::RowInsertRequests; -use async_trait::async_trait; -use common_error::ext::ErrorExt; use common_grpc::precision::Precision; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; -use common_runtime::Runtime; -use common_telemetry::logging::{debug, error, warn}; -use futures::StreamExt; -use tokio::sync::broadcast; use self::codec::DataPoint; use crate::error::Result; -use crate::opentsdb::connection::Connection; -use crate::opentsdb::handler::Handler; -use crate::query_handler::OpentsdbProtocolHandlerRef; use crate::row_writer::{self, MultiTableData}; -use crate::server::{AbortableStream, BaseTcpServer, Server}; -use crate::shutdown::Shutdown; - -pub struct OpentsdbServer { - base_server: BaseTcpServer, - query_handler: OpentsdbProtocolHandlerRef, - - /// Broadcasts a shutdown signal to all active connections. - /// - /// When a connection task is spawned, it is passed a broadcast receiver handle. We can send - /// a `()` value via `notify_shutdown` or just drop `notify_shutdown`, then each active - /// connection receives it, reaches a safe terminal state, and completes the task. - notify_shutdown: Option>, -} - -impl OpentsdbServer { - pub fn create_server( - query_handler: OpentsdbProtocolHandlerRef, - io_runtime: Arc, - ) -> Box { - // When the provided `shutdown` future completes, we must send a shutdown - // message to all active connections. We use a broadcast channel for this - // purpose. The call below ignores the receiver of the broadcast pair, and when - // a receiver is needed, the subscribe() method on the sender is used to create - // one. - let (notify_shutdown, _) = broadcast::channel(1); - - Box::new(OpentsdbServer { - base_server: BaseTcpServer::create_server("OpenTSDB", io_runtime), - query_handler, - notify_shutdown: Some(notify_shutdown), - }) - } - - fn accept( - &self, - io_runtime: Arc, - stream: AbortableStream, - ) -> impl Future { - let query_handler = self.query_handler.clone(); - let notify_shutdown = self - .notify_shutdown - .clone() - .expect("`notify_shutdown` must be present when accepting connection!"); - stream.for_each(move |stream| { - let io_runtime = io_runtime.clone(); - let query_handler = query_handler.clone(); - let shutdown = Shutdown::new(notify_shutdown.subscribe()); - async move { - match stream { - Ok(stream) => { - if let Err(e) = stream.set_nodelay(true) { - warn!(e; "Failed to set TCP nodelay"); - } - let connection = Connection::new(stream); - let mut handler = Handler::new(query_handler, connection, shutdown); - - let _handle = io_runtime.spawn(async move { - if let Err(e) = handler.run().await { - if e.status_code().should_log_error() { - error!(e; "Unexpected error when handling OpenTSDB connection"); - } - } - }); - } - Err(error) => debug!("Broken pipe: {}", error), // IoError doesn't impl ErrorExt. - }; - } - }) - } -} - -pub const OPENTSDB_SERVER: &str = "OPENTSDB_SERVER"; - -#[async_trait] -impl Server for OpentsdbServer { - async fn shutdown(&self) -> Result<()> { - if let Some(tx) = &self.notify_shutdown { - // Err of broadcast sender does not mean that future calls to send will fail, so - // its return value is ignored here. - let _ = tx.send(()); - } - self.base_server.shutdown().await?; - Ok(()) - } - - async fn start(&self, listening: SocketAddr) -> Result { - let (stream, addr) = self.base_server.bind(listening).await?; - - let io_runtime = self.base_server.io_runtime(); - let join_handle = common_runtime::spawn_read(self.accept(io_runtime, stream)); - self.base_server.start_with(join_handle).await?; - Ok(addr) - } - fn name(&self) -> &str { - OPENTSDB_SERVER - } -} pub fn data_point_to_grpc_row_insert_requests( data_points: Vec, diff --git a/src/servers/src/opentsdb/connection.rs b/src/servers/src/opentsdb/connection.rs deleted file mode 100644 index e5f15d3c159d..000000000000 --- a/src/servers/src/opentsdb/connection.rs +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Modified from Tokio's mini-redis example. - -use bytes::{Buf, BytesMut}; -use snafu::ResultExt; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufWriter}; - -use crate::error::{self, Result}; - -type Line = String; - -#[derive(Debug)] -pub struct Connection { - stream: BufWriter, - buffer: BytesMut, -} - -impl Connection { - pub fn new(stream: S) -> Connection { - Connection { - stream: BufWriter::new(stream), - buffer: BytesMut::with_capacity(4 * 1024), - } - } - - /// Read one line from the underlying stream. - /// - /// The function waits until it has retrieved enough data to parse a line (terminated by \r\n). - /// Any data remaining in the read buffer after the line has been parsed is kept there for the - /// next call to `read_line`. - /// - /// # Returns - /// - /// On success, the received line is returned. If the stream is closed in a way that - /// doesn't break a line in half, it returns `None`. Otherwise, an error is returned. - pub async fn read_line(&mut self) -> Result> { - loop { - // Attempt to parse a line from the buffered data. If enough data - // has been buffered, the line is returned. - if let Some(line) = self.parse_line()? { - return Ok(Some(line)); - } - - // There is not enough buffered data as a line. Attempt to read more from the socket. - // On success, the number of bytes is returned. `0` indicates "end of stream". - if self.stream.read_buf(&mut self.buffer).await? == 0 { - // The remote closed the connection. For this to be a clean shutdown, there should - // be no data in the read buffer. If there is, this means that the peer closed the - // socket while sending a line. - if self.buffer.is_empty() { - return Ok(None); - } else { - return error::ConnResetByPeerSnafu {}.fail(); - } - } - } - } - - /// Tries to parse a line from the buffer. - /// - /// If the buffer contains enough data, the line is returned and the buffered data is removed. - /// If not enough data has been buffered yet, `Ok(None)` is returned. - /// If the buffered data does not represent a valid UTF8 line, `Err` is returned. - fn parse_line(&mut self) -> Result> { - if self.buffer.is_empty() { - return Ok(None); - } - - let buf = &self.buffer[..]; - if let Some(pos) = buf.windows(2).position(|w| w == [b'\r', b'\n']) { - let line = buf[0..pos].to_vec(); - - self.buffer.advance(pos + 2); - - Ok(Some( - String::from_utf8(line).context(error::InvalidOpentsdbLineSnafu)?, - )) - } else { - // There is not enough data present in the read buffer to parse a single line. We must - // wait for more data to be received from the socket. - Ok(None) - } - } - - pub async fn write_line(&mut self, line: String) -> Result<()> { - self.stream - .write_all(line.as_bytes()) - .await - .context(error::InternalIoSnafu)?; - let _ = self - .stream - .write(b"\r\n") - .await - .context(error::InternalIoSnafu)?; - self.stream.flush().await.context(error::InternalIoSnafu)?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - use std::io::Write; - - use bytes::BufMut; - use common_error::ext::ErrorExt; - use tokio_test::io::Builder; - - use super::*; - - #[tokio::test] - async fn test_read_line() { - let mock = Builder::new() - .read(b"This is") - .read(b" a line.\r\n") - .build(); - let mut conn = Connection::new(mock); - let line = conn.read_line().await.unwrap(); - assert_eq!(line, Some("This is a line.".to_string())); - - let line = conn.read_line().await.unwrap(); - assert_eq!(line, None); - - let buffer = &mut conn.buffer; - buffer - .writer() - .write_all(b"simulating buffer has remaining data") - .unwrap(); - let result = conn.read_line().await; - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Connection reset by peer")); - } - - #[test] - fn test_parse_line() { - let mock = Builder::new().build(); - let mut conn = Connection::new(mock); - - // initially, no data in the buffer, so return None - let line = conn.parse_line(); - assert_matches!(line, Ok(None)); - - // still has no line, but we have data in the buffer - { - let buffer = &mut conn.buffer; - buffer.writer().write_all(b"This is a ").unwrap(); - let line = conn.parse_line(); - assert_matches!(line, Ok(None)); - } - let buffer = &conn.buffer[..]; - assert_eq!(String::from_utf8(buffer.to_vec()).unwrap(), "This is a "); - - // finally gets a line, and the buffer has the remaining data - { - let buffer = &mut conn.buffer; - buffer - .writer() - .write_all(b"line.\r\n another line's remaining data") - .unwrap(); - let line = conn.parse_line().unwrap(); - assert_eq!(line, Some("This is a line.".to_string())); - } - let buffer = &conn.buffer[..]; - assert_eq!( - String::from_utf8(buffer.to_vec()).unwrap(), - " another line's remaining data" - ); - - // expected failed on not valid utf-8 line - let buffer = &mut conn.buffer; - buffer.writer().write_all(b"Hello Wor\xffld.\r\n").unwrap(); - let result = conn.parse_line(); - assert!(result.is_err()); - let err = result.unwrap_err().output_msg(); - assert!(err.contains("invalid utf-8 sequence")); - } - - #[tokio::test] - async fn test_write_err() { - let mock = Builder::new() - .write(b"An OpenTSDB error.") - .write(b"\r\n") - .build(); - let mut conn = Connection::new(mock); - conn.write_line("An OpenTSDB error.".to_string()) - .await - .unwrap(); - } -} diff --git a/src/servers/src/opentsdb/handler.rs b/src/servers/src/opentsdb/handler.rs deleted file mode 100644 index 55cebb210b3d..000000000000 --- a/src/servers/src/opentsdb/handler.rs +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Modified from Tokio's mini-redis example. - -use common_error::ext::ErrorExt; -use session::context::QueryContextBuilder; -use tokio::io::{AsyncRead, AsyncWrite}; - -use crate::error::Result; -use crate::opentsdb::codec::DataPoint; -use crate::opentsdb::connection::Connection; -use crate::query_handler::OpentsdbProtocolHandlerRef; -use crate::shutdown::Shutdown; - -/// Per-connection handler. Reads requests from `connection` and applies the OpenTSDB metric to -/// [OpentsdbProtocolHandlerRef]. -pub(crate) struct Handler { - query_handler: OpentsdbProtocolHandlerRef, - - /// The TCP connection decorated with OpenTSDB line protocol encoder / decoder implemented - /// using a buffered `TcpStream`. - /// - /// When TCP listener receives an inbound connection, the `TcpStream` is passed to - /// `Connection::new`, which initializes the associated buffers. The byte level protocol - /// parsing details is encapsulated in `Connection`. - connection: Connection, - - /// Listen for shutdown notifications. - /// - /// A wrapper around the `broadcast::Receiver` paired with the sender in TCP connections - /// listener. The connection handler processes requests from the connection until the peer - /// disconnects **or** a shutdown notification is received from `shutdown`. In the latter case, - /// any in-flight work being processed for the peer is continued until it reaches a safe state, - /// at which point the connection is terminated. (Graceful shutdown.) - shutdown: Shutdown, -} - -impl Handler { - pub(crate) fn new( - query_handler: OpentsdbProtocolHandlerRef, - connection: Connection, - shutdown: Shutdown, - ) -> Self { - Self { - query_handler, - connection, - shutdown, - } - } - - pub(crate) async fn run(&mut self) -> Result<()> { - // TODO(shuiyisong): figure out how to auth in tcp connection. - let ctx = QueryContextBuilder::default().build(); - while !self.shutdown.is_shutdown() { - // While reading a request, also listen for the shutdown signal. - let maybe_line = tokio::select! { - line = self.connection.read_line() => line?, - _ = self.shutdown.recv() => { - // If a shutdown signal is received, return from `run`. - // This will result in the task terminating. - return Ok(()); - } - }; - - // If `None` is returned from `read_line()` then the peer closed the socket. There is - // no further work to do and the task can be terminated. - let line = match maybe_line { - Some(line) => line, - None => return Ok(()), - }; - - // Close connection upon receiving "quit" line. With actual OpenTSDB, telnet just won't - // quit, the connection to OpenTSDB server can be closed only via terminating telnet - // session manually, for example, close the terminal window. That is a little annoying, - // so I added "quit" command to the line protocol, to make telnet client able to quit - // gracefully. - if line.trim().eq_ignore_ascii_case("quit") { - return Ok(()); - } - - match DataPoint::try_create(&line) { - Ok(data_point) => { - let _timer = - crate::metrics::METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED.start_timer(); - let result = self.query_handler.exec(vec![data_point], ctx.clone()).await; - if let Err(e) = result { - self.connection.write_line(e.output_msg()).await?; - } - } - Err(e) => { - self.connection.write_line(e.output_msg()).await?; - } - } - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::net::SocketAddr; - use std::sync::Arc; - - use async_trait::async_trait; - use session::context::QueryContextRef; - use tokio::net::{TcpListener, TcpStream}; - use tokio::sync::{broadcast, mpsc}; - - use super::*; - use crate::error; - use crate::query_handler::OpentsdbProtocolHandler; - - struct DummyQueryHandler { - tx: mpsc::Sender, - } - - #[async_trait] - impl OpentsdbProtocolHandler for DummyQueryHandler { - async fn exec(&self, data_points: Vec, _ctx: QueryContextRef) -> Result { - let metric = data_points.first().unwrap().metric(); - if metric == "should_failed" { - return error::InternalSnafu { - err_msg: "expected", - } - .fail(); - } - self.tx.send(metric.to_string()).await.unwrap(); - Ok(data_points.len()) - } - } - - #[tokio::test] - async fn test_run() { - let (tx, mut rx) = mpsc::channel(100); - - let query_handler = Arc::new(DummyQueryHandler { tx }); - let (notify_shutdown, _) = broadcast::channel(1); - let addr = start_server(query_handler, notify_shutdown).await; - - let stream = TcpStream::connect(addr).await.unwrap(); - let mut client = Connection::new(stream); - - client - .write_line("put my_metric_1 1000 1.0 host=web01".to_string()) - .await - .unwrap(); - assert_eq!(rx.recv().await.unwrap(), "my_metric_1"); - - client - .write_line("put my_metric_2 1000 1.0 host=web01".to_string()) - .await - .unwrap(); - assert_eq!(rx.recv().await.unwrap(), "my_metric_2"); - - client - .write_line("put should_failed 1000 1.0 host=web01".to_string()) - .await - .unwrap(); - let resp = client.read_line().await.unwrap(); - assert_eq!(resp, Some("Internal error: 1003".to_string())); - - client.write_line("get".to_string()).await.unwrap(); - let resp = client.read_line().await.unwrap(); - assert_eq!( - resp, - Some("Invalid query: unknown command get.".to_string()) - ); - } - - async fn start_server( - query_handler: OpentsdbProtocolHandlerRef, - notify_shutdown: broadcast::Sender<()>, - ) -> SocketAddr { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - - let _handle = common_runtime::spawn_read(async move { - loop { - let (stream, _) = listener.accept().await.unwrap(); - - let query_handler = query_handler.clone(); - let connection = Connection::new(stream); - let shutdown = Shutdown::new(notify_shutdown.subscribe()); - let _handle = common_runtime::spawn_read(async move { - Handler::new(query_handler, connection, shutdown) - .run() - .await - }); - } - }); - addr - } -} diff --git a/src/servers/src/shutdown.rs b/src/servers/src/shutdown.rs deleted file mode 100644 index b69b3e4da415..000000000000 --- a/src/servers/src/shutdown.rs +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Copied from tokio's mini-redis example. - -use tokio::sync::broadcast; - -/// Listens for the server shutdown signal. -/// -/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is -/// ever sent. Once a value has been sent via the broadcast channel, the server -/// should shutdown. -/// -/// The `Shutdown` struct listens for the signal and tracks that the signal has -/// been received. Callers may query for whether the shutdown signal has been -/// received or not. -#[derive(Debug)] -pub(crate) struct Shutdown { - /// `true` if the shutdown signal has been received - shutdown: bool, - - /// The receive half of the channel used to listen for shutdown. - notify: broadcast::Receiver<()>, -} - -impl Shutdown { - /// Create a new `Shutdown` backed by the given `broadcast::Receiver`. - pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown { - Shutdown { - shutdown: false, - notify, - } - } - - /// Returns `true` if the shutdown signal has been received. - pub(crate) fn is_shutdown(&self) -> bool { - self.shutdown - } - - /// Receive the shutdown notice, waiting if necessary. - pub(crate) async fn recv(&mut self) { - // If the shutdown signal has already been received, then return - // immediately. - if self.shutdown { - return; - } - - // Cannot receive a "lag error" as only one value is ever sent. - let _ = self.notify.recv().await; - - // Remember that the signal has been received. - self.shutdown = true; - } -} diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 8913c879d74b..450ed86497c1 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -40,7 +40,6 @@ mod grpc; mod http; mod interceptor; mod mysql; -mod opentsdb; mod postgres; mod py_script; diff --git a/src/servers/tests/opentsdb.rs b/src/servers/tests/opentsdb.rs deleted file mode 100644 index 79ac2ba21939..000000000000 --- a/src/servers/tests/opentsdb.rs +++ /dev/null @@ -1,283 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::time::{Duration, Instant}; - -use async_trait::async_trait; -use common_runtime::Builder as RuntimeBuilder; -use rand::rngs::StdRng; -use rand::Rng; -use servers::error::{self as server_error, Error, Result}; -use servers::opentsdb::codec::DataPoint; -use servers::opentsdb::connection::Connection; -use servers::opentsdb::OpentsdbServer; -use servers::query_handler::OpentsdbProtocolHandler; -use servers::server::Server; -use session::context::QueryContextRef; -use tokio::net::TcpStream; -use tokio::sync::{mpsc, Notify}; - -struct DummyOpentsdbInstance { - tx: mpsc::Sender, -} - -#[async_trait] -impl OpentsdbProtocolHandler for DummyOpentsdbInstance { - async fn exec(&self, data_points: Vec, _ctx: QueryContextRef) -> Result { - let metric = data_points.first().unwrap().metric(); - if metric == "should_failed" { - return server_error::InternalSnafu { - err_msg: "expected", - } - .fail(); - } - let i = metric.parse::().unwrap(); - let _ = self.tx.send(i * i).await; - Ok(data_points.len()) - } -} - -fn create_opentsdb_server(tx: mpsc::Sender) -> Result> { - let query_handler = Arc::new(DummyOpentsdbInstance { tx }); - let io_runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(2) - .thread_name("opentsdb-io-handlers") - .build() - .unwrap(), - ); - Ok(OpentsdbServer::create_server(query_handler, io_runtime)) -} - -#[tokio::test] -async fn test_start_opentsdb_server() -> Result<()> { - let (tx, _) = mpsc::channel(100); - let server = create_opentsdb_server(tx)?; - let listening = "127.0.0.1:0".parse::().unwrap(); - let result = server.start(listening).await; - let _ = result.unwrap(); - - let result = server.start(listening).await; - assert!(result - .unwrap_err() - .to_string() - .contains("OpenTSDB server has been started.")); - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_shutdown_opentsdb_server_concurrently() -> Result<()> { - let (tx, _) = mpsc::channel(100); - let server = create_opentsdb_server(tx)?; - let result = server.shutdown().await; - assert!(result - .unwrap_err() - .to_string() - .contains("OpenTSDB server is not started.")); - - let listening = "127.0.0.1:0".parse::().unwrap(); - let addr = server.start(listening).await?; - - let notify = Arc::new(Notify::new()); - let notify_in_task = notify.clone(); - let stop = Arc::new(AtomicBool::new(false)); - let stop_task = stop.clone(); - let stop_timeout = Duration::from_secs(5 * 60); - let join_handle = tokio::spawn(async move { - let mut i = 1; - let mut stop_time = None; - - loop { - let stream = TcpStream::connect(addr).await; - match stream { - Ok(stream) => { - let mut connection = Connection::new(stream); - let result = connection.write_line(format!("put {i} 1 1")).await; - i += 1; - - if i > 4 { - // Ensure the server has been started. - notify_in_task.notify_one(); - } - - if let Err(e) = result { - match e { - Error::InternalIo { .. } => return, - _ => panic!("Not IO error, err is {e}"), - } - } - - if stop.load(Ordering::Relaxed) { - let du_since_stop = stop_time.get_or_insert_with(Instant::now).elapsed(); - if du_since_stop > stop_timeout { - // Avoid hang on test. - panic!("Stop timeout"); - } - } - } - Err(_) => return, - } - } - }); - - notify.notified().await; - - server.shutdown().await.unwrap(); - - stop_task.store(true, Ordering::Relaxed); - join_handle.await.unwrap(); - - Ok(()) -} - -#[tokio::test] -async fn test_opentsdb_connection_shutdown() -> Result<()> { - let (tx, _) = mpsc::channel(100); - let server = create_opentsdb_server(tx)?; - let result = server.shutdown().await; - assert!(result - .unwrap_err() - .to_string() - .contains("OpenTSDB server is not started.")); - - let listening = "127.0.0.1:0".parse::().unwrap(); - let addr = server.start(listening).await?; - - let stream = TcpStream::connect(addr).await.unwrap(); - let mut connection = Connection::new(stream); - connection - .write_line("put 1 1 1".to_string()) - .await - .unwrap(); - - server.shutdown().await.unwrap(); - - let shutdown_time = Instant::now(); - let timeout = Duration::from_secs(5 * 60); - let mut i = 2; - loop { - // The connection may not be unwritable after shutdown immediately. - let result = connection.write_line(format!("put {i} 1 1")).await; - i += 1; - if result.is_err() { - if let Err(e) = result { - match e { - Error::InternalIo { .. } => break, - _ => panic!("Not IO error, err is {e}"), - } - } - } - if shutdown_time.elapsed() > timeout { - panic!("Shutdown timeout"); - } - } - - Ok(()) -} - -#[tokio::test] -async fn test_opentsdb_connect_after_shutdown() -> Result<()> { - let (tx, _) = mpsc::channel(100); - let server = create_opentsdb_server(tx)?; - let result = server.shutdown().await; - assert!(result - .unwrap_err() - .to_string() - .contains("OpenTSDB server is not started.")); - - let listening = "127.0.0.1:0".parse::().unwrap(); - let addr = server.start(listening).await?; - - server.shutdown().await.unwrap(); - - assert!(TcpStream::connect(addr).await.is_err()); - - Ok(()) -} - -#[tokio::test] -async fn test_query() -> Result<()> { - let (tx, mut rx) = mpsc::channel(10); - let server = create_opentsdb_server(tx)?; - let listening = "127.0.0.1:0".parse::().unwrap(); - let addr = server.start(listening).await?; - - let stream = TcpStream::connect(addr).await.unwrap(); - let mut connection = Connection::new(stream); - connection.write_line("put 100 1 1".to_string()).await?; - assert_eq!(rx.recv().await.unwrap(), 10000); - - connection - .write_line("foo illegal put line".to_string()) - .await - .unwrap(); - let result = connection.read_line().await?; - assert_eq!( - result, - Some("Invalid query: unknown command foo.".to_string()) - ); - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn test_query_concurrently() -> Result<()> { - let threads = 4; - let expect_executed_queries_per_worker = 1000; - let (tx, mut rx) = mpsc::channel(threads * expect_executed_queries_per_worker); - - let server = create_opentsdb_server(tx)?; - let listening = "127.0.0.1:0".parse::().unwrap(); - let addr = server.start(listening).await?; - - let mut join_handles = vec![]; - for _ in 0..threads { - join_handles.push(tokio::spawn(async move { - let mut rand: StdRng = rand::SeedableRng::from_entropy(); - - let stream = TcpStream::connect(addr).await.unwrap(); - let mut connection = Connection::new(stream); - for i in 0..expect_executed_queries_per_worker { - connection.write_line(format!("put {i} 1 1")).await.unwrap(); - - let should_recreate_conn = rand.gen_range(0..100) == 1; - if should_recreate_conn { - let stream = TcpStream::connect(addr).await.unwrap(); - connection = Connection::new(stream); - } - } - expect_executed_queries_per_worker - })) - } - - let mut total_pending_queries = threads * expect_executed_queries_per_worker; - for handle in join_handles.iter_mut() { - total_pending_queries -= handle.await.unwrap(); - } - assert_eq!(0, total_pending_queries); - - let mut expected_result: i32 = (threads - * (0..expect_executed_queries_per_worker) - .map(|i| i * i) - .sum::()) as i32; - while let Some(i) = rx.recv().await { - expected_result -= i; - if expected_result == 0 { - break; - } - } - Ok(()) -} diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index d269859b869a..fe70b938abbd 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -770,8 +770,6 @@ watch = false [frontend.opentsdb] enable = true -addr = "127.0.0.1:4242" -runtime_size = 2 [frontend.influxdb] enable = true