Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions cpp/celeborn/client/ShuffleClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ ShuffleClientImpl::ShuffleClientImpl(
: appUniqueId_(appUniqueId),
conf_(conf),
clientFactory_(clientEndpoint.clientFactory()),
pushDataRetryPool_(clientEndpoint.pushDataRetryPool()) {
pushDataRetryPool_(clientEndpoint.pushDataRetryPool()),
pushReplicateEnabled_(conf->clientPushReplicateEnabled()),
fetchExcludeWorkerOnFailureEnabled_(
conf->clientFetchExcludeWorkerOnFailureEnabled()),
fetchExcludedWorkers_(std::make_shared<FetchExcludedWorkers>()) {
CELEBORN_CHECK_NOT_NULL(clientFactory_);
CELEBORN_CHECK_NOT_NULL(pushDataRetryPool_);
}
Expand Down Expand Up @@ -286,7 +290,19 @@ std::unique_ptr<CelebornInputStream> ShuffleClientImpl::readPartition(
attemptNumber,
startMapIndex,
endMapIndex,
needCompression);
needCompression,
fetchExcludedWorkers_);
}

void ShuffleClientImpl::excludeFailedFetchLocation(
const std::string& hostAndFetchPort,
const std::exception& e) {
if (pushReplicateEnabled_ && fetchExcludeWorkerOnFailureEnabled_) {
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
fetchExcludedWorkers_->set(hostAndFetchPort, now);
}
}

