Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@
M(S3CopyObject) \
M(S3GetObjectRetry) \
M(S3PutObjectRetry) \
<<<<<<< HEAD
=======
M(S3IORead) \
M(S3IOSeek) \
M(S3IOSeekBackward) \
>>>>>>> 28e47db635 (io: Fix checksum seek at end (#10341))
M(FileCacheHit) \
M(FileCacheMiss) \
M(FileCacheEvict) \
Expand Down
74 changes: 74 additions & 0 deletions dbms/src/IO/Checksum/ChecksumBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,80 @@ namespace DB
{
using namespace DB::Digest;

template <typename Backend>
off_t FramedChecksumReadBuffer<Backend>::doSeek(off_t offset, int whence)
{
auto & frame = reinterpret_cast<ChecksumFrame<Backend> &>(
*(this->working_buffer.begin() - sizeof(ChecksumFrame<Backend>))); // align should not fail

if (whence == SEEK_CUR)
{
offset = getPositionInFile() + offset;
}
else if (whence != SEEK_SET)
{
throw TiFlashException(
"FramedChecksumReadBuffer::seek expects SEEK_SET or SEEK_CUR as whence",
Errors::Checksum::Internal);
}
auto target_frame = offset / frame_size;
auto target_offset = offset % frame_size;

// If we have already seek to EOF, then working_buffer was cleared
if (target_frame == current_frame && working_buffer.size() > 0)
{
if (unlikely(target_offset > working_buffer.size()))
pos = working_buffer.end();
else
pos = working_buffer.begin() + target_offset;
return offset;
}

// Seek according to `target_frame` and `target_offset`
// read the header and the body
auto header_offset = target_frame * (sizeof(ChecksumFrame<Backend>) + frame_size);
auto result = in->seek(static_cast<off_t>(header_offset), SEEK_SET);
if (result == -1)
{
throw TiFlashException(
Errors::Checksum::IOFailure,
"checksum framed file {} is not seekable",
in->getFileName());
}
auto length = expectRead(
working_buffer.begin() - sizeof(ChecksumFrame<Backend>),
sizeof(ChecksumFrame<Backend>) + frame_size);
if (length == 0)
{
current_frame = target_frame;
pos = working_buffer.begin();
working_buffer.resize(0);
return offset; // EOF
}
if (unlikely(length != sizeof(ChecksumFrame<Backend>) + frame.bytes))
{
throw TiFlashException(
Errors::Checksum::DataCorruption,
"frame length (header = {}, body = {}, read = {}) mismatch for {}",
sizeof(ChecksumFrame<Backend>),
frame.bytes,
length,
in->getFileName());
}

// body checksum examination
checkBody();

// update statistics
current_frame = target_frame;
if (unlikely(target_offset > working_buffer.size()))
pos = working_buffer.end();
else
pos = working_buffer.begin() + target_offset;

return offset;
}

template class FramedChecksumReadBuffer<None>;
template class FramedChecksumReadBuffer<CRC32>;
template class FramedChecksumReadBuffer<CRC64>;
Expand Down
67 changes: 1 addition & 66 deletions dbms/src/IO/Checksum/ChecksumBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,72 +415,7 @@ class FramedChecksumReadBuffer : public ReadBufferFromFileDescriptor
return true;
}

off_t doSeek(off_t offset, int whence) override
{
auto & frame = reinterpret_cast<ChecksumFrame<Backend> &>(
*(this->working_buffer.begin() - sizeof(ChecksumFrame<Backend>))); // align should not fail

if (whence == SEEK_CUR)
{
offset = getPositionInFile() + offset;
}
else if (whence != SEEK_SET)
{
throw TiFlashException(
"FramedChecksumReadBuffer::seek expects SEEK_SET or SEEK_CUR as whence",
Errors::Checksum::Internal);
}
auto target_frame = offset / frame_size;
auto target_offset = offset % frame_size;

if (target_frame == current_frame)
{
pos = working_buffer.begin() + target_offset;
return offset;
}
else
{
// read the header and the body
auto header_offset = target_frame * (sizeof(ChecksumFrame<Backend>) + frame_size);
auto result = in->seek(static_cast<off_t>(header_offset), SEEK_SET);
if (result == -1)
{
throw TiFlashException(
"checksum framed file " + in->getFileName() + " is not seekable",
Errors::Checksum::IOFailure);
}
auto length = expectRead(
working_buffer.begin() - sizeof(ChecksumFrame<Backend>),
sizeof(ChecksumFrame<Backend>) + frame_size);
if (length == 0)
{
current_frame = target_frame;
pos = working_buffer.begin();
working_buffer.resize(0);
return offset; // EOF
}
if (unlikely(length != sizeof(ChecksumFrame<Backend>) + frame.bytes))
{
throw TiFlashException(
fmt::format(
"frame length (header = {}, body = {}, read = {}) mismatch for {}",
sizeof(ChecksumFrame<Backend>),
frame.bytes,
length,
in->getFileName()),
Errors::Checksum::DataCorruption);
}

// body checksum examination
checkBody();

// update statistics
current_frame = target_frame;
pos = working_buffer.begin() + target_offset;
}

return offset;
}
off_t doSeek(off_t offset, int whence) override;
};

} // namespace DB
140 changes: 131 additions & 9 deletions dbms/src/IO/Checksum/tests/gtest_dm_checksum_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsign-compare"
#include <gtest/gtest.h>
#pragma GCC diagnostic pop
#include <Common/Logger.h>
#include <IO/BaseFile/PosixRandomAccessFile.h>
#include <IO/BaseFile/PosixWritableFile.h>
#include <IO/BaseFile/RateLimiter.h>
Expand All @@ -28,14 +25,15 @@
#include <Poco/File.h>
#include <Storages/DeltaMerge/DMChecksumConfig.h>
#include <Storages/Page/PageUtil.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <fmt/format.h>

