Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
path: deps/3rd/usr/local
- name: BuildDebug
shell: bash
run: bash build.sh debug -DCONCURRENCY=ON -DENABLE_COVERAGE=ON -DWITH_BENCHMARK=ON -DWITH_MEMTRACER=ON -DWITH_UNIT_TESTS=ON --make -j4
run: bash build.sh debug -DENABLE_COVERAGE=ON -DWITH_BENCHMARK=ON -DWITH_MEMTRACER=ON -DWITH_UNIT_TESTS=ON --make -j4

- name: Test
shell: bash
Expand Down Expand Up @@ -125,7 +125,7 @@ jobs:
path: deps/3rd/usr/local
- name: BuildRelease
shell: bash
run: bash build.sh release -DCONCURRENCY=ON -DWITH_UNIT_TESTS=ON -DWITH_BENCHMARK=ON -DENABLE_ASAN=OFF -DWITH_MEMTRACER=ON --make -j4
run: bash build.sh release -DWITH_UNIT_TESTS=ON -DWITH_BENCHMARK=ON -DENABLE_ASAN=OFF -DWITH_MEMTRACER=ON --make -j4

- name: Upload release artifacts
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -386,4 +386,4 @@ jobs:
export PATH="/opt/homebrew/opt/bison/bin:$PATH"
sudo bash build.sh init
bash build.sh debug -DWITH_MEMTRACER=ON --make -j4
bash build.sh release -DCONCURRENCY=ON -DWITH_UNIT_TESTS=OFF -DWITH_BENCHMARK=ON -DWITH_MEMTRACER=ON --make -j4
bash build.sh release -DWITH_UNIT_TESTS=OFF -DWITH_BENCHMARK=ON -DWITH_MEMTRACER=ON --make -j4
7 changes: 0 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ OPTION(WITH_BENCHMARK "Compile benchmark" OFF)
OPTION(WITH_MEMTRACER "Compile memtracer" OFF)
OPTION(ENABLE_COVERAGE "Enable unittest coverage" OFF)
OPTION(ENABLE_NOPIE "Enable no pie" OFF)
OPTION(CONCURRENCY "Support concurrency operations" OFF)
OPTION(STATIC_STDLIB "Link std library static or dynamic, such as libgcc, libstdc++, libasan" OFF)
OPTION(USE_SIMD "Use SIMD" OFF)
OPTION(USE_MUSL_LIBC "Use musl libc" OFF)
Expand Down Expand Up @@ -75,12 +74,6 @@ IF(USE_SIMD)
ADD_DEFINITIONS(-DUSE_SIMD)
ENDIF(USE_SIMD)

IF (CONCURRENCY)
MESSAGE(STATUS "CONCURRENCY is ON")
SET(CMAKE_COMMON_FLAGS "${CMAKE_COMMON_FLAGS} -DCONCURRENCY")
ADD_DEFINITIONS(-DCONCURRENCY)
ENDIF (CONCURRENCY)

