Skip to content

Commit

Permalink
Merge pull request #151 from Altinity/customizations/21.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Enmk authored Jun 1, 2022
2 parents bb327c3 + a0418cb commit b84aa2e
Show file tree
Hide file tree
Showing 23 changed files with 252 additions and 72 deletions.
6 changes: 3 additions & 3 deletions cmake/autogenerated_versions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
SET(VERSION_REVISION 54445)
SET(VERSION_MAJOR 21)
SET(VERSION_MINOR 1)
SET(VERSION_PATCH 9)
SET(VERSION_PATCH 10)
SET(VERSION_GITHASH 99169ea26e214bd3cf711061a093dfdb7dc60a2c)
SET(VERSION_DESCRIBE v21.1.9.41-stable)
SET(VERSION_STRING 21.1.9.41)
SET(VERSION_DESCRIBE v21.1.10.1-stable)
SET(VERSION_STRING 21.1.10.1)
# end of autochange
4 changes: 2 additions & 2 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
clickhouse (21.1.9.1) unstable; urgency=low
clickhouse (21.1.10.1) unstable; urgency=low

* Modified source code

-- clickhouse-release <[email protected]> Wed, 07 Apr 2021 15:43:25 +0300
-- clickhouse-release <[email protected]> Tue, 13 Apr 2021 14:31:11 +0300
2 changes: 1 addition & 1 deletion docker/client/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM ubuntu:18.04

ARG repository="deb https://repo.clickhouse.tech/deb/stable/ main/"
ARG version=21.1.9.*
ARG version=21.1.10.*

RUN apt-get update \
&& apt-get install --yes --no-install-recommends \
Expand Down
2 changes: 1 addition & 1 deletion docker/server/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM ubuntu:20.04

ARG repository="deb https://repo.clickhouse.tech/deb/stable/ main/"
ARG version=21.1.9.*
ARG version=21.1.10.*
ARG gosu_ver=1.10

# user/group precreated explicitly with fixed uid/gid on purpose.
Expand Down
2 changes: 1 addition & 1 deletion docker/test/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM ubuntu:18.04

ARG repository="deb https://repo.clickhouse.tech/deb/stable/ main/"
ARG version=21.1.9.*
ARG version=21.1.10.*

RUN apt-get update && \
apt-get install -y apt-transport-https dirmngr && \
Expand Down
51 changes: 35 additions & 16 deletions src/AggregateFunctions/QuantileTDigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,6 @@ class QuantileTDigest
BetterFloat l_count = l->count;
while (r != centroids.end())
{
if (l->mean == r->mean) // Perfect aggregation (fast). We compare l->mean, not l_mean, to avoid identical elements after compress
{
l_count += r->count;
l->count = l_count;
++r;
continue;
}
// we use quantile which gives us the smallest error

/// The ratio of the part of the histogram to l, including the half l to the entire histogram. That is, what level quantile in position l.
BetterFloat ql = (sum + l_count * 0.5) / count;
BetterFloat err = ql * (1 - ql);
Expand Down Expand Up @@ -320,16 +311,29 @@ class QuantileTDigest
Float64 prev_x = 0;
Count sum = 0;
Value prev_mean = centroids.front().mean;
Count prev_count = centroids.front().count;

for (const auto & c : centroids)
{
Float64 current_x = sum + c.count * 0.5;

if (current_x >= x)
return interpolate(x, prev_x, prev_mean, current_x, c.mean);
{
/// Special handling of singletons.
Float64 left = prev_x + 0.5*(prev_count == 1);
Float64 right = current_x - 0.5*(c.count == 1);

if (x <= left)
return prev_mean;
else if (x >= right)
return c.mean;
else
return interpolate(x, left, prev_mean, right, c.mean);
}

sum += c.count;
prev_mean = c.mean;
prev_count = c.count;
prev_x = current_x;
}

Expand Down Expand Up @@ -364,25 +368,40 @@ class QuantileTDigest
Float64 prev_x = 0;
Count sum = 0;
Value prev_mean = centroids.front().mean;
Count prev_count = centroids.front().count;

size_t result_num = 0;
for (const auto & c : centroids)
{
Float64 current_x = sum + c.count * 0.5;

while (current_x >= x)
if (current_x >= x)
{
result[levels_permutation[result_num]] = interpolate(x, prev_x, prev_mean, current_x, c.mean);
/// Special handling of singletons.
Float64 left = prev_x + 0.5*(prev_count == 1);
Float64 right = current_x - 0.5*(c.count == 1);

while (current_x >= x)
{

if (x <= left)
result[levels_permutation[result_num]] = prev_mean;
else if (x >= right)
result[levels_permutation[result_num]] = c.mean;
else
result[levels_permutation[result_num]] = interpolate(x, left, prev_mean, right, c.mean);

++result_num;
if (result_num >= size)
return;
++result_num;
if (result_num >= size)
return;

x = levels[levels_permutation[result_num]] * count;
x = levels[levels_permutation[result_num]] * count;
}
}

sum += c.count;
prev_mean = c.mean;
prev_count = c.count;
prev_x = current_x;
}

Expand Down
4 changes: 4 additions & 0 deletions src/Compression/CompressionCodecDelta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ void CompressionCodecDelta::doDecompressData(const char * source, UInt32 source_
throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);

UInt8 bytes_size = source[0];

if (bytes_size == 0)
throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);