#include <ext/scope_guard.h>
#include <random>

namespace DB
{
namespace tests
namespace DB::tests
{

namespace
{
std::random_device dev; // NOLINT(cert-err58-cpp)
Expand Down Expand Up @@ -372,5 +370,129 @@ TEST_STACKED_SEEKING(CRC32)
TEST_STACKED_SEEKING(CRC64)
TEST_STACKED_SEEKING(City128)
TEST_STACKED_SEEKING(XXH3)
} // namespace tests
} // namespace DB

template <ChecksumAlgo D>
void runCompressedSeekableReaderBufferTest()
try
{
auto log = Logger::get();
// Create a temporary file for testing
const std::string temp_file_path = "/tmp/tiflash_compressed_seek_test.dat";
SCOPE_EXIT({
Poco::File file(temp_file_path);
if (file.exists())
file.remove();
});
// Test data - create multiple blocks with different patterns
std::vector<std::string> test_blocks;

test_blocks = {
std::string(1500, 'A') + "BLOCK0_END",
std::string(800, 'B') + "BLOCK1_END",
"", // Block 2 is empty
"", // Block 3 is empty
};

std::vector<size_t> block_compressed_offsets;
std::vector<size_t> block_decompressed_sizes;

auto [limiter, provider] = prepareIO();
auto config = DM::DMChecksumConfig{{}, TIFLASH_DEFAULT_CHECKSUM_FRAME_SIZE, D};

// Write compressed data to file
{
auto plain_file = ChecksumWriteBufferBuilder::build(
true,
provider,
temp_file_path,
EncryptionPath(temp_file_path, temp_file_path),
false,
limiter->getWriteLimiter(),
config.getChecksumAlgorithm(),
config.getChecksumFrameLength(),
/*flags*/
-1,
/*mode*/ 0666,
1048576);
auto compressed_buf
= CompressedWriteBuffer<>::build(*plain_file, CompressionSettings(CompressionMethod::LZ4), false);

for (const auto & block_data : test_blocks)
{
// Record the compressed file offset before writing this block
block_compressed_offsets.push_back(plain_file->count());
block_decompressed_sizes.push_back(block_data.size());

// Write the block data
compressed_buf->write(block_data.data(), block_data.size());
compressed_buf->next(); // Force compression of this block
}
}

LOG_INFO(log, "Created compressed file with {} blocks", test_blocks.size());
for (size_t i = 0; i < block_compressed_offsets.size(); ++i)
{
LOG_INFO(
log,
"Block {}: compressed_offset={}, decompressed_size={}",
i,
block_compressed_offsets[i],
block_decompressed_sizes[i]);
}


auto compressed_in = CompressedReadBufferFromFileBuilder::build(
provider,
temp_file_path,
EncryptionPath(temp_file_path, temp_file_path),
config.getChecksumFrameLength(),
limiter->getReadLimiter(),
config.getChecksumAlgorithm(),
config.getChecksumFrameLength());

// 1. Check seek + read
for (size_t i = 0; i < test_blocks.size(); ++i)
{
// Seek to the start of each block
LOG_INFO(log, "Seeking to block {} at offset {}", i, block_compressed_offsets[i]);
compressed_in->seek(block_compressed_offsets[i], 0);

// Read the data
std::string read_data;
read_data.resize(block_decompressed_sizes[i]);
compressed_in->readBig(read_data.data(), block_decompressed_sizes[i]);

// Verify the data matches
ASSERT_EQ(read_data, test_blocks[i]) << "Block " << i << " data mismatch";
}

// Seek in inverse order to test seek again
for (size_t i = 0; i < test_blocks.size(); ++i)
{
assert(i + 1 <= test_blocks.size());
const size_t target_block = test_blocks.size() - i - 1;
compressed_in->seek(block_compressed_offsets[target_block], 0);
std::string read_data;
read_data.resize(block_decompressed_sizes[target_block]);
size_t num_read = compressed_in->readBig(read_data.data(), test_blocks[target_block].size());
ASSERT_EQ(num_read, test_blocks[target_block].size());
read_data.resize(num_read);
ASSERT_EQ(read_data, test_blocks[target_block])
<< "Block " << target_block << " data mismatch after seek again";
}
}
CATCH

#define TEST_COMPRESSEDSEEKABLE(ALGO) \
TEST(DMChecksumBuffer##ALGO, CompressedSeekable) \
{ \
runCompressedSeekableReaderBufferTest<ChecksumAlgo::ALGO>(); \
} // NOLINT(cert-err58-cpp)

TEST_COMPRESSEDSEEKABLE(None)
TEST_COMPRESSEDSEEKABLE(CRC32)
TEST_COMPRESSEDSEEKABLE(CRC64)
TEST_COMPRESSEDSEEKABLE(City128)
TEST_COMPRESSEDSEEKABLE(XXH3)

} // namespace DB::tests
Loading