diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp index 39fb2342d7f..93aa3bd2f23 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp @@ -140,6 +140,8 @@ double ResourceGroup::getAcquireRUNumWithoutLock(double speed, uint32_t n_sec, d if unlikely (acquire_num == 0.0 && remaining_ru == 0.0) acquire_num = DEFAULT_BUFFER_TOKENS; + // The purpose of subtracting remaining_ru is try to ensure that the number of local tokens + // always stays same with the amount consumed. acquire_num -= remaining_ru; acquire_num = (acquire_num > 0.0 ? acquire_num : 0.0); return acquire_num; @@ -351,11 +353,9 @@ void LocalAdmissionController::mainLoop() static_assert( tick_interval <= ResourceGroup::COMPUTE_RU_CONSUMPTION_SPEED_INTERVAL && tick_interval <= DEGRADE_MODE_DURATION && tick_interval <= DEFAULT_TARGET_PERIOD); - auto cur_tick_beg = current_tick; - auto cur_tick_end = cur_tick_beg + tick_interval; + auto cur_tick_end = current_tick + tick_interval; while (!stopped.load()) { - if (current_tick < cur_tick_end) { std::unique_lock lock(mu); if (low_token_resource_groups.empty()) @@ -375,19 +375,16 @@ void LocalAdmissionController::mainLoop() try { while (current_tick >= cur_tick_end) - { - updateRUConsumptionSpeed(); - cur_tick_beg = cur_tick_end; cur_tick_end += tick_interval; - } + updateRUConsumptionSpeed(); if (const auto gac_req_opt = buildGACRequest(/*is_final_report=*/false); gac_req_opt.has_value()) { std::lock_guard lock(gac_requests_mu); gac_requests.push_back(gac_req_opt.value()); gac_requests_cv.notify_all(); } - clearCPUTimeWithoutLock(current_tick); + clearCPUTime(current_tick); checkDegradeMode(); } catch (...) @@ -423,13 +420,16 @@ std::optional LocalAdmissionController::b else { std::unordered_set local_low_token_resource_groups; + std::unordered_map local_resource_groups; { std::lock_guard lock(mu); local_low_token_resource_groups = low_token_resource_groups; low_token_resource_groups.clear(); + + local_resource_groups = resource_groups; } - for (const auto & iter : resource_groups) + for (const auto & iter : local_resource_groups) { const auto rg_name = iter.first; const bool need_fetch_token = local_low_token_resource_groups.contains(rg_name); @@ -621,7 +621,7 @@ std::vector LocalAdmissionController::handleTokenBucketsResp( const String err_msg = fmt::format("handle acquire token resp failed: rg: {}", name); // It's possible for one_resp.granted_r_u_tokens() to be empty - // when the acquire_token_req is only for report RU consumption. + // when the acquire_token_req is only for report RU consumption or GAC got error(like nan token). if (one_resp.granted_r_u_tokens().empty()) { resource_group->endRequest(); @@ -630,6 +630,7 @@ std::vector LocalAdmissionController::handleTokenBucketsResp( if unlikely (one_resp.granted_r_u_tokens().size() != 1) { + resource_group->endRequest(); LOG_ERROR( log, "{} unexpected resp.granted_r_u_tokens().size(): {} one_resp: {}", @@ -642,6 +643,7 @@ std::vector LocalAdmissionController::handleTokenBucketsResp( const resource_manager::GrantedRUTokenBucket & granted_token_bucket = one_resp.granted_r_u_tokens()[0]; if unlikely (granted_token_bucket.type() != resource_manager::RequestUnitType::RU) { + resource_group->endRequest(); LOG_ERROR(log, "{} unexpected request type, one_resp: {}", err_msg, one_resp.ShortDebugString()); continue; } @@ -649,6 +651,7 @@ std::vector LocalAdmissionController::handleTokenBucketsResp( const auto trickle_ms = granted_token_bucket.trickle_time_ms(); if unlikely (trickle_ms < 0) { + resource_group->endRequest(); LOG_ERROR( log, "{} unexpected trickle_ms: {} one_resp: {}", @@ -663,6 +666,7 @@ std::vector LocalAdmissionController::handleTokenBucketsResp( double added_tokens = granted_token_bucket.granted_tokens().tokens(); if unlikely (!std::isfinite(added_tokens) || added_tokens < 0.0) { + resource_group->endRequest(); LOG_ERROR( log, "{} invalid added_tokens: {} one_resp: {}", @@ -833,7 +837,7 @@ bool LocalAdmissionController::handleDeleteEvent(const mvccpb::KeyValue & kv, st updateMaxRUPerSecAfterDeleteWithoutLock(deleted_user_ru_per_sec); } } - LOG_DEBUG(log, "delete resource group {}, erase_num: {}", name, erase_num); + LOG_INFO(log, "delete resource group {}, erase_num: {}", name, erase_num); return true; } @@ -868,7 +872,7 @@ bool LocalAdmissionController::handlePutEvent(const mvccpb::KeyValue & kv, std:: updateMaxRUPerSecAfterDeleteWithoutLock(deleted_user_ru_per_sec); } } - LOG_DEBUG(log, "modify resource group to: {}", group_pb.ShortDebugString()); + LOG_INFO(log, "modify resource group to: {}", group_pb.ShortDebugString()); return true; } diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index 097062cd3e7..965acb42764 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -538,9 +538,10 @@ class LocalAdmissionController final : private boost::noncopyable std::string & err_msg); void updateMaxRUPerSecAfterDeleteWithoutLock(uint64_t deleted_user_ru_per_sec); - void clearCPUTimeWithoutLock(const SteadyClock::time_point & now) + void clearCPUTime(const SteadyClock::time_point & now) { static_assert(CLEAR_CPU_TIME_DURATION > ResourceGroup::COMPUTE_RU_CONSUMPTION_SPEED_INTERVAL); + std::lock_guard lock(mu); if (now - last_clear_cpu_time >= CLEAR_CPU_TIME_DURATION) { for (auto & resource_group : resource_groups)