UInt8 bytes_to_skip = uncompressed_size % bytes_size;

if (UInt32(2 + bytes_to_skip) > source_size)
Expand Down
4 changes: 4 additions & 0 deletions src/Compression/CompressionCodecDoubleDelta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ void CompressionCodecDoubleDelta::doDecompressData(const char * source, UInt32 s
throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);

UInt8 bytes_size = source[0];

if (bytes_size == 0)
throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);

UInt8 bytes_to_skip = uncompressed_size % bytes_size;

if (UInt32(2 + bytes_to_skip) > source_size)
Expand Down
4 changes: 4 additions & 0 deletions src/Compression/CompressionCodecGorilla.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ void CompressionCodecGorilla::doDecompressData(const char * source, UInt32 sourc
throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);

UInt8 bytes_size = source[0];

if (bytes_size == 0)
throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);

UInt8 bytes_to_skip = uncompressed_size % bytes_size;

if (UInt32(2 + bytes_to_skip) > source_size)
Expand Down
12 changes: 8 additions & 4 deletions src/Compression/CompressionCodecLZ4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ namespace DB

namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}

CompressionCodecLZ4::CompressionCodecLZ4()
Expand Down Expand Up @@ -51,7 +52,10 @@ UInt32 CompressionCodecLZ4::doCompressData(const char * source, UInt32 source_si

void CompressionCodecLZ4::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
LZ4::decompress(source, dest, source_size, uncompressed_size, lz4_stat);
bool success = LZ4::decompress(source, dest, source_size, uncompressed_size, lz4_stat);

if (!success)
throw Exception("Cannot decompress", ErrorCodes::CANNOT_DECOMPRESS);
}

