Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
37c8553
add cdc client read data
JNSimba Nov 24, 2025
cd4e1e9
add a part of stream load sink
JNSimba Nov 25, 2025
4b56052
fix write split chunk api
JNSimba Nov 26, 2025
03fa875
add be request cdc client
JNSimba Nov 28, 2025
aa85742
add fe rpc interface and test
JNSimba Nov 28, 2025
c12ca09
fix cdc client manager compile
JNSimba Nov 28, 2025
3f90ae6
Merge branch 'mysql-cdc' of https://github.com/JNSimba/doris into mys…
JNSimba Nov 28, 2025
56ac92c
add streaming job split chunks and streaming multi task
JNSimba Dec 2, 2025
38507ae
add fe create streamt multi ask
JNSimba Dec 4, 2025
3c2d358
fix be forward request
JNSimba Dec 5, 2025
124d2ae
add multi table sync and checkstyle
JNSimba Dec 5, 2025
06c8ae7
add build script
JNSimba Dec 6, 2025
8f8f676
update check style
JNSimba Dec 6, 2025
59a5cf2
fix be fork
JNSimba Dec 8, 2025
2b783cd
fix offset bug
JNSimba Dec 8, 2025
6b2443f
Merge branch 'mysql-cdc' of https://github.com/JNSimba/doris into mys…
JNSimba Dec 8, 2025
79a71ed
fix create table and offset consumer bug
JNSimba Dec 9, 2025
4f3a1e5
add case for mysql sync
JNSimba Dec 10, 2025
8d9b14b
Merge branch 'master' into mysql-cdc
JNSimba Dec 10, 2025
f63e984
code style
JNSimba Dec 10, 2025
f4e1a9e
fix multi table bug
JNSimba Dec 10, 2025
0a27f8a
fix case
JNSimba Dec 11, 2025
7748211
fix
JNSimba Dec 11, 2025
b26a7b2
fix
JNSimba Dec 11, 2025
91c3bd2
fix
JNSimba Dec 11, 2025
3f8813d
extend to entity fe-common
JNSimba Dec 11, 2025
cfff8f9
extend fe model to common
JNSimba Dec 11, 2025
948aaf8
Merge branch 'mysql-cdc' of https://github.com/JNSimba/doris into mys…
JNSimba Dec 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ DEFINE_Int32(brpc_port, "8060");

DEFINE_Int32(arrow_flight_sql_port, "8050");

DEFINE_Int32(cdc_client_port, "9096");

// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ DECLARE_Int32(brpc_port);
// Default -1, do not start arrow flight sql server.
DECLARE_Int32(arrow_flight_sql_port);

// port for cdc client scan oltp cdc data
DECLARE_Int32(cdc_client_port);

// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
Expand Down
303 changes: 303 additions & 0 deletions be/src/runtime/cdc_client_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "runtime/cdc_client_manager.h"

#include <brpc/closure_guard.h>
#include <fmt/core.h>
#include <gen_cpp/internal_service.pb.h>
#include <google/protobuf/stubs/callback.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/wait.h>
#ifndef __APPLE__
#include <sys/prctl.h>
#endif

#include <atomic>
#include <chrono>
#include <mutex>
#include <string>
#include <thread>

#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "http/http_client.h"

