Skip to content

[CELEBORN-2263] Fix IndexOutOfBoundsException while reading from S3#3600

Open
eolivelli wants to merge 3 commits intoapache:mainfrom
eolivelli:CELEBORN-2263-apache
Open

[CELEBORN-2263] Fix IndexOutOfBoundsException while reading from S3#3600
eolivelli wants to merge 3 commits intoapache:mainfrom
eolivelli:CELEBORN-2263-apache

Conversation

@eolivelli
Copy link
Contributor

@eolivelli eolivelli commented Feb 10, 2026

What changes were proposed in this pull request?

Properly pass the size of the array to the InputStream that feeds the flush.

Why are the changes needed?

Because without this change if the array is bigger than the buffer, then the inputstream returns garbage, resulting in corrupted data on S3.

Does this PR resolve a correctness bug?

Yes.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New unit test + Manual testing.

@eolivelli eolivelli force-pushed the CELEBORN-2263-apache branch from 0519b2c to 6132a43 Compare February 11, 2026 08:23
@eolivelli
Copy link
Contributor Author

@xy2953396112 @FMX @SteNicholas can you please take a look ?

length: Int): Array[Byte] = {
if (copyBytes != null && copyBytes.length >= length) {
buffer.readBytes(copyBytes, 0, length)
buffer.getBytes(buffer.readerIndex(), copyBytes, 0, length)
Copy link
Member

@pan3793 pan3793 Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only difference here is whether to update the reader/writer index? how does this help?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main difference is here:

def returnBuffer(buffer: CompositeByteBuf, keepBuffer: Boolean = false): Unit = {
val bufferSize = buffer.readableBytes()
MemoryManager.instance().releaseDiskBuffer(bufferSize)
Option(CongestionController.instance())

when it "returns" the buffer we use the "readableBytes" to tell MemoryManager that the buffer has been returned

without this fix the variable diskBufferCounter in MemoryManager keeps increasing and never goes down

private final AtomicLong diskBufferCounter = new AtomicLong(0);

val readableBytes = buffer.readableBytes()
val bytes = convertBufferToBytes(buffer, copyBytes, readableBytes)
val inputStream = new ByteArrayInputStream(bytes)
val inputStream = new ByteArrayInputStream(bytes, 0, readableBytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this matches what you say in the PR description, and makes sense to me.

@pan3793
Copy link
Member

pan3793 commented Feb 12, 2026

cc @TheodoreLx @RexXiong, who originally wrote and committed this code, could you please take a look?

@eolivelli, it's holiday season in China until the end of Feb, so replies from reviewers might be delayed.

@eolivelli
Copy link
Contributor Author

thank you @pan3793

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants