From b19f538ada4bc1e68ea97de8268aa29783beefae Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 1 Jul 2025 22:46:59 +0800 Subject: [PATCH 1/4] Update to tiflash system table Signed-off-by: JaySon-Huang --- .../ColumnFile/ColumnFileInMemory.cpp | 17 +++ .../src/Storages/DeltaMerge/DeltaMergeStore.h | 1 + .../Storages/DeltaMerge/File/DMFileMetaV2.cpp | 2 +- .../DeltaMerge/tests/gtest_segment.cpp | 135 ++++++++++++++++++ .../tests/gtest_segment_test_basic.cpp | 25 +++- 5 files changed, 178 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp index 57f249d436c..afe14a63865 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp @@ -97,12 +97,29 @@ ColumnFile::AppendResult ColumnFileInMemory::append( const auto & cache_col = *cache->block.getByPosition(i).column; auto * mutable_cache_col = const_cast(&cache_col); size_t alloc_bytes = mutable_cache_col->allocatedBytes(); + if (mutable_cache_col->capacity() < mutable_cache_col->size() + limit) + { + // If the column is not enough, we reserve more space by 1.5 factor + mutable_cache_col->reserveWithStrategy( + mutable_cache_col->size() + limit, + IColumn::ReserveStrategy::ScaleFactor1_5); + } mutable_cache_col->insertRangeFrom(*col, offset, limit); new_alloc_block_bytes += mutable_cache_col->allocatedBytes() - alloc_bytes; } rows += limit; bytes += data_bytes; + // TODO: Remove this logging + LOG_INFO( + Logger::get(), + "Append rows to ColumnFileInMemory, new_rows={} new_bytes={} new_alloc_bytes={} max_capacity={} total_rows={} " + "total_bytes={}", + limit - offset, + data_bytes, + new_alloc_block_bytes, + rows, + bytes); return AppendResult{true, new_alloc_block_bytes}; } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 3b9e3c2f0d1..5df509ba44f 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp index 700c6bb214a..2c92f319c41 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp @@ -284,7 +284,7 @@ void DMFileMetaV2::read(const FileProviderPtr & file_provider, const ReadMode & { break; } - LOG_WARNING(log, "{}'s size is larger than {}", metaPath(), buf.size()); + // LOG_WARNING(log, "{}'s size is larger than {}", metaPath(), buf.size()); buf.resize(buf.size() + meta_buffer_size); } buf.resize(read_bytes); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index 2d2b4d38503..11409cf66fe 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -608,6 +608,141 @@ try CATCH +namespace +{ +struct ProcessMemoryUsage +{ + double resident_mb; + Int64 cur_proc_num_threads; + double cur_virt_mb; +}; + +bool process_mem_usage(double & resident_set, Int64 & cur_proc_num_threads, UInt64 & cur_virt_size) +{ + resident_set = 0.0; + + // 'file' stat seems to give the most reliable results + std::ifstream stat_stream("/proc/self/stat", std::ios_base::in); + // if "/proc/self/stat" is not supported + if (!stat_stream.is_open()) + return false; + + // dummy vars for leading entries in stat that we don't care about + std::string pid, comm, state, ppid, pgrp, session, tty_nr; + std::string tpgid, flags, minflt, cminflt, majflt, cmajflt; + std::string utime, stime, cutime, cstime, priority, nice; + std::string itrealvalue, starttime; + + // the field we want + Int64 rss; + + stat_stream >> pid >> comm >> state >> ppid >> pgrp >> session >> tty_nr >> tpgid >> flags >> minflt >> cminflt + >> majflt >> cmajflt >> utime >> stime >> cutime >> cstime >> priority >> nice >> cur_proc_num_threads + >> itrealvalue >> starttime >> cur_virt_size >> rss; // don't care about the rest + + stat_stream.close(); + + Int64 page_size_kb = sysconf(_SC_PAGE_SIZE) / 1024; // in case x86-64 is configured to use 2MB pages + resident_set = rss * page_size_kb; + return true; +} +ProcessMemoryUsage get_process_mem_usage() +{ + double resident_set; + Int64 cur_proc_num_threads = 1; + UInt64 cur_virt_size = 0; + process_mem_usage(resident_set, cur_proc_num_threads, cur_virt_size); + resident_set *= 1024; // unit: byte + return ProcessMemoryUsage{ + resident_set / 1024.0 / 1024, + cur_proc_num_threads, + cur_virt_size / 1024.0 / 1024, + }; +} +} // namespace + +TEST_F(SegmentOperationTest, TestMassiveSegment) +try +{ + const size_t level = 5; + for (size_t lvl = 0; lvl < level; ++lvl) + { + size_t num_expected_segs = 1000; + // size_t num_expected_segs = 10; + size_t progress_interval = 100; + const auto lvl_beg_seg_id = segments.rbegin()->first; + { + auto seg = segments[lvl_beg_seg_id]; + LOG_INFO(log, "lvl={} beg_seg_id={} rowkey={}", lvl, lvl_beg_seg_id, seg->getRowKeyRange().toString()); + } + auto next_split_seg_id = lvl_beg_seg_id; + for (size_t i = 0; i < num_expected_segs; ++i) + { + auto split_point = (lvl * num_expected_segs + 1 + i) * 500; + auto n_seg_id = splitSegmentAt(next_split_seg_id, split_point, Segment::SplitMode::Logical); + ASSERT_TRUE(n_seg_id.has_value()) << fmt::format("i={} sp={}", i, split_point); + next_split_seg_id = *n_seg_id; + if (i % progress_interval == 0) + { + auto mu = get_process_mem_usage(); + LOG_INFO( + log, + "lvl={} round={} split_point={} next_seg_id={} mem_resident_set={:.3f}MB)", + lvl, + i, + split_point, + *n_seg_id, + mu.resident_mb); + } + } + { + auto mu = get_process_mem_usage(); + LOG_INFO(log, "lvl={} round={} mem_resident_set={:.3f}MB", lvl, num_expected_segs, mu.resident_mb); + } + + size_t round = 0; + for (auto && [seg_id, seg] : segments) + { + // next_split_seg_id is the last segment created in this level, skip it + if (seg_id == next_split_seg_id) + continue; + + if (seg_id < lvl_beg_seg_id) + continue; // skip segments created in previous levels + + auto write_rows = 500; + if (round % progress_interval == 0) + { + auto mu = get_process_mem_usage(); + LOG_INFO( + log, + "lvl={} round={} written_rows={} mem_resident_set={:.3f}MB", + lvl, + round, + write_rows * round, + mu.resident_mb); + } + writeToCache( + seg_id, + write_rows, + /* start_at */ lvl * num_expected_segs * write_rows + round * write_rows, + false, + std::nullopt); + round++; + } + { + auto mu = get_process_mem_usage(); + LOG_INFO( + log, + "TestMassiveSegment done, segments.size()={} lvl={} mem_resident_set={:.3f}MB", + lvl, + segments.size(), + mu.resident_mb); + } + } +} +CATCH + class SegmentEnableLogicalSplitTest : public SegmentOperationTest { protected: diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 76540fd0949..495914f0fe8 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -92,8 +92,18 @@ void SegmentTestBasic::reloadWithOptions(SegmentTestOptions config) TiFlashStorageTestBasic::SetUp(); options = config; table_columns = std::make_shared(); + ColumnDefinesPtr cols = DMTestEnv::getDefaultColumns(); + for (Int64 i = 0; i < 1000; ++i) + { + // add a new column + cols->emplace_back(ColumnDefine{ + i + 100, + fmt::format("field_{}", i), + DB::tests::typeFromString("Nullable(String)"), + }); + } - root_segment = reload(config.is_common_handle, nullptr, std::move(config.db_settings)); + root_segment = reload(config.is_common_handle, cols, std::move(config.db_settings)); ASSERT_EQ(root_segment->segmentId(), DELTA_MERGE_FIRST_SEGMENT_ID); segments.clear(); segments[DELTA_MERGE_FIRST_SEGMENT_ID] = root_segment; @@ -543,6 +553,19 @@ Block SegmentTestBasic::prepareWriteBlockInSegmentRange( RUNTIME_CHECK(write_end_key_this_round <= segment_end_key); Block block = prepareWriteBlock(*write_start_key, write_end_key_this_round, is_deleted, including_right_boundary, ts); + for (Int64 i = 0; i < 1000; ++i) + { + auto null_col = ColumnNullable::create(ColumnString::create(), ColumnUInt8::create()); + for (size_t j = 0; j < write_rows_this_round; ++j) + { + null_col->insertDefault(); + } + block.insert(ColumnWithTypeAndName{ + std::move(null_col), + std::make_shared(std::make_shared()), + fmt::format("field_{}", i), + }); + } blocks.emplace_back(block); remaining_rows -= write_rows_this_round + including_right_boundary; From a26c2df3dabcf4c68931d86fd9496281ff5eb858 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 3 Jul 2025 12:56:37 +0800 Subject: [PATCH 2/4] Add IColumn::capacity Signed-off-by: JaySon-Huang --- dbms/src/Columns/ColumnAggregateFunction.h | 4 +- dbms/src/Columns/ColumnArray.cpp | 14 ++- dbms/src/Columns/ColumnArray.h | 3 +- dbms/src/Columns/ColumnConst.h | 2 + dbms/src/Columns/ColumnDecimal.cpp | 14 +++ dbms/src/Columns/ColumnDecimal.h | 5 +- dbms/src/Columns/ColumnFixedString.cpp | 13 +++ dbms/src/Columns/ColumnFixedString.h | 4 +- dbms/src/Columns/ColumnFunction.h | 3 + dbms/src/Columns/ColumnNothing.h | 5 +- dbms/src/Columns/ColumnNullable.cpp | 19 ++- dbms/src/Columns/ColumnNullable.h | 3 +- dbms/src/Columns/ColumnSet.h | 3 + dbms/src/Columns/ColumnString.cpp | 19 ++- dbms/src/Columns/ColumnString.h | 4 +- dbms/src/Columns/ColumnTuple.cpp | 11 +- dbms/src/Columns/ColumnTuple.h | 3 +- dbms/src/Columns/ColumnVector.h | 14 ++- dbms/src/Columns/IColumn.h | 21 +++- dbms/src/Columns/IColumnDummy.h | 1 + dbms/src/Common/CurrentMetrics.cpp | 5 +- dbms/src/Common/CurrentMetrics.h | 1 - dbms/src/Common/PODArray.cpp | 30 +++++ dbms/src/Common/PODArray.h | 108 +++++++++++------- .../AsynchronousBlockInputStream.h | 1 - .../src/DataStreams/ParallelInputsProcessor.h | 5 - dbms/src/Dictionaries/CacheDictionary.h | 2 - dbms/src/Interpreters/ProcessList.h | 1 - dbms/src/Interpreters/QueryPriorities.h | 1 - dbms/src/Server/TCPHandler.h | 1 - .../ColumnFile/ColumnFileInMemory.cpp | 3 + .../ColumnFile/ColumnFileInMemory.h | 1 + .../DeltaMerge/Delta/ColumnFilePersistedSet.h | 1 - .../src/Storages/DeltaMerge/DeltaMergeStore.h | 1 - .../VersionChain/tests/mvcc_test_utils.h | 1 - .../DeltaMerge/tests/gtest_segment.cpp | 12 +- .../Storages/System/StorageSystemColumns.cpp | 2 +- .../Storages/System/StorageSystemTables.cpp | 2 +- .../System}/VirtualColumnUtils.cpp | 2 +- .../System}/VirtualColumnUtils.h | 0 40 files changed, 260 insertions(+), 85 deletions(-) rename dbms/src/{Columns => Storages/System}/VirtualColumnUtils.cpp (98%) rename dbms/src/{Columns => Storages/System}/VirtualColumnUtils.h (100%) diff --git a/dbms/src/Columns/ColumnAggregateFunction.h b/dbms/src/Columns/ColumnAggregateFunction.h index 873f773df96..4b8c347ad32 100644 --- a/dbms/src/Columns/ColumnAggregateFunction.h +++ b/dbms/src/Columns/ColumnAggregateFunction.h @@ -98,7 +98,7 @@ class ColumnAggregateFunction final : public COWPtrHelper ColumnPtr permute(const Permutation & perm, size_t limit) const override; int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override; void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override; - void reserve(size_t n) override; + size_t capacity() const override { return data->capacity(); } + void reserveWithStrategy(size_t n, ReserveStrategy strategy) override; void reserveAlign(size_t n, size_t alignment) override; size_t byteSize() const override; size_t byteSize(size_t offset, size_t limit) const override; diff --git a/dbms/src/Columns/ColumnConst.h b/dbms/src/Columns/ColumnConst.h index 7548e07b2a8..53f31dd8007 100644 --- a/dbms/src/Columns/ColumnConst.h +++ b/dbms/src/Columns/ColumnConst.h @@ -261,6 +261,8 @@ class ColumnConst final : public COWPtrHelper ColumnPtr permute(const Permutation & perm, size_t limit) const override; void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override; + size_t capacity() const override { return data->capacity(); } + size_t byteSize() const override { return data->byteSize() + sizeof(s); } size_t byteSize(size_t /*offset*/, size_t /*limit*/) const override { return byteSize(); } diff --git a/dbms/src/Columns/ColumnDecimal.cpp b/dbms/src/Columns/ColumnDecimal.cpp index 872b874d19a..0243721499a 100644 --- a/dbms/src/Columns/ColumnDecimal.cpp +++ b/dbms/src/Columns/ColumnDecimal.cpp @@ -764,6 +764,20 @@ MutableColumnPtr ColumnDecimal::cloneResized(size_t size) const return res; } +template +void ColumnDecimal::reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) +{ + switch (strategy) + { + case IColumn::ReserveStrategy::Default: + data.reserve(n); + break; + case IColumn::ReserveStrategy::ScaleFactor1_5: + data.reserve_exact(n / 2 * 3); + break; + } +} + template void ColumnDecimal::insertData(const char * src [[maybe_unused]], size_t /*length*/) { diff --git a/dbms/src/Columns/ColumnDecimal.h b/dbms/src/Columns/ColumnDecimal.h index 43f1f869a3a..576348743aa 100644 --- a/dbms/src/Columns/ColumnDecimal.h +++ b/dbms/src/Columns/ColumnDecimal.h @@ -113,8 +113,9 @@ class ColumnDecimal final : public COWPtrHelper void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override; + size_t capacity() const override { return chars.capacity() / n; } + void insertRangeFrom(const IColumn & src, size_t start, size_t length) override; ColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override; @@ -223,7 +225,7 @@ class ColumnFixedString final : public COWPtrHelper void gather(ColumnGathererStream & gatherer_stream) override; - void reserve(size_t size) override { chars.reserve(n * size); } + void reserveWithStrategy(size_t size, IColumn::ReserveStrategy strategy) override; void reserveAlign(size_t size, size_t alignment) override { chars.reserve(n * size, alignment); } void getExtremes(Field & min, Field & max) const override; diff --git a/dbms/src/Columns/ColumnFunction.h b/dbms/src/Columns/ColumnFunction.h index 229e0ce9ce6..0cb51dbf40d 100644 --- a/dbms/src/Columns/ColumnFunction.h +++ b/dbms/src/Columns/ColumnFunction.h @@ -272,6 +272,9 @@ class ColumnFunction final : public COWPtrHelper throw Exception("getPermutation is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED); } + // A fake capacity for ColumnFunction, as it does not hold data in the same way as other columns. + size_t capacity() const override { return 0; } + void gather(ColumnGathererStream &) override { throw Exception("Method gather is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); diff --git a/dbms/src/Columns/ColumnNothing.h b/dbms/src/Columns/ColumnNothing.h index 697e020b0ff..d5999ee4468 100644 --- a/dbms/src/Columns/ColumnNothing.h +++ b/dbms/src/Columns/ColumnNothing.h @@ -29,9 +29,12 @@ class ColumnNothing final : public COWPtrHelper public: const char * getFamilyName() const override { return "Nothing"; } - MutableColumnPtr cloneDummy(size_t s) const override { return ColumnNothing::create(s); }; + MutableColumnPtr cloneDummy(size_t s) const override { return ColumnNothing::create(s); } bool canBeInsideNullable() const override { return true; } + + // ColumnNothing does not hold any data, so capacity is 0. + size_t capacity() const override { return 1; } }; } // namespace DB diff --git a/dbms/src/Columns/ColumnNullable.cpp b/dbms/src/Columns/ColumnNullable.cpp index 8a5ceaa1b0c..f1769e8210f 100644 --- a/dbms/src/Columns/ColumnNullable.cpp +++ b/dbms/src/Columns/ColumnNullable.cpp @@ -632,10 +632,23 @@ void ColumnNullable::gather(ColumnGathererStream & gatherer) gatherer.gather(*this); } -void ColumnNullable::reserve(size_t n) +size_t ColumnNullable::capacity() const { - getNestedColumn().reserve(n); - getNullMapData().reserve(n); + return getNestedColumn().capacity(); +} + +void ColumnNullable::reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) +{ + getNestedColumn().reserveWithStrategy(n, strategy); + switch (strategy) + { + case ReserveStrategy::Default: + getNullMapData().reserve(n); + break; + case ReserveStrategy::ScaleFactor1_5: + getNullMapData().reserve_exact(n / 2 * 3); + break; + } } void ColumnNullable::reserveAlign(size_t n, size_t alignment) diff --git a/dbms/src/Columns/ColumnNullable.h b/dbms/src/Columns/ColumnNullable.h index ed1fd3f8439..6ae969bd76b 100644 --- a/dbms/src/Columns/ColumnNullable.h +++ b/dbms/src/Columns/ColumnNullable.h @@ -181,7 +181,8 @@ class ColumnNullable final : public COWPtrHelper Permutation & res) const override; void adjustPermutationWithNullDirection(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const; - void reserve(size_t n) override; + size_t capacity() const override; + void reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) override; void reserveAlign(size_t n, size_t alignment) override; void reserveWithTotalMemoryHint(size_t n, Int64 total_memory_hint) override; void reserveAlignWithTotalMemoryHint(size_t n, Int64 total_memory_hint, size_t alignment) override; diff --git a/dbms/src/Columns/ColumnSet.h b/dbms/src/Columns/ColumnSet.h index 94c0bbdc71b..f6b7b04c5b7 100644 --- a/dbms/src/Columns/ColumnSet.h +++ b/dbms/src/Columns/ColumnSet.h @@ -45,6 +45,9 @@ class ColumnSet final : public COWPtrHelper ConstSetPtr getData() const { return data; } + // A fake capacity for ColumnSet, as it does not hold data in the same way as other columns. + size_t capacity() const override { return 1; } + private: ConstSetPtr data; }; diff --git a/dbms/src/Columns/ColumnString.cpp b/dbms/src/Columns/ColumnString.cpp index 65a9e888aad..76e62725623 100644 --- a/dbms/src/Columns/ColumnString.cpp +++ b/dbms/src/Columns/ColumnString.cpp @@ -288,11 +288,24 @@ void ColumnString::gather(ColumnGathererStream & gatherer) gatherer.gather(*this); } +size_t ColumnString::capacity() const +{ + return chars.capacity() / APPROX_STRING_SIZE; // Approximate capacity based on average string size. +} -void ColumnString::reserve(size_t n) +void ColumnString::reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) { - offsets.reserve(n); - chars.reserve(n * APPROX_STRING_SIZE); + switch (strategy) + { + case IColumn::ReserveStrategy::Default: + offsets.reserve(n); + chars.reserve(n * APPROX_STRING_SIZE); + break; + case IColumn::ReserveStrategy::ScaleFactor1_5: + offsets.reserve_exact(n / 2 * 3); + chars.reserve_exact(n * APPROX_STRING_SIZE / 2 * 3); + break; + } } void ColumnString::reserveAlign(size_t n, size_t alignment) diff --git a/dbms/src/Columns/ColumnString.h b/dbms/src/Columns/ColumnString.h index 828abd7f169..2938b47f15c 100644 --- a/dbms/src/Columns/ColumnString.h +++ b/dbms/src/Columns/ColumnString.h @@ -454,7 +454,9 @@ class ColumnString final : public COWPtrHelper void gather(ColumnGathererStream & gatherer_stream) override; - void reserve(size_t n) override; + size_t capacity() const override; + + void reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) override; void reserveAlign(size_t n, size_t alignment) override; void reserveWithTotalMemoryHint(size_t n, Int64 total_memory_hint) override; diff --git a/dbms/src/Columns/ColumnTuple.cpp b/dbms/src/Columns/ColumnTuple.cpp index bf4372f3375..5b284fc4acb 100644 --- a/dbms/src/Columns/ColumnTuple.cpp +++ b/dbms/src/Columns/ColumnTuple.cpp @@ -421,11 +421,18 @@ void ColumnTuple::gather(ColumnGathererStream & gatherer) gatherer.gather(*this); } -void ColumnTuple::reserve(size_t n) +size_t ColumnTuple::capacity() const +{ + return columns.empty() ? 0 : columns[0]->capacity(); +} + +void ColumnTuple::reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) { const size_t tuple_size = columns.size(); for (size_t i = 0; i < tuple_size; ++i) - getColumn(i).reserve(n); + { + getColumn(i).reserveWithStrategy(n, strategy); + } } void ColumnTuple::reserveAlign(size_t n, size_t alignment) diff --git a/dbms/src/Columns/ColumnTuple.h b/dbms/src/Columns/ColumnTuple.h index 6ae50cbd283..95861b65791 100644 --- a/dbms/src/Columns/ColumnTuple.h +++ b/dbms/src/Columns/ColumnTuple.h @@ -256,7 +256,8 @@ class ColumnTuple final : public COWPtrHelper int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override; void getExtremes(Field & min, Field & max) const override; void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override; - void reserve(size_t n) override; + size_t capacity() const override; + void reserveWithStrategy(size_t n, IColumn::ReserveStrategy strategy) override; void reserveAlign(size_t n, size_t alignment) override; size_t byteSize() const override; size_t byteSize(size_t offset, size_t limit) const override; diff --git a/dbms/src/Columns/ColumnVector.h b/dbms/src/Columns/ColumnVector.h index 067b696cdfe..194cccd0313 100644 --- a/dbms/src/Columns/ColumnVector.h +++ b/dbms/src/Columns/ColumnVector.h @@ -421,7 +421,19 @@ class ColumnVector final : public COWPtrHelper virtual void gather(ColumnGathererStream & gatherer_stream) = 0; /** Computes minimum and maximum element of the column. - * In addition to numeric types, the funtion is completely implemented for Date and DateTime. - * For strings and arrays function should retrurn default value. + * In addition to numeric types, the function is completely implemented for Date and DateTime. + * For strings and arrays function should return default value. * (except for constant columns; they should return value of the constant). * If column is empty function should return default value. */ virtual void getExtremes(Field & min, Field & max) const = 0; + /// Returns the number of elements that this column can hold without reallocating. + virtual size_t capacity() const = 0; + + enum class ReserveStrategy + { + // Use the default behavior of PODArray, which doubles the capacity each time. + Default, + // Use the strategy that reserves memory for 1.5 times the requested size. + ScaleFactor1_5, + }; + /// Reserves memory for specified amount of elements. If reservation isn't possible, does nothing. /// It affects performance only (not correctness). - virtual void reserve(size_t /*n*/) {} + void reserve(size_t n) { reserveWithStrategy(n, ReserveStrategy::Default); } + + /// Reserves memory for specified amount of elements with strategy. + /// If reservation isn't possible, does nothing. It affects performance only (not correctness). + virtual void reserveWithStrategy(size_t /*n*/, ReserveStrategy /*strategy*/) {} /// Reserves aligned memory for specified amount of elements. If reservation isn't possible, does nothing. /// It affects performance only (not correctness). diff --git a/dbms/src/Columns/IColumnDummy.h b/dbms/src/Columns/IColumnDummy.h index 86b3e6c00bd..cdc5a5a5554 100644 --- a/dbms/src/Columns/IColumnDummy.h +++ b/dbms/src/Columns/IColumnDummy.h @@ -45,6 +45,7 @@ class IColumnDummy : public IColumn virtual MutableColumnPtr cloneDummy(size_t s_) const = 0; MutableColumnPtr cloneResized(size_t s_) const override { return cloneDummy(s_); } + size_t capacity() const override { return s; } size_t size() const override { return s; } void insertDefault() override { ++s; } diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index c2cbbba2d80..839886298fc 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -95,7 +95,10 @@ M(ConnectionPoolSize) \ M(MemoryTrackingQueryStorageTask) \ M(MemoryTrackingFetchPages) \ - M(MemoryTrackingSharedColumnData) + M(MemoryTrackingSharedColumnData) \ + M(NumSegments) \ + M(NumDeltaCache) \ + M(BytesDeltaCache) namespace CurrentMetrics { diff --git a/dbms/src/Common/CurrentMetrics.h b/dbms/src/Common/CurrentMetrics.h index ce45cc7e52c..cc48affb703 100644 --- a/dbms/src/Common/CurrentMetrics.h +++ b/dbms/src/Common/CurrentMetrics.h @@ -18,7 +18,6 @@ #include #include -#include #include /** Allows to count number of simultaneously happening processes or current value of some metric. diff --git a/dbms/src/Common/PODArray.cpp b/dbms/src/Common/PODArray.cpp index 66647a55303..295202778b9 100644 --- a/dbms/src/Common/PODArray.cpp +++ b/dbms/src/Common/PODArray.cpp @@ -16,7 +16,37 @@ namespace DB { +namespace ErrorCodes +{ +extern const int CANNOT_ALLOCATE_MEMORY; +} + /// Used for left padding of PODArray when empty const char EmptyPODArray[EmptyPODArraySize]{}; +namespace PODArrayDetails +{ + +ALWAYS_INLINE size_t byte_size(size_t num_elements, size_t element_size) +{ + size_t amount; + if (__builtin_mul_overflow(num_elements, element_size, &amount)) + throw Exception( + ErrorCodes::CANNOT_ALLOCATE_MEMORY, + "Amount of memory requested to allocate is more than allowed"); + return amount; +} + +ALWAYS_INLINE size_t +minimum_memory_for_elements(size_t num_elements, size_t element_size, size_t pad_left, size_t pad_right) +{ + size_t amount; + if (__builtin_add_overflow(byte_size(num_elements, element_size), pad_left + pad_right, &amount)) + throw Exception( + ErrorCodes::CANNOT_ALLOCATE_MEMORY, + "Amount of memory requested to allocate is more than allowed"); + return amount; +} + +} // namespace PODArrayDetails } // namespace DB diff --git a/dbms/src/Common/PODArray.h b/dbms/src/Common/PODArray.h index 4a71d6ec5fc..7ebe6708028 100644 --- a/dbms/src/Common/PODArray.h +++ b/dbms/src/Common/PODArray.h @@ -28,7 +28,6 @@ #include #include #include -#include #include #ifndef NDEBUG @@ -48,6 +47,20 @@ inline constexpr size_t integerRoundUp(size_t value, size_t dividend) return ((value + dividend - 1) / dividend) * dividend; } +namespace PODArrayDetails +{ +/// The amount of memory occupied by the num_elements of the elements. +size_t byte_size(size_t num_elements, size_t element_size); /// NOLINT + +/// Minimum amount of memory to allocate for num_elements, including padding. +size_t minimum_memory_for_elements( + size_t num_elements, + size_t element_size, + size_t pad_left, + size_t pad_right); /// NOLINT + +}; // namespace PODArrayDetails + /** A dynamic array for POD types. * Designed for a small number of large arrays (rather than a lot of small ones). * To be more precise - for use in ColumnVector. @@ -109,22 +122,12 @@ class PODArrayBase bool is_shared_memory; - [[nodiscard]] __attribute__((always_inline)) std::optional swicthMemoryTracker() + [[nodiscard]] __attribute__((always_inline)) std::optional switchMemoryTracker() { return is_shared_memory ? std::make_optional(true, shared_column_data_mem_tracker.get()) : std::nullopt; } - /// The amount of memory occupied by the num_elements of the elements. - static size_t byte_size(size_t num_elements) { return num_elements * ELEMENT_SIZE; } - - /// Minimum amount of memory to allocate for num_elements, including padding. - static size_t minimum_memory_for_elements(size_t num_elements) - { - auto res = byte_size(num_elements) + pad_right + pad_left; - return res; - } - void alloc_for_num_elements(size_t num_elements) { //alloc_for_num_elements is only used when initialized PODArray based on size or two iterators. @@ -133,14 +136,14 @@ class PODArrayBase //If the users want to do PODArray initialize first, and also will push_back other elements later, //in push_back or emplace_back will do the reserveForNextSize to alloc extra memory. //Thus, we don't need do roundUpToPowerOfTwoOrZero here, and it can cut down extra memory usage, - //and will not have bad affact on performance. - alloc(minimum_memory_for_elements(num_elements)); + //and will not have bad affect on performance. + alloc(PODArrayDetails::minimum_memory_for_elements(num_elements, ELEMENT_SIZE, pad_left, pad_right)); } template void alloc(size_t bytes, TAllocatorParams &&... allocator_params) { - auto guard = swicthMemoryTracker(); + auto guard = switchMemoryTracker(); c_start = c_end = reinterpret_cast(TAllocator::alloc(bytes, std::forward(allocator_params)...)) + pad_left; @@ -157,7 +160,7 @@ class PODArrayBase unprotect(); - auto guard = swicthMemoryTracker(); + auto guard = switchMemoryTracker(); TAllocator::free(c_start - pad_left, allocated_bytes()); } @@ -172,7 +175,7 @@ class PODArrayBase unprotect(); - auto guard = swicthMemoryTracker(); + auto guard = switchMemoryTracker(); ptrdiff_t end_diff = c_end - c_start; c_start = reinterpret_cast(TAllocator::realloc( @@ -197,12 +200,14 @@ class PODArrayBase template void reserveForNextSize(TAllocatorParams &&... allocator_params) { - if (size() == 0) + if (empty()) { // The allocated memory should be multiplication of ELEMENT_SIZE to hold the element, otherwise, // memory issue such as corruption could appear in edge case. realloc( - std::max(((INITIAL_SIZE - 1) / ELEMENT_SIZE + 1) * ELEMENT_SIZE, minimum_memory_for_elements(1)), + std::max( + ((INITIAL_SIZE - 1) / ELEMENT_SIZE + 1) * ELEMENT_SIZE, + PODArrayDetails::minimum_memory_for_elements(1, ELEMENT_SIZE, pad_left, pad_right)), std::forward(allocator_params)...); } else @@ -240,7 +245,7 @@ class PODArrayBase size_t capacity() const { return (c_end_of_storage - c_start) / ELEMENT_SIZE; } /// This method is safe to use only for information about memory usage. - size_t allocated_bytes() const { return c_end_of_storage - c_start + pad_right + pad_left; } + size_t allocated_bytes() const { return c_end_of_storage - c_start + pad_right + pad_left; } // NOLINT void clear() { c_end = c_start; } @@ -249,7 +254,17 @@ class PODArrayBase { if (n > capacity()) realloc( - roundUpToPowerOfTwoOrZero(minimum_memory_for_elements(n)), + roundUpToPowerOfTwoOrZero( + PODArrayDetails::minimum_memory_for_elements(n, ELEMENT_SIZE, pad_left, pad_right)), + std::forward(allocator_params)...); + } + + template + void reserve_exact(size_t n, TAllocatorParams &&... allocator_params) /// NOLINT + { + if (n > capacity()) + realloc( + PODArrayDetails::minimum_memory_for_elements(n, ELEMENT_SIZE, pad_left, pad_right), std::forward(allocator_params)...); } @@ -260,20 +275,26 @@ class PODArrayBase resize_assume_reserved(n); } - void resize_assume_reserved(const size_t n) { c_end = c_start + byte_size(n); } + void resize_assume_reserved(const size_t n) // NOLINT + { + c_end = c_start + PODArrayDetails::byte_size(n, ELEMENT_SIZE); + } - const char * raw_data() const { return c_start; } + const char * raw_data() const { return c_start; } // NOLINT(readability-identifier-naming) template - void push_back_raw(const char * ptr, TAllocatorParams &&... allocator_params) + void push_back_raw(const char * ptr, TAllocatorParams &&... allocator_params) // NOLINT { push_back_raw_many(1, ptr, std::forward(allocator_params)...); } template - void push_back_raw_many(size_t number_of_items, const void * ptr, TAllocatorParams &&... allocator_params) + void push_back_raw_many( // NOLINT(readability-identifier-naming) + size_t number_of_items, + const void * ptr, + TAllocatorParams &&... allocator_params) { - size_t items_byte_size = byte_size(number_of_items); + size_t items_byte_size = PODArrayDetails::byte_size(number_of_items, ELEMENT_SIZE); if (unlikely(c_end + items_byte_size > c_end_of_storage)) reserve(size() + number_of_items, std::forward(allocator_params)...); memcpy(c_end, ptr, items_byte_size); @@ -323,30 +344,34 @@ class PODArray : public PODArrayBase(this->c_end); } const T * t_end_of_storage() const { return reinterpret_cast(this->c_end_of_storage); } + size_t byte_size(size_t n) const { return PODArrayDetails::byte_size(n, sizeof(T)); } + public: using value_type = T; /// You can not just use `typedef`, because there is ambiguity for the constructors and `assign` functions. - struct iterator : public boost::iterator_adaptor + struct iterator // NOLINT(readability-identifier-naming) + : public boost::iterator_adaptor { - iterator() {} - iterator(T * ptr_) + iterator() = default; + iterator(T * ptr_) // NOLINT : iterator::iterator_adaptor_(ptr_) {} }; - struct const_iterator : public boost::iterator_adaptor + struct const_iterator // NOLINT(readability-identifier-naming) + : public boost::iterator_adaptor { - const_iterator() {} - const_iterator(const T * ptr_) + const_iterator() = default; + const_iterator(const T * ptr_) // NOLINT : const_iterator::iterator_adaptor_(ptr_) {} }; - PODArray() {} + PODArray() = default; - PODArray(size_t n) + PODArray(size_t n) // NOLINT { this->alloc_for_num_elements(n); this->c_end += this->byte_size(n); @@ -398,7 +423,7 @@ class PODArray : public PODArrayBase - void resize_fill_zero(size_t n, TAllocatorParams &&... allocator_params) + void resize_fill_zero(size_t n, TAllocatorParams &&... allocator_params) // NOLINT(readability-identifier-naming) { size_t old_size = this->size(); if (n > old_size) @@ -410,7 +435,10 @@ class PODArray : public PODArrayBase - void resize_fill(size_t n, const T & value, TAllocatorParams &&... allocator_params) + void resize_fill( // NOLINT(readability-identifier-naming) + size_t n, + const T & value, + TAllocatorParams &&... allocator_params) { size_t old_size = this->size(); if (n > old_size) @@ -422,7 +450,7 @@ class PODArray : public PODArrayBase - void push_back(U && x, TAllocatorParams &&... allocator_params) + void push_back(U && x, TAllocatorParams &&... allocator_params) // NOLINT(readability-identifier-naming) { // FIXME:: It's dangerous here !! if (unlikely(this->c_end >= this->c_end_of_storage)) @@ -436,7 +464,7 @@ class PODArray : public PODArrayBase - void emplace_back(Args &&... args) + void emplace_back(Args &&... args) // NOLINT(readability-identifier-naming) { if (unlikely(this->c_end >= this->c_end_of_storage)) this->reserveForNextSize(); @@ -445,7 +473,7 @@ class PODArray : public PODArrayBasec_end += this->byte_size(1); } - void pop_back() { this->c_end -= this->byte_size(1); } + void pop_back() { this->c_end -= this->byte_size(1); } // NOLINT(readability-identifier-naming) /// Do not insert into the array a piece of itself. Because with the resize, the iterators on themselves can be invalidated. template @@ -494,7 +522,7 @@ class PODArray : public PODArrayBase - void insert_assume_reserved(It1 from_begin, It2 from_end) + void insert_assume_reserved(It1 from_begin, It2 from_end) // NOLINT(readability-identifier-naming) { size_t bytes_to_copy = this->byte_size(from_end - from_begin); memcpy(this->c_end, reinterpret_cast(&*from_begin), bytes_to_copy); diff --git a/dbms/src/DataStreams/AsynchronousBlockInputStream.h b/dbms/src/DataStreams/AsynchronousBlockInputStream.h index 8b2b56b0a17..06bae9eaef5 100644 --- a/dbms/src/DataStreams/AsynchronousBlockInputStream.h +++ b/dbms/src/DataStreams/AsynchronousBlockInputStream.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index 2265e28f14f..7f8e21981ce 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include @@ -24,10 +23,6 @@ #include #include -#include -#include -#include -#include /** Allows to process multiple block input streams (sources) in parallel, using specified number of threads. diff --git a/dbms/src/Dictionaries/CacheDictionary.h b/dbms/src/Dictionaries/CacheDictionary.h index 9a45907ee5e..989df4f70bd 100644 --- a/dbms/src/Dictionaries/CacheDictionary.h +++ b/dbms/src/Dictionaries/CacheDictionary.h @@ -16,14 +16,12 @@ #include #include -#include #include #include #include #include #include -#include #include #include #include diff --git a/dbms/src/Interpreters/ProcessList.h b/dbms/src/Interpreters/ProcessList.h index 2bf5e02140c..ddc0a131a10 100644 --- a/dbms/src/Interpreters/ProcessList.h +++ b/dbms/src/Interpreters/ProcessList.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include diff --git a/dbms/src/Interpreters/QueryPriorities.h b/dbms/src/Interpreters/QueryPriorities.h index ad201cd97bd..946cc5e34c1 100644 --- a/dbms/src/Interpreters/QueryPriorities.h +++ b/dbms/src/Interpreters/QueryPriorities.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include diff --git a/dbms/src/Server/TCPHandler.h b/dbms/src/Server/TCPHandler.h index a83c03fed63..846163eb038 100644 --- a/dbms/src/Server/TCPHandler.h +++ b/dbms/src/Server/TCPHandler.h @@ -15,7 +15,6 @@ #pragma once #include -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp index afe14a63865..efd13a58a7a 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp @@ -91,6 +91,7 @@ ColumnFile::AppendResult ColumnFileInMemory::append( return AppendResult{false, 0}; size_t new_alloc_block_bytes = 0; + size_t max_capacity = 0; for (size_t i = 0; i < cache->block.columns(); ++i) { const auto & col = data.getByPosition(i).column; @@ -105,6 +106,7 @@ ColumnFile::AppendResult ColumnFileInMemory::append( IColumn::ReserveStrategy::ScaleFactor1_5); } mutable_cache_col->insertRangeFrom(*col, offset, limit); + max_capacity = std::max(max_capacity, mutable_cache_col->capacity()); new_alloc_block_bytes += mutable_cache_col->allocatedBytes() - alloc_bytes; } @@ -118,6 +120,7 @@ ColumnFile::AppendResult ColumnFileInMemory::append( limit - offset, data_bytes, new_alloc_block_bytes, + max_capacity, rows, bytes); return AppendResult{true, new_alloc_block_bytes}; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h index 257b3327434..e5f8fd8ad2f 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h @@ -98,6 +98,7 @@ class ColumnFileInMemory : public ColumnFile bool isAppendable() const override { return !disable_append; } void disableAppend() override; + AppendResult append( const DMContext & dm_context, const Block & data, diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h index 1d8a8d9f4c1..8277c836e9d 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 5df509ba44f..3b9e3c2f0d1 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/VersionChain/tests/mvcc_test_utils.h b/dbms/src/Storages/DeltaMerge/VersionChain/tests/mvcc_test_utils.h index 9f0ad183691..58db88a54cc 100644 --- a/dbms/src/Storages/DeltaMerge/VersionChain/tests/mvcc_test_utils.h +++ b/dbms/src/Storages/DeltaMerge/VersionChain/tests/mvcc_test_utils.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index 11409cf66fe..00018ae21c8 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -664,10 +664,10 @@ ProcessMemoryUsage get_process_mem_usage() TEST_F(SegmentOperationTest, TestMassiveSegment) try { - const size_t level = 5; + const size_t level = 1; for (size_t lvl = 0; lvl < level; ++lvl) { - size_t num_expected_segs = 1000; + size_t num_expected_segs = 500; // size_t num_expected_segs = 10; size_t progress_interval = 100; const auto lvl_beg_seg_id = segments.rbegin()->first; @@ -728,6 +728,14 @@ try /* start_at */ lvl * num_expected_segs * write_rows + round * write_rows, false, std::nullopt); + LOG_INFO( + log, + "lvl={} round={} seg_id={} written_rows={} mem_tbl_bytes={}", + lvl, + round, + seg_id, + write_rows, + segments[seg_id]->getDelta()->getTotalCacheBytes()); round++; } { diff --git a/dbms/src/Storages/System/StorageSystemColumns.cpp b/dbms/src/Storages/System/StorageSystemColumns.cpp index 9436d994d5b..b61c3ac7c7b 100644 --- a/dbms/src/Storages/System/StorageSystemColumns.cpp +++ b/dbms/src/Storages/System/StorageSystemColumns.cpp @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include @@ -24,6 +23,7 @@ #include #include #include +#include namespace DB diff --git a/dbms/src/Storages/System/StorageSystemTables.cpp b/dbms/src/Storages/System/StorageSystemTables.cpp index 1fc7b7b8c61..47e8b8ad425 100644 --- a/dbms/src/Storages/System/StorageSystemTables.cpp +++ b/dbms/src/Storages/System/StorageSystemTables.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include #include @@ -29,6 +28,7 @@ #include #include #include +#include #include #include diff --git a/dbms/src/Columns/VirtualColumnUtils.cpp b/dbms/src/Storages/System/VirtualColumnUtils.cpp similarity index 98% rename from dbms/src/Columns/VirtualColumnUtils.cpp rename to dbms/src/Storages/System/VirtualColumnUtils.cpp index a01864d42f2..72261354889 100644 --- a/dbms/src/Columns/VirtualColumnUtils.cpp +++ b/dbms/src/Storages/System/VirtualColumnUtils.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include -#include #include #include #include @@ -24,6 +23,7 @@ #include #include #include +#include namespace DB::VirtualColumnUtils diff --git a/dbms/src/Columns/VirtualColumnUtils.h b/dbms/src/Storages/System/VirtualColumnUtils.h similarity index 100% rename from dbms/src/Columns/VirtualColumnUtils.h rename to dbms/src/Storages/System/VirtualColumnUtils.h From c1cafc58a28f5eb95dd7dd124da021e710e2c28e Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 9 Jul 2025 10:40:07 +0800 Subject: [PATCH 3/4] Add ut about empty memtable with 1000 columns Signed-off-by: JaySon-Huang --- dbms/src/Columns/ColumnString.cpp | 6 ++- dbms/src/Core/Block.cpp | 3 ++ .../ColumnFile/ColumnFileInMemory.cpp | 13 ++++++ .../ColumnFile/tests/gtest_dm_column_file.cpp | 31 +++++++++++++ .../DeltaMerge/tests/gtest_segment.cpp | 43 +++++++++++-------- 5 files changed, 76 insertions(+), 20 deletions(-) diff --git a/dbms/src/Columns/ColumnString.cpp b/dbms/src/Columns/ColumnString.cpp index 76e62725623..aeab4b6177d 100644 --- a/dbms/src/Columns/ColumnString.cpp +++ b/dbms/src/Columns/ColumnString.cpp @@ -302,8 +302,10 @@ void ColumnString::reserveWithStrategy(size_t n, IColumn::ReserveStrategy strate chars.reserve(n * APPROX_STRING_SIZE); break; case IColumn::ReserveStrategy::ScaleFactor1_5: - offsets.reserve_exact(n / 2 * 3); - chars.reserve_exact(n * APPROX_STRING_SIZE / 2 * 3); + // offsets.reserve_exact(n / 2 * 3); + // chars.reserve_exact(n * APPROX_STRING_SIZE / 2 * 3); + offsets.resize(n / 2 * 3); + chars.resize(n / 2 * APPROX_STRING_SIZE * 3); break; } } diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index dd919cc6abe..78adb8422e3 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -370,7 +370,10 @@ MutableColumns Block::cloneEmptyColumns() const size_t num_columns = data.size(); MutableColumns columns(num_columns); for (size_t i = 0; i < num_columns; ++i) + { columns[i] = data[i].column ? data[i].column->cloneEmpty() : data[i].type->createColumn(); + columns[i]->reserveWithStrategy(0, IColumn::ReserveStrategy::ScaleFactor1_5); + } return columns; } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp index efd13a58a7a..7629fe48da9 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp @@ -123,6 +123,19 @@ ColumnFile::AppendResult ColumnFileInMemory::append( max_capacity, rows, bytes); + for (size_t i = 0; i < cache->block.columns(); ++i) + { + const auto & cache_col = *cache->block.getByPosition(i).column; + LOG_INFO( + Logger::get(""), + "col i={} name={} type={} rows={} bytes={} alloc_bytes={}", + i, + cache->block.getByPosition(i).name, + cache_col.getName(), + cache_col.size(), + cache_col.byteSize(), + cache_col.allocatedBytes()); + } return AppendResult{true, new_alloc_block_bytes}; } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/tests/gtest_dm_column_file.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/tests/gtest_dm_column_file.cpp index 9d63a7aef04..130d631fba8 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/tests/gtest_dm_column_file.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/tests/gtest_dm_column_file.cpp @@ -38,6 +38,8 @@ #include #include +#include "Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h" + namespace DB::DM::tests { @@ -228,6 +230,35 @@ try } CATCH +TEST_P(ColumnFileTest, CreateColumnFileInMemory) +try +{ + size_t num_rows_write = 10; + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + for (Int64 i = 0; i < 1000; ++i) + { + auto null_col = ColumnNullable::create(ColumnString::create(), ColumnUInt8::create()); + for (size_t j = 0; j < num_rows_write; ++j) + { + null_col->insertDefault(); + } + block.insert(ColumnWithTypeAndName{ + std::move(null_col), + std::make_shared(std::make_shared()), + fmt::format("field_{}", i), + }); + } + // A 1000 columns empty block takes about 238KB + auto schema = ColumnFileSchema(block); + ColumnFileInMemoryPtr cf_in_mem = std::make_shared(std::make_shared(schema)); + LOG_INFO( + Logger::get(), + "ColumnFileInMemory, bytes={} alloc_bytes={}", + cf_in_mem->getBytes(), + cf_in_mem->getAllocateBytes()); +} +CATCH + TEST_P(ColumnFileTest, ReadColumns) try { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index 00018ae21c8..4de7e990580 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -665,9 +665,10 @@ TEST_F(SegmentOperationTest, TestMassiveSegment) try { const size_t level = 1; + const size_t segment_range_size = 500; for (size_t lvl = 0; lvl < level; ++lvl) { - size_t num_expected_segs = 500; + size_t num_expected_segs = 200; // size_t num_expected_segs = 10; size_t progress_interval = 100; const auto lvl_beg_seg_id = segments.rbegin()->first; @@ -678,7 +679,7 @@ try auto next_split_seg_id = lvl_beg_seg_id; for (size_t i = 0; i < num_expected_segs; ++i) { - auto split_point = (lvl * num_expected_segs + 1 + i) * 500; + auto split_point = (lvl * num_expected_segs + 1 + i) * segment_range_size; auto n_seg_id = splitSegmentAt(next_split_seg_id, split_point, Segment::SplitMode::Logical); ASSERT_TRUE(n_seg_id.has_value()) << fmt::format("i={} sp={}", i, split_point); next_split_seg_id = *n_seg_id; @@ -710,7 +711,8 @@ try if (seg_id < lvl_beg_seg_id) continue; // skip segments created in previous levels - auto write_rows = 500; + size_t write_rows = 4; + size_t write_rows_sub = 2; if (round % progress_interval == 0) { auto mu = get_process_mem_usage(); @@ -722,20 +724,25 @@ try write_rows * round, mu.resident_mb); } - writeToCache( - seg_id, - write_rows, - /* start_at */ lvl * num_expected_segs * write_rows + round * write_rows, - false, - std::nullopt); - LOG_INFO( - log, - "lvl={} round={} seg_id={} written_rows={} mem_tbl_bytes={}", - lvl, - round, - seg_id, - write_rows, - segments[seg_id]->getDelta()->getTotalCacheBytes()); + for (size_t k = 0; k < 2; ++k) + { + writeToCache( + seg_id, + write_rows_sub, + /* start_at */ lvl * num_expected_segs * segment_range_size + round * segment_range_size, + false, + std::nullopt); + LOG_INFO( + log, + "lvl={} round={} k={} seg_id={} written_rows={} mem_tbl_bytes={} mem_tbl_alloc_bytes={}", + lvl, + round, + k, + seg_id, + write_rows, + segments[seg_id]->getDelta()->getTotalCacheBytes(), + segments[seg_id]->getDelta()->getTotalAllocatedBytes()); + } round++; } { @@ -743,8 +750,8 @@ try LOG_INFO( log, "TestMassiveSegment done, segments.size()={} lvl={} mem_resident_set={:.3f}MB", - lvl, segments.size(), + lvl, mu.resident_mb); } } From 579f533af08a19bbca5dac8ab4f2755fc89595e9 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 9 Jul 2025 23:53:03 +0800 Subject: [PATCH 4/4] Update test Signed-off-by: JaySon-Huang --- dbms/src/Common/CurrentMetrics.cpp | 5 +- .../DeltaMerge/tests/gtest_segment.cpp | 69 +++---------------- 2 files changed, 9 insertions(+), 65 deletions(-) diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index 839886298fc..c2cbbba2d80 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -95,10 +95,7 @@ M(ConnectionPoolSize) \ M(MemoryTrackingQueryStorageTask) \ M(MemoryTrackingFetchPages) \ - M(MemoryTrackingSharedColumnData) \ - M(NumSegments) \ - M(NumDeltaCache) \ - M(BytesDeltaCache) + M(MemoryTrackingSharedColumnData) namespace CurrentMetrics { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index 4de7e990580..e6cb692d351 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -607,59 +608,11 @@ try } CATCH - -namespace -{ -struct ProcessMemoryUsage -{ - double resident_mb; - Int64 cur_proc_num_threads; - double cur_virt_mb; -}; - -bool process_mem_usage(double & resident_set, Int64 & cur_proc_num_threads, UInt64 & cur_virt_size) +double get_process_resident_mb() { - resident_set = 0.0; - - // 'file' stat seems to give the most reliable results - std::ifstream stat_stream("/proc/self/stat", std::ios_base::in); - // if "/proc/self/stat" is not supported - if (!stat_stream.is_open()) - return false; - - // dummy vars for leading entries in stat that we don't care about - std::string pid, comm, state, ppid, pgrp, session, tty_nr; - std::string tpgid, flags, minflt, cminflt, majflt, cmajflt; - std::string utime, stime, cutime, cstime, priority, nice; - std::string itrealvalue, starttime; - - // the field we want - Int64 rss; - - stat_stream >> pid >> comm >> state >> ppid >> pgrp >> session >> tty_nr >> tpgid >> flags >> minflt >> cminflt - >> majflt >> cmajflt >> utime >> stime >> cutime >> cstime >> priority >> nice >> cur_proc_num_threads - >> itrealvalue >> starttime >> cur_virt_size >> rss; // don't care about the rest - - stat_stream.close(); - - Int64 page_size_kb = sysconf(_SC_PAGE_SIZE) / 1024; // in case x86-64 is configured to use 2MB pages - resident_set = rss * page_size_kb; - return true; + auto mu = DB::get_process_mem_usage(); + return mu.resident_bytes / 1024.0 / 1024; } -ProcessMemoryUsage get_process_mem_usage() -{ - double resident_set; - Int64 cur_proc_num_threads = 1; - UInt64 cur_virt_size = 0; - process_mem_usage(resident_set, cur_proc_num_threads, cur_virt_size); - resident_set *= 1024; // unit: byte - return ProcessMemoryUsage{ - resident_set / 1024.0 / 1024, - cur_proc_num_threads, - cur_virt_size / 1024.0 / 1024, - }; -} -} // namespace TEST_F(SegmentOperationTest, TestMassiveSegment) try @@ -685,7 +638,6 @@ try next_split_seg_id = *n_seg_id; if (i % progress_interval == 0) { - auto mu = get_process_mem_usage(); LOG_INFO( log, "lvl={} round={} split_point={} next_seg_id={} mem_resident_set={:.3f}MB)", @@ -693,13 +645,10 @@ try i, split_point, *n_seg_id, - mu.resident_mb); + get_process_resident_mb()); } } - { - auto mu = get_process_mem_usage(); - LOG_INFO(log, "lvl={} round={} mem_resident_set={:.3f}MB", lvl, num_expected_segs, mu.resident_mb); - } + LOG_INFO(log, "lvl={} round={} mem_resident_set={:.3f}MB", lvl, num_expected_segs, get_process_resident_mb()); size_t round = 0; for (auto && [seg_id, seg] : segments) @@ -715,14 +664,13 @@ try size_t write_rows_sub = 2; if (round % progress_interval == 0) { - auto mu = get_process_mem_usage(); LOG_INFO( log, "lvl={} round={} written_rows={} mem_resident_set={:.3f}MB", lvl, round, write_rows * round, - mu.resident_mb); + get_process_resident_mb()); } for (size_t k = 0; k < 2; ++k) { @@ -746,13 +694,12 @@ try round++; } { - auto mu = get_process_mem_usage(); LOG_INFO( log, "TestMassiveSegment done, segments.size()={} lvl={} mem_resident_set={:.3f}MB", segments.size(), lvl, - mu.resident_mb); + get_process_resident_mb()); } } }