namespace doris {

namespace {
// Handle SIGCHLD signal to prevent zombie processes
void handle_sigchld(int sig_no) {
int status = 0;
pid_t pid = waitpid(0, &status, WNOHANG);
LOG(INFO) << "handle cdc process exit, result: " << pid << ", status: " << status;
}

// Check CDC client health
Status check_cdc_client_health(int retry_times, int sleep_time, std::string& health_response) {
const std::string cdc_health_url =
"http://127.0.0.1:" + std::to_string(doris::config::cdc_client_port) +
"/actuator/health";

auto health_request = [cdc_health_url, &health_response](HttpClient* client) {
RETURN_IF_ERROR(client->init(cdc_health_url));
client->set_timeout_ms(5000);
RETURN_IF_ERROR(client->execute(&health_response));
return Status::OK();
};

Status status = HttpClient::execute_with_retry(retry_times, sleep_time, health_request);

if (!status.ok()) {
return Status::InternalError(
fmt::format("CDC client health check failed: url={}", cdc_health_url));
}

bool is_up = health_response.find("UP") != std::string::npos;

if (!is_up) {
return Status::InternalError(fmt::format("CDC client unhealthy: url={}, response={}",
cdc_health_url, health_response));
}

return Status::OK();
}

} // anonymous namespace

CdcClientMgr::CdcClientMgr() = default;

CdcClientMgr::~CdcClientMgr() {
stop();
LOG(INFO) << "CdcClientMgr is destroyed";
}

void CdcClientMgr::stop() {
pid_t pid = _child_pid.load();
if (pid > 0) {
// Check if process is still alive
if (kill(pid, 0) == 0) {
LOG(INFO) << "Stopping CDC client process, pid=" << pid;

// Send SIGTERM for graceful shutdown
if (kill(pid, SIGTERM) == 0) {
// Wait up to 10 seconds for graceful shutdown
for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(1));
if (kill(pid, 0) != 0) {
// Process has exited
_child_pid.store(0);
LOG(INFO) << "CDC client process stopped gracefully";
return;
}
}
}

// Force kill if still alive
if (kill(pid, 0) == 0) {
LOG(WARNING) << "Force killing CDC client process, pid=" << pid;
kill(pid, SIGKILL);
// Wait for process to exit
int status = 0;
waitpid(pid, &status, 0);
}
_child_pid.store(0);
} else {
// Process already dead
_child_pid.store(0);
}
}

LOG(INFO) << "CdcClientMgr is stopped";
}

Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
std::lock_guard<std::mutex> lock(_start_mutex);

pid_t existing_pid = _child_pid.load();
if (existing_pid > 0) {
// Check if process is still alive
if (kill(existing_pid, 0) == 0) {
std::string check_response;
auto check_st = check_cdc_client_health(1, 0, check_response);
if (check_st.ok()) {
LOG(INFO) << "cdc client already started, pid=" << existing_pid;
return Status::OK();
} else {
// Process exists but not healthy, reset PID
LOG(WARNING) << "CDC client process exists but unhealthy, pid=" << existing_pid;
_child_pid.store(0);
}
} else {
// Process is dead, reset PID
_child_pid.store(0);
}
}

Status st = Status::OK();

const char* doris_home = getenv("DORIS_HOME");
if (!doris_home) {
st = Status::InternalError("DORIS_HOME environment variable is not set");
if (result) {
st.to_protobuf(result->mutable_status());
}
return st;
}

const char* log_dir = getenv("LOG_DIR");
if (!log_dir) {
st = Status::InternalError("LOG_DIR environment variable is not set");
if (result) {
st.to_protobuf(result->mutable_status());
}
return st;
}

const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar";
const std::string cdc_jar_port =
"--server.port=" + std::to_string(doris::config::cdc_client_port);
const std::string backend_http_port =
"--backend.http.port=" + std::to_string(config::webserver_port);
const std::string java_opts = "-Dlog.path=" + std::string(log_dir);

// check cdc jar exists
struct stat buffer;
if (stat(cdc_jar_path.c_str(), &buffer) != 0) {
st = Status::InternalError("Can not find cdc-client.jar.");
if (result) {
st.to_protobuf(result->mutable_status());
}
return st;
}

// check cdc process already started
std::string check_response;
auto check_st = check_cdc_client_health(1, 0, check_response);
if (check_st.ok()) {
LOG(INFO) << "cdc client already started.";
return Status::OK();
} else {
LOG(INFO) << "cdc client not started, to start.";
}

const auto* java_home = getenv("JAVA_HOME");
if (!java_home) {
st = Status::InternalError("Can not find java home.");
if (result) {
st.to_protobuf(result->mutable_status());
}
return st;
}
std::string path(java_home);

