diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index a5a28179b93..39b8ee70fb7 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -132,6 +132,12 @@ M(S3CopyObject) \ M(S3GetObjectRetry) \ M(S3PutObjectRetry) \ +<<<<<<< HEAD +======= + M(S3IORead) \ + M(S3IOSeek) \ + M(S3IOSeekBackward) \ +>>>>>>> 28e47db635 (io: Fix checksum seek at end (#10341)) M(FileCacheHit) \ M(FileCacheMiss) \ M(FileCacheEvict) \ diff --git a/dbms/src/IO/Checksum/ChecksumBuffer.cpp b/dbms/src/IO/Checksum/ChecksumBuffer.cpp index b40784e53ad..3d90b1cbf19 100644 --- a/dbms/src/IO/Checksum/ChecksumBuffer.cpp +++ b/dbms/src/IO/Checksum/ChecksumBuffer.cpp @@ -18,6 +18,80 @@ namespace DB { using namespace DB::Digest; +template +off_t FramedChecksumReadBuffer::doSeek(off_t offset, int whence) +{ + auto & frame = reinterpret_cast &>( + *(this->working_buffer.begin() - sizeof(ChecksumFrame))); // align should not fail + + if (whence == SEEK_CUR) + { + offset = getPositionInFile() + offset; + } + else if (whence != SEEK_SET) + { + throw TiFlashException( + "FramedChecksumReadBuffer::seek expects SEEK_SET or SEEK_CUR as whence", + Errors::Checksum::Internal); + } + auto target_frame = offset / frame_size; + auto target_offset = offset % frame_size; + + // If we have already seek to EOF, then working_buffer was cleared + if (target_frame == current_frame && working_buffer.size() > 0) + { + if (unlikely(target_offset > working_buffer.size())) + pos = working_buffer.end(); + else + pos = working_buffer.begin() + target_offset; + return offset; + } + + // Seek according to `target_frame` and `target_offset` + // read the header and the body + auto header_offset = target_frame * (sizeof(ChecksumFrame) + frame_size); + auto result = in->seek(static_cast(header_offset), SEEK_SET); + if (result == -1) + { + throw TiFlashException( + Errors::Checksum::IOFailure, + "checksum framed file {} is not seekable", + in->getFileName()); + } + auto length = expectRead( + working_buffer.begin() - sizeof(ChecksumFrame), + sizeof(ChecksumFrame) + frame_size); + if (length == 0) + { + current_frame = target_frame; + pos = working_buffer.begin(); + working_buffer.resize(0); + return offset; // EOF + } + if (unlikely(length != sizeof(ChecksumFrame) + frame.bytes)) + { + throw TiFlashException( + Errors::Checksum::DataCorruption, + "frame length (header = {}, body = {}, read = {}) mismatch for {}", + sizeof(ChecksumFrame), + frame.bytes, + length, + in->getFileName()); + } + + // body checksum examination + checkBody(); + + // update statistics + current_frame = target_frame; + if (unlikely(target_offset > working_buffer.size())) + pos = working_buffer.end(); + else + pos = working_buffer.begin() + target_offset; + + return offset; +} + template class FramedChecksumReadBuffer; template class FramedChecksumReadBuffer; template class FramedChecksumReadBuffer; diff --git a/dbms/src/IO/Checksum/ChecksumBuffer.h b/dbms/src/IO/Checksum/ChecksumBuffer.h index 78b234babdf..24433f85d17 100644 --- a/dbms/src/IO/Checksum/ChecksumBuffer.h +++ b/dbms/src/IO/Checksum/ChecksumBuffer.h @@ -415,72 +415,7 @@ class FramedChecksumReadBuffer : public ReadBufferFromFileDescriptor return true; } - off_t doSeek(off_t offset, int whence) override - { - auto & frame = reinterpret_cast &>( - *(this->working_buffer.begin() - sizeof(ChecksumFrame))); // align should not fail - - if (whence == SEEK_CUR) - { - offset = getPositionInFile() + offset; - } - else if (whence != SEEK_SET) - { - throw TiFlashException( - "FramedChecksumReadBuffer::seek expects SEEK_SET or SEEK_CUR as whence", - Errors::Checksum::Internal); - } - auto target_frame = offset / frame_size; - auto target_offset = offset % frame_size; - - if (target_frame == current_frame) - { - pos = working_buffer.begin() + target_offset; - return offset; - } - else - { - // read the header and the body - auto header_offset = target_frame * (sizeof(ChecksumFrame) + frame_size); - auto result = in->seek(static_cast(header_offset), SEEK_SET); - if (result == -1) - { - throw TiFlashException( - "checksum framed file " + in->getFileName() + " is not seekable", - Errors::Checksum::IOFailure); - } - auto length = expectRead( - working_buffer.begin() - sizeof(ChecksumFrame), - sizeof(ChecksumFrame) + frame_size); - if (length == 0) - { - current_frame = target_frame; - pos = working_buffer.begin(); - working_buffer.resize(0); - return offset; // EOF - } - if (unlikely(length != sizeof(ChecksumFrame) + frame.bytes)) - { - throw TiFlashException( - fmt::format( - "frame length (header = {}, body = {}, read = {}) mismatch for {}", - sizeof(ChecksumFrame), - frame.bytes, - length, - in->getFileName()), - Errors::Checksum::DataCorruption); - } - - // body checksum examination - checkBody(); - - // update statistics - current_frame = target_frame; - pos = working_buffer.begin() + target_offset; - } - - return offset; - } + off_t doSeek(off_t offset, int whence) override; }; } // namespace DB diff --git a/dbms/src/IO/Checksum/tests/gtest_dm_checksum_buffer.cpp b/dbms/src/IO/Checksum/tests/gtest_dm_checksum_buffer.cpp index 54147819afb..f615b5e95a0 100644 --- a/dbms/src/IO/Checksum/tests/gtest_dm_checksum_buffer.cpp +++ b/dbms/src/IO/Checksum/tests/gtest_dm_checksum_buffer.cpp @@ -12,10 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wsign-compare" -#include -#pragma GCC diagnostic pop +#include #include #include #include @@ -28,14 +25,15 @@ #include #include #include +#include #include +#include #include -namespace DB -{ -namespace tests +namespace DB::tests { + namespace { std::random_device dev; // NOLINT(cert-err58-cpp) @@ -372,5 +370,129 @@ TEST_STACKED_SEEKING(CRC32) TEST_STACKED_SEEKING(CRC64) TEST_STACKED_SEEKING(City128) TEST_STACKED_SEEKING(XXH3) -} // namespace tests -} // namespace DB + +template +void runCompressedSeekableReaderBufferTest() +try +{ + auto log = Logger::get(); + // Create a temporary file for testing + const std::string temp_file_path = "/tmp/tiflash_compressed_seek_test.dat"; + SCOPE_EXIT({ + Poco::File file(temp_file_path); + if (file.exists()) + file.remove(); + }); + // Test data - create multiple blocks with different patterns + std::vector test_blocks; + + test_blocks = { + std::string(1500, 'A') + "BLOCK0_END", + std::string(800, 'B') + "BLOCK1_END", + "", // Block 2 is empty + "", // Block 3 is empty + }; + + std::vector block_compressed_offsets; + std::vector block_decompressed_sizes; + + auto [limiter, provider] = prepareIO(); + auto config = DM::DMChecksumConfig{{}, TIFLASH_DEFAULT_CHECKSUM_FRAME_SIZE, D}; + + // Write compressed data to file + { + auto plain_file = ChecksumWriteBufferBuilder::build( + true, + provider, + temp_file_path, + EncryptionPath(temp_file_path, temp_file_path), + false, + limiter->getWriteLimiter(), + config.getChecksumAlgorithm(), + config.getChecksumFrameLength(), + /*flags*/ + -1, + /*mode*/ 0666, + 1048576); + auto compressed_buf + = CompressedWriteBuffer<>::build(*plain_file, CompressionSettings(CompressionMethod::LZ4), false); + + for (const auto & block_data : test_blocks) + { + // Record the compressed file offset before writing this block + block_compressed_offsets.push_back(plain_file->count()); + block_decompressed_sizes.push_back(block_data.size()); + + // Write the block data + compressed_buf->write(block_data.data(), block_data.size()); + compressed_buf->next(); // Force compression of this block + } + } + + LOG_INFO(log, "Created compressed file with {} blocks", test_blocks.size()); + for (size_t i = 0; i < block_compressed_offsets.size(); ++i) + { + LOG_INFO( + log, + "Block {}: compressed_offset={}, decompressed_size={}", + i, + block_compressed_offsets[i], + block_decompressed_sizes[i]); + } + + + auto compressed_in = CompressedReadBufferFromFileBuilder::build( + provider, + temp_file_path, + EncryptionPath(temp_file_path, temp_file_path), + config.getChecksumFrameLength(), + limiter->getReadLimiter(), + config.getChecksumAlgorithm(), + config.getChecksumFrameLength()); + + // 1. Check seek + read + for (size_t i = 0; i < test_blocks.size(); ++i) + { + // Seek to the start of each block + LOG_INFO(log, "Seeking to block {} at offset {}", i, block_compressed_offsets[i]); + compressed_in->seek(block_compressed_offsets[i], 0); + + // Read the data + std::string read_data; + read_data.resize(block_decompressed_sizes[i]); + compressed_in->readBig(read_data.data(), block_decompressed_sizes[i]); + + // Verify the data matches + ASSERT_EQ(read_data, test_blocks[i]) << "Block " << i << " data mismatch"; + } + + // Seek in inverse order to test seek again + for (size_t i = 0; i < test_blocks.size(); ++i) + { + assert(i + 1 <= test_blocks.size()); + const size_t target_block = test_blocks.size() - i - 1; + compressed_in->seek(block_compressed_offsets[target_block], 0); + std::string read_data; + read_data.resize(block_decompressed_sizes[target_block]); + size_t num_read = compressed_in->readBig(read_data.data(), test_blocks[target_block].size()); + ASSERT_EQ(num_read, test_blocks[target_block].size()); + read_data.resize(num_read); + ASSERT_EQ(read_data, test_blocks[target_block]) + << "Block " << target_block << " data mismatch after seek again"; + } +} +CATCH + +#define TEST_COMPRESSEDSEEKABLE(ALGO) \ + TEST(DMChecksumBuffer##ALGO, CompressedSeekable) \ + { \ + runCompressedSeekableReaderBufferTest(); \ + } // NOLINT(cert-err58-cpp) + +TEST_COMPRESSEDSEEKABLE(None) +TEST_COMPRESSEDSEEKABLE(CRC32) +TEST_COMPRESSEDSEEKABLE(CRC64) +TEST_COMPRESSEDSEEKABLE(City128) +TEST_COMPRESSEDSEEKABLE(XXH3) + +} // namespace DB::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index 13432ead82d..d5df1857ce2 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -923,6 +924,178 @@ try } CATCH +TEST_F(DMFileMetaV2Test, WriteReadNullableVectorColumn) +try +{ + auto log = Logger::get(); + // Verify that compression and checksum settings are configured correctly + ASSERT_EQ(dbContext().getSettingsRef().dt_compression_method.get(), CompressionMethod::LZ4); + ASSERT_EQ(dbContext().getSettingsRef().dt_checksum_algorithm.get(), ChecksumAlgo::XXH3); + ASSERT_TRUE(dm_file->getConfiguration().has_value()); + ASSERT_EQ(dm_file->getConfiguration()->getChecksumAlgorithm(), ChecksumAlgo::XXH3); + + // Define columns: ID (Int64, not null) and embedding (Array(Float32), nullable) + auto cols = std::make_shared(); + + // ID column (not null) + ColumnDefine id_cd(1, "id", std::make_shared()); + cols->emplace_back(id_cd); + + // Embedding column (nullable vector with 1536 dimensions) + ColumnDefine embedding_cd( + 2, + "embedding", + std::make_shared(std::make_shared(std::make_shared()))); + cols->emplace_back(embedding_cd); + + const size_t total_rows = 1000; + const size_t filled_embedding_rows = 100; + const size_t embedding_dimensions = 1536; + + // Prepare data in batches to avoid memory issues + const size_t batch_size = 50; + const size_t num_batches = total_rows / batch_size; + + { + auto stream = std::make_shared(dbContext(), dm_file, *cols); + stream->writePrefix(); + + for (size_t batch = 0; batch < num_batches; ++batch) + { + size_t batch_start = batch * batch_size; + size_t batch_end = (batch + 1) * batch_size; + + Block block; + + // Create ID column data (fully filled) + std::vector id_data; + id_data.reserve(batch_size); + for (size_t i = batch_start; i < batch_end; ++i) + { + id_data.push_back(static_cast(i)); + } + block.insert(DB::tests::createColumn(id_data, id_cd.name, id_cd.id)); + + // Create embedding column data (nullable, only first 100 rows filled) + // Use a simpler approach similar to existing tests + std::vector> embedding_data; + embedding_data.reserve(batch_size); + + for (size_t i = batch_start; i < batch_end; ++i) + { + if (i < filled_embedding_rows) + { + // Create a 1536-dimension vector with values based on row index + Array vec; + vec.reserve(embedding_dimensions); + for (size_t dim = 0; dim < embedding_dimensions; ++dim) + { + // Use a simple pattern: row_id + dimension_index * 0.001 + vec.push_back(static_cast(i + dim * 0.001)); + } + embedding_data.push_back(vec); + } + else + { + // Null value for rows beyond the first 100 + embedding_data.push_back(std::nullopt); + } + } + + // Create nullable array column using the tuple approach + auto embedding_col = DB::tests::createColumn>( + std::make_tuple(std::make_shared()), + embedding_data); + embedding_col.name = embedding_cd.name; + embedding_col.column_id = embedding_cd.id; + block.insert(embedding_col); + + stream->write(block, DMFileBlockOutputStream::BlockProperty{0, 0, 0, 0}); + } + + stream->writeSuffix(); + } + + // Read back and verify the data + { + DMFileBlockInputStreamBuilder builder(dbContext()); + builder.setRowsThreshold(100); + LOG_INFO(log, "Begin to verify reading blocks from DMFile"); + auto stream = builder.build( + dm_file, + *cols, + RowKeyRanges{RowKeyRange::newAll(false, 1)}, + std::make_shared()); + + size_t total_rows_read = 0; + size_t non_null_embedding_count = 0; + size_t null_embedding_count = 0; + + while (Block block = stream->read()) + { + if (!block) + break; + + ASSERT_EQ(block.columns(), 2); + + auto id_column = block.getByName("id").column; + auto embedding_column = block.getByName("embedding").column; + + // Verify column types + ASSERT_TRUE(id_column->isNumeric()); + ASSERT_TRUE(embedding_column->isColumnNullable()); + + const auto * nullable_embedding = static_cast(embedding_column.get()); + auto nested_column = nullable_embedding->getNestedColumnPtr(); + auto null_map = nullable_embedding->getNullMapColumnPtr(); + + // Check if nested column is array type + ASSERT_TRUE(typeid_cast(nested_column.get()) != nullptr); + + for (size_t i = 0; i < block.rows(); ++i) + { + // Verify ID values + Int64 id_value = id_column->getInt(i); + ASSERT_EQ(id_value, static_cast(total_rows_read + i)); + + // Verify embedding values + bool is_null = null_map->getUInt(i) != 0; + if (total_rows_read + i < filled_embedding_rows) + { + // Should not be null for first 100 rows + ASSERT_FALSE(is_null) << "Row " << (total_rows_read + i) << " should not be null"; + + if (!is_null) + { + const auto * array_column = static_cast(nested_column.get()); + + // Verify array size (should be 1536 dimensions) + size_t array_size = array_column->sizeAt(non_null_embedding_count); + ASSERT_EQ(array_size, embedding_dimensions) + << "Row " << (total_rows_read + i) << " has wrong embedding dimension"; + + non_null_embedding_count++; + } + } + else + { + // Should be null for rows beyond first 100 + ASSERT_TRUE(is_null) << "Row " << (total_rows_read + i) << " should be null"; + null_embedding_count++; + } + } + + total_rows_read += block.rows(); + } + + // Final verification for first DMFile + ASSERT_EQ(total_rows_read, total_rows); + ASSERT_EQ(non_null_embedding_count, filled_embedding_rows); + ASSERT_EQ(null_embedding_count, total_rows - filled_embedding_rows); + } +} +CATCH + class DMFileTest : public DMFileMetaV2Test , public testing::WithParamInterface @@ -1908,7 +2081,6 @@ try } CATCH - INSTANTIATE_TEST_CASE_P( DTFileMode, // DMFileTest, diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.cpp b/dbms/src/Storages/S3/S3RandomAccessFile.cpp index ca54144931d..6330cb3bd08 100644 --- a/dbms/src/Storages/S3/S3RandomAccessFile.cpp +++ b/dbms/src/Storages/S3/S3RandomAccessFile.cpp @@ -32,6 +32,12 @@ namespace ProfileEvents extern const Event S3GetObject; extern const Event S3ReadBytes; extern const Event S3GetObjectRetry; +<<<<<<< HEAD +======= +extern const Event S3IORead; +extern const Event S3IOSeek; +extern const Event S3IOSeekBackward; +>>>>>>> 28e47db635 (io: Fix checksum seek at end (#10341)) } // namespace ProfileEvents namespace DB::S3 @@ -58,7 +64,7 @@ constexpr int S3StreamError = -2; bool isRetryableError(int ret, int err) { - return ret == S3StreamError || err == ECONNRESET || err == EAGAIN; + return ret == S3StreamError || err == ECONNRESET || err == EAGAIN || err == EINPROGRESS; } } // namespace @@ -86,15 +92,14 @@ ssize_t S3RandomAccessFile::readImpl(char * buf, size_t size) istr.read(buf, size); size_t gcount = istr.gcount(); // Theoretically, `istr.eof()` is equivalent to `cur_offset + gcount != static_cast(content_length)`. - // It's just a double check for more safty. + // It's just a double check for more safety. if (gcount < size && (!istr.eof() || cur_offset + gcount != static_cast(content_length))) { auto state = istr.rdstate(); LOG_ERROR( log, "Cannot read from istream, size={} gcount={} state=0x{:02X} cur_offset={} content_length={} errno={} " - "errmsg={} " - "cost={}ns", + "errmsg={} cost={}ns", size, gcount, state, @@ -143,7 +148,7 @@ off_t S3RandomAccessFile::seekImpl(off_t offset_, int whence) { RUNTIME_CHECK_MSG(whence == SEEK_SET, "Only SEEK_SET mode is allowed, but {} is received", whence); RUNTIME_CHECK_MSG( - offset_ >= cur_offset && offset_ <= content_length, + offset_ >= 0 && offset_ <= content_length, "Seek position is out of bounds: offset={}, cur_offset={}, content_length={}", offset_, cur_offset, @@ -153,6 +158,21 @@ off_t S3RandomAccessFile::seekImpl(off_t offset_, int whence) { return cur_offset; } + + if (offset_ < cur_offset) + { + ProfileEvents::increment(ProfileEvents::S3IOSeekBackward, 1); + cur_offset = offset_; + cur_retry = 0; + + if (!initialize()) + { + return S3StreamError; + } + return cur_offset; + } + + // Forward seek Stopwatch sw; auto & istr = read_result.GetBody(); if (!istr.ignore(offset_ - cur_offset))