[CELEBORN-2263] Fix IndexOutOfBoundsException while reading from S3#3600
[CELEBORN-2263] Fix IndexOutOfBoundsException while reading from S3#3600eolivelli wants to merge 3 commits intoapache:mainfrom
Conversation
628e98f to
043c6d3
Compare
0519b2c to
6132a43
Compare
|
@xy2953396112 @FMX @SteNicholas can you please take a look ? |
| length: Int): Array[Byte] = { | ||
| if (copyBytes != null && copyBytes.length >= length) { | ||
| buffer.readBytes(copyBytes, 0, length) | ||
| buffer.getBytes(buffer.readerIndex(), copyBytes, 0, length) |
There was a problem hiding this comment.
the only difference here is whether to update the reader/writer index? how does this help?
There was a problem hiding this comment.
the main difference is here:
when it "returns" the buffer we use the "readableBytes" to tell MemoryManager that the buffer has been returned
without this fix the variable diskBufferCounter in MemoryManager keeps increasing and never goes down
| val readableBytes = buffer.readableBytes() | ||
| val bytes = convertBufferToBytes(buffer, copyBytes, readableBytes) | ||
| val inputStream = new ByteArrayInputStream(bytes) | ||
| val inputStream = new ByteArrayInputStream(bytes, 0, readableBytes) |
There was a problem hiding this comment.
this matches what you say in the PR description, and makes sense to me.
|
cc @TheodoreLx @RexXiong, who originally wrote and committed this code, could you please take a look? @eolivelli, it's holiday season in China until the end of Feb, so replies from reviewers might be delayed. |
|
thank you @pan3793 |
What changes were proposed in this pull request?
Properly pass the size of the array to the InputStream that feeds the flush.
Why are the changes needed?
Because without this change if the array is bigger than the buffer, then the inputstream returns garbage, resulting in corrupted data on S3.
Does this PR resolve a correctness bug?
Yes.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New unit test + Manual testing.