// Capture signal to prevent child process from becoming a zombie process
struct sigaction act;
act.sa_flags = 0;
act.sa_handler = handle_sigchld;
sigaction(SIGCHLD, &act, NULL);
LOG(INFO) << "Start to fork cdc client process with " << path;

pid_t pid = ::fork();
if (pid < 0) {
// Fork failed
st = Status::InternalError("Fork cdc client failed.");
if (result) {
st.to_protobuf(result->mutable_status());
}
return st;
} else if (pid == 0) {
// Child process
// When the parent process is killed, the child process also needs to exit
#ifndef __APPLE__
prctl(PR_SET_PDEATHSIG, SIGKILL);
#endif

LOG(INFO) << "Cdc client child process ready to start";
std::cout << "Cdc client child process ready to start." << std::endl;
std::string java_bin = path + "/bin/java";
// java -jar -Dlog.path=xx cdc-client.jar --server.port=9096 --backend.http.port=8040
execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar", cdc_jar_path.c_str(),
cdc_jar_port.c_str(), backend_http_port.c_str(), (char*)NULL);
std::cerr << "Cdc client child process error." << std::endl;
exit(1);
} else {
// Parent process: save PID and wait for startup
_child_pid.store(pid);

// Waiting for cdc to start, failed after more than 30 seconds
std::string health_response;
Status status = check_cdc_client_health(5, 6, health_response);
if (!status.ok()) {
// Reset PID if startup failed
_child_pid.store(0);
st = Status::InternalError("Start cdc client failed.");
if (result) {
st.to_protobuf(result->mutable_status());
}
} else {
LOG(INFO) << "Start cdc client success, pid=" << pid
<< ", status=" << status.to_string() << ", response=" << health_response;
}
}
return st;
}

void CdcClientMgr::request_cdc_client_impl(const PRequestCdcClientRequest* request,
PRequestCdcClientResult* result,
google::protobuf::Closure* done) {
VLOG_RPC << "request to cdc client, api " << request->api();
brpc::ClosureGuard closure_guard(done);

// Start CDC client if not started
Status start_st = start_cdc_client(result);
if (!start_st.ok()) {
LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string();
start_st.to_protobuf(result->mutable_status());
return;
}

std::string cdc_response;
Status st = send_request_to_cdc_client(request->api(), request->params(), &cdc_response);
result->set_response(cdc_response);
st.to_protobuf(result->mutable_status());
}

Status CdcClientMgr::send_request_to_cdc_client(const std::string& api,
const std::string& params_body,
std::string* response) {
std::string remote_url_prefix =
fmt::format("http://127.0.0.1:{}{}", doris::config::cdc_client_port, api);

auto cdc_request = [&remote_url_prefix, response, &params_body](HttpClient* client) {
RETURN_IF_ERROR(client->init(remote_url_prefix));
client->set_timeout_ms(60 * 1000);
if (!params_body.empty()) {
client->set_payload(params_body);
}
client->set_content_type("application/json");
client->set_method(POST);
RETURN_IF_ERROR(client->execute(response));
return Status::OK();
};

return HttpClient::execute_with_retry(3, 1, cdc_request);
}

} // namespace doris
56 changes: 56 additions & 0 deletions be/src/runtime/cdc_client_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <gen_cpp/internal_service.pb.h>

#include <atomic>
#include <mutex>
#include <string>

#include "common/status.h"

namespace google::protobuf {
class Closure;
class RpcController;
} // namespace google::protobuf

namespace doris {

class CdcClientMgr {
public:
CdcClientMgr();
~CdcClientMgr();

void stop();

// Request CDC client to handle a request
void request_cdc_client_impl(const PRequestCdcClientRequest* request,
PRequestCdcClientResult* result, google::protobuf::Closure* done);

private:
Status send_request_to_cdc_client(const std::string& api, const std::string& params_body,
std::string* response);

Status start_cdc_client(PRequestCdcClientResult* result);

std::mutex _start_mutex;
std::atomic<pid_t> _child_pid {0};
};

} // namespace doris
Loading
Loading