void registerCodecLZ4(CompressionCodecFactory & factory)
Expand Down
44 changes: 33 additions & 11 deletions src/Compression/LZ4_decompress_faster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,16 @@ template <> void inline copyOverlap<32, false>(UInt8 * op, const UInt8 *& match,
/// See also https://stackoverflow.com/a/30669632

template <size_t copy_amount, bool use_shuffle>
void NO_INLINE decompressImpl(
bool NO_INLINE decompressImpl(
const char * const source,
char * const dest,
size_t source_size,
size_t dest_size)
{
const UInt8 * ip = reinterpret_cast<const UInt8 *>(source);
UInt8 * op = reinterpret_cast<UInt8 *>(dest);
const UInt8 * const input_end = ip + source_size;
UInt8 * const output_begin = op;
UInt8 * const output_end = op + dest_size;

/// Unrolling with clang is doing >10% performance degrade.
Expand All @@ -436,11 +439,14 @@ void NO_INLINE decompressImpl(
{
s = *ip++;
length += s;
} while (unlikely(s == 255));
} while (unlikely(s == 255 && ip < input_end));
};

/// Get literal length.

if (unlikely(ip >= input_end))
return false;

const unsigned token = *ip++;
length = token >> 4;
if (length == 0x0F)
Expand All @@ -461,20 +467,29 @@ void NO_INLINE decompressImpl(
/// output: xyzHello, w
/// ^-op (we will overwrite excessive bytes on next iteration)

if (unlikely(copy_end > output_end))
return false;

wildCopy<copy_amount>(op, ip, copy_end); /// Here we can write up to copy_amount - 1 bytes after buffer.

if (copy_end == output_end)
return true;

ip += length;
op = copy_end;

if (copy_end >= output_end)
return;
if (unlikely(ip + 1 >= input_end))
return false;

/// Get match offset.

size_t offset = unalignedLoad<UInt16>(ip);
ip += 2;
const UInt8 * match = op - offset;

if (unlikely(match < output_begin))
return false;

/// Get match length.

length = token & 0x0F;
Expand Down Expand Up @@ -515,7 +530,11 @@ void NO_INLINE decompressImpl(

copy<copy_amount>(op, match); /// copy_amount + copy_amount - 1 - 4 * 2 bytes after buffer.
if (length > copy_amount * 2)
{
if (unlikely(copy_end > output_end))
return false;
wildCopy<copy_amount>(op + copy_amount, match + copy_amount, copy_end);
}

op = copy_end;
}
Expand All @@ -524,15 +543,15 @@ void NO_INLINE decompressImpl(
}


void decompress(
bool decompress(
const char * const source,
char * const dest,
size_t source_size,
size_t dest_size,
PerformanceStatistics & statistics [[maybe_unused]])
{
if (source_size == 0 || dest_size == 0)
return;
return true;

/// Don't run timer if the block is too small.
if (dest_size >= 32768)
Expand All @@ -542,24 +561,27 @@ void decompress(
/// Run the selected method and measure time.

Stopwatch watch;
bool success = true;
if (best_variant == 0)
decompressImpl<16, true>(source, dest, dest_size);
success = decompressImpl<16, true>(source, dest, source_size, dest_size);
if (best_variant == 1)
decompressImpl<16, false>(source, dest, dest_size);
success = decompressImpl<16, false>(source, dest, source_size, dest_size);
if (best_variant == 2)
decompressImpl<8, true>(source, dest, dest_size);
success = decompressImpl<8, true>(source, dest, source_size, dest_size);
if (best_variant == 3)
decompressImpl<32, false>(source, dest, dest_size);
success = decompressImpl<32, false>(source, dest, source_size, dest_size);

watch.stop();

/// Update performance statistics.

statistics.data[best_variant].update(watch.elapsedSeconds(), dest_size);

return success;
}
else
{
decompressImpl<8, false>(source, dest, dest_size);
return decompressImpl<8, false>(source, dest, source_size, dest_size);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/Compression/LZ4_decompress_faster.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ struct PerformanceStatistics
return choose_method;
}

PerformanceStatistics() {}
PerformanceStatistics(ssize_t choose_method_) : choose_method(choose_method_) {}
PerformanceStatistics() = default;
explicit PerformanceStatistics(ssize_t choose_method_) : choose_method(choose_method_) {}
};


/** This method dispatch to one of different implementations depending on performance statistics.
*/
void decompress(
bool decompress(
const char * const source,
char * const dest,
size_t source_size,
Expand Down
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class IColumn;
M(UInt64, background_buffer_flush_schedule_pool_size, 16, "Number of threads performing background flush for tables with Buffer engine. Only has meaning at server startup.", 0) \
M(UInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \
M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \
M(UInt64, background_fetches_pool_size, 3, "Number of threads performing background fetches for replicated tables. Only has meaning at server startup.", 0) \
M(UInt64, background_fetches_pool_size, 8, "Number of threads performing background fetches for replicated tables. Only has meaning at server startup.", 0) \
M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \
M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for message streaming. Only has meaning at server startup.", 0) \
M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \
Expand Down
27 changes: 26 additions & 1 deletion src/IO/S3/PocoHTTPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,32 @@ void PocoHTTPClient::makeRequestInternal(

Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1);

poco_request.setURI(poco_uri.getPathAndQuery());
/** Aws::Http::URI will encode URL in appropriate way for AWS S3 server.
* Poco::URI also does that correctly but it's not compatible with AWS.
* For example, `+` symbol will not be converted to `%2B` by Poco and would
* be received as space symbol.
*
* References:
* https://github.com/aws/aws-sdk-java/issues/1946
* https://forums.aws.amazon.com/thread.jspa?threadID=55746
*
* Example:
* Suppose we are requesting a file: abc+def.txt
* To correctly do it, we need to construct an URL containing either:
* - abc%2Bdef.txt
* this is also technically correct:
* - abc+def.txt
* but AWS servers don't support it properly, interpreting plus character as whitespace
* although it is in path part, not in query string.
* e.g. this is not correct:
* - abc%20def.txt
*
* Poco will keep plus character as is (which is correct) while AWS servers will treat it as whitespace, which is not what is intended.
* To overcome this limitation, we encode URL with "Aws::Http::URI" and then pass already prepared URL to Poco.
*/

Aws::Http::URI aws_target_uri(uri);
poco_request.setURI(aws_target_uri.GetPath() + aws_target_uri.GetQueryString());

switch (request.GetMethod())
{
Expand Down
Loading

0 comments on commit b84aa2e

Please sign in to comment.