Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,9 @@ public interface CompressionOperation<CL, R> extends Operation<CL, R> {
String compressValue(String value, ConnectionContext ctx);

String decompressValue(String value, ConnectionContext ctx);

byte[] compressValue(byte[] value, ConnectionContext ctx);

byte[] decompressValue(byte[] value, ConnectionContext ctx);

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public interface Connection<CL> {
*/
public void open() throws DynoException;

/**
* Reset connection before adding back to pool.
*/
public void reset();

/**
* Can be used by clients to indicate connection exception.
* This can be analyzed by connection pools later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ public interface MultiKeyCompressionOperation<CL, R> extends Operation<CL, R> {
String[] compressMultiKeyValue(ConnectionContext ctx, String... value);

String decompressValue(ConnectionContext ctx, String value);

byte[][] compressMultiKeyValue(ConnectionContext ctx, byte[]... value);

byte[] decompressValue(ConnectionContext ctx, byte[] value);

}
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,6 @@ public <R> OperationResult<R> executeWithFailover(Operation<CL, R> op) throws Dy
} else {
cpMonitor.incOperationFailure(null, e);
}
} catch (Throwable t) {
throw new RuntimeException(t);
} finally {
if (connection != null) {
if (connection.getLastException() != null
Expand Down Expand Up @@ -422,8 +420,6 @@ public <R> Collection<OperationResult<R>> executeWithRing(TokenRackMapper tokenR
cpHealthTracker.trackConnectionError(connection.getParentConnectionPool(), lastException);
}

} catch (Throwable t) {
throw new RuntimeException(t);
} finally {
connection.getContext().reset();
connection.getParentConnectionPool().returnConnection(connection);
Expand All @@ -442,7 +438,7 @@ public <R> Collection<OperationResult<R>> executeWithRing(TokenRackMapper tokenR
connectionToClose.getContext().reset();
connectionToClose.getParentConnectionPool().returnConnection(connectionToClose);
} catch (Throwable t) {

Logger.warn("Error returning connection to pool", t);
}
}
}
Expand All @@ -467,9 +463,34 @@ private void updateConnectionContext(ConnectionContext context, Host host) {
* @param baseOperation
* @return
*/
public <R> Connection<CL> getConnectionForOperation(BaseOperation<CL, R> baseOperation) {
return selectionStrategy.getConnection(baseOperation, cpConfiguration.getMaxTimeoutWhenExhausted(),
TimeUnit.MILLISECONDS);
public <R> Connection<CL> getConnectionWithFailover(BaseOperation<CL, R> baseOperation) {
RetryPolicy retry = cpConfiguration.getRetryPolicyFactory().getRetryPolicy();
retry.begin();

DynoException lastException = null;

do {
try {
Connection<CL> connection = selectionStrategy.getConnectionUsingRetryPolicy(baseOperation,
cpConfiguration.getMaxTimeoutWhenExhausted(), TimeUnit.MILLISECONDS, retry);
updateConnectionContext(connection.getContext(), connection.getHost());
retry.success();
return connection;
} catch (NoAvailableHostsException e) {
cpMonitor.incOperationFailure(null, e);
throw e;
} catch (PoolExhaustedException e) {
Logger.warn("Pool exhausted: " + e.getMessage());
cpMonitor.incOperationFailure(null, e);
cpHealthTracker.trackConnectionError(e.getHostConnectionPool(), e);
} catch (DynoException e) {
retry.failure(e);
lastException = e;
}

} while (retry.allowRetry());

throw lastException;
}

@Override
Expand Down Expand Up @@ -702,8 +723,6 @@ public <R> ListenableFuture<OperationResult<R>> executeAsync(AsyncOperation<CL,
cpHealthTracker.trackConnectionError(connection.getParentConnectionPool(), lastException);
}

} catch (Throwable t) {
t.printStackTrace();
} finally {
if (connection != null) {
connection.getParentConnectionPool().returnConnection(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,14 @@ public boolean returnConnection(Connection<CL> connection) {
return closeConnection(connection);
} else {
// Add the given connection back to the pool
availableConnections.add(connection);
return false;
try {
connection.reset();
availableConnections.add(connection);
return false;
} catch (Exception e) {
recycleConnection(connection);
return true;
}
}
} finally {
monitor.incConnectionReturned(host);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
******************************************************************************/
package com.netflix.dyno.connectionpool.impl.lb;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.netflix.dyno.connectionpool.BaseOperation;
import com.netflix.dyno.connectionpool.HashPartitioner;
import com.netflix.dyno.connectionpool.HostConnectionPool;
Expand All @@ -25,13 +32,8 @@
import com.netflix.dyno.connectionpool.impl.hash.Murmur1HashPartitioner;
import com.netflix.dyno.connectionpool.impl.utils.CollectionUtils;
import com.netflix.dyno.connectionpool.impl.utils.CollectionUtils.Transform;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;

/**
* Concrete implementation of the {@link HostSelectionStrategy} interface using
Expand Down Expand Up @@ -92,7 +94,7 @@ public HostConnectionPool<CL> getPoolForOperation(BaseOperation<CL, ?> op, Strin
if (hashtag == null || hashtag.isEmpty()) {
hToken = this.getTokenForKey(key);
} else {
String hashValue = StringUtils.substringBetween(key, Character.toString(hashtag.charAt(0)), Character.toString(hashtag.charAt(1)));
String hashValue = getHashValue(key, hashtag);
hToken = this.getTokenForKey(hashValue);
}

Expand All @@ -108,9 +110,15 @@ public HostConnectionPool<CL> getPoolForOperation(BaseOperation<CL, ?> op, Strin
} else {
// the key is binary
byte[] binaryKey = op.getBinaryKey();
hToken = this.getTokenForKey(binaryKey);
if (hashtag == null || hashtag.isEmpty()) {
hToken = this.getTokenForKey(binaryKey);
} else {
byte[] hashValue = getHashValue(binaryKey, hashtag);
hToken = this.getTokenForKey(hashValue);
}

if (hToken == null) {
throw new NoAvailableHostsException("Token not found for key " + binaryKey.toString());
throw new NoAvailableHostsException("Token not found for key " + Arrays.toString(binaryKey));
}

hostPool = tokenPools.get(hToken.getToken());
Expand All @@ -124,6 +132,44 @@ public HostConnectionPool<CL> getPoolForOperation(BaseOperation<CL, ?> op, Strin

}

public static String getHashValue(String key, String hashtag) {
if (key == null || hashtag == null || hashtag.length() < 2) {
throw new RuntimeException(
"Hash value calculation not possible for key: " + key + ", hashtag: " + hashtag);
}
return StringUtils.substringBetween(
key, Character.toString(hashtag.charAt(0)), Character.toString(hashtag.charAt(1))
);
}

public static byte[] getHashValue(byte[] key, String hashtag) {
if (key == null || hashtag == null || hashtag.length() < 2) {
throw new RuntimeException(
"Hash value calculation not possible for key: " + Arrays.toString(key) + ", hashtag: " + hashtag);
}

char startChar = hashtag.charAt(0);
char endChar = hashtag.charAt(1);

int s = -1;
int e = -1;
boolean sFound = false;
for (int i = 0; i < key.length; i++) {
if (key[i] == startChar && !sFound) {
s = i;
sFound = true;
}
if (key[i] == endChar && sFound) {
e = i;
break;
}
}
if (s > -1 && e > -1 && e != s + 1) {
return Arrays.copyOfRange(key, s + 1, e);
}
return key;
}

@Override
public Map<HostConnectionPool<CL>, BaseOperation<CL, ?>> getPoolsForOperationBatch(
Collection<BaseOperation<CL, ?>> ops) throws NoAvailableHostsException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ public Host getHost() {
public void open() throws DynoException {
}

@Override
public void reset() {
}

@Override
public DynoConnectException getLastException() {
return ex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public Host getHost() {
public void open() throws DynoException {
}

@Override
public void reset() {
}

@Override
public DynoConnectException getLastException() {
return ex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ public void runSimpleTest() throws Exception {
}
// read
for (int i = 0; i < 10; i++) {
OperationResult<String> result = client.d_get("" + i);
System.out.println("Key: " + i + ", Value: " + result.getResult() + " " + result.getNode());
String result = client.get("" + i);
System.out.println("Key: " + i + ", Value: " + result);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,16 @@ public void runSimpleTest() throws Exception {
}
// read
for (int i = 0; i < numKeys; i++) {
OperationResult<String> result = client.d_get("DynoClientTest-" + i);
System.out.println("Reading Key: " + i + ", Value: " + result.getResult() + " " + result.getNode());
String result = client.get("DynoClientTest-" + i);
System.out.println("Reading Key: " + i + ", Value: " + result);
}

// read from shadow cluster
if (shadowClusterClient != null) {
// read
for (int i = 0; i < numKeys; i++) {
OperationResult<String> result = shadowClusterClient.d_get("DynoClientTest-" + i);
System.out.println("Reading Key: " + i + ", Value: " + result.getResult() + " " + result.getNode());
String result = shadowClusterClient.get("DynoClientTest-" + i);
System.out.println("Reading Key: " + i + ", Value: " + result);
}
}
}
Expand Down Expand Up @@ -285,21 +285,21 @@ public void runSimpleDualWriterPipelineTest() {
// read
System.out.println("Reading keys from dual writer pipeline client");
for (int i = 0; i < numKeys; i++) {
OperationResult<String> result = client.d_hget("DynoClientTest", "DynoClientTest-" + i);
System.out.println("Reading Key: DynoClientTest/" + i + ", Value: " + result.getResult() + " " + result.getNode());
result = client.d_hget("DynoClientTest-1", "DynoClientTest-" + i);
System.out.println("Reading Key: DynoClientTest-1/" + i + ", Value: " + result.getResult() + " " + result.getNode());
String result = client.hget("DynoClientTest", "DynoClientTest-" + i);
System.out.println("Reading Key: DynoClientTest/" + i + ", Value: " + result);
result = client.hget("DynoClientTest-1", "DynoClientTest-" + i);
System.out.println("Reading Key: DynoClientTest-1/" + i + ", Value: " + result);
}

// read from shadow cluster
System.out.println("Reading keys from shadow Jedis client");
if (shadowClusterClient != null) {
// read
for (int i = 0; i < numKeys; i++) {
OperationResult<String> result = shadowClusterClient.d_hget("DynoClientTest", "DynoClientTest-" + i);
System.out.println("Reading Key: DynoClientTest/" + i + ", Value: " + result.getResult() + " " + result.getNode());
result = shadowClusterClient.d_hget("DynoClientTest-1", "DynoClientTest-" + i);
System.out.println("Reading Key: DynoClientTest-1/" + i + ", Value: " + result.getResult() + " " + result.getNode());
String result = shadowClusterClient.hget("DynoClientTest", "DynoClientTest-" + i);
System.out.println("Reading Key: DynoClientTest/" + i + ", Value: " + result);
result = shadowClusterClient.hget("DynoClientTest-1", "DynoClientTest-" + i);
System.out.println("Reading Key: DynoClientTest-1/" + i + ", Value: " + result);
}
}

Expand Down Expand Up @@ -337,8 +337,8 @@ public void runBinaryKeyTest() throws Exception {
System.out.println("Writing Key: " + new String(overallKey, Charset.forName("UTF-8")));

// read
OperationResult<byte[]> result = client.d_get(newKey);
System.out.println("Reading Key: " + new String(newKey, Charset.forName("UTF-8")) + ", Value: " + result.getResult().toString() + " " + result.getNode());
byte[] result = client.get(newKey);
System.out.println("Reading Key: " + new String(newKey, Charset.forName("UTF-8")) + ", Value: " + Arrays.toString(result));

}

Expand All @@ -361,9 +361,9 @@ public void runSimpleTestWithHashtag() throws Exception {
}
// read
for (int i = 0; i < numKeys; i++) {
OperationResult<String> result = client.d_get(i + "-{bar}");
String result = client.get(i + "-{bar}");
System.out.println(
"Reading Key: " + i + "-{bar}" + " , Value: " + result.getResult() + " " + result.getNode());
"Reading Key: " + i + "-{bar}" + " , Value: " + result);
}
}

Expand Down Expand Up @@ -1000,12 +1000,13 @@ public void runLongTest() throws InterruptedException {

private class SAXHandler extends DefaultHandler {

private final List<Map<String, String>> list = new ArrayList<Map<String, String>>();
private final List<Map<String, String>> list = new ArrayList<>();
private final String rootElement;
private final Set<String> interestElements = new HashSet<String>();
private final Set<String> interestElements = new HashSet<>();

private Map<String, String> currentPayload = null;
private String currentInterestElement = null;
private StringBuilder valueBuffer = new StringBuilder();

private SAXHandler(String root, String... interests) {

Expand All @@ -1016,12 +1017,11 @@ private SAXHandler(String root, String... interests) {
}

@Override
public void startElement(String uri, String localName, String qName, Attributes attributes)
throws SAXException {
public void startElement(String uri, String localName, String qName, Attributes attributes) {

if (qName.equalsIgnoreCase(rootElement)) {
// prep for next instance
currentPayload = new HashMap<String, String>();
currentPayload = new HashMap<>();
return;
}

Expand All @@ -1033,7 +1033,13 @@ public void startElement(String uri, String localName, String qName, Attributes
}

@Override
public void endElement(String uri, String localName, String qName) throws SAXException {
public void endElement(String uri, String localName, String qName) {
if (currentInterestElement != null && currentPayload != null) {
String value = valueBuffer.toString();
currentPayload.put(currentInterestElement, value);
currentInterestElement = null;
valueBuffer = new StringBuilder(); //reset buffer
}

// add host to list
if (qName.equalsIgnoreCase(rootElement)) {
Expand All @@ -1043,13 +1049,9 @@ public void endElement(String uri, String localName, String qName) throws SAXExc
}

@Override
public void characters(char[] ch, int start, int length) throws SAXException {

String value = new String(ch, start, length);

public void characters(char[] ch, int start, int length) {
if (currentInterestElement != null && currentPayload != null) {
currentPayload.put(currentInterestElement, value);
currentInterestElement = null;
valueBuffer.append(ch, start, length);
}
}

Expand Down
Loading