diff --git a/geode-benchmarks/src/main/java/benchmark/geode/data/Session.java b/geode-benchmarks/src/main/java/benchmark/geode/data/Session.java new file mode 100644 index 000000000..bfb3fac14 --- /dev/null +++ b/geode-benchmarks/src/main/java/benchmark/geode/data/Session.java @@ -0,0 +1,267 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package benchmark.geode.data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.geode.DataSerializable; +import org.apache.geode.DataSerializer; +import org.apache.geode.Delta; +import org.apache.geode.InvalidDeltaException; + +/** + * The {@link benchmark.geode.data.Session} class is an Abstract Data Type (ADT) modeling a user's + * session. + * + * @author John Blum + * @see Comparable + * @see DataSerializable + * @see DataSerializer + * @see Delta + */ +public class Session implements Comparable, DataSerializable, Delta { + + private static final boolean ALLOW_JAVA_SERIALIZATION = false; + + public static benchmark.geode.data.Session create(String id) { + return new benchmark.geode.data.Session(id); + } + + private transient boolean delta = false; + + protected transient volatile boolean toDataCalled = false; + protected transient volatile boolean toDeltaCalled = false; + + private Instant creationTime; + private Instant lastAccessedTime; + + private final Map attributes = new HashMap<>(); + private final Map attributeDeltas = new HashMap<>(); + + private String id; + + public Session(String id) { + + this.id = id; + this.creationTime = Instant.now(); + this.lastAccessedTime = this.creationTime; + this.delta = true; + } + + public synchronized String getId() { + return this.id; + } + + public synchronized Object setAttribute(String name, Object value) { + + return value != null + ? doSetAttribute(name, value) + : removeAttribute(name); + } + + public synchronized void setAttributes(Map attributes) { + this.attributes.putAll(attributes); + } + + private Object doSetAttribute(String name, Object value) { + + Object previousValue = this.attributes.put(name, value); + + if (!value.equals(previousValue)) { + this.attributeDeltas.put(name, value); + this.delta = true; + } + + return previousValue; + } + + public synchronized Object removeAttribute(String name) { + + if (getAttributes().containsKey(name)) { + this.attributeDeltas.put(name, null); + this.delta = true; + } + + return getAttributes().remove(name); + } + + public synchronized Object getAttribute(String name) { + return getAttributes().get(name); + } + + public synchronized Set getAttributeNames() { + return Collections.unmodifiableSet(new HashSet<>(getAttributes().keySet())); + } + + protected synchronized Map getAttributes() { + return this.attributes; + } + + public synchronized Instant getCreationTime() { + return this.creationTime; + } + + protected synchronized void setLastAccessedTime(Instant lastAccessedTime) { + this.lastAccessedTime = lastAccessedTime; + } + + public synchronized Instant getLastAccessedTime() { + return this.lastAccessedTime; + } + + public synchronized benchmark.geode.data.Session commit() { + + this.delta = false; + this.attributeDeltas.clear(); + + return this; + } + + public synchronized benchmark.geode.data.Session touch() { + + Instant newLastAccessedTime = Instant.now(); + + this.delta |= !newLastAccessedTime.equals(getLastAccessedTime()); + setLastAccessedTime(newLastAccessedTime); + + return this; + } + + @Override + public synchronized void toData(DataOutput out) throws IOException { + + this.toDataCalled = true; + + out.writeUTF(this.getId()); + out.writeLong(this.getCreationTime().toEpochMilli()); + out.writeLong(this.getLastAccessedTime().toEpochMilli()); + DataSerializer.writeHashMap(getAttributes(), out); + } + + @Override + public synchronized void fromData(DataInput in) throws IOException, ClassNotFoundException { + + this.id = in.readUTF(); + this.creationTime = Instant.ofEpochMilli(in.readLong()); + + setLastAccessedTime(Instant.ofEpochMilli(in.readLong())); + + this.setAttributes(DataSerializer.readHashMap(in)); + } + + @Override + public synchronized void toDelta(DataOutput out) throws IOException { + + this.toDeltaCalled = true; + + out.writeLong(getLastAccessedTime().toEpochMilli()); + out.writeInt(this.attributeDeltas.size()); + + for (Entry entry : this.attributeDeltas.entrySet()) { + out.writeUTF(entry.getKey()); + DataSerializer.writeObject(entry.getValue(), out, ALLOW_JAVA_SERIALIZATION); + } + } + + @Override + public synchronized void fromDelta(DataInput in) throws IOException, InvalidDeltaException { + + String key = null; + + try { + + setLastAccessedTime(Instant.ofEpochMilli(in.readLong())); + + for (int count = in.readInt(); count > 0; count--) { + key = in.readUTF(); + setAttribute(key, DataSerializer.readObject(in)); + } + } catch (ClassNotFoundException cause) { + throw new IOException( + String.format("Failed to resolve type in delta for attribute [%s]", key), cause); + } + } + + @Override + public synchronized boolean hasDelta() { + return this.delta || !this.attributeDeltas.isEmpty(); + } + + public synchronized boolean wasToDataCalled() { + + boolean toDataCalled = this.toDataCalled; + + this.toDataCalled = false; + + return toDataCalled; + } + + public synchronized boolean wasToDeltaCalled() { + + boolean toDeltaCalled = this.toDeltaCalled; + + this.toDeltaCalled = false; + + return toDeltaCalled; + } + + + @Override + public int compareTo(benchmark.geode.data.Session session) { + return this.getId().compareTo(session.getId()); + } + + @Override + public boolean equals(Object obj) { + + if (this == obj) { + return true; + } + + if (!(obj instanceof benchmark.geode.data.Session)) { + return false; + } + + benchmark.geode.data.Session that = (benchmark.geode.data.Session) obj; + + return this.getId().equals(that.getId()); + } + + @Override + public int hashCode() { + + int hashValue = 17; + + hashValue = 37 * hashValue + getId().hashCode(); + + return hashValue; + } + + @Override + public String toString() { + return getId(); + } +} diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/parameters/HeapParameters.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/parameters/HeapParameters.java index 0ef39c62a..b7c287faa 100644 --- a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/parameters/HeapParameters.java +++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/parameters/HeapParameters.java @@ -28,7 +28,7 @@ public class HeapParameters { public static void configure(final TestConfig testConfig) { final String heap = System.getProperty("withHeap", "8g"); logger.info("Configuring heap parameters {}.", heap); - configureAll(testConfig, "-Xmx" + heap, "-Xms" + heap); + configureAll(testConfig, "-Xmx" + heap, "-Xms" + heap, "-Xss228k"); } } diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java new file mode 100644 index 000000000..dcdc719e7 --- /dev/null +++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.benchmark.tests; + +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.CLIENT; +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.SERVER; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import benchmark.geode.data.Session; +import org.junit.jupiter.api.Test; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkDriverAdapter; + +import org.apache.geode.benchmark.tasks.CreatePartitionedRegion; +import org.apache.geode.benchmark.topology.ClientServerTopology; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.perftest.PerformanceTest; +import org.apache.geode.perftest.Task; +import org.apache.geode.perftest.TestConfig; +import org.apache.geode.perftest.TestContext; +import org.apache.geode.perftest.TestRunners; + +public class PartitionedWithDeltaAndUniqueObjectReferenceBenchmark implements PerformanceTest { + + private static final String SUBSCRIPTION_POOL = "subscriptionPool"; + private static final String ID = UUID.randomUUID().toString(); + + public PartitionedWithDeltaAndUniqueObjectReferenceBenchmark() {} + + @Test + public void run() throws Exception { + TestRunners.defaultRunner().runTest(this); + } + + @Override + public TestConfig configure() { + TestConfig config = GeodeBenchmark.createConfig(); + config.threads(180); + config.durationSeconds(300); + config.warmupSeconds(30); + ClientServerTopology.configure(config); + config.before(new CreatePartitionedRegion(), SERVER); + config.before(new CreateClientPool(), CLIENT); + config.before(new CreateClientProxyRegionWithPool(), CLIENT); + config.workload(new PutWithDeltaTask(ID), CLIENT); + return config; + } + + public static class PutWithDeltaTask extends BenchmarkDriverAdapter implements Serializable { + + private Region region; + private final AtomicReference sessionId = new AtomicReference<>(null); + + private final String id; + + private final List existingSessionAttributeNames = + new ArrayList<>(); + + private final Random random = new Random(System.currentTimeMillis()); + + private Collection> workloadTasks; + + public PutWithDeltaTask(String id) { + this.id = id; + } + + @Override + public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + this.workloadTasks = newSessionWorkloadTasks(); + ClientCache cache = ClientCacheFactory.getAnyInstance(); + cache.setCopyOnRead(true); + region = cache.getRegion("region"); + save(Session.create(id)); + this.sessionId.set(id); + } + + @Override + public boolean test(Map ctx) throws Exception { + int sessionAttributeCount = runSessionWorkload(); + + assert sessionAttributeCount == this.existingSessionAttributeNames.size(); + Session session = findById(this.sessionId.get()); + + return true; + } + + private Session findById(String id) { + return Optional.ofNullable(this.region.get(id)) + .map(Session::commit) + .map(Session::touch) + .orElseThrow(() -> new IllegalStateException( + String.format("No Session with ID [%s] was found", id))); + } + + private Session save(Session session) { + if (session != null && session.hasDelta()) { + this.region.put(session.getId(), session); + session.commit(); + } + + return session; + } + + private int runSessionWorkload() { + return workloadTasks.stream().mapToInt(integerCallable -> { + try { + return integerCallable.call(); + } catch (Exception e) { + e.printStackTrace(); + } + return 0; + }).sum(); + } + + private Collection> newSessionWorkloadTasks() { + + Collection> sessionWorkloadTasks = new LinkedList<>(); + + sessionWorkloadTasks.add(newAddSessionAttributeTask()); + sessionWorkloadTasks.add(newRemoveSessionAttributeTask()); + sessionWorkloadTasks.add(newSessionReaderTask()); + return sessionWorkloadTasks; + } + + private Callable newAddSessionAttributeTask() { + return () -> { + Session session = findById(this.sessionId.get()); + + String name = UUID.randomUUID().toString(); + + session.setAttribute(name, System.currentTimeMillis()); + save(session); + + this.existingSessionAttributeNames.add(name); + return 1; + }; + } + + private Callable newRemoveSessionAttributeTask() { + return () -> { + int returnValue = 0; + + Session session = findById(this.sessionId.get()); + String attributeName = null; + + int size = this.existingSessionAttributeNames.size(); + if (size > 0) { + int index = this.random.nextInt(size); + attributeName = this.existingSessionAttributeNames.remove(index); + } + + if (session.getAttributeNames().contains(attributeName)) { + session.removeAttribute(attributeName); + returnValue = -1; + } else { + Optional.ofNullable(attributeName) + .filter(it -> !it.trim().isEmpty()) + .ifPresent(this.existingSessionAttributeNames::add); + } + + save(session); + + return returnValue; + }; + } + + private Callable newSessionReaderTask() { + return () -> { + Session session = findById(this.sessionId.get()); + save(session.touch()); + + return 0; + }; + } + + private int safeFutureGet(Future future) { + try { + return future.get(); + } catch (Exception cause) { + throw new RuntimeException("Session access task failure", cause); + } + } + } + + private class CreateClientPool implements Task { + + @Override + public void run(TestContext context) throws Exception { + PoolFactory poolFactory = PoolManager.createFactory(); + poolFactory + .addLocator("localhost", 10334) + .setMaxConnections(-1) + .setPingInterval(1000) + .setRetryAttempts(5) + .setSubscriptionEnabled(true) + .create(SUBSCRIPTION_POOL); + } + } + + private class CreateClientProxyRegionWithPool implements Task { + + @Override + public void run(TestContext context) throws Exception { + ClientCache clientCache = (ClientCache) context.getAttribute("CLIENT_CACHE"); + clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY) + .setPoolName(SUBSCRIPTION_POOL) + .create("region"); + } + } +}