Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/java/com/yahoo/oak/BasicChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ protected void updateBasicChild(BasicChunk<K, V> child) {

abstract boolean readValueFromEntryIndex(ValueBuffer value, int ei);

abstract void lookUp(ThreadContext ctx, K key);
/* the lookUp method throws this exception for OrderedChunks only while trying to access a released chunk.
* the lookUp method is not allowed to throw the exception in the case of HashChunk.
*/
abstract void lookUp(ThreadContext ctx, K key) throws DeletedMemoryAccessException;

/*-------------- Publishing related methods and getters ---------------*/
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

package com.yahoo.oak;

public class DeletedMemoryAccessException extends RuntimeException {
/**
* This Exception is thrown when using the Nova MemoryManager, such exception indicates that we are trying
* to access some off heap memory location that was deleted concurrently to the access.
*/
public class DeletedMemoryAccessException extends Exception {
public DeletedMemoryAccessException() {
super();
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/com/yahoo/oak/EntryArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ int getNextNonZeroIndex(int currentIndex) {
int nextIndex = currentIndex;
while (!isNotZero && isIndexInBound(++nextIndex)) {

isNotZero = getEntryFieldLong(nextIndex, KEY_REF_OFFSET) != 0 ||
getEntryFieldLong(nextIndex, VALUE_REF_OFFSET) != 0 ||
getEntryFieldLong(nextIndex, OPT_FIELD_OFFSET) != 0;
isNotZero = array.getEntryFieldLong(nextIndex, KEY_REF_OFFSET) != 0 ||
array.getEntryFieldLong(nextIndex, VALUE_REF_OFFSET) != 0 ||
array.getEntryFieldLong(nextIndex, OPT_FIELD_OFFSET) != 0;
}
if (isIndexInBound(nextIndex)) {
return nextIndex;
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/com/yahoo/oak/EntryOrderedSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,13 @@ boolean isEntrySetValidAfterRebalance() {
return true;
}

//This method is called by only one thread on a release chunk.
void releaseAllDeletedKeys() {
KeyBuffer key = new KeyBuffer(config.keysMemoryManager.getEmptySlice());
for (int i = 0; i < numOfEntries.get() ; i++) {
for (int i = 0; i < nextFreeIndex.get() - 1 && i < numOfEntries.get() ; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks to me very weird condition and comment.

  1. releaseAllDeletedKeys() should be called only by one thread that succeeds to change the chunk's state from FROZEN to RELEASED (CAS success). I do not see any reason for other threads to invoke releaseAllDeletedKeys(). Can you please check that this is how it happens? And definitely add a header comment about releaseAllDeletedKeys() usage intention.
  2. This release happens as a very last step in the Chunk rebalance. At the beginning of the rebalance step the thread proceeding with the rebalance waits for all updates to finish. This is why we have publish/unpublish staff. So if nextFreeIndex is concurrently updated this is a bug... Maybe to add an explanatory comment here and simplify the loop condition. I think checking only nextFreeIndex should be enough.

//the second condition is for case where multiple threads increment the nextfreeIndex
//which results in being bigger than the array size, this happens when multiple threads try
// to increase the nextFreeIndex
if (!config.valuesMemoryManager.isReferenceDeleted(getValueReference(i))) {
continue;
}
Expand Down
54 changes: 25 additions & 29 deletions core/src/main/java/com/yahoo/oak/InternalOakBasics.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,16 @@ V replace(K key, V value, OakTransformer<V> valueDeserializeTransformer) {
ThreadContext ctx = getThreadContext();

for (int i = 0; i < MAX_RETRIES; i++) {
BasicChunk<K, V> chunk = findChunk(key, ctx); // find orderedChunk matching key
chunk.lookUp(ctx, key);
BasicChunk<K, V> chunk = findChunk(key, ctx); // find Chunk matching key
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment above speaks about orderedChunk, do we know it is not going to happen for hash?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add an explanatory comment here?

chunk.lookUp(ctx, key);
//the look up method might encounter a chunk which is released, while using Nova as a memory manager
//as a result we might access an already deleted key, thus the need to catch the exception
} catch (DeletedMemoryAccessException e) {
assert !(chunk instanceof HashChunk);
//Hash deals with deleted keys in an earlier stage
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an assert that chunk is not hashChunk? As a sanity check...

Copy link
Collaborator Author

@li0nr li0nr Oct 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this opened to be discussed

}
if (!ctx.isValueValid()) {
return null;
}
Expand All @@ -184,29 +192,7 @@ V replace(K key, V value, OakTransformer<V> valueDeserializeTransformer) {
throw new RuntimeException("replace failed: reached retry limit (1024).");
}

boolean replace(K key, V oldValue, V newValue, OakTransformer<V> valueDeserializeTransformer) {
ThreadContext ctx = getThreadContext();

for (int i = 0; i < MAX_RETRIES; i++) {
// find chunk matching key, puts this key hash into ctx.operationKeyHash
BasicChunk<K, V> c = findChunk(key, ctx); // find orderedChunk matching key
c.lookUp(ctx, key);
if (!ctx.isValueValid()) {
return false;
}

ValueUtils.ValueResult res = getValueOperator().compareExchange(c, ctx, oldValue, newValue,
valueDeserializeTransformer, getValueSerializer());
if (res == ValueUtils.ValueResult.RETRY) {
// it might be that this chunk is proceeding with rebalance -> help
helpRebalanceIfInProgress(c);
continue;
}
return res == ValueUtils.ValueResult.TRUE;
}

throw new RuntimeException("replace failed: reached retry limit (1024).");
}
abstract boolean replace(K key, V oldValue, V newValue, OakTransformer<V> valueDeserializeTransformer);

abstract boolean putIfAbsentComputeIfPresent(K key, V value, Consumer<OakScopedWriteBuffer> computer);

Expand Down Expand Up @@ -355,6 +341,9 @@ public boolean hasNext() {
return (state != null);
}

// for more detail on this method see implementation
protected abstract void initStateWithMinKey(BasicChunk<K, V> chunk);

protected abstract void initAfterRebalance();

// the actual next()
Expand Down Expand Up @@ -471,7 +460,8 @@ protected void invalidatePrevState() {

protected abstract BasicChunk<K, V> getNextChunk(BasicChunk<K, V> current);

protected abstract BasicChunk.BasicChunkIter getChunkIter(BasicChunk<K, V> current);
protected abstract BasicChunk.BasicChunkIter getChunkIter(BasicChunk<K, V> current)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to explain in which cases the exception is thrown. As I remember we were asking ourselves as well. Please add a comment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be discussed

throws DeletedMemoryAccessException;

/**
* advance state to the new position
Expand All @@ -483,21 +473,27 @@ protected boolean advanceState() {
BasicChunk.BasicChunkIter chunkIter = getState().getChunkIter();

updatePreviousState();

while (!chunkIter.hasNext()) { // skip empty chunks
chunk = getNextChunk(chunk);
if (chunk == null) {
//End of iteration
setState(null);
return false;
}
chunkIter = getChunkIter(chunk);
try {
chunkIter = getChunkIter(chunk);
} catch (DeletedMemoryAccessException e) {
assert chunk.state.get() == BasicChunk.State.RELEASED;
initStateWithMinKey(chunk);
return true;
}
}

int nextIndex = chunkIter.next(ctx);
getState().set(chunk, chunkIter, nextIndex);

return true;
return true;
}

}
Expand Down
21 changes: 14 additions & 7 deletions core/src/main/java/com/yahoo/oak/InternalOakHash.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ boolean replace(K key, V oldValue, V newValue, OakTransformer<V> valueDeserializ

for (int i = 0; i < MAX_RETRIES; i++) {
// find chunk matching key, puts this key hash into ctx.operationKeyHash
BasicChunk<K, V> c = findChunk(key, ctx);
HashChunk<K, V> c = findChunk(key, ctx);
c.lookUp(ctx, key);
if (!ctx.isValueValid()) {
return false;
Expand Down Expand Up @@ -406,9 +406,8 @@ boolean computeIfPresent(K key, Consumer<OakScopedWriteBuffer> computer) {

for (int i = 0; i < MAX_RETRIES; i++) {
// find chunk matching key, puts this key hash into ctx.operationKeyHash
BasicChunk<K, V> c = findChunk(key, ctx);
HashChunk<K, V> c = findChunk(key, ctx);
c.lookUp(ctx, key);

if (ctx.isValueValid()) {
ValueUtils.ValueResult res = config.valueOperator.compute(ctx.value, computer);
if (res == ValueUtils.ValueResult.TRUE) {
Expand Down Expand Up @@ -440,9 +439,8 @@ Result remove(K key, V oldValue, OakTransformer<V> transformer) {

for (int i = 0; i < MAX_RETRIES; i++) {
// find chunk matching key, puts this key hash into ctx.operationKeyHash
BasicChunk<K, V> c = findChunk(key, ctx);
HashChunk<K, V> c = findChunk(key, ctx);
c.lookUp(ctx, key);

if (!ctx.isKeyValid()) {
// There is no such key. If we did logical deletion and someone else did the physical deletion,
// then the old value is saved in v. Otherwise v is (correctly) null
Expand Down Expand Up @@ -569,6 +567,10 @@ protected void initAfterRebalance() {
initState();
}

@Override
protected void initStateWithMinKey(BasicChunk c) { //nextKey is null here
initState();
}

/**
* Advances next to higher entry.
Expand Down Expand Up @@ -799,8 +801,9 @@ class EntryTransformIterator<T> extends HashIter<T> {
}

public T next() {
ValueUtils.ValueResult res;
advance(true);
ValueUtils.ValueResult res = ctx.value.s.preRead();
res = ctx.value.s.preRead();
if (res == ValueUtils.ValueResult.FALSE) {
return next();
} else if (res == ValueUtils.ValueResult.RETRY) {
Expand All @@ -817,7 +820,11 @@ public T next() {
new AbstractMap.SimpleEntry<>(ctx.key, ctx.value);

T transformation = transformer.apply(entry);
ctx.value.s.postRead();
try {
ctx.value.s.postRead();
} catch (DeletedMemoryAccessException e) {
return next();
}
return transformation;
}
}
Expand Down
Loading