Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ double ResourceGroup::getAcquireRUNumWithoutLock(double speed, uint32_t n_sec, d
if unlikely (acquire_num == 0.0 && remaining_ru == 0.0)
acquire_num = DEFAULT_BUFFER_TOKENS;

// The purpose of subtracting remaining_ru is try to ensure that the number of local tokens
// always stays same with the amount consumed.
acquire_num -= remaining_ru;
acquire_num = (acquire_num > 0.0 ? acquire_num : 0.0);
return acquire_num;
Expand Down Expand Up @@ -351,11 +353,9 @@ void LocalAdmissionController::mainLoop()
static_assert(
tick_interval <= ResourceGroup::COMPUTE_RU_CONSUMPTION_SPEED_INTERVAL && tick_interval <= DEGRADE_MODE_DURATION
&& tick_interval <= DEFAULT_TARGET_PERIOD);
auto cur_tick_beg = current_tick;
auto cur_tick_end = cur_tick_beg + tick_interval;
auto cur_tick_end = current_tick + tick_interval;
while (!stopped.load())
{
if (current_tick < cur_tick_end)
{
std::unique_lock<std::mutex> lock(mu);
if (low_token_resource_groups.empty())
Expand All @@ -375,19 +375,16 @@ void LocalAdmissionController::mainLoop()
try
{
while (current_tick >= cur_tick_end)
{
updateRUConsumptionSpeed();
cur_tick_beg = cur_tick_end;
cur_tick_end += tick_interval;
}

updateRUConsumptionSpeed();
if (const auto gac_req_opt = buildGACRequest(/*is_final_report=*/false); gac_req_opt.has_value())
{
std::lock_guard lock(gac_requests_mu);
gac_requests.push_back(gac_req_opt.value());
gac_requests_cv.notify_all();
}
clearCPUTimeWithoutLock(current_tick);
clearCPUTime(current_tick);
checkDegradeMode();
}
catch (...)
Expand Down Expand Up @@ -423,13 +420,16 @@ std::optional<resource_manager::TokenBucketsRequest> LocalAdmissionController::b
else
{
std::unordered_set<std::string> local_low_token_resource_groups;
std::unordered_map<std::string, ResourceGroupPtr> local_resource_groups;
{
std::lock_guard lock(mu);
local_low_token_resource_groups = low_token_resource_groups;
low_token_resource_groups.clear();

local_resource_groups = resource_groups;
}

for (const auto & iter : resource_groups)
for (const auto & iter : local_resource_groups)
{
const auto rg_name = iter.first;
const bool need_fetch_token = local_low_token_resource_groups.contains(rg_name);
Expand Down Expand Up @@ -621,7 +621,7 @@ std::vector<std::string> LocalAdmissionController::handleTokenBucketsResp(

const String err_msg = fmt::format("handle acquire token resp failed: rg: {}", name);
// It's possible for one_resp.granted_r_u_tokens() to be empty
// when the acquire_token_req is only for report RU consumption.
// when the acquire_token_req is only for report RU consumption or GAC got error(like nan token).
if (one_resp.granted_r_u_tokens().empty())
{
resource_group->endRequest();
Expand All @@ -630,6 +630,7 @@ std::vector<std::string> LocalAdmissionController::handleTokenBucketsResp(

if unlikely (one_resp.granted_r_u_tokens().size() != 1)
{
resource_group->endRequest();
LOG_ERROR(
log,
"{} unexpected resp.granted_r_u_tokens().size(): {} one_resp: {}",
Expand All @@ -642,13 +643,15 @@ std::vector<std::string> LocalAdmissionController::handleTokenBucketsResp(
const resource_manager::GrantedRUTokenBucket & granted_token_bucket = one_resp.granted_r_u_tokens()[0];
if unlikely (granted_token_bucket.type() != resource_manager::RequestUnitType::RU)
{
resource_group->endRequest();
LOG_ERROR(log, "{} unexpected request type, one_resp: {}", err_msg, one_resp.ShortDebugString());
continue;
}

const auto trickle_ms = granted_token_bucket.trickle_time_ms();
if unlikely (trickle_ms < 0)
{
resource_group->endRequest();
LOG_ERROR(
log,
"{} unexpected trickle_ms: {} one_resp: {}",
Expand All @@ -663,6 +666,7 @@ std::vector<std::string> LocalAdmissionController::handleTokenBucketsResp(
double added_tokens = granted_token_bucket.granted_tokens().tokens();
if unlikely (!std::isfinite(added_tokens) || added_tokens < 0.0)
{
resource_group->endRequest();
LOG_ERROR(
log,
"{} invalid added_tokens: {} one_resp: {}",
Expand Down Expand Up @@ -833,7 +837,7 @@ bool LocalAdmissionController::handleDeleteEvent(const mvccpb::KeyValue & kv, st
updateMaxRUPerSecAfterDeleteWithoutLock(deleted_user_ru_per_sec);
}
}
LOG_DEBUG(log, "delete resource group {}, erase_num: {}", name, erase_num);
LOG_INFO(log, "delete resource group {}, erase_num: {}", name, erase_num);
return true;
}

Expand Down Expand Up @@ -868,7 +872,7 @@ bool LocalAdmissionController::handlePutEvent(const mvccpb::KeyValue & kv, std::
updateMaxRUPerSecAfterDeleteWithoutLock(deleted_user_ru_per_sec);
}
}
LOG_DEBUG(log, "modify resource group to: {}", group_pb.ShortDebugString());
LOG_INFO(log, "modify resource group to: {}", group_pb.ShortDebugString());
return true;
}

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/ResourceControl/LocalAdmissionController.h
Original file line number Diff line number Diff line change
Expand Up @@ -538,9 +538,10 @@ class LocalAdmissionController final : private boost::noncopyable
std::string & err_msg);
void updateMaxRUPerSecAfterDeleteWithoutLock(uint64_t deleted_user_ru_per_sec);

void clearCPUTimeWithoutLock(const SteadyClock::time_point & now)
void clearCPUTime(const SteadyClock::time_point & now)
{
static_assert(CLEAR_CPU_TIME_DURATION > ResourceGroup::COMPUTE_RU_CONSUMPTION_SPEED_INTERVAL);
std::lock_guard lock(mu);
if (now - last_clear_cpu_time >= CLEAR_CPU_TIME_DURATION)
{
for (auto & resource_group : resource_groups)
Expand Down