void ShuffleClientImpl::updateReducerFileGroup(int shuffleId) {
Expand Down
14 changes: 14 additions & 0 deletions cpp/celeborn/client/ShuffleClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class ShuffleClient {

virtual void updateReducerFileGroup(int shuffleId) = 0;

using FetchExcludedWorkers = utils::ConcurrentHashMap<std::string, int64_t>;

virtual std::unique_ptr<CelebornInputStream> readPartition(
int shuffleId,
int partitionId,
Expand All @@ -68,6 +70,10 @@ class ShuffleClient {
int endMapIndex,
bool needCompression) = 0;

virtual void excludeFailedFetchLocation(
const std::string& hostAndFetchPort,
const std::exception& e) = 0;

virtual bool cleanupShuffle(int shuffleId) = 0;

virtual void shutdown() = 0;
Expand Down Expand Up @@ -161,6 +167,10 @@ class ShuffleClientImpl
int endMapIndex,
bool needCompression) override;

void excludeFailedFetchLocation(
const std::string& hostAndFetchPort,
const std::exception& e) override;

void updateReducerFileGroup(int shuffleId) override;

bool cleanupShuffle(int shuffleId) override;
Expand Down Expand Up @@ -266,6 +276,10 @@ class ShuffleClientImpl
mapperEndSets_;
utils::ConcurrentHashSet<int> stageEndShuffleSet_;

bool pushReplicateEnabled_;
bool fetchExcludeWorkerOnFailureEnabled_;
std::shared_ptr<FetchExcludedWorkers> fetchExcludedWorkers_;

// TODO: pushExcludedWorker is not supported yet
};
} // namespace client
Expand Down
115 changes: 110 additions & 5 deletions cpp/celeborn/client/reader/CelebornInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ CelebornInputStream::CelebornInputStream(
int attemptNumber,
int startMapIndex,
int endMapIndex,
bool needCompression)
bool needCompression,
const std::shared_ptr<FetchExcludedWorkers>& fetchExcludedWorkers)
: shuffleKey_(shuffleKey),
conf_(conf),
clientFactory_(clientFactory),
Expand All @@ -45,7 +46,21 @@ CelebornInputStream::CelebornInputStream(
shouldDecompress_(
conf_->shuffleCompressionCodec() !=
protocol::CompressionCodec::NONE &&
needCompression) {
needCompression),
fetchChunkRetryCnt_(0),
fetchChunkMaxRetry_(
conf_->clientPushReplicateEnabled()
? conf_->clientFetchMaxRetriesForEachReplica() * 2
: conf_->clientFetchMaxRetriesForEachReplica()),
retryWait_(conf_->networkIoRetryWait()),
fetchExcludedWorkers_(fetchExcludedWorkers),
fetchExcludedWorkerExpireTimeoutMs_(
std::chrono::duration_cast<std::chrono::milliseconds>(
conf_->clientFetchExcludedWorkerExpireTimeout())
.count()),
readSkewPartitionWithoutMapRange_(
conf_->clientAdaptiveOptimizeSkewedPartitionReadEnabled() &&
startMapIndex > endMapIndex) {
if (shouldDecompress_) {
decompressor_ = compress::Decompressor::createDecompressor(
conf_->shuffleCompressionCodec());
Expand Down Expand Up @@ -178,6 +193,7 @@ void CelebornInputStream::moveToNextReader() {
if (!location) {
return;
}
fetchChunkRetryCnt_ = 0;
currReader_ = createReaderWithRetry(*location);
currLocationIndex_++;
if (currReader_->hasNext()) {
Expand All @@ -189,9 +205,58 @@ void CelebornInputStream::moveToNextReader() {

std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
const protocol::PartitionLocation& location) {
// TODO: support retrying when createReader failed. Maybe switch to peer
// location?
return createReader(location);
const protocol::PartitionLocation* currentLocation = &location;
std::exception_ptr lastException;

while (fetchChunkRetryCnt_ < fetchChunkMaxRetry_) {
try {
VLOG(1) << "Create reader for location " << currentLocation->host << ":"
<< currentLocation->fetchPort;
if (isExcluded(*currentLocation)) {
throw std::runtime_error(
"Fetch data from excluded worker! " +
currentLocation->hostAndFetchPort());
}
auto reader = createReader(*currentLocation);
return reader;
} catch (const std::exception& e) {
lastException = std::current_exception();
excludeFailedFetchLocation(currentLocation->hostAndFetchPort(), e);
fetchChunkRetryCnt_++;

if (currentLocation->hasPeer() && !readSkewPartitionWithoutMapRange_) {
if (fetchChunkRetryCnt_ % 2 == 0) {
std::this_thread::sleep_for(
std::chrono::milliseconds(retryWait_.count()));
}
LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_
<< "/" << fetchChunkMaxRetry_ << " times for location "
<< currentLocation->hostAndFetchPort()
<< ", change to peer. Error: " << e.what();
// TODO: When stream handlers are supported, send BUFFER_STREAM_END
// to close the active stream before switching to peer, matching the
// Java CelebornInputStream behavior. Currently, the C++ client does
// not have pre-opened stream handlers, so stream cleanup is handled
// by WorkerPartitionReader's destructor.
currentLocation = currentLocation->getPeer();
} else {
LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_
<< "/" << fetchChunkMaxRetry_ << " times for location "
<< currentLocation->hostAndFetchPort()
<< ", retry the same location. Error: " << e.what();
std::this_thread::sleep_for(
std::chrono::milliseconds(retryWait_.count()));
}
}
}

// Max retries exceeded, rethrow the last exception wrapped with context
throw utils::CelebornRuntimeError(
lastException,
"createPartitionReader failed after " +
std::to_string(fetchChunkRetryCnt_) + " retries for location " +
location.hostAndFetchPort(),
false);
}

std::shared_ptr<PartitionReader> CelebornInputStream::createReader(
Expand Down Expand Up @@ -236,6 +301,46 @@ std::unordered_set<int>& CelebornInputStream::getBatchRecord(int mapId) {
return *batchRecords_[mapId];
}

bool CelebornInputStream::isExcluded(
const protocol::PartitionLocation& location) {
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
auto timestamp = fetchExcludedWorkers_->get(location.hostAndFetchPort());
if (!timestamp.has_value()) {
return false;
}
if (now - timestamp.value() > fetchExcludedWorkerExpireTimeoutMs_) {
fetchExcludedWorkers_->erase(location.hostAndFetchPort());
return false;
}
if (location.hasPeer()) {
auto peerTimestamp =
fetchExcludedWorkers_->get(location.getPeer()->hostAndFetchPort());
// To avoid both replicate locations being excluded, if peer was added to
// excluded list earlier, change to try peer.
if (!peerTimestamp.has_value() ||
peerTimestamp.value() < timestamp.value()) {
return true;
}
return false;
}
return true;
}

void CelebornInputStream::excludeFailedFetchLocation(
const std::string& hostAndFetchPort,
const std::exception& e) {
if (conf_->clientPushReplicateEnabled() &&
conf_->clientFetchExcludeWorkerOnFailureEnabled() &&
utils::isCriticalCauseForFetch(e)) {
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
fetchExcludedWorkers_->set(hostAndFetchPort, now);
}
}

void CelebornInputStream::cleanupReader() {
currReader_ = nullptr;
}
Expand Down
21 changes: 20 additions & 1 deletion cpp/celeborn/client/reader/CelebornInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

#pragma once

#include <thread>

#include "celeborn/client/compress/Decompressor.h"
#include "celeborn/client/reader/WorkerPartitionReader.h"
#include "celeborn/conf/CelebornConf.h"
#include "celeborn/utils/CelebornUtils.h"

namespace celeborn {
namespace client {
class CelebornInputStream {
public:
using FetchExcludedWorkers = utils::ConcurrentHashMap<std::string, int64_t>;

CelebornInputStream(
const std::string& shuffleKey,
const std::shared_ptr<const conf::CelebornConf>& conf,
Expand All @@ -35,7 +40,8 @@ class CelebornInputStream {
int attemptNumber,
int startMapIndex,
int endMapIndex,
bool needCompression);
bool needCompression,
const std::shared_ptr<FetchExcludedWorkers>& fetchExcludedWorkers);

int read(uint8_t* buffer, size_t offset, size_t len);

Expand All @@ -56,6 +62,12 @@ class CelebornInputStream {
std::shared_ptr<PartitionReader> createReader(
const protocol::PartitionLocation& location);

bool isExcluded(const protocol::PartitionLocation& location);

void excludeFailedFetchLocation(
const std::string& hostAndFetchPort,
const std::exception& e);

std::shared_ptr<const protocol::PartitionLocation> nextReadableLocation();

std::unordered_set<int>& getBatchRecord(int mapId);
Expand All @@ -81,6 +93,13 @@ class CelebornInputStream {
size_t currBatchSize_;
std::shared_ptr<PartitionReader> currReader_;
std::vector<std::unique_ptr<std::unordered_set<int>>> batchRecords_;

int fetchChunkRetryCnt_;
int fetchChunkMaxRetry_;
utils::Timeout retryWait_;
std::shared_ptr<FetchExcludedWorkers> fetchExcludedWorkers_;
int64_t fetchExcludedWorkerExpireTimeoutMs_;
bool readSkewPartitionWithoutMapRange_;
};
} // namespace client
} // namespace celeborn
38 changes: 36 additions & 2 deletions cpp/celeborn/conf/CelebornConf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,12 @@ CelebornConf::defaultProperties() {
kShuffleCompressionCodec,
protocol::toString(protocol::CompressionCodec::NONE)),
NUM_PROP(kShuffleCompressionZstdCompressLevel, 1),
// NUM_PROP(kNumExample, 50'000),
// BOOL_PROP(kBoolExample, false),
NUM_PROP(kClientFetchMaxRetriesForEachReplica, 3),
STR_PROP(kNetworkIoRetryWait, "5s"),
BOOL_PROP(kClientPushReplicateEnabled, false),
BOOL_PROP(kClientFetchExcludeWorkerOnFailureEnabled, false),
STR_PROP(kClientFetchExcludedWorkerExpireTimeout, "60s"),
BOOL_PROP(kClientAdaptiveOptimizeSkewedPartitionReadEnabled, false),
};
return defaultProp;
}
Expand Down Expand Up @@ -308,5 +312,35 @@ int CelebornConf::shuffleCompressionZstdCompressLevel() const {
return std::stoi(
optionalProperty(kShuffleCompressionZstdCompressLevel).value());
}

int CelebornConf::clientFetchMaxRetriesForEachReplica() const {
return std::stoi(
optionalProperty(kClientFetchMaxRetriesForEachReplica).value());
}

Timeout CelebornConf::networkIoRetryWait() const {
return utils::toTimeout(
toDuration(optionalProperty(kNetworkIoRetryWait).value()));
}

bool CelebornConf::clientPushReplicateEnabled() const {
return folly::to<bool>(optionalProperty(kClientPushReplicateEnabled).value());
}

bool CelebornConf::clientFetchExcludeWorkerOnFailureEnabled() const {
return folly::to<bool>(
optionalProperty(kClientFetchExcludeWorkerOnFailureEnabled).value());
}

Timeout CelebornConf::clientFetchExcludedWorkerExpireTimeout() const {
return utils::toTimeout(toDuration(
optionalProperty(kClientFetchExcludedWorkerExpireTimeout).value()));
}

bool CelebornConf::clientAdaptiveOptimizeSkewedPartitionReadEnabled() const {
return folly::to<bool>(
optionalProperty(kClientAdaptiveOptimizeSkewedPartitionReadEnabled)
.value());
}
} // namespace conf
} // namespace celeborn
31 changes: 31 additions & 0 deletions cpp/celeborn/conf/CelebornConf.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,25 @@ class CelebornConf : public BaseConf {
static constexpr std::string_view kShuffleCompressionZstdCompressLevel{
"celeborn.client.shuffle.compression.zstd.level"};

static constexpr std::string_view kClientFetchMaxRetriesForEachReplica{
"celeborn.client.fetch.maxRetriesForEachReplica"};

static constexpr std::string_view kNetworkIoRetryWait{
"celeborn.data.io.retryWait"};

static constexpr std::string_view kClientPushReplicateEnabled{
"celeborn.client.push.replicate.enabled"};

static constexpr std::string_view kClientFetchExcludeWorkerOnFailureEnabled{
"celeborn.client.fetch.excludeWorkerOnFailure.enabled"};

static constexpr std::string_view kClientFetchExcludedWorkerExpireTimeout{
"celeborn.client.fetch.excludedWorker.expireTimeout"};

static constexpr std::string_view
kClientAdaptiveOptimizeSkewedPartitionReadEnabled{
"celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled"};

CelebornConf();

CelebornConf(const std::string& filename);
Expand Down Expand Up @@ -176,6 +195,18 @@ class CelebornConf : public BaseConf {
protocol::CompressionCodec shuffleCompressionCodec() const;

int shuffleCompressionZstdCompressLevel() const;

int clientFetchMaxRetriesForEachReplica() const;

Timeout networkIoRetryWait() const;

bool clientPushReplicateEnabled() const;

bool clientFetchExcludeWorkerOnFailureEnabled() const;

Timeout clientFetchExcludedWorkerExpireTimeout() const;

bool clientAdaptiveOptimizeSkewedPartitionReadEnabled() const;
};
} // namespace conf
} // namespace celeborn
Loading