MESSAGE(STATUS "CMAKE_CXX_COMPILER_ID is " ${CMAKE_CXX_COMPILER_ID})
IF ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU" AND ${STATIC_STDLIB})
ADD_LINK_OPTIONS(-static-libgcc -static-libstdc++)
Expand Down
13 changes: 0 additions & 13 deletions docs/docs/how_to_run.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,6 @@ title: 如何运行
```
这会连接到服务端的miniob.sock文件。

**并发模式**

默认情况下,编译出的程序是不支持并发的。如果需要支持并发,需要在编译时增加选项 `-DCONCURRENCY=ON`:
```bash
cmake -DCONCURRENCY=ON ..
```

或者

```bash
bash build.sh -DCONCURRENCY=ON
```

然后使用上面的命令启动服务端程序,就可以支持并发了。

**启动参数介绍**
Expand Down
39 changes: 1 addition & 38 deletions src/common/lang/mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,36 +278,26 @@ void DebugMutex::unlock()
////////////////////////////////////////////////////////////////////////////////
void Mutex::lock()
{
#ifdef CONCURRENCY
lock_.lock();
LOG_DEBUG("lock %p, lbt=%s", &lock_, lbt());
#endif
}

bool Mutex::try_lock()
{
#ifdef CONCURRENCY
bool result = lock_.try_lock();
if (result) {
LOG_DEBUG("try lock success %p, lbt=%s", &lock_, lbt());
}
return result;
#else
return true;
#endif
}

void Mutex::unlock()
{
#ifdef CONCURRENCY
LOG_DEBUG("unlock %p, lbt=%s", &lock_, lbt());
lock_.unlock();
#endif
}

////////////////////////////////////////////////////////////////////////////////
#ifdef CONCURRENCY

void SharedMutex::lock()
{
lock_.lock();
Expand Down Expand Up @@ -346,33 +336,7 @@ void SharedMutex::unlock_shared()
lock_.unlock_shared();
}

#else // CONCURRENCY undefined

void SharedMutex::lock() {}
bool SharedMutex::try_lock() { return true; }
void SharedMutex::unlock() // unlock exclusive
{}

void SharedMutex::lock_shared() {}
bool SharedMutex::try_lock_shared() { return true; }
void SharedMutex::unlock_shared() {}

#endif // CONCURRENCY end

////////////////////////////////////////////////////////////////////////////////
#ifndef CONCURRENCY
void RecursiveSharedMutex::lock_shared() {}

bool RecursiveSharedMutex::try_lock_shared() { return true; }

void RecursiveSharedMutex::unlock_shared() {}

void RecursiveSharedMutex::lock() {}

void RecursiveSharedMutex::unlock() {}

#else // ifdef CONCURRENCY

void RecursiveSharedMutex::lock_shared()
{
unique_lock<mutex> lock(mutex_);
Expand Down Expand Up @@ -431,6 +395,5 @@ void RecursiveSharedMutex::unlock()
}
}
}
#endif // CONCURRENCY

} // namespace common
} // namespace common
7 changes: 0 additions & 7 deletions src/common/lang/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,7 @@ class Mutex final
void unlock();

private:
#ifdef CONCURRENCY
mutex lock_;
#endif
};

class SharedMutex final
Expand All @@ -295,16 +293,13 @@ class SharedMutex final
void unlock_shared();

private:
#ifdef CONCURRENCY
shared_mutex lock_;
#endif
};

/**
* 支持写锁递归加锁的读写锁
* 读锁本身就可以递归加锁。但是某个线程加了读锁后,也不能再加写锁。
* 但是一个线程可以加多次写锁
* 与其它类型的锁一样,在CONCURRENCY编译模式下才会真正的生效
*/
class RecursiveSharedMutex
{
Expand All @@ -320,15 +315,13 @@ class RecursiveSharedMutex
void unlock();

private:
#ifdef CONCURRENCY
mutex mutex_;
condition_variable shared_lock_cv_;
condition_variable exclusive_lock_cv_;
int shared_lock_count_ = 0;
int exclusive_lock_count_ = 0;
thread::id recursive_owner_;
int recursive_count_ = 0; // 表示当前线程加写锁加了多少次
#endif // CONCURRENCY
};

} // namespace common
150 changes: 136 additions & 14 deletions src/observer/sql/executor/execute_stage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,113 @@ See the Mulan PSL v2 for more details. */

#include "sql/executor/execute_stage.h"

#include <string>
#include <map>
#include <vector>
#include <algorithm>
#include <strings.h> // 引入 strcasecmp

#include "common/log/log.h"
#include "common/lang/string.h"
#include "event/session_event.h"
#include "event/sql_event.h"
#include "sql/executor/command_executor.h"
#include "sql/operator/calc_physical_operator.h"
#include "sql/stmt/select_stmt.h"
#include "sql/operator/physical_operator.h"
#include "sql/operator/string_list_physical_operator.h"
#include "sql/stmt/stmt.h"
#include "storage/default/default_handler.h"
#include "sql/parser/parse_defs.h"
#include "session/session.h"
#include "sql/expr/tuple.h"
#include "storage/field/field_meta.h"

using namespace common;

// 辅助函数:处理 SET 变量命令
RC do_set_variable(SessionEvent *session_event, SetVariableSqlNode *sql_node) {
Session *session = session_event->session();
const std::string &name = sql_node->name;
const Value &val = sql_node->value;

// 转换 Value 为 bool
bool bool_value = false;
if (val.attr_type() == AttrType::INTS) {
bool_value = (val.get_int() != 0);
} else if (val.attr_type() == AttrType::CHARS) {
std::string s = val.get_string();
if (strcasecmp(s.c_str(), "true") == 0 || strcmp(s.c_str(), "1") == 0) {
bool_value = true;
}
} else {
// 默认尝试按整型处理
bool_value = (val.get_int() != 0);
}

// 使用 strcasecmp 代替 common::string_util::case_ignore_equal
if (strcasecmp(name.c_str(), "sql_debug") == 0) {
session->set_sql_debug(bool_value);
} else if (strcasecmp(name.c_str(), "hash_join") == 0) {
session->set_hash_join(bool_value);
} else if (strcasecmp(name.c_str(), "use_cascade") == 0) {
session->set_use_cascade(bool_value);
} else {
LOG_WARN("Unknown variable or read-only: %s", name.c_str());
}
return RC::SUCCESS;
}

RC ExecuteStage::handle_request(SQLStageEvent *sql_event)
{
RC rc = RC::SUCCESS;
SessionEvent *session_event = sql_event->session_event();

const unique_ptr<PhysicalOperator> &physical_operator = sql_event->physical_operator();
if (physical_operator != nullptr) {
return handle_request_with_physical_operator(sql_event);
// 从 sql_event 获取解析后的 SQL 节点
ParsedSqlNode *sql_node = sql_event->sql_node().get();

if (!sql_node) {
LOG_WARN("SQL node is null");
return RC::INTERNAL;
}

SessionEvent *session_event = sql_event->session_event();
switch (sql_node->flag) {
case SCF_SET_VARIABLE: {
rc = do_set_variable(session_event, &sql_node->set_variable);
break;
}
case SCF_SHOW_VARIABLES: {
rc = do_show_variables(session_event, &sql_node->show_variables);
break;
}
case SCF_EXIT: {
// 这里的 API 不支持 set_is_terminate,直接返回 SUCCESS
// MiniOB 的上层逻辑会处理连接关闭,或者 CommandExecutor 会再次处理
rc = RC::SUCCESS;
break;
}
default: {
// 默认处理流程
const unique_ptr<PhysicalOperator> &physical_operator = sql_event->physical_operator();
if (physical_operator != nullptr) {
return handle_request_with_physical_operator(sql_event);
}

Stmt *stmt = sql_event->stmt();
if (stmt != nullptr) {
CommandExecutor command_executor;
rc = command_executor.execute(sql_event);
session_event->sql_result()->set_return_code(rc);
} else {
return RC::INTERNAL;
Stmt *stmt = sql_event->stmt();
if (stmt != nullptr) {
CommandExecutor command_executor;
rc = command_executor.execute(sql_event);
session_event->sql_result()->set_return_code(rc);
} else {
// 如果是 HELP 等命令,Parser 解析了但没有生成 Stmt 和 Operator
if (sql_node->flag == SCF_HELP) {
// Help 可以在这里处理,或者交给 default 的 CommandExecutor
} else {
LOG_WARN("Unknown command flag: %d", sql_node->flag);
rc = RC::INTERNAL; // 使用 INTERNAL 替代 UNEXPECTED
}
}
break;
}
}

return rc;
}

Expand All @@ -58,3 +135,48 @@ RC ExecuteStage::handle_request_with_physical_operator(SQLStageEvent *sql_event)
sql_result->set_operator(std::move(physical_operator));
return rc;
}

// 实现 SHOW VARIABLES
RC ExecuteStage::do_show_variables(SessionEvent *session_event, ShowVariablesSqlNode *sql_node)
{
// 1. 构造 Schema
TupleSchema schema;
schema.append_cell(TupleCellSpec("Variable_name"));
schema.append_cell(TupleCellSpec("Value"));

// 2. 创建 StringListPhysicalOperator
auto op = std::make_unique<StringListPhysicalOperator>();

// 关键修正:StringListPhysicalOperator 似乎没有 set_schema 接口
// 我们直接将 Schema 设置到 SqlResult 中,这是最安全的做法
session_event->sql_result()->set_tuple_schema(schema);

// 3. 收集变量数据
Session *session = session_event->session();
std::map<std::string, std::string> variables;

// 从 Session 获取状态
variables["sql_debug"] = session->sql_debug_on() ? "true" : "false";
variables["hash_join"] = session->hash_join_on() ? "true" : "false";
variables["use_cascade"] = session->use_cascade() ? "true" : "false";

// 4. 填充数据
const std::string &pattern = sql_node->pattern;

for (const auto &item : variables) {
const std::string &key = item.first;
const std::string &val = item.second;

// 简单匹配逻辑 (substring match)
// 如果 pattern 为空或者 key 包含 pattern
if (pattern.empty() || key.find(pattern) != std::string::npos) {
// 使用 initializer_list 插入数据
op->append({key, val});
}
}

// 5. 设置 Operator 到 Result
session_event->sql_result()->set_operator(std::move(op));

return RC::SUCCESS;
}
6 changes: 5 additions & 1 deletion src/observer/sql/executor/execute_stage.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ See the Mulan PSL v2 for more details. */
class SQLStageEvent;
class SessionEvent;
class SelectStmt;
class ShowVariablesSqlNode;

/**
* @brief 执行SQL语句的Stage,包括DML和DDL
Expand All @@ -31,4 +32,7 @@ class ExecuteStage
public:
RC handle_request(SQLStageEvent *event);
RC handle_request_with_physical_operator(SQLStageEvent *sql_event);
};

private:
RC do_show_variables(SessionEvent *session_event, ShowVariablesSqlNode *sql_node);
};
2 changes: 2 additions & 0 deletions src/observer/sql/parser/lex_sql.l
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ ANALYZE RETURN_TOKEN(ANALYZE);
FIELDS RETURN_TOKEN(FIELDS);
TERMINATED RETURN_TOKEN(TERMINATED);
ENCLOSED RETURN_TOKEN(ENCLOSED);
VARIABLES RETURN_TOKEN(VARIABLES);
LIKE RETURN_TOKEN(LIKE);
{ID} yylval->cstring=strdup(yytext); static_cast<std::vector<char*>*>(yyextra)->push_back(yylval->cstring); RETURN_TOKEN(ID);
"(" RETURN_TOKEN(LBRACE);
")" RETURN_TOKEN(RBRACE);
Expand Down
Loading
Loading