diff --git a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp index 9502a7d6ebf..86844cb7710 100644 --- a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp +++ b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -94,14 +95,23 @@ NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef return genNamesAndTypes(table_scan.getColumns(), column_prefix); } -std::tuple genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan) +std::tuple>> genColumnDefinesForDisaggregatedRead( + const TiDBTableScan & table_scan) { auto column_defines = std::make_shared(); int extra_table_id_index = MutSup::invalid_col_id; column_defines->reserve(table_scan.getColumnSize()); + std::vector> generated_column_infos; for (Int32 i = 0; i < table_scan.getColumnSize(); ++i) { const auto & column_info = table_scan.getColumns()[i]; + if (column_info.hasGeneratedColumnFlag()) + { + const auto & data_type = getDataTypeByColumnInfoForComputingLayer(column_info); + const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i); + generated_column_infos.push_back(std::make_tuple(i, col_name, data_type)); + continue; + } // Now the upper level seems treat disagg read as an ExchangeReceiver output, so // use this as output column prefix. // Even if the id is pk_column or extra_table_id, we still output it as @@ -117,10 +127,6 @@ std::tuple genColumnDefinesForDisaggregatedRead(const break; case MutSup::extra_table_id_col_id: { - column_defines->emplace_back(DM::ColumnDefine{ - MutSup::extra_table_id_col_id, - output_name, // MutSup::extra_table_id_column_name - MutSup::getExtraTableIdColumnType()}); extra_table_id_index = i; break; } @@ -133,7 +139,7 @@ std::tuple genColumnDefinesForDisaggregatedRead(const break; } } - return {std::move(column_defines), extra_table_id_index}; + return {std::move(column_defines), extra_table_id_index, std::move(generated_column_infos)}; } ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types) diff --git a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h index 4b6281e8a77..1eb87891ba9 100644 --- a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h +++ b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h @@ -34,7 +34,8 @@ NamesAndTypes genNamesAndTypes(const TiDB::ColumnInfos & column_infos, const Str ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types); NamesAndTypes toNamesAndTypes(const DAGSchema & dag_schema); -// The column defines and `extra table id index` -std::tuple genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan); +// The column defines, `extra table id index` and `generated columns info` for disaggregated read. +std::tuple>> genColumnDefinesForDisaggregatedRead( + const TiDBTableScan & table_scan); } // namespace DB diff --git a/dbms/src/Storages/StorageDisaggregated.h b/dbms/src/Storages/StorageDisaggregated.h index 70ffbe65925..27e6208f520 100644 --- a/dbms/src/Storages/StorageDisaggregated.h +++ b/dbms/src/Storages/StorageDisaggregated.h @@ -108,7 +108,7 @@ class StorageDisaggregated : public IStorage std::tuple buildRSOperatorAndColumnRange( const Context & db_context, const DM::ColumnDefinesPtr & columns_to_read); - std::variant packSegmentReadTasks( + std::tuple, DM::ColumnDefinesPtr> packSegmentReadTasks( const Context & db_context, DM::SegmentReadTasks && read_tasks, const DM::ColumnDefinesPtr & column_defines, @@ -161,5 +161,7 @@ class StorageDisaggregated : public IStorage std::unique_ptr analyzer; static constexpr auto ZONE_LABEL_KEY = "zone"; std::optional zone_label; + // For generated column, just need a placeholder, and TiDB will fill this column. + std::vector> generated_column_infos; }; } // namespace DB diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index 77dc3ccaa88..0f870a83813 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -115,6 +115,8 @@ BlockInputStreams StorageDisaggregated::readThroughS3(const Context & db_context num_streams, pipeline, scan_context); + // handle generated column if necessary. + executeGeneratedColumnPlaceholder(generated_column_infos, log, pipeline); NamesAndTypes source_columns; source_columns.reserve(table_scan.getColumnSize()); @@ -149,6 +151,8 @@ void StorageDisaggregated::readThroughS3( buildReadTaskWithBackoff(db_context, scan_context), num_streams, scan_context); + // handle generated column if necessary. + executeGeneratedColumnPlaceholder(exec_context, group_builder, generated_column_infos, log); NamesAndTypes source_columns; auto header = group_builder.getCurrentHeader(); @@ -609,13 +613,14 @@ std::tuple StorageDisaggregated::buildRSO return {rs_operator, column_range}; } -std::variant StorageDisaggregated::packSegmentReadTasks( - const Context & db_context, - DM::SegmentReadTasks && read_tasks, - const DM::ColumnDefinesPtr & column_defines, - const DM::ScanContextPtr & scan_context, - size_t num_streams, - int extra_table_id_index) +std::tuple, DM::ColumnDefinesPtr> StorageDisaggregated:: + packSegmentReadTasks( + const Context & db_context, + DM::SegmentReadTasks && read_tasks, + const DM::ColumnDefinesPtr & column_defines, + const DM::ScanContextPtr & scan_context, + size_t num_streams, + int extra_table_id_index) { const auto & executor_id = table_scan.getTableScanExecutorID(); @@ -651,53 +656,61 @@ std::variant StorageDisagg scan_context->read_mode = read_mode; const UInt64 start_ts = sender_target_mpp_task_id.gather_id.query_id.start_ts; const auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread; + const auto & final_columns_defines = push_down_executor && push_down_executor->extra_cast + ? push_down_executor->columns_after_cast + : column_defines; RUNTIME_CHECK(num_streams > 0, num_streams); LOG_INFO( log, "packSegmentReadTasks: enable_read_thread={} read_mode={} is_fast_scan={} keep_order={} task_count={} " - "num_streams={} column_defines={}", + "num_streams={} column_defines={} final_columns_defines={}", enable_read_thread, magic_enum::enum_name(read_mode), table_scan.isFastScan(), table_scan.keepOrder(), read_tasks.size(), num_streams, - *column_defines); + *column_defines, + *final_columns_defines); if (enable_read_thread) { // Under disagg arch, now we use blocking IO to read data from cloud storage. So it require more active // segments to fully utilize the read threads. const size_t read_thread_num_active_seg = 10 * num_streams; - return std::make_shared( - extra_table_id_index, - *column_defines, - push_down_executor, - start_ts, - db_context.getSettingsRef().max_block_size, - read_mode, - std::move(read_tasks), - /*after_segment_read*/ [](const DM::DMContextPtr &, const DM::SegmentPtr &) {}, - log->identifier(), - /*enable_read_thread*/ true, - num_streams, - read_thread_num_active_seg, - context.getDAGContext()->getKeyspaceID(), - context.getDAGContext()->getResourceGroupName()); + return { + std::make_shared( + extra_table_id_index, + *final_columns_defines, + push_down_executor, + start_ts, + db_context.getSettingsRef().max_block_size, + read_mode, + std::move(read_tasks), + /*after_segment_read*/ [](const DM::DMContextPtr &, const DM::SegmentPtr &) {}, + log->identifier(), + /*enable_read_thread*/ true, + num_streams, + read_thread_num_active_seg, + context.getDAGContext()->getKeyspaceID(), + context.getDAGContext()->getResourceGroupName()), + final_columns_defines}; } else { - return DM::Remote::RNWorkers::create( - db_context, - std::move(read_tasks), - { - .log = log->getChild(executor_id), - .columns_to_read = column_defines, - .start_ts = start_ts, - .push_down_executor = push_down_executor, - .read_mode = read_mode, - }, - num_streams); + return { + DM::Remote::RNWorkers::create( + db_context, + std::move(read_tasks), + { + .log = log->getChild(executor_id), + .columns_to_read = final_columns_defines, + .start_ts = start_ts, + .push_down_executor = push_down_executor, + .read_mode = read_mode, + }, + num_streams), + final_columns_defines}; } } @@ -739,8 +752,11 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams( const DM::ScanContextPtr & scan_context) { // Build the input streams to read blocks from remote segments - auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan); - auto packed_read_tasks = packSegmentReadTasks( + DM::ColumnDefinesPtr column_defines; + int extra_table_id_index; + std::tie(column_defines, extra_table_id_index, generated_column_infos) + = genColumnDefinesForDisaggregatedRead(table_scan); + auto [packed_read_tasks, final_column_defines] = packSegmentReadTasks( db_context, std::move(read_tasks), column_defines, @@ -751,7 +767,7 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams( InputStreamBuilder builder{ .tracing_id = log->identifier(), - .columns_to_read = column_defines, + .columns_to_read = final_column_defines, .extra_table_id_index = extra_table_id_index, }; for (size_t stream_idx = 0; stream_idx < num_streams; ++stream_idx) @@ -810,8 +826,11 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps( const DM::ScanContextPtr & scan_context) { // Build the input streams to read blocks from remote segments - auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan); - auto packed_read_tasks = packSegmentReadTasks( + DM::ColumnDefinesPtr column_defines; + int extra_table_id_index; + std::tie(column_defines, extra_table_id_index, generated_column_infos) + = genColumnDefinesForDisaggregatedRead(table_scan); + auto [packed_read_tasks, final_column_defines] = packSegmentReadTasks( db_context, std::move(read_tasks), column_defines, @@ -821,7 +840,7 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps( SourceOpBuilder builder{ .tracing_id = log->identifier(), - .column_defines = column_defines, + .column_defines = final_column_defines, .extra_table_id_index = extra_table_id_index, .exec_context = exec_context, }; diff --git a/tests/docker/next-gen-config/tidb.toml b/tests/docker/next-gen-config/tidb.toml index 4c1c7e6c764..40cc6e9e68a 100644 --- a/tests/docker/next-gen-config/tidb.toml +++ b/tests/docker/next-gen-config/tidb.toml @@ -19,6 +19,8 @@ disaggregated-tiflash = true # Now tests are ran on the SYSTEM keyspace tidb. keyspace-name = "SYSTEM" +tikv-worker-url = "tikv-worker0:19000" + enable-telemetry = false temp-dir = "/data/tmp" [performance] diff --git a/tests/docker/next-gen-config/tikv-worker.toml b/tests/docker/next-gen-config/tikv-worker.toml new file mode 100644 index 00000000000..bb2a52ba88f --- /dev/null +++ b/tests/docker/next-gen-config/tikv-worker.toml @@ -0,0 +1,28 @@ +# Copyright 2025 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# object storage for next-gen +[dfs] +prefix = "tikv" +s3-endpoint = "http://minio0:9000" +s3-key-id = "minioadmin" +s3-secret-key = "minioadmin" +s3-bucket = "tiflash-test" +s3-region = "local" + +[schema-manager] +dir = "/data/schemas" +enabled = true +keyspace-refresh-interval = "10s" +schema-refresh-threshold = 1 diff --git a/tests/docker/next-gen-config/tikv.toml b/tests/docker/next-gen-config/tikv.toml index 0e0fe67baee..d00976dc9ca 100644 --- a/tests/docker/next-gen-config/tikv.toml +++ b/tests/docker/next-gen-config/tikv.toml @@ -18,6 +18,7 @@ reserve-space = "0" # Enable keyspace and ttl for next-gen api-version = 2 enable-ttl = true +low-space-threshold = 0 [raftstore] capacity = "100GB" diff --git a/tests/docker/next-gen-yaml/cluster.yaml b/tests/docker/next-gen-yaml/cluster.yaml index 28bd63f2748..c0acdf2418c 100644 --- a/tests/docker/next-gen-yaml/cluster.yaml +++ b/tests/docker/next-gen-yaml/cluster.yaml @@ -49,6 +49,19 @@ services: - "pd0" - "minio0" restart: on-failure + tikv-worker0: + image: ${TIKV_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/hub/tikv/tikv/image:dedicated-next-gen} + security_opt: + - seccomp:unconfined + volumes: + - ./next-gen-config/tikv-worker.toml:/tikv-worker.toml:ro + - ./data/tikv-worker0:/data + - ./log/tikv-worker0:/log + entrypoint: /tikv-worker + command: --addr=0.0.0.0:19000 --pd-endpoints=pd0:2379 --config=/tikv-worker.toml --data-dir=/data --log-file=/log/tikv-worker.log + depends_on: + - "pd0" + restart: on-failure tidb0: image: ${TIDB_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/hub/pingcap/tidb/images/tidb-server:master-next-gen} security_opt: diff --git a/tests/fullstack-test-next-gen/run.sh b/tests/fullstack-test-next-gen/run.sh index d7192820896..5f828cf4b66 100755 --- a/tests/fullstack-test-next-gen/run.sh +++ b/tests/fullstack-test-next-gen/run.sh @@ -69,15 +69,15 @@ if [[ -n "$ENABLE_NEXT_GEN" && "$ENABLE_NEXT_GEN" != "false" && "$ENABLE_NEXT_GE ENV_ARGS="ENABLE_NEXT_GEN=true verbose=${verbose} " # most failpoints are expected to be set on the compute layer, use tiflash-cn0 to run tests ${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test/sample.test" - ${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test-index/vector" + ${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test-index" ${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test-next-gen/placement" ${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test2/clustered_index" ${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test2/dml" ${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test2/variables" ${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test2/mpp" - # TODO: enable the following tests after they are fixed. And maybe we need to split them into parallel pipelines because they take too long to run. - #${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test/expr" - #${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test/mpp" + # maybe we need to split them into parallel pipelines because they take too long to run. + ${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test/expr" + ${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test/mpp" ${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" down clean_data_log diff --git a/tests/fullstack-test/mpp/extra_physical_table_column.test b/tests/fullstack-test/mpp/extra_physical_table_column.test index 2fe6af2d1f0..098f95c7d4e 100644 --- a/tests/fullstack-test/mpp/extra_physical_table_column.test +++ b/tests/fullstack-test/mpp/extra_physical_table_column.test @@ -12,9 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# line 27: Pessimistic lock response corrupted -#SKIP_FOR_NEXT_GEN - # Preparation. => DBGInvoke __init_fail_point()