diff --git a/dbms/src/Columns/ColumnAggregateFunction.h b/dbms/src/Columns/ColumnAggregateFunction.h index 873f773df96..4b8c347ad32 100644 --- a/dbms/src/Columns/ColumnAggregateFunction.h +++ b/dbms/src/Columns/ColumnAggregateFunction.h @@ -98,7 +98,7 @@ class ColumnAggregateFunction final : public COWPtrHelper ColumnPtr permute(const Permutation & perm, size_t limit) const override; int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override; void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override; - void reserve(size_t n) override; + size_t capacity() const override { return data->capacity(); } + void reserveWithStrategy(size_t n, ReserveStrategy strategy) override; void reserveAlign(size_t n, size_t alignment) override; size_t byteSize() const override; size_t byteSize(size_t offset, size_t limit) const override; diff --git a/dbms/src/Columns/ColumnConst.h b/dbms/src/Columns/ColumnConst.h index 7548e07b2a8..53f31dd8007 100644 --- a/dbms/src/Columns/ColumnConst.h +++ b/dbms/src/Columns/ColumnConst.h @@ -261,6 +261,8 @@ class ColumnConst final : public COWPtrHelper ColumnPtr permute(const Permutation & perm, size_t limit) const override; void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override; + size_t capacity() const override { return data->capacity(); } + size_t byteSize() const override { return data->byteSize() + sizeof(s); } size_t byteSize(size_t /*offset*/, size_t /*limit*/) const override { return byteSize(); } diff --git a/dbms/src/Columns/ColumnDecimal.cpp b/dbms/src/Columns/ColumnDecimal.cpp index 872b874d19a..0243721499a 100644 --- a/dbms/src/Columns/ColumnDecimal.cpp +++ b/dbms/src/Columns/ColumnDecimal.cpp @@ -764,6 +764,20 @@ MutableColumnPtr ColumnDecimal::cloneResized(size_t size) const return res; } +template +void ColumnDecimal::reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) +{ + switch (strategy) + { + case IColumn::ReserveStrategy::Default: + data.reserve(n); + break; + case IColumn::ReserveStrategy::ScaleFactor1_5: + data.reserve_exact(n / 2 * 3); + break; + } +} + template void ColumnDecimal::insertData(const char * src [[maybe_unused]], size_t /*length*/) { diff --git a/dbms/src/Columns/ColumnDecimal.h b/dbms/src/Columns/ColumnDecimal.h index 43f1f869a3a..576348743aa 100644 --- a/dbms/src/Columns/ColumnDecimal.h +++ b/dbms/src/Columns/ColumnDecimal.h @@ -113,8 +113,9 @@ class ColumnDecimal final : public COWPtrHelper void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override; + size_t capacity() const override { return chars.capacity() / n; } + void insertRangeFrom(const IColumn & src, size_t start, size_t length) override; ColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override; @@ -223,7 +225,7 @@ class ColumnFixedString final : public COWPtrHelper void gather(ColumnGathererStream & gatherer_stream) override; - void reserve(size_t size) override { chars.reserve(n * size); } + void reserveWithStrategy(size_t size, IColumn::ReserveStrategy strategy) override; void reserveAlign(size_t size, size_t alignment) override { chars.reserve(n * size, alignment); } void getExtremes(Field & min, Field & max) const override; diff --git a/dbms/src/Columns/ColumnFunction.h b/dbms/src/Columns/ColumnFunction.h index 229e0ce9ce6..0cb51dbf40d 100644 --- a/dbms/src/Columns/ColumnFunction.h +++ b/dbms/src/Columns/ColumnFunction.h @@ -272,6 +272,9 @@ class ColumnFunction final : public COWPtrHelper throw Exception("getPermutation is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED); } + // A fake capacity for ColumnFunction, as it does not hold data in the same way as other columns. + size_t capacity() const override { return 0; } + void gather(ColumnGathererStream &) override { throw Exception("Method gather is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); diff --git a/dbms/src/Columns/ColumnNothing.h b/dbms/src/Columns/ColumnNothing.h index 697e020b0ff..d5999ee4468 100644 --- a/dbms/src/Columns/ColumnNothing.h +++ b/dbms/src/Columns/ColumnNothing.h @@ -29,9 +29,12 @@ class ColumnNothing final : public COWPtrHelper public: const char * getFamilyName() const override { return "Nothing"; } - MutableColumnPtr cloneDummy(size_t s) const override { return ColumnNothing::create(s); }; + MutableColumnPtr cloneDummy(size_t s) const override { return ColumnNothing::create(s); } bool canBeInsideNullable() const override { return true; } + + // ColumnNothing does not hold any data, so capacity is 0. + size_t capacity() const override { return 1; } }; } // namespace DB diff --git a/dbms/src/Columns/ColumnNullable.cpp b/dbms/src/Columns/ColumnNullable.cpp index 8a5ceaa1b0c..f1769e8210f 100644 --- a/dbms/src/Columns/ColumnNullable.cpp +++ b/dbms/src/Columns/ColumnNullable.cpp @@ -632,10 +632,23 @@ void ColumnNullable::gather(ColumnGathererStream & gatherer) gatherer.gather(*this); } -void ColumnNullable::reserve(size_t n) +size_t ColumnNullable::capacity() const { - getNestedColumn().reserve(n); - getNullMapData().reserve(n); + return getNestedColumn().capacity(); +} + +void ColumnNullable::reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) +{ + getNestedColumn().reserveWithStrategy(n, strategy); + switch (strategy) + { + case ReserveStrategy::Default: + getNullMapData().reserve(n); + break; + case ReserveStrategy::ScaleFactor1_5: + getNullMapData().reserve_exact(n / 2 * 3); + break; + } } void ColumnNullable::reserveAlign(size_t n, size_t alignment) diff --git a/dbms/src/Columns/ColumnNullable.h b/dbms/src/Columns/ColumnNullable.h index ed1fd3f8439..6ae969bd76b 100644 --- a/dbms/src/Columns/ColumnNullable.h +++ b/dbms/src/Columns/ColumnNullable.h @@ -181,7 +181,8 @@ class ColumnNullable final : public COWPtrHelper Permutation & res) const override; void adjustPermutationWithNullDirection(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const; - void reserve(size_t n) override; + size_t capacity() const override; + void reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) override; void reserveAlign(size_t n, size_t alignment) override; void reserveWithTotalMemoryHint(size_t n, Int64 total_memory_hint) override; void reserveAlignWithTotalMemoryHint(size_t n, Int64 total_memory_hint, size_t alignment) override; diff --git a/dbms/src/Columns/ColumnSet.h b/dbms/src/Columns/ColumnSet.h index 94c0bbdc71b..f6b7b04c5b7 100644 --- a/dbms/src/Columns/ColumnSet.h +++ b/dbms/src/Columns/ColumnSet.h @@ -45,6 +45,9 @@ class ColumnSet final : public COWPtrHelper ConstSetPtr getData() const { return data; } + // A fake capacity for ColumnSet, as it does not hold data in the same way as other columns. + size_t capacity() const override { return 1; } + private: ConstSetPtr data; }; diff --git a/dbms/src/Columns/ColumnString.cpp b/dbms/src/Columns/ColumnString.cpp index 65a9e888aad..aeab4b6177d 100644 --- a/dbms/src/Columns/ColumnString.cpp +++ b/dbms/src/Columns/ColumnString.cpp @@ -288,11 +288,26 @@ void ColumnString::gather(ColumnGathererStream & gatherer) gatherer.gather(*this); } +size_t ColumnString::capacity() const +{ + return chars.capacity() / APPROX_STRING_SIZE; // Approximate capacity based on average string size. +} -void ColumnString::reserve(size_t n) +void ColumnString::reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) { - offsets.reserve(n); - chars.reserve(n * APPROX_STRING_SIZE); + switch (strategy) + { + case IColumn::ReserveStrategy::Default: + offsets.reserve(n); + chars.reserve(n * APPROX_STRING_SIZE); + break; + case IColumn::ReserveStrategy::ScaleFactor1_5: + // offsets.reserve_exact(n / 2 * 3); + // chars.reserve_exact(n * APPROX_STRING_SIZE / 2 * 3); + offsets.resize(n / 2 * 3); + chars.resize(n / 2 * APPROX_STRING_SIZE * 3); + break; + } } void ColumnString::reserveAlign(size_t n, size_t alignment) diff --git a/dbms/src/Columns/ColumnString.h b/dbms/src/Columns/ColumnString.h index 828abd7f169..2938b47f15c 100644 --- a/dbms/src/Columns/ColumnString.h +++ b/dbms/src/Columns/ColumnString.h @@ -454,7 +454,9 @@ class ColumnString final : public COWPtrHelper void gather(ColumnGathererStream & gatherer_stream) override; - void reserve(size_t n) override; + size_t capacity() const override; + + void reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) override; void reserveAlign(size_t n, size_t alignment) override; void reserveWithTotalMemoryHint(size_t n, Int64 total_memory_hint) override; diff --git a/dbms/src/Columns/ColumnTuple.cpp b/dbms/src/Columns/ColumnTuple.cpp index bf4372f3375..5b284fc4acb 100644 --- a/dbms/src/Columns/ColumnTuple.cpp +++ b/dbms/src/Columns/ColumnTuple.cpp @@ -421,11 +421,18 @@ void ColumnTuple::gather(ColumnGathererStream & gatherer) gatherer.gather(*this); } -void ColumnTuple::reserve(size_t n) +size_t ColumnTuple::capacity() const +{ + return columns.empty() ? 0 : columns[0]->capacity(); +} + +void ColumnTuple::reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) { const size_t tuple_size = columns.size(); for (size_t i = 0; i < tuple_size; ++i) - getColumn(i).reserve(n); + { + getColumn(i).reserveWithStrategy(n, strategy); + } } void ColumnTuple::reserveAlign(size_t n, size_t alignment) diff --git a/dbms/src/Columns/ColumnTuple.h b/dbms/src/Columns/ColumnTuple.h index 6ae50cbd283..95861b65791 100644 --- a/dbms/src/Columns/ColumnTuple.h +++ b/dbms/src/Columns/ColumnTuple.h @@ -256,7 +256,8 @@ class ColumnTuple final : public COWPtrHelper int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override; void getExtremes(Field & min, Field & max) const override; void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override; - void reserve(size_t n) override; + size_t capacity() const override; + void reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) override; void reserveAlign(size_t n, size_t alignment) override; size_t byteSize() const override; size_t byteSize(size_t offset, size_t limit) const override; diff --git a/dbms/src/Columns/ColumnVector.h b/dbms/src/Columns/ColumnVector.h index 067b696cdfe..194cccd0313 100644 --- a/dbms/src/Columns/ColumnVector.h +++ b/dbms/src/Columns/ColumnVector.h @@ -421,7 +421,19 @@ class ColumnVector final : public COWPtrHelper virtual void gather(ColumnGathererStream & gatherer_stream) = 0; /** Computes minimum and maximum element of the column. - * In addition to numeric types, the funtion is completely implemented for Date and DateTime. - * For strings and arrays function should retrurn default value. + * In addition to numeric types, the function is completely implemented for Date and DateTime. + * For strings and arrays function should return default value. * (except for constant columns; they should return value of the constant). * If column is empty function should return default value. */ virtual void getExtremes(Field & min, Field & max) const = 0; + /// Returns the number of elements that this column can hold without reallocating. + virtual size_t capacity() const = 0; + + enum class ReserveStrategy + { + // Use the default behavior of PODArray, which doubles the capacity each time. + Default, + // Use the strategy that reserves memory for 1.5 times the requested size. + ScaleFactor1_5, + }; + /// Reserves memory for specified amount of elements. If reservation isn't possible, does nothing. /// It affects performance only (not correctness). - virtual void reserve(size_t /*n*/) {} + void reserve(size_t n) { reserveWithStrategy(n, ReserveStrategy::Default); } + + /// Reserves memory for specified amount of elements with strategy. + /// If reservation isn't possible, does nothing. It affects performance only (not correctness). + virtual void reserveWithStrategy(size_t /*n*/, ReserveStrategy /*strategy*/) {} /// Reserves aligned memory for specified amount of elements. If reservation isn't possible, does nothing. /// It affects performance only (not correctness). diff --git a/dbms/src/Columns/IColumnDummy.h b/dbms/src/Columns/IColumnDummy.h index 86b3e6c00bd..cdc5a5a5554 100644 --- a/dbms/src/Columns/IColumnDummy.h +++ b/dbms/src/Columns/IColumnDummy.h @@ -45,6 +45,7 @@ class IColumnDummy : public IColumn virtual MutableColumnPtr cloneDummy(size_t s_) const = 0; MutableColumnPtr cloneResized(size_t s_) const override { return cloneDummy(s_); } + size_t capacity() const override { return s; } size_t size() const override { return s; } void insertDefault() override { ++s; } diff --git a/dbms/src/Common/CurrentMetrics.h b/dbms/src/Common/CurrentMetrics.h index ce45cc7e52c..cc48affb703 100644 --- a/dbms/src/Common/CurrentMetrics.h +++ b/dbms/src/Common/CurrentMetrics.h @@ -18,7 +18,6 @@ #include #include -#include #include /** Allows to count number of simultaneously happening processes or current value of some metric. diff --git a/dbms/src/Common/PODArray.cpp b/dbms/src/Common/PODArray.cpp index 66647a55303..295202778b9 100644 --- a/dbms/src/Common/PODArray.cpp +++ b/dbms/src/Common/PODArray.cpp @@ -16,7 +16,37 @@ namespace DB { +namespace ErrorCodes +{ +extern const int CANNOT_ALLOCATE_MEMORY; +} + /// Used for left padding of PODArray when empty const char EmptyPODArray[EmptyPODArraySize]{}; +namespace PODArrayDetails +{ + +ALWAYS_INLINE size_t byte_size(size_t num_elements, size_t element_size) +{ + size_t amount; + if (__builtin_mul_overflow(num_elements, element_size, &amount)) + throw Exception( + ErrorCodes::CANNOT_ALLOCATE_MEMORY, + "Amount of memory requested to allocate is more than allowed"); + return amount; +} + +ALWAYS_INLINE size_t +minimum_memory_for_elements(size_t num_elements, size_t element_size, size_t pad_left, size_t pad_right) +{ + size_t amount; + if (__builtin_add_overflow(byte_size(num_elements, element_size), pad_left + pad_right, &amount)) + throw Exception( + ErrorCodes::CANNOT_ALLOCATE_MEMORY, + "Amount of memory requested to allocate is more than allowed"); + return amount; +} + +} // namespace PODArrayDetails } // namespace DB diff --git a/dbms/src/Common/PODArray.h b/dbms/src/Common/PODArray.h index 4a71d6ec5fc..7ebe6708028 100644 --- a/dbms/src/Common/PODArray.h +++ b/dbms/src/Common/PODArray.h @@ -28,7 +28,6 @@ #include #include #include -#include #include #ifndef NDEBUG @@ -48,6 +47,20 @@ inline constexpr size_t integerRoundUp(size_t value, size_t dividend) return ((value + dividend - 1) / dividend) * dividend; } +namespace PODArrayDetails +{ +/// The amount of memory occupied by the num_elements of the elements. +size_t byte_size(size_t num_elements, size_t element_size); /// NOLINT + +/// Minimum amount of memory to allocate for num_elements, including padding. +size_t minimum_memory_for_elements( + size_t num_elements, + size_t element_size, + size_t pad_left, + size_t pad_right); /// NOLINT + +}; // namespace PODArrayDetails + /** A dynamic array for POD types. * Designed for a small number of large arrays (rather than a lot of small ones). * To be more precise - for use in ColumnVector. @@ -109,22 +122,12 @@ class PODArrayBase bool is_shared_memory; - [[nodiscard]] __attribute__((always_inline)) std::optional swicthMemoryTracker() + [[nodiscard]] __attribute__((always_inline)) std::optional switchMemoryTracker() { return is_shared_memory ? std::make_optional(true, shared_column_data_mem_tracker.get()) : std::nullopt; } - /// The amount of memory occupied by the num_elements of the elements. - static size_t byte_size(size_t num_elements) { return num_elements * ELEMENT_SIZE; } - - /// Minimum amount of memory to allocate for num_elements, including padding. - static size_t minimum_memory_for_elements(size_t num_elements) - { - auto res = byte_size(num_elements) + pad_right + pad_left; - return res; - } - void alloc_for_num_elements(size_t num_elements) { //alloc_for_num_elements is only used when initialized PODArray based on size or two iterators. @@ -133,14 +136,14 @@ class PODArrayBase //If the users want to do PODArray initialize first, and also will push_back other elements later, //in push_back or emplace_back will do the reserveForNextSize to alloc extra memory. //Thus, we don't need do roundUpToPowerOfTwoOrZero here, and it can cut down extra memory usage, - //and will not have bad affact on performance. - alloc(minimum_memory_for_elements(num_elements)); + //and will not have bad affect on performance. + alloc(PODArrayDetails::minimum_memory_for_elements(num_elements, ELEMENT_SIZE, pad_left, pad_right)); } template void alloc(size_t bytes, TAllocatorParams &&... allocator_params) { - auto guard = swicthMemoryTracker(); + auto guard = switchMemoryTracker(); c_start = c_end = reinterpret_cast(TAllocator::alloc(bytes, std::forward(allocator_params)...)) + pad_left; @@ -157,7 +160,7 @@ class PODArrayBase unprotect(); - auto guard = swicthMemoryTracker(); + auto guard = switchMemoryTracker(); TAllocator::free(c_start - pad_left, allocated_bytes()); } @@ -172,7 +175,7 @@ class PODArrayBase unprotect(); - auto guard = swicthMemoryTracker(); + auto guard = switchMemoryTracker(); ptrdiff_t end_diff = c_end - c_start; c_start = reinterpret_cast(TAllocator::realloc( @@ -197,12 +200,14 @@ class PODArrayBase template void reserveForNextSize(TAllocatorParams &&... allocator_params) { - if (size() == 0) + if (empty()) { // The allocated memory should be multiplication of ELEMENT_SIZE to hold the element, otherwise, // memory issue such as corruption could appear in edge case. realloc( - std::max(((INITIAL_SIZE - 1) / ELEMENT_SIZE + 1) * ELEMENT_SIZE, minimum_memory_for_elements(1)), + std::max( + ((INITIAL_SIZE - 1) / ELEMENT_SIZE + 1) * ELEMENT_SIZE, + PODArrayDetails::minimum_memory_for_elements(1, ELEMENT_SIZE, pad_left, pad_right)), std::forward(allocator_params)...); } else @@ -240,7 +245,7 @@ class PODArrayBase size_t capacity() const { return (c_end_of_storage - c_start) / ELEMENT_SIZE; } /// This method is safe to use only for information about memory usage. - size_t allocated_bytes() const { return c_end_of_storage - c_start + pad_right + pad_left; } + size_t allocated_bytes() const { return c_end_of_storage - c_start + pad_right + pad_left; } // NOLINT void clear() { c_end = c_start; } @@ -249,7 +254,17 @@ class PODArrayBase { if (n > capacity()) realloc( - roundUpToPowerOfTwoOrZero(minimum_memory_for_elements(n)), + roundUpToPowerOfTwoOrZero( + PODArrayDetails::minimum_memory_for_elements(n, ELEMENT_SIZE, pad_left, pad_right)), + std::forward(allocator_params)...); + } + + template + void reserve_exact(size_t n, TAllocatorParams &&... allocator_params) /// NOLINT + { + if (n > capacity()) + realloc( + PODArrayDetails::minimum_memory_for_elements(n, ELEMENT_SIZE, pad_left, pad_right), std::forward(allocator_params)...); } @@ -260,20 +275,26 @@ class PODArrayBase resize_assume_reserved(n); } - void resize_assume_reserved(const size_t n) { c_end = c_start + byte_size(n); } + void resize_assume_reserved(const size_t n) // NOLINT + { + c_end = c_start + PODArrayDetails::byte_size(n, ELEMENT_SIZE); + } - const char * raw_data() const { return c_start; } + const char * raw_data() const { return c_start; } // NOLINT(readability-identifier-naming) template - void push_back_raw(const char * ptr, TAllocatorParams &&... allocator_params) + void push_back_raw(const char * ptr, TAllocatorParams &&... allocator_params) // NOLINT { push_back_raw_many(1, ptr, std::forward(allocator_params)...); } template - void push_back_raw_many(size_t number_of_items, const void * ptr, TAllocatorParams &&... allocator_params) + void push_back_raw_many( // NOLINT(readability-identifier-naming) + size_t number_of_items, + const void * ptr, + TAllocatorParams &&... allocator_params) { - size_t items_byte_size = byte_size(number_of_items); + size_t items_byte_size = PODArrayDetails::byte_size(number_of_items, ELEMENT_SIZE); if (unlikely(c_end + items_byte_size > c_end_of_storage)) reserve(size() + number_of_items, std::forward(allocator_params)...); memcpy(c_end, ptr, items_byte_size); @@ -323,30 +344,34 @@ class PODArray : public PODArrayBase(this->c_end); } const T * t_end_of_storage() const { return reinterpret_cast(this->c_end_of_storage); } + size_t byte_size(size_t n) const { return PODArrayDetails::byte_size(n, sizeof(T)); } + public: using value_type = T; /// You can not just use `typedef`, because there is ambiguity for the constructors and `assign` functions. - struct iterator : public boost::iterator_adaptor + struct iterator // NOLINT(readability-identifier-naming) + : public boost::iterator_adaptor { - iterator() {} - iterator(T * ptr_) + iterator() = default; + iterator(T * ptr_) // NOLINT : iterator::iterator_adaptor_(ptr_) {} }; - struct const_iterator : public boost::iterator_adaptor + struct const_iterator // NOLINT(readability-identifier-naming) + : public boost::iterator_adaptor { - const_iterator() {} - const_iterator(const T * ptr_) + const_iterator() = default; + const_iterator(const T * ptr_) // NOLINT : const_iterator::iterator_adaptor_(ptr_) {} }; - PODArray() {} + PODArray() = default; - PODArray(size_t n) + PODArray(size_t n) // NOLINT { this->alloc_for_num_elements(n); this->c_end += this->byte_size(n); @@ -398,7 +423,7 @@ class PODArray : public PODArrayBase - void resize_fill_zero(size_t n, TAllocatorParams &&... allocator_params) + void resize_fill_zero(size_t n, TAllocatorParams &&... allocator_params) // NOLINT(readability-identifier-naming) { size_t old_size = this->size(); if (n > old_size) @@ -410,7 +435,10 @@ class PODArray : public PODArrayBase - void resize_fill(size_t n, const T & value, TAllocatorParams &&... allocator_params) + void resize_fill( // NOLINT(readability-identifier-naming) + size_t n, + const T & value, + TAllocatorParams &&... allocator_params) { size_t old_size = this->size(); if (n > old_size) @@ -422,7 +450,7 @@ class PODArray : public PODArrayBase - void push_back(U && x, TAllocatorParams &&... allocator_params) + void push_back(U && x, TAllocatorParams &&... allocator_params) // NOLINT(readability-identifier-naming) { // FIXME:: It's dangerous here !! if (unlikely(this->c_end >= this->c_end_of_storage)) @@ -436,7 +464,7 @@ class PODArray : public PODArrayBase - void emplace_back(Args &&... args) + void emplace_back(Args &&... args) // NOLINT(readability-identifier-naming) { if (unlikely(this->c_end >= this->c_end_of_storage)) this->reserveForNextSize(); @@ -445,7 +473,7 @@ class PODArray : public PODArrayBasec_end += this->byte_size(1); } - void pop_back() { this->c_end -= this->byte_size(1); } + void pop_back() { this->c_end -= this->byte_size(1); } // NOLINT(readability-identifier-naming) /// Do not insert into the array a piece of itself. Because with the resize, the iterators on themselves can be invalidated. template @@ -494,7 +522,7 @@ class PODArray : public PODArrayBase - void insert_assume_reserved(It1 from_begin, It2 from_end) + void insert_assume_reserved(It1 from_begin, It2 from_end) // NOLINT(readability-identifier-naming) { size_t bytes_to_copy = this->byte_size(from_end - from_begin); memcpy(this->c_end, reinterpret_cast(&*from_begin), bytes_to_copy); diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index dd919cc6abe..78adb8422e3 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -370,7 +370,10 @@ MutableColumns Block::cloneEmptyColumns() const size_t num_columns = data.size(); MutableColumns columns(num_columns); for (size_t i = 0; i < num_columns; ++i) + { columns[i] = data[i].column ? data[i].column->cloneEmpty() : data[i].type->createColumn(); + columns[i]->reserveWithStrategy(0, IColumn::ReserveStrategy::ScaleFactor1_5); + } return columns; } diff --git a/dbms/src/DataStreams/AsynchronousBlockInputStream.h b/dbms/src/DataStreams/AsynchronousBlockInputStream.h index 8b2b56b0a17..06bae9eaef5 100644 --- a/dbms/src/DataStreams/AsynchronousBlockInputStream.h +++ b/dbms/src/DataStreams/AsynchronousBlockInputStream.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index 2265e28f14f..7f8e21981ce 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include @@ -24,10 +23,6 @@ #include #include -#include -#include -#include -#include /** Allows to process multiple block input streams (sources) in parallel, using specified number of threads. diff --git a/dbms/src/Dictionaries/CacheDictionary.h b/dbms/src/Dictionaries/CacheDictionary.h index 9a45907ee5e..989df4f70bd 100644 --- a/dbms/src/Dictionaries/CacheDictionary.h +++ b/dbms/src/Dictionaries/CacheDictionary.h @@ -16,14 +16,12 @@ #include #include -#include #include #include #include #include #include -#include #include #include #include diff --git a/dbms/src/Interpreters/ProcessList.h b/dbms/src/Interpreters/ProcessList.h index 2bf5e02140c..ddc0a131a10 100644 --- a/dbms/src/Interpreters/ProcessList.h +++ b/dbms/src/Interpreters/ProcessList.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include diff --git a/dbms/src/Interpreters/QueryPriorities.h b/dbms/src/Interpreters/QueryPriorities.h index ad201cd97bd..946cc5e34c1 100644 --- a/dbms/src/Interpreters/QueryPriorities.h +++ b/dbms/src/Interpreters/QueryPriorities.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include diff --git a/dbms/src/Server/TCPHandler.h b/dbms/src/Server/TCPHandler.h index a83c03fed63..846163eb038 100644 --- a/dbms/src/Server/TCPHandler.h +++ b/dbms/src/Server/TCPHandler.h @@ -15,7 +15,6 @@ #pragma once #include -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp index 57f249d436c..7629fe48da9 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp @@ -91,18 +91,51 @@ ColumnFile::AppendResult ColumnFileInMemory::append( return AppendResult{false, 0}; size_t new_alloc_block_bytes = 0; + size_t max_capacity = 0; for (size_t i = 0; i < cache->block.columns(); ++i) { const auto & col = data.getByPosition(i).column; const auto & cache_col = *cache->block.getByPosition(i).column; auto * mutable_cache_col = const_cast(&cache_col); size_t alloc_bytes = mutable_cache_col->allocatedBytes(); + if (mutable_cache_col->capacity() < mutable_cache_col->size() + limit) + { + // If the column is not enough, we reserve more space by 1.5 factor + mutable_cache_col->reserveWithStrategy( + mutable_cache_col->size() + limit, + IColumn::ReserveStrategy::ScaleFactor1_5); + } mutable_cache_col->insertRangeFrom(*col, offset, limit); + max_capacity = std::max(max_capacity, mutable_cache_col->capacity()); new_alloc_block_bytes += mutable_cache_col->allocatedBytes() - alloc_bytes; } rows += limit; bytes += data_bytes; + // TODO: Remove this logging + LOG_INFO( + Logger::get(), + "Append rows to ColumnFileInMemory, new_rows={} new_bytes={} new_alloc_bytes={} max_capacity={} total_rows={} " + "total_bytes={}", + limit - offset, + data_bytes, + new_alloc_block_bytes, + max_capacity, + rows, + bytes); + for (size_t i = 0; i < cache->block.columns(); ++i) + { + const auto & cache_col = *cache->block.getByPosition(i).column; + LOG_INFO( + Logger::get(""), + "col i={} name={} type={} rows={} bytes={} alloc_bytes={}", + i, + cache->block.getByPosition(i).name, + cache_col.getName(), + cache_col.size(), + cache_col.byteSize(), + cache_col.allocatedBytes()); + } return AppendResult{true, new_alloc_block_bytes}; } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h index 257b3327434..e5f8fd8ad2f 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h @@ -98,6 +98,7 @@ class ColumnFileInMemory : public ColumnFile bool isAppendable() const override { return !disable_append; } void disableAppend() override; + AppendResult append( const DMContext & dm_context, const Block & data, diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/tests/gtest_dm_column_file.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/tests/gtest_dm_column_file.cpp index 9d63a7aef04..130d631fba8 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/tests/gtest_dm_column_file.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/tests/gtest_dm_column_file.cpp @@ -38,6 +38,8 @@ #include #include +#include "Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h" + namespace DB::DM::tests { @@ -228,6 +230,35 @@ try } CATCH +TEST_P(ColumnFileTest, CreateColumnFileInMemory) +try +{ + size_t num_rows_write = 10; + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + for (Int64 i = 0; i < 1000; ++i) + { + auto null_col = ColumnNullable::create(ColumnString::create(), ColumnUInt8::create()); + for (size_t j = 0; j < num_rows_write; ++j) + { + null_col->insertDefault(); + } + block.insert(ColumnWithTypeAndName{ + std::move(null_col), + std::make_shared(std::make_shared()), + fmt::format("field_{}", i), + }); + } + // A 1000 columns empty block takes about 238KB + auto schema = ColumnFileSchema(block); + ColumnFileInMemoryPtr cf_in_mem = std::make_shared(std::make_shared(schema)); + LOG_INFO( + Logger::get(), + "ColumnFileInMemory, bytes={} alloc_bytes={}", + cf_in_mem->getBytes(), + cf_in_mem->getAllocateBytes()); +} +CATCH + TEST_P(ColumnFileTest, ReadColumns) try { diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h index 1d8a8d9f4c1..8277c836e9d 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp index 700c6bb214a..2c92f319c41 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp @@ -284,7 +284,7 @@ void DMFileMetaV2::read(const FileProviderPtr & file_provider, const ReadMode & { break; } - LOG_WARNING(log, "{}'s size is larger than {}", metaPath(), buf.size()); + // LOG_WARNING(log, "{}'s size is larger than {}", metaPath(), buf.size()); buf.resize(buf.size() + meta_buffer_size); } buf.resize(read_bytes); diff --git a/dbms/src/Storages/DeltaMerge/VersionChain/tests/mvcc_test_utils.h b/dbms/src/Storages/DeltaMerge/VersionChain/tests/mvcc_test_utils.h index 9f0ad183691..58db88a54cc 100644 --- a/dbms/src/Storages/DeltaMerge/VersionChain/tests/mvcc_test_utils.h +++ b/dbms/src/Storages/DeltaMerge/VersionChain/tests/mvcc_test_utils.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index 2d2b4d38503..e6cb692d351 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -607,6 +608,102 @@ try } CATCH +double get_process_resident_mb() +{ + auto mu = DB::get_process_mem_usage(); + return mu.resident_bytes / 1024.0 / 1024; +} + +TEST_F(SegmentOperationTest, TestMassiveSegment) +try +{ + const size_t level = 1; + const size_t segment_range_size = 500; + for (size_t lvl = 0; lvl < level; ++lvl) + { + size_t num_expected_segs = 200; + // size_t num_expected_segs = 10; + size_t progress_interval = 100; + const auto lvl_beg_seg_id = segments.rbegin()->first; + { + auto seg = segments[lvl_beg_seg_id]; + LOG_INFO(log, "lvl={} beg_seg_id={} rowkey={}", lvl, lvl_beg_seg_id, seg->getRowKeyRange().toString()); + } + auto next_split_seg_id = lvl_beg_seg_id; + for (size_t i = 0; i < num_expected_segs; ++i) + { + auto split_point = (lvl * num_expected_segs + 1 + i) * segment_range_size; + auto n_seg_id = splitSegmentAt(next_split_seg_id, split_point, Segment::SplitMode::Logical); + ASSERT_TRUE(n_seg_id.has_value()) << fmt::format("i={} sp={}", i, split_point); + next_split_seg_id = *n_seg_id; + if (i % progress_interval == 0) + { + LOG_INFO( + log, + "lvl={} round={} split_point={} next_seg_id={} mem_resident_set={:.3f}MB)", + lvl, + i, + split_point, + *n_seg_id, + get_process_resident_mb()); + } + } + LOG_INFO(log, "lvl={} round={} mem_resident_set={:.3f}MB", lvl, num_expected_segs, get_process_resident_mb()); + + size_t round = 0; + for (auto && [seg_id, seg] : segments) + { + // next_split_seg_id is the last segment created in this level, skip it + if (seg_id == next_split_seg_id) + continue; + + if (seg_id < lvl_beg_seg_id) + continue; // skip segments created in previous levels + + size_t write_rows = 4; + size_t write_rows_sub = 2; + if (round % progress_interval == 0) + { + LOG_INFO( + log, + "lvl={} round={} written_rows={} mem_resident_set={:.3f}MB", + lvl, + round, + write_rows * round, + get_process_resident_mb()); + } + for (size_t k = 0; k < 2; ++k) + { + writeToCache( + seg_id, + write_rows_sub, + /* start_at */ lvl * num_expected_segs * segment_range_size + round * segment_range_size, + false, + std::nullopt); + LOG_INFO( + log, + "lvl={} round={} k={} seg_id={} written_rows={} mem_tbl_bytes={} mem_tbl_alloc_bytes={}", + lvl, + round, + k, + seg_id, + write_rows, + segments[seg_id]->getDelta()->getTotalCacheBytes(), + segments[seg_id]->getDelta()->getTotalAllocatedBytes()); + } + round++; + } + { + LOG_INFO( + log, + "TestMassiveSegment done, segments.size()={} lvl={} mem_resident_set={:.3f}MB", + segments.size(), + lvl, + get_process_resident_mb()); + } + } +} +CATCH class SegmentEnableLogicalSplitTest : public SegmentOperationTest { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 76540fd0949..495914f0fe8 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -92,8 +92,18 @@ void SegmentTestBasic::reloadWithOptions(SegmentTestOptions config) TiFlashStorageTestBasic::SetUp(); options = config; table_columns = std::make_shared(); + ColumnDefinesPtr cols = DMTestEnv::getDefaultColumns(); + for (Int64 i = 0; i < 1000; ++i) + { + // add a new column + cols->emplace_back(ColumnDefine{ + i + 100, + fmt::format("field_{}", i), + DB::tests::typeFromString("Nullable(String)"), + }); + } - root_segment = reload(config.is_common_handle, nullptr, std::move(config.db_settings)); + root_segment = reload(config.is_common_handle, cols, std::move(config.db_settings)); ASSERT_EQ(root_segment->segmentId(), DELTA_MERGE_FIRST_SEGMENT_ID); segments.clear(); segments[DELTA_MERGE_FIRST_SEGMENT_ID] = root_segment; @@ -543,6 +553,19 @@ Block SegmentTestBasic::prepareWriteBlockInSegmentRange( RUNTIME_CHECK(write_end_key_this_round <= segment_end_key); Block block = prepareWriteBlock(*write_start_key, write_end_key_this_round, is_deleted, including_right_boundary, ts); + for (Int64 i = 0; i < 1000; ++i) + { + auto null_col = ColumnNullable::create(ColumnString::create(), ColumnUInt8::create()); + for (size_t j = 0; j < write_rows_this_round; ++j) + { + null_col->insertDefault(); + } + block.insert(ColumnWithTypeAndName{ + std::move(null_col), + std::make_shared(std::make_shared()), + fmt::format("field_{}", i), + }); + } blocks.emplace_back(block); remaining_rows -= write_rows_this_round + including_right_boundary; diff --git a/dbms/src/Storages/System/StorageSystemColumns.cpp b/dbms/src/Storages/System/StorageSystemColumns.cpp index 9436d994d5b..b61c3ac7c7b 100644 --- a/dbms/src/Storages/System/StorageSystemColumns.cpp +++ b/dbms/src/Storages/System/StorageSystemColumns.cpp @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include @@ -24,6 +23,7 @@ #include #include #include +#include namespace DB diff --git a/dbms/src/Storages/System/StorageSystemTables.cpp b/dbms/src/Storages/System/StorageSystemTables.cpp index 1fc7b7b8c61..47e8b8ad425 100644 --- a/dbms/src/Storages/System/StorageSystemTables.cpp +++ b/dbms/src/Storages/System/StorageSystemTables.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include #include @@ -29,6 +28,7 @@ #include #include #include +#include #include #include diff --git a/dbms/src/Columns/VirtualColumnUtils.cpp b/dbms/src/Storages/System/VirtualColumnUtils.cpp similarity index 98% rename from dbms/src/Columns/VirtualColumnUtils.cpp rename to dbms/src/Storages/System/VirtualColumnUtils.cpp index a01864d42f2..72261354889 100644 --- a/dbms/src/Columns/VirtualColumnUtils.cpp +++ b/dbms/src/Storages/System/VirtualColumnUtils.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include -#include #include #include #include @@ -24,6 +23,7 @@ #include #include #include +#include namespace DB::VirtualColumnUtils diff --git a/dbms/src/Columns/VirtualColumnUtils.h b/dbms/src/Storages/System/VirtualColumnUtils.h similarity index 100% rename from dbms/src/Columns/VirtualColumnUtils.h rename to dbms/src/Storages/System/VirtualColumnUtils.h