From a2f3b77caac25c6cbddbef893333d85985960576 Mon Sep 17 00:00:00 2001 From: xingying01 Date: Wed, 27 Dec 2023 19:15:14 +0800 Subject: [PATCH] [fix](stream_load)fix bug for stream 1. forbid thed stream_load without content-length or chunked Transfer Encoding 2. forbid thed stream_load both with content-length and chunked Transfer Encoding --- be/src/http/action/stream_load.cpp | 14 ++++ .../stream_load/test_stream_load.groovy | 70 +++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index d8c49f7c147cd2..e23a0344d64a06 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -285,6 +285,20 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptrheader(HttpHeaders::CONTENT_LENGTH).empty() && + !ctx->is_chunked_transfer))) { + LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set " + "content_length or transfer-encoding=chunked"; + return Status::InvalidArgument( + "content_length is empty and transfer-encoding!=chunked, please set content_length " + "or transfer-encoding=chunked"); + } else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() && + ctx->is_chunked_transfer)) { + LOG(WARNING) << "please do not set both content_length and transfer-encoding"; + return Status::InvalidArgument( + "please do not set both content_length and transfer-encoding"); + } + if (!http_req->header(HTTP_TIMEOUT).empty()) { try { ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT)); diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy index e9a93f82f129a0..310e995b01a997 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy @@ -1044,5 +1044,75 @@ suite("test_stream_load", "p0") { } finally { sql """ DROP TABLE IF EXISTS ${tableName16} FORCE""" } + + def sql_result = sql """ select Host, HttpPort from backends() where alive = true limit 1; """ + + log.info(sql_result[0][0].toString()) + log.info(sql_result[0][1].toString()) + log.info(sql_result[0].size.toString()) + + def beHost=sql_result[0][0] + def beHttpPort=sql_result[0][1] + log.info("${beHost}".toString()) + log.info("${beHttpPort}".toString()); + + //test be : chunked transfer + Content Length + try { + sql """ DROP TABLE IF EXISTS ${tableName16} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName16} ( + `k1` bigint(20) NULL DEFAULT "1", + `k2` bigint(20) NULL , + `v1` tinyint(4) NULL, + `v2` tinyint(4) NULL, + `v3` tinyint(4) NULL, + `v4` DATETIME NULL + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + def command = "curl --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} -H column_separator:| -H ${db}:${tableName16} -H Content-Length:0 -H Transfer-Encoding:chunked -H columns:k1,k2,v1,v2,v3 -T ${context.dataPath}/test_chunked_transfer.csv http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load" + log.info("test chunked transfer command: ${command}") + def process = command.execute() + code = process.waitFor() + out = process.text + log.info("test chunked transfer result: ${out}".toString()) + def json = parseJson(out) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("please do not set both content_length and transfer-encoding")) + } finally { + sql """ DROP TABLE IF EXISTS ${tableName16} FORCE""" + } + + + //test be : no chunked transfer + no Content Length + try { + sql """ DROP TABLE IF EXISTS ${tableName16} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName16} ( + `k1` bigint(20) NULL DEFAULT "1", + `k2` bigint(20) NULL , + `v1` tinyint(4) NULL, + `v2` tinyint(4) NULL, + `v3` tinyint(4) NULL, + `v4` DATETIME NULL + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + def command = "curl --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} -H column_separator:| -H ${db}:${tableName16} -H Content-Length: -H Transfer-Encoding: -T ${context.dataPath}/test_chunked_transfer.csv http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load" + log.info("test chunked transfer command: ${command}") + def process = command.execute() + code = process.waitFor() + out = process.text + log.info("test chunked transfer result: ${out}".toString()) + def json = parseJson(out) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("content_length is empty and transfer-encoding!=chunked, please set content_length or transfer-encoding=chunked")) + } finally { + sql """ DROP TABLE IF EXISTS ${tableName16} FORCE""" + } }