From 514d1077917a92294abf6b81d6ad719d09f67ceb Mon Sep 17 00:00:00 2001 From: Udo Kohlmeyer Date: Mon, 9 Mar 2020 12:33:03 -0700 Subject: [PATCH 1/3] PartitionedWithDeltaAndUniqueObjectReferenceBenchmark --- .../java/benchmark/geode/data/Session.java | 276 ++++++++++++++++++ ...eltaAndUniqueObjectReferenceBenchmark.java | 249 ++++++++++++++++ 2 files changed, 525 insertions(+) create mode 100644 geode-benchmarks/src/main/java/benchmark/geode/data/Session.java create mode 100644 geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java diff --git a/geode-benchmarks/src/main/java/benchmark/geode/data/Session.java b/geode-benchmarks/src/main/java/benchmark/geode/data/Session.java new file mode 100644 index 000000000..b86b6449e --- /dev/null +++ b/geode-benchmarks/src/main/java/benchmark/geode/data/Session.java @@ -0,0 +1,276 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package benchmark.geode.data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import org.apache.geode.DataSerializable; +import org.apache.geode.DataSerializer; +import org.apache.geode.Delta; +import org.apache.geode.InvalidDeltaException; + +/** + * The {@link benchmark.geode.data.Session} class is an Abstract Data Type (ADT) modeling a user's session. + * + * @author John Blum + * @see Comparable + * @see DataSerializable + * @see DataSerializer + * @see Delta + */ +public class Session implements Comparable, DataSerializable, Delta { + + private static final boolean ALLOW_JAVA_SERIALIZATION = false; + + public static benchmark.geode.data.Session create() { + return new benchmark.geode.data.Session(); + } + + private transient boolean delta = false; + + protected volatile transient boolean toDataCalled = false; + protected volatile transient boolean toDeltaCalled = false; + + private Instant creationTime; + private Instant lastAccessedTime; + + private Map attributes = new HashMap<>(); + private Map attributeDeltas = new HashMap<>(); + + private String id; + + public Session() { + + this.id = UUID.randomUUID().toString(); + this.creationTime = Instant.now(); + this.lastAccessedTime = this.creationTime; + this.delta = true; + } + + public synchronized String getId() { + return this.id; + } + + public synchronized Object setAttribute(String name, Object value) { + + return value != null + ? doSetAttribute(name, value) + : removeAttribute(name); + } + + public synchronized void setAttributes(Map attributes) { + this.attributes.putAll(attributes); + } + + private Object doSetAttribute(String name, Object value) { + + Object previousValue = this.attributes.put(name, value); + + if (!value.equals(previousValue)) { + this.attributeDeltas.put(name, value); + this.delta = true; + } + + return previousValue; + } + + public synchronized Object removeAttribute(String name) { + + if (getAttributes().containsKey(name)) { + this.attributeDeltas.put(name, null); + this.delta = true; + } + + return getAttributes().remove(name); + } + + public synchronized Object getAttribute(String name) { + return getAttributes().get(name); + } + + public synchronized Set getAttributeNames() { + return Collections.unmodifiableSet(new HashSet<>(getAttributes().keySet())); + } + + protected synchronized Map getAttributes() { + return this.attributes; + } + + public synchronized Instant getCreationTime() { + return this.creationTime; + } + + protected synchronized void setLastAccessedTime(Instant lastAccessedTime) { + this.lastAccessedTime = lastAccessedTime; + } + + public synchronized Instant getLastAccessedTime() { + return this.lastAccessedTime; + } + + public synchronized benchmark.geode.data.Session commit() { + + this.delta = false; + this.attributeDeltas.clear(); + + return this; + } + + public synchronized benchmark.geode.data.Session touch() { + + Instant newLastAccessedTime = Instant.now(); + + this.delta |= !newLastAccessedTime.equals(getLastAccessedTime()); + setLastAccessedTime(newLastAccessedTime); + + return this; + } + + @Override + public synchronized void toData(DataOutput out) throws IOException { + + this.toDataCalled = true; + + out.writeUTF(this.getId()); + out.writeLong(this.getCreationTime().toEpochMilli()); + out.writeLong(this.getLastAccessedTime().toEpochMilli()); + DataSerializer.writeHashMap(getAttributes(),out); +// out.writeInt(getAttributes().size()); +// +// for (Entry entry : getAttributes().entrySet()) { +// out.writeUTF(entry.getKey()); +// DataSerializer.writeObject(entry.getValue(), out, ALLOW_JAVA_SERIALIZATION); +// } + } + + @Override + public synchronized void fromData(DataInput in) throws IOException, ClassNotFoundException { + + this.id = in.readUTF(); + this.creationTime = Instant.ofEpochMilli(in.readLong()); + + setLastAccessedTime(Instant.ofEpochMilli(in.readLong())); + + this.setAttributes(DataSerializer.readHashMap(in)); +// for (int count = in.readInt(); count > 0; count--) { +// setAttribute(in.readUTF(), DataSerializer.readObject(in)); +// } + } + + @Override + public synchronized void toDelta(DataOutput out) throws IOException { + + this.toDeltaCalled = true; + + out.writeLong(getLastAccessedTime().toEpochMilli()); + out.writeInt(this.attributeDeltas.size()); + + for (Entry entry : this.attributeDeltas.entrySet()) { + out.writeUTF(entry.getKey()); + DataSerializer.writeObject(entry.getValue(), out, ALLOW_JAVA_SERIALIZATION); + } + } + + @Override + public synchronized void fromDelta(DataInput in) throws IOException, InvalidDeltaException { + + String key = null; + + try { + + setLastAccessedTime(Instant.ofEpochMilli(in.readLong())); + + for (int count = in.readInt(); count > 0; count--) { + key = in.readUTF(); + setAttribute(key, DataSerializer.readObject(in)); + } + } + catch (ClassNotFoundException cause) { + throw new IOException(String.format("Failed to resolve type in delta for attribute [%s]", key), cause); + } + } + + @Override + public synchronized boolean hasDelta() { + return this.delta || !this.attributeDeltas.isEmpty(); + } + + public synchronized boolean wasToDataCalled() { + + boolean toDataCalled = this.toDataCalled; + + this.toDataCalled = false; + + return toDataCalled; + } + + public synchronized boolean wasToDeltaCalled() { + + boolean toDeltaCalled = this.toDeltaCalled; + + this.toDeltaCalled = false; + + return toDeltaCalled; + } + + + @Override + public int compareTo(benchmark.geode.data.Session session) { + return this.getId().compareTo(session.getId()); + } + + @Override + public boolean equals(Object obj) { + + if (this == obj) { + return true; + } + + if (!(obj instanceof benchmark.geode.data.Session)) { + return false; + } + + benchmark.geode.data.Session that = (benchmark.geode.data.Session) obj; + + return this.getId().equals(that.getId()); + } + + @Override + public int hashCode() { + + int hashValue = 17; + + hashValue = 37 * hashValue + getId().hashCode(); + + return hashValue; + } + + @Override + public String toString() { + return getId(); + } +} diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java new file mode 100644 index 000000000..29aace884 --- /dev/null +++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java @@ -0,0 +1,249 @@ +package org.apache.geode.benchmark.tests; + +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.CLIENT; +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.SERVER; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import benchmark.geode.data.Session; +import org.junit.jupiter.api.Test; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkDriverAdapter; + +import org.apache.geode.benchmark.tasks.CreatePartitionedRegion; +import org.apache.geode.benchmark.topology.ClientServerTopology; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.perftest.PerformanceTest; +import org.apache.geode.perftest.Task; +import org.apache.geode.perftest.TestConfig; +import org.apache.geode.perftest.TestContext; +import org.apache.geode.perftest.TestRunners; + +public class PartitionedWithDeltaAndUniqueObjectReferenceBenchmark implements PerformanceTest { + + private static final String SUBSCRIPTION_POOL = "subscriptionPool"; + private static final int THREAD_COUNT = 180; + private static final int WORKLOAD_SIZE = 10000; + + public PartitionedWithDeltaAndUniqueObjectReferenceBenchmark() { + } + + @Test + public void run() throws Exception { + TestRunners.defaultRunner().runTest(this); + } + + @Override + public TestConfig configure() { + TestConfig config = GeodeBenchmark.createConfig(); + config.threads(Runtime.getRuntime().availableProcessors() * 2); + ClientServerTopology.configure(config); + config.before(new CreatePartitionedRegion(), SERVER); + config.before(new CreateClientPool(), CLIENT); + config.before(new CreateClientProxyRegionWithPool(), CLIENT); + config.workload(new PutWithDeltaTask(THREAD_COUNT, WORKLOAD_SIZE), CLIENT); + return config; + } + + public static class PutWithDeltaTask extends BenchmarkDriverAdapter implements Serializable { + + private Region region; + private final AtomicInteger threadCounter = new AtomicInteger(0); + private final AtomicReference sessionId = new AtomicReference<>(null); + + private final List + existingSessionAttributeNames = + Collections.synchronizedList(new ArrayList<>()); + + private final Random random = new Random(System.currentTimeMillis()); + + private Collection> workloadTasks; + + public PutWithDeltaTask(int threadCount, int workloadSize) { + + } + + @Override + public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + this.workloadTasks = newSessionWorkloadTasks(); + ClientCache cache = ClientCacheFactory.getAnyInstance(); + region = cache.getRegion("region"); + } + + @Override + public boolean test(Map ctx) throws Exception { + int sessionAttributeCount = runSessionWorkload(); + + Session session = findById(this.sessionId.get()); + + return true; + } + + private Session findById(String id) { + + return Optional.ofNullable(this.region.get(id)) + .map(Session::commit) + .map(Session::touch) + .orElseThrow(() -> new IllegalStateException( + String.format("No Session with ID [%s] was found", id))); + } + + private Session save(Session session) { + if (session != null && session.hasDelta()) { + this.region.put(session.getId(), session); + session.commit(); + } + + return session; + } + + private int runSessionWorkload() throws InterruptedException { + ExecutorService sessionBatchWorkloadExecutor = newSessionWorkloadExecutor(); + + try { + Collection> results = + sessionBatchWorkloadExecutor.invokeAll(workloadTasks); + return results.stream() + .mapToInt(this::safeFutureGet) + .sum(); + } finally { + Optional.of(sessionBatchWorkloadExecutor).ifPresent(ExecutorService::shutdownNow); + } + } + + private ExecutorService newSessionWorkloadExecutor() { + + return Executors.newFixedThreadPool(THREAD_COUNT, runnable -> { + + Thread sessionThread = new Thread(runnable); + + sessionThread + .setName(String.format("Session Thread %d", this.threadCounter.incrementAndGet())); + sessionThread.setDaemon(true); + sessionThread.setPriority(Thread.NORM_PRIORITY); + + return sessionThread; + }); + } + + private Collection> newSessionWorkloadTasks() { + + Collection> sessionWorkloadTasks = new LinkedList<>(); + + for (int count = 0; count < WORKLOAD_SIZE; count++) { + sessionWorkloadTasks.add(count % 79 != 0 + ? newAddSessionAttributeTask() + : count % 237 != 0 + ? newRemoveSessionAttributeTask() + : newSessionReaderTask()); + } + + return sessionWorkloadTasks; + } + + private Callable newAddSessionAttributeTask() { + return () -> { + Session session = findById(this.sessionId.get()); + + String name = UUID.randomUUID().toString(); + + session.setAttribute(name, System.currentTimeMillis()); + save(session); + + this.existingSessionAttributeNames.add(name); + return 1; + }; + } + + private Callable newRemoveSessionAttributeTask() { + return () -> { + int returnValue = 0; + + Session session = findById(this.sessionId.get()); + String attributeName = null; + + synchronized (this.existingSessionAttributeNames) { + int size = this.existingSessionAttributeNames.size(); + if (size > 0) { + int index = this.random.nextInt(size); + attributeName = this.existingSessionAttributeNames.remove(index); + } + } + + if (session.getAttributeNames().contains(attributeName)) { + session.removeAttribute(attributeName); + returnValue = -1; + } else { + Optional.ofNullable(attributeName) + .filter(it -> !it.trim().isEmpty()) + .ifPresent(this.existingSessionAttributeNames::add); + } + + save(session); + + return returnValue; + }; + } + + private Callable newSessionReaderTask() { + return () -> { + Session session = findById(this.sessionId.get()); + save(session.touch()); + + return 0; + }; + } + + private int safeFutureGet(Future future) { + try { + return future.get(); + } catch (Exception cause) { + throw new RuntimeException("Session access task failure", cause); + } + } + } + + private class CreateClientPool implements Task { + @Override + public void run(TestContext context) throws Exception { + PoolFactory poolFactory = PoolManager.createFactory(); + poolFactory + .setMaxConnections(-1) + .setPingInterval(1000) + .setRetryAttempts(5) + .setSubscriptionEnabled(true) + .create(SUBSCRIPTION_POOL); + } + } + + private class CreateClientProxyRegionWithPool implements Task { + @Override + public void run(TestContext context) throws Exception { + ClientCache clientCache = (ClientCache) context.getAttribute("CLIENT_CACHE"); + clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY) + .setPoolName(SUBSCRIPTION_POOL) + .create("region"); + } + } +} From 4a057cf187db94018a425b52c2da493decc75a88 Mon Sep 17 00:00:00 2001 From: Udo Kohlmeyer Date: Mon, 23 Mar 2020 07:52:40 -0700 Subject: [PATCH 2/3] GEODE-7763: Benchmark for GEODE-7763. In this benchmark a client has many thread (180) which all try and access/modify the same key. This is to test the performance of GEODE since the removal of the client Futures cache, which was removed in 1.10. --- .../java/benchmark/geode/data/Session.java | 43 ++++----- .../benchmark/parameters/HeapParameters.java | 2 +- ...eltaAndUniqueObjectReferenceBenchmark.java | 93 +++++++------------ 3 files changed, 53 insertions(+), 85 deletions(-) diff --git a/geode-benchmarks/src/main/java/benchmark/geode/data/Session.java b/geode-benchmarks/src/main/java/benchmark/geode/data/Session.java index b86b6449e..bfb3fac14 100644 --- a/geode-benchmarks/src/main/java/benchmark/geode/data/Session.java +++ b/geode-benchmarks/src/main/java/benchmark/geode/data/Session.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.UUID; import org.apache.geode.DataSerializable; import org.apache.geode.DataSerializer; @@ -34,7 +33,8 @@ import org.apache.geode.InvalidDeltaException; /** - * The {@link benchmark.geode.data.Session} class is an Abstract Data Type (ADT) modeling a user's session. + * The {@link benchmark.geode.data.Session} class is an Abstract Data Type (ADT) modeling a user's + * session. * * @author John Blum * @see Comparable @@ -46,26 +46,26 @@ public class Session implements Comparable, DataSerializable, Delta { private static final boolean ALLOW_JAVA_SERIALIZATION = false; - public static benchmark.geode.data.Session create() { - return new benchmark.geode.data.Session(); + public static benchmark.geode.data.Session create(String id) { + return new benchmark.geode.data.Session(id); } private transient boolean delta = false; - protected volatile transient boolean toDataCalled = false; - protected volatile transient boolean toDeltaCalled = false; + protected transient volatile boolean toDataCalled = false; + protected transient volatile boolean toDeltaCalled = false; private Instant creationTime; private Instant lastAccessedTime; - private Map attributes = new HashMap<>(); - private Map attributeDeltas = new HashMap<>(); + private final Map attributes = new HashMap<>(); + private final Map attributeDeltas = new HashMap<>(); private String id; - public Session() { + public Session(String id) { - this.id = UUID.randomUUID().toString(); + this.id = id; this.creationTime = Instant.now(); this.lastAccessedTime = this.creationTime; this.delta = true; @@ -78,8 +78,8 @@ public synchronized String getId() { public synchronized Object setAttribute(String name, Object value) { return value != null - ? doSetAttribute(name, value) - : removeAttribute(name); + ? doSetAttribute(name, value) + : removeAttribute(name); } public synchronized void setAttributes(Map attributes) { @@ -158,13 +158,7 @@ public synchronized void toData(DataOutput out) throws IOException { out.writeUTF(this.getId()); out.writeLong(this.getCreationTime().toEpochMilli()); out.writeLong(this.getLastAccessedTime().toEpochMilli()); - DataSerializer.writeHashMap(getAttributes(),out); -// out.writeInt(getAttributes().size()); -// -// for (Entry entry : getAttributes().entrySet()) { -// out.writeUTF(entry.getKey()); -// DataSerializer.writeObject(entry.getValue(), out, ALLOW_JAVA_SERIALIZATION); -// } + DataSerializer.writeHashMap(getAttributes(), out); } @Override @@ -176,9 +170,6 @@ public synchronized void fromData(DataInput in) throws IOException, ClassNotFoun setLastAccessedTime(Instant.ofEpochMilli(in.readLong())); this.setAttributes(DataSerializer.readHashMap(in)); -// for (int count = in.readInt(); count > 0; count--) { -// setAttribute(in.readUTF(), DataSerializer.readObject(in)); -// } } @Override @@ -208,9 +199,9 @@ public synchronized void fromDelta(DataInput in) throws IOException, InvalidDelt key = in.readUTF(); setAttribute(key, DataSerializer.readObject(in)); } - } - catch (ClassNotFoundException cause) { - throw new IOException(String.format("Failed to resolve type in delta for attribute [%s]", key), cause); + } catch (ClassNotFoundException cause) { + throw new IOException( + String.format("Failed to resolve type in delta for attribute [%s]", key), cause); } } diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/parameters/HeapParameters.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/parameters/HeapParameters.java index 0ef39c62a..b7c287faa 100644 --- a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/parameters/HeapParameters.java +++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/parameters/HeapParameters.java @@ -28,7 +28,7 @@ public class HeapParameters { public static void configure(final TestConfig testConfig) { final String heap = System.getProperty("withHeap", "8g"); logger.info("Configuring heap parameters {}.", heap); - configureAll(testConfig, "-Xmx" + heap, "-Xms" + heap); + configureAll(testConfig, "-Xmx" + heap, "-Xms" + heap, "-Xss228k"); } } diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java index 29aace884..99c3075db 100644 --- a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java +++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java @@ -6,7 +6,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -14,10 +13,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import benchmark.geode.data.Session; @@ -42,11 +38,9 @@ public class PartitionedWithDeltaAndUniqueObjectReferenceBenchmark implements PerformanceTest { private static final String SUBSCRIPTION_POOL = "subscriptionPool"; - private static final int THREAD_COUNT = 180; - private static final int WORKLOAD_SIZE = 10000; + private static final String ID = UUID.randomUUID().toString(); - public PartitionedWithDeltaAndUniqueObjectReferenceBenchmark() { - } + public PartitionedWithDeltaAndUniqueObjectReferenceBenchmark() {} @Test public void run() throws Exception { @@ -56,31 +50,33 @@ public void run() throws Exception { @Override public TestConfig configure() { TestConfig config = GeodeBenchmark.createConfig(); - config.threads(Runtime.getRuntime().availableProcessors() * 2); + config.threads(180); + config.durationSeconds(300); + config.warmupSeconds(30); ClientServerTopology.configure(config); config.before(new CreatePartitionedRegion(), SERVER); config.before(new CreateClientPool(), CLIENT); config.before(new CreateClientProxyRegionWithPool(), CLIENT); - config.workload(new PutWithDeltaTask(THREAD_COUNT, WORKLOAD_SIZE), CLIENT); + config.workload(new PutWithDeltaTask(ID), CLIENT); return config; } public static class PutWithDeltaTask extends BenchmarkDriverAdapter implements Serializable { private Region region; - private final AtomicInteger threadCounter = new AtomicInteger(0); private final AtomicReference sessionId = new AtomicReference<>(null); - private final List - existingSessionAttributeNames = - Collections.synchronizedList(new ArrayList<>()); + private final String id; + + private final List existingSessionAttributeNames = + new ArrayList<>(); private final Random random = new Random(System.currentTimeMillis()); private Collection> workloadTasks; - public PutWithDeltaTask(int threadCount, int workloadSize) { - + public PutWithDeltaTask(String id) { + this.id = id; } @Override @@ -88,20 +84,23 @@ public void setUp(BenchmarkConfiguration cfg) throws Exception { super.setUp(cfg); this.workloadTasks = newSessionWorkloadTasks(); ClientCache cache = ClientCacheFactory.getAnyInstance(); + cache.setCopyOnRead(true); region = cache.getRegion("region"); + save(Session.create(id)); + this.sessionId.set(id); } @Override public boolean test(Map ctx) throws Exception { int sessionAttributeCount = runSessionWorkload(); + assert sessionAttributeCount == this.existingSessionAttributeNames.size(); Session session = findById(this.sessionId.get()); return true; } private Session findById(String id) { - return Optional.ofNullable(this.region.get(id)) .map(Session::commit) .map(Session::touch) @@ -118,47 +117,24 @@ private Session save(Session session) { return session; } - private int runSessionWorkload() throws InterruptedException { - ExecutorService sessionBatchWorkloadExecutor = newSessionWorkloadExecutor(); - - try { - Collection> results = - sessionBatchWorkloadExecutor.invokeAll(workloadTasks); - return results.stream() - .mapToInt(this::safeFutureGet) - .sum(); - } finally { - Optional.of(sessionBatchWorkloadExecutor).ifPresent(ExecutorService::shutdownNow); - } - } - - private ExecutorService newSessionWorkloadExecutor() { - - return Executors.newFixedThreadPool(THREAD_COUNT, runnable -> { - - Thread sessionThread = new Thread(runnable); - - sessionThread - .setName(String.format("Session Thread %d", this.threadCounter.incrementAndGet())); - sessionThread.setDaemon(true); - sessionThread.setPriority(Thread.NORM_PRIORITY); - - return sessionThread; - }); + private int runSessionWorkload() { + return workloadTasks.stream().mapToInt(integerCallable -> { + try { + return integerCallable.call(); + } catch (Exception e) { + e.printStackTrace(); + } + return 0; + }).sum(); } private Collection> newSessionWorkloadTasks() { Collection> sessionWorkloadTasks = new LinkedList<>(); - for (int count = 0; count < WORKLOAD_SIZE; count++) { - sessionWorkloadTasks.add(count % 79 != 0 - ? newAddSessionAttributeTask() - : count % 237 != 0 - ? newRemoveSessionAttributeTask() - : newSessionReaderTask()); - } - + sessionWorkloadTasks.add(newAddSessionAttributeTask()); + sessionWorkloadTasks.add(newRemoveSessionAttributeTask()); + sessionWorkloadTasks.add(newSessionReaderTask()); return sessionWorkloadTasks; } @@ -183,12 +159,10 @@ private Callable newRemoveSessionAttributeTask() { Session session = findById(this.sessionId.get()); String attributeName = null; - synchronized (this.existingSessionAttributeNames) { - int size = this.existingSessionAttributeNames.size(); - if (size > 0) { - int index = this.random.nextInt(size); - attributeName = this.existingSessionAttributeNames.remove(index); - } + int size = this.existingSessionAttributeNames.size(); + if (size > 0) { + int index = this.random.nextInt(size); + attributeName = this.existingSessionAttributeNames.remove(index); } if (session.getAttributeNames().contains(attributeName)) { @@ -225,10 +199,12 @@ private int safeFutureGet(Future future) { } private class CreateClientPool implements Task { + @Override public void run(TestContext context) throws Exception { PoolFactory poolFactory = PoolManager.createFactory(); poolFactory + .addLocator("localhost", 10334) .setMaxConnections(-1) .setPingInterval(1000) .setRetryAttempts(5) @@ -238,6 +214,7 @@ public void run(TestContext context) throws Exception { } private class CreateClientProxyRegionWithPool implements Task { + @Override public void run(TestContext context) throws Exception { ClientCache clientCache = (ClientCache) context.getAttribute("CLIENT_CACHE"); From 135e6995dd54c630fcf673cacde66bf8260be9d5 Mon Sep 17 00:00:00 2001 From: Udo Kohlmeyer Date: Thu, 16 Apr 2020 09:43:46 -0700 Subject: [PATCH 3/3] GEODE-7763: Updating licensing on PartitionedWithDeltaAndUniqueObjectReferenceBenchmark --- ...thDeltaAndUniqueObjectReferenceBenchmark.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java index 99c3075db..dcdc719e7 100644 --- a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java +++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.geode.benchmark.tests; import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.CLIENT;