From 29b7b4e86c3f253c0eabfd874ecd807bfe12991d Mon Sep 17 00:00:00 2001 From: David Robinson Date: Wed, 10 Jul 2024 11:15:50 -0400 Subject: [PATCH 1/4] Imperative transaction ordering --- .../dclare/IImperativeTransaction.java | 4 ++ .../dclare/ImperativeTransaction.java | 2 +- .../dclare/TerminalImperativeTransaction.java | 5 ++ .../dclare/UniverseTransaction.java | 68 ++++++++++++++----- 4 files changed, 61 insertions(+), 18 deletions(-) create mode 100644 src/main/java/org/modelingvalue/dclare/IImperativeTransaction.java create mode 100644 src/main/java/org/modelingvalue/dclare/TerminalImperativeTransaction.java diff --git a/src/main/java/org/modelingvalue/dclare/IImperativeTransaction.java b/src/main/java/org/modelingvalue/dclare/IImperativeTransaction.java new file mode 100644 index 00000000..a4839f7b --- /dev/null +++ b/src/main/java/org/modelingvalue/dclare/IImperativeTransaction.java @@ -0,0 +1,4 @@ +package org.modelingvalue.dclare; + +public sealed interface IImperativeTransaction permits TerminalImperativeTransaction, ImperativeTransaction { +} diff --git a/src/main/java/org/modelingvalue/dclare/ImperativeTransaction.java b/src/main/java/org/modelingvalue/dclare/ImperativeTransaction.java index 4e0f8fe8..a4589f73 100644 --- a/src/main/java/org/modelingvalue/dclare/ImperativeTransaction.java +++ b/src/main/java/org/modelingvalue/dclare/ImperativeTransaction.java @@ -31,7 +31,7 @@ import org.modelingvalue.collections.util.NamedIdentity; import org.modelingvalue.dclare.Priority.Queued; -public class ImperativeTransaction extends LeafTransaction { +public final class ImperativeTransaction extends LeafTransaction implements IImperativeTransaction { @SuppressWarnings("rawtypes") protected static final DefaultMap> SETTED_MAP = DefaultMap.of(k -> Set.of()); diff --git a/src/main/java/org/modelingvalue/dclare/TerminalImperativeTransaction.java b/src/main/java/org/modelingvalue/dclare/TerminalImperativeTransaction.java new file mode 100644 index 00000000..d6356680 --- /dev/null +++ b/src/main/java/org/modelingvalue/dclare/TerminalImperativeTransaction.java @@ -0,0 +1,5 @@ +package org.modelingvalue.dclare; + +public final class TerminalImperativeTransaction implements IImperativeTransaction { + // for imperative transaction ordering in graph +} diff --git a/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java b/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java index 24a81363..012739b1 100644 --- a/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java +++ b/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java @@ -26,15 +26,12 @@ import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; -import org.modelingvalue.collections.Collection; -import org.modelingvalue.collections.DefaultMap; -import org.modelingvalue.collections.Entry; -import org.modelingvalue.collections.List; -import org.modelingvalue.collections.Set; +import org.modelingvalue.collections.*; import org.modelingvalue.collections.util.Concurrent; import org.modelingvalue.collections.util.ContextPool; import org.modelingvalue.collections.util.StatusProvider; @@ -50,6 +47,8 @@ public class UniverseTransaction extends MutableTransaction { private static final Setable STOPPED = Setable.of("stopped", false); + private static final TerminalImperativeTransaction SOURCE = new TerminalImperativeTransaction(); + private static final TerminalImperativeTransaction TARGET = new TerminalImperativeTransaction(); // private final DclareConfig config; protected final Concurrent, ActionTransaction>> actionTransactions = Concurrent.of(() -> new ReusableTransaction<>(this)); @@ -94,7 +93,7 @@ public class UniverseTransaction extends MutableTransaction { private List> timeTravelingActions = List.of(backward, forward); private List> preActions = List.of(); private List> postActions = List.of(); - private List imperativeTransactions = List.of(); + private Graph> imperativeTransactions = Graph.of(); private List history = List.of(); private List future = List.of(); private State preState; @@ -282,7 +281,7 @@ public void run() { handleTooManyChanges(state); runActions(postActions); } - commit(state, timeTraveling, imperativeTransactions.iterator()); + commit(state, timeTraveling); if (!killed && inQueue.isEmpty() && isStopped(state)) { break; } @@ -709,32 +708,67 @@ public void addPostAction(Action action) { public ImperativeTransaction addImperative(String id, StateDeltaHandler diffHandler, Consumer scheduler, boolean keepTransaction) { ImperativeTransaction n = ImperativeTransaction.of(Imperative.of(id), preState, this, scheduler, diffHandler, keepTransaction); synchronized (this) { - imperativeTransactions = imperativeTransactions.add(n); + imperativeTransactions = imperativeTransactions.putEdge(SOURCE, n, Void.class); } return n; } + public void orderImperativeTransactions(ImperativeTransaction first, ImperativeTransaction second) { + imperativeTransactions = imperativeTransactions.putEdge(first, second, Void.class); + } + + public void orderImperativeTransactions(String first, String second) { + orderImperativeTransactions(getImperativeTransaction(first), getImperativeTransaction(second)); + } + + @SuppressWarnings("unchecked") public List getImperativeTransactions() { - return imperativeTransactions; + return imperativeTransactions.getNodes().filter(ImperativeTransaction.class::isInstance).map(ImperativeTransaction.class::cast).asList(); } public ImperativeTransaction getImperativeTransaction(String id) { - for (ImperativeTransaction it : imperativeTransactions) { - if (it.imperative().id().equals(id)) { + for (IImperativeTransaction iit : imperativeTransactions.getNodes()) { + if (iit instanceof ImperativeTransaction it && it.imperative().id().equals(id)) { return it; } } return null; } - private void commit(State state, boolean timeTraveling, Iterator it) { - if (!killed && it.hasNext()) { - ImperativeTransaction itx = it.next(); - itx.schedule(() -> { - if (itx.commit(state, timeTraveling)) { - commit(itx.state(), timeTraveling, it); + private void commit(State state, boolean timeTraveling) { + if (imperativeTransactions.hasCycles(n -> true, e -> !Objects.equals(e.a(), e.c()))) + throw new Error("Circular native group ordering"); + + AtomicReference> started = new AtomicReference<>(Set.of()); + AtomicReference> stopped = new AtomicReference<>(Set.of()); + AtomicBoolean inSync = new AtomicBoolean(true); + + imperativeTransactions = imperativeTransactions.putEdge(SOURCE, TARGET, Void.class); + tryCommit(state, timeTraveling, SOURCE, started, stopped, inSync); + } + + private void tryCommit(State state, boolean timeTraveling, IImperativeTransaction it, AtomicReference> started, AtomicReference> stopped, AtomicBoolean inSync) { + if (!inSync.get() || killed || Objects.equals(it, TARGET)) return; + + Set incoming = imperativeTransactions.getIncomingNodes(it); + if (!stopped.get().containsAll(incoming)) return; + + Set s = started.getAndUpdate(old -> old.add(it)); + if (s.contains(it)) return; + + stopped.updateAndGet(old -> old.add(it)); + if (it instanceof ImperativeTransaction im) { + im.schedule(() -> { + if (im.commit(state, timeTraveling)) { + imperativeTransactions.getOutgoingNodes(it).forEach(next -> + tryCommit(state, timeTraveling, next, started, stopped, inSync)); + } else { + inSync.set(false); } }); + } else { + imperativeTransactions.getOutgoingNodes(it).forEach(next -> + tryCommit(state, timeTraveling, next, started, stopped, inSync)); } } From f9a56791e29e5fc0de9b67a7c577693be3f17736 Mon Sep 17 00:00:00 2001 From: David Robinson Date: Tue, 16 Jul 2024 12:56:48 -0400 Subject: [PATCH 2/4] Started writing tests for imperative transaction ordering --- .../dclare/UniverseTransaction.java | 4 +- .../dclare/test/ImperativeOrderingTests.java | 124 ++++++++++++++ .../test/support/OrderedTestUniverse.java | 162 ++++++++++++++++++ 3 files changed, 288 insertions(+), 2 deletions(-) create mode 100644 src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java create mode 100644 src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java diff --git a/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java b/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java index 012739b1..e8ec4f19 100644 --- a/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java +++ b/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java @@ -20,7 +20,6 @@ package org.modelingvalue.dclare; -import java.util.Iterator; import java.util.Objects; import java.util.Timer; import java.util.TimerTask; @@ -756,10 +755,10 @@ private void tryCommit(State state, boolean timeTraveling, IImperativeTransactio Set s = started.getAndUpdate(old -> old.add(it)); if (s.contains(it)) return; - stopped.updateAndGet(old -> old.add(it)); if (it instanceof ImperativeTransaction im) { im.schedule(() -> { if (im.commit(state, timeTraveling)) { + stopped.updateAndGet(old -> old.add(it)); imperativeTransactions.getOutgoingNodes(it).forEach(next -> tryCommit(state, timeTraveling, next, started, stopped, inSync)); } else { @@ -767,6 +766,7 @@ private void tryCommit(State state, boolean timeTraveling, IImperativeTransactio } }); } else { + stopped.updateAndGet(old -> old.add(it)); imperativeTransactions.getOutgoingNodes(it).forEach(next -> tryCommit(state, timeTraveling, next, started, stopped, inSync)); } diff --git a/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java b/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java new file mode 100644 index 00000000..8f1a12a2 --- /dev/null +++ b/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java @@ -0,0 +1,124 @@ +package org.modelingvalue.dclare.test; + +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.RepetitionInfo; +import org.modelingvalue.collections.List; +import org.modelingvalue.collections.Set; +import org.modelingvalue.collections.util.Concurrent; +import org.modelingvalue.collections.util.Pair; +import org.modelingvalue.collections.util.StatusProvider; +import org.modelingvalue.dclare.*; +import org.modelingvalue.dclare.test.support.*; + +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.*; +import static org.modelingvalue.dclare.CoreSetableModifier.containment; +import static org.modelingvalue.dclare.test.support.Shared.THE_POOL; +import static org.modelingvalue.dclare.test.support.TestNewable.create; + +public class ImperativeOrderingTests { + private static final DclareConfig BASE_CONFIG = new DclareConfig() // + .withDevMode(true) // + .withCheckOrphanState(true) // + .withMaxNrOfChanges(16) // + .withMaxTotalNrOfChanges(1000) // + .withMaxNrOfObserved(40) // + .withMaxNrOfObservers(40) // + .withTraceUniverse(false) // + .withTraceMutable(false) // + .withTraceActions(false) // + .withTraceMatching(false) // + .withTraceRippleOut(false) // + .withTraceDerivation(false); + private static final DclareConfig[] CONFIGS = new DclareConfig[]{ // + BASE_CONFIG, // + BASE_CONFIG // + .withDevMode(true) // + .withRunSequential(true) // + }; + + private boolean imperativeTest(DclareConfig config, int size, Set> edges) { + Observed> cs = Observed.of("cs", List.of(), containment); + TestMutableClass U = TestMutableClass.of("Universe", cs); + + OrderedTestUniverse universe = OrderedTestUniverse.of("universe", U, size, edges); + UniverseTransaction utx = new UniverseTransaction(universe, THE_POOL, config); + + Set> actions = Set.of(IntStream.range(0, size) + .mapToObj(i -> Pair.of(i, (Runnable) () -> {})).toList()); + + run(utx, "init", actions); + + run(utx, "stop", Set.of(Pair.of(0, utx::stop))); + utx.waitForEnd(); + return universe.passed(); + } + +// private boolean imperativeTest(DclareConfig config, int size, Set> expectedEdges, Set> actualEdges) { +// Observed> cs = Observed.of("cs", List.of(), containment); +// TestMutableClass U = TestMutableClass.of("Universe", cs); +// +// OrderedTestUniverse universe = OrderedTestUniverse.of("universe", U, size, expectedEdges, actualEdges); +// UniverseTransaction utx = new UniverseTransaction(universe, THE_POOL, config); +// +// Set> actions = Set.of(IntStream.range(0, size) +// .mapToObj(i -> Pair.of(i, (Runnable) () -> {})).toList()); +// +// run(utx, "init", actions); +// +// run(utx, "stop", Set.of(Pair.of(0, utx::stop))); +// utx.waitForEnd(); +// return universe.passed(); +// } + + @RepeatedTest(64) + public void basicOrdering(RepetitionInfo repetitionInfo) { + DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32]; + + assertTrue(imperativeTest(config, 1, Set.of())); + assertTrue(imperativeTest(config, 2, Set.of())); + assertTrue(imperativeTest(config, 2, Set.of(Pair.of(0, 1)))); + } + + @RepeatedTest(64) + public void complexOrdering(RepetitionInfo repetitionInfo) { + DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32]; + + assertTrue(imperativeTest(config, 6, Set.of( + Pair.of(0, 2), Pair.of(0, 3), Pair.of(1, 3), + Pair.of(2, 4), Pair.of(3, 4), Pair.of(3, 5) + ))); + } + + @RepeatedTest(64) + public void cyclicOrdering(RepetitionInfo repetitionInfo) { + DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32]; + + assertThrows(Error.class, () -> imperativeTest(config, 2, Set.of(Pair.of(0, 1), Pair.of(1, 0)))); +// System.out.println("1"); + assertThrows(Error.class, () -> imperativeTest(config, 1, Set.of(Pair.of(0, 0)))); +// System.out.println("2"); + } + + @RepeatedTest(64) + public void wrongOrdering(RepetitionInfo repetitionInfo) { + DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32]; + +// assertFalse(imperativeTest(config, 2, Set.of(Pair.of(0, 1)), Set.of(Pair.of(1, 0)))); + } + + private void run(UniverseTransaction utx, String id, Set> actions) { + StatusProvider.StatusIterator it = utx.getStatusIterator(); + UniverseTransaction.Status status = it.waitForStoppedOr(UniverseTransaction.Status::isIdle); + if (!status.isStopped()) { + if (utx.getConfig().isTraceUniverse()) { + System.err.println("-------------------------- " + id + " -------------------------------------------"); + } + OrderedTestUniverse u = (OrderedTestUniverse) utx.universe(); + u.schedule(actions); + it.waitForStoppedOr(s -> !s.active.isEmpty()); + } + } + +} diff --git a/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java b/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java new file mode 100644 index 00000000..a157d752 --- /dev/null +++ b/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java @@ -0,0 +1,162 @@ +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// (C) Copyright 2018-2024 Modeling Value Group B.V. (http://modelingvalue.org) ~ +// ~ +// Licensed under the GNU Lesser General Public License v3.0 (the 'License'). You may not use this file except in ~ +// compliance with the License. You may obtain a copy of the License at: https://choosealicense.com/licenses/lgpl-3.0 ~ +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on ~ +// an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the ~ +// specific language governing permissions and limitations under the License. ~ +// ~ +// Maintainers: ~ +// Wim Bast, Tom Brus ~ +// ~ +// Contributors: ~ +// Ronald Krijgsheld ✝, Arjan Kok, Carel Bast ~ +// --------------------------------------------------------------------------------------------------------------------- ~ +// In Memory of Ronald Krijgsheld, 1972 - 2023 ~ +// Ronald was suddenly and unexpectedly taken from us. He was not only our long-term colleague and team member ~ +// but also our friend. "He will live on in many of the lines of code you see below." ~ +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +package org.modelingvalue.dclare.test.support; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.modelingvalue.collections.Entry; +import org.modelingvalue.collections.List; +import org.modelingvalue.collections.Map; +import org.modelingvalue.collections.Set; +import org.modelingvalue.collections.util.Pair; +import org.modelingvalue.dclare.ImperativeTransaction; +import org.modelingvalue.dclare.LeafTransaction; +import org.modelingvalue.dclare.Mutable; +import org.modelingvalue.dclare.Setable; +import org.modelingvalue.dclare.State; +import org.modelingvalue.dclare.Universe; +import org.modelingvalue.dclare.UniverseTransaction; + +@SuppressWarnings("unused") +public class OrderedTestUniverse extends TestMutable implements Universe { +// public static OrderedTestUniverse of(Object id, TestMutableClass clazz, int size, Set> expectedEdges, Set> actualEdges) { +// return new OrderedTestUniverse(id, clazz, size, expectedEdges, actualEdges); +// } + + public static OrderedTestUniverse of(Object id, TestMutableClass clazz, int size, Set> edges) { + return new OrderedTestUniverse(id, clazz, size, edges); + } + + private static final Setable DUMMY = Setable.of("$DUMMY", 0l); + + private final TestScheduler scheduler = TestScheduler.of(); + private final AtomicInteger counter = new AtomicInteger(0); + + private UniverseTransaction universeTransaction; + + private List imperativeTransactions = List.of(); + private final int size; + private Set> expectedEdges; + private Set> actualEdges; + private Set> edges; + private Map> incoming = Map.of(); + AtomicReference> passed = new AtomicReference<>(Map.of()); + private AtomicBoolean flag = new AtomicBoolean(true); + +// private OrderedTestUniverse(Object id, TestMutableClass clazz, int size, Set> expectedEdges, Set> actualEdges) { +// super(id, clazz); +// this.size = size; +// this.expectedEdges = expectedEdges; +// this.actualEdges = actualEdges; +// } + + private OrderedTestUniverse(Object id, TestMutableClass clazz, int size, Set> edges) { + super(id, clazz); + this.size = size; + this.edges = edges; + } + + @Override + public void init() { + System.out.println(size); + scheduler.start(); + Universe.super.init(); + universeTransaction = LeafTransaction.getCurrent().universeTransaction(); + + for (int i = 0; i < size; i++) { + int finalI = i; + passed.getAndUpdate(map -> map.put(finalI, false)); + } + + for (int i = 0; i < size; i++) { + int finalI = i; + System.out.println(i); + imperativeTransactions = imperativeTransactions.add(universeTransaction.addImperative("TEST" + i, (pre, post, last, setted) -> { + passed.getAndUpdate(list -> { + if (incoming.containsKey(finalI)) { + for (Integer inc : incoming.get((Integer) finalI)) { + if (!list.get(inc)) { + flag.set(false); + } + } + } + + return list.put(finalI, true); + }); + + System.out.println("?"); + pre.diff(post, o -> o instanceof TestNewable, s -> s == Mutable.D_PARENT_CONTAINING).forEach(e -> { + if (e.getValue().get(Mutable.D_PARENT_CONTAINING).b() != null) { + TestNewable n = (TestNewable) e.getKey(); + if (n.dInitialConstruction().isDerived()) { + TestNewable.construct(n, "init" + uniqueInt()); + } + } + }); + }, scheduler, false)); + } + + for (var edge : expectedEdges) { + incoming = incoming.put(edge.b(), incoming.getOrDefault(edge.b(), Set.of()).add(edge.a())); + } + for (var edge : actualEdges) { + System.out.println(edge.a() + " -> " + edge.b()); + universeTransaction.orderImperativeTransactions("TEST" + edge.a(), "TEST" + edge.b()); + } + } + + @Override + public void exit() { + scheduler.stop(); + Universe.super.exit(); + } + + public boolean passed() { + return flag.get() && passed.get().toValues().allMatch(b -> b); + } + + public int uniqueInt() { + return counter.getAndIncrement(); + } + + public void schedule(Set> actions) { + actions.forEach(action -> imperativeTransactions.get(action.a()).schedule(() -> { + DUMMY.set(this, Long::sum, 1L); + action.b().run(); + })); + } + + public State waitForEnd(UniverseTransaction universeTransaction) throws Throwable { + try { + return universeTransaction.waitForEnd(); + } catch (Error e) { + throw e.getCause(); + } + } + + @Override + public boolean dIsOrphan(State state) { + return Universe.super.dIsOrphan(state); + } + +} From bde88da3c4134e4a75b7fce2bdb2bba66c2317f8 Mon Sep 17 00:00:00 2001 From: David Robinson Date: Tue, 16 Jul 2024 17:48:24 -0400 Subject: [PATCH 3/4] Finished writing tests for imperative transaction ordering --- .../dclare/UniverseTransaction.java | 2 +- .../dclare/test/ImperativeOrderingTests.java | 34 +++++++------------ .../test/support/OrderedTestUniverse.java | 25 +++----------- 3 files changed, 19 insertions(+), 42 deletions(-) diff --git a/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java b/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java index e8ec4f19..ecee8b36 100644 --- a/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java +++ b/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java @@ -735,7 +735,7 @@ public ImperativeTransaction getImperativeTransaction(String id) { } private void commit(State state, boolean timeTraveling) { - if (imperativeTransactions.hasCycles(n -> true, e -> !Objects.equals(e.a(), e.c()))) + if (imperativeTransactions.hasCycles(n -> true, e -> true)) throw new Error("Circular native group ordering"); AtomicReference> started = new AtomicReference<>(Set.of()); diff --git a/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java b/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java index 8f1a12a2..71192416 100644 --- a/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java +++ b/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java @@ -39,10 +39,14 @@ public class ImperativeOrderingTests { }; private boolean imperativeTest(DclareConfig config, int size, Set> edges) { + return imperativeTest(config, size, edges, edges); + } + + private boolean imperativeTest(DclareConfig config, int size, Set> expectedEdges, Set> actualEdges) { Observed> cs = Observed.of("cs", List.of(), containment); TestMutableClass U = TestMutableClass.of("Universe", cs); - OrderedTestUniverse universe = OrderedTestUniverse.of("universe", U, size, edges); + OrderedTestUniverse universe = OrderedTestUniverse.of("universe", U, size, expectedEdges, actualEdges); UniverseTransaction utx = new UniverseTransaction(universe, THE_POOL, config); Set> actions = Set.of(IntStream.range(0, size) @@ -55,23 +59,6 @@ private boolean imperativeTest(DclareConfig config, int size, Set> expectedEdges, Set> actualEdges) { -// Observed> cs = Observed.of("cs", List.of(), containment); -// TestMutableClass U = TestMutableClass.of("Universe", cs); -// -// OrderedTestUniverse universe = OrderedTestUniverse.of("universe", U, size, expectedEdges, actualEdges); -// UniverseTransaction utx = new UniverseTransaction(universe, THE_POOL, config); -// -// Set> actions = Set.of(IntStream.range(0, size) -// .mapToObj(i -> Pair.of(i, (Runnable) () -> {})).toList()); -// -// run(utx, "init", actions); -// -// run(utx, "stop", Set.of(Pair.of(0, utx::stop))); -// utx.waitForEnd(); -// return universe.passed(); -// } - @RepeatedTest(64) public void basicOrdering(RepetitionInfo repetitionInfo) { DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32]; @@ -96,16 +83,21 @@ public void cyclicOrdering(RepetitionInfo repetitionInfo) { DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32]; assertThrows(Error.class, () -> imperativeTest(config, 2, Set.of(Pair.of(0, 1), Pair.of(1, 0)))); -// System.out.println("1"); assertThrows(Error.class, () -> imperativeTest(config, 1, Set.of(Pair.of(0, 0)))); -// System.out.println("2"); + assertThrows(Error.class, () -> imperativeTest(config, 4, Set.of( + Pair.of(0, 1), Pair.of(1, 2), Pair.of(2, 3), Pair.of(3, 1) + ))); } @RepeatedTest(64) public void wrongOrdering(RepetitionInfo repetitionInfo) { DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32]; -// assertFalse(imperativeTest(config, 2, Set.of(Pair.of(0, 1)), Set.of(Pair.of(1, 0)))); + assertFalse(imperativeTest(config, 2, Set.of(Pair.of(0, 1)), Set.of(Pair.of(1, 0)))); + assertFalse(imperativeTest(config, 3, + Set.of(Pair.of(0, 1), Pair.of(0, 2), Pair.of(1, 2)), + Set.of(Pair.of(0, 1), Pair.of(0, 2), Pair.of(2, 1)) + )); } private void run(UniverseTransaction utx, String id, Set> actions) { diff --git a/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java b/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java index a157d752..ef46aa23 100644 --- a/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java +++ b/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java @@ -39,12 +39,8 @@ @SuppressWarnings("unused") public class OrderedTestUniverse extends TestMutable implements Universe { -// public static OrderedTestUniverse of(Object id, TestMutableClass clazz, int size, Set> expectedEdges, Set> actualEdges) { -// return new OrderedTestUniverse(id, clazz, size, expectedEdges, actualEdges); -// } - - public static OrderedTestUniverse of(Object id, TestMutableClass clazz, int size, Set> edges) { - return new OrderedTestUniverse(id, clazz, size, edges); + public static OrderedTestUniverse of(Object id, TestMutableClass clazz, int size, Set> expectedEdges, Set> actualEdges) { + return new OrderedTestUniverse(id, clazz, size, expectedEdges, actualEdges); } private static final Setable DUMMY = Setable.of("$DUMMY", 0l); @@ -58,27 +54,19 @@ public static OrderedTestUniverse of(Object id, TestMutableClass clazz, int size private final int size; private Set> expectedEdges; private Set> actualEdges; - private Set> edges; private Map> incoming = Map.of(); AtomicReference> passed = new AtomicReference<>(Map.of()); private AtomicBoolean flag = new AtomicBoolean(true); -// private OrderedTestUniverse(Object id, TestMutableClass clazz, int size, Set> expectedEdges, Set> actualEdges) { -// super(id, clazz); -// this.size = size; -// this.expectedEdges = expectedEdges; -// this.actualEdges = actualEdges; -// } - - private OrderedTestUniverse(Object id, TestMutableClass clazz, int size, Set> edges) { + private OrderedTestUniverse(Object id, TestMutableClass clazz, int size, Set> expectedEdges, Set> actualEdges) { super(id, clazz); this.size = size; - this.edges = edges; + this.expectedEdges = expectedEdges; + this.actualEdges = actualEdges; } @Override public void init() { - System.out.println(size); scheduler.start(); Universe.super.init(); universeTransaction = LeafTransaction.getCurrent().universeTransaction(); @@ -90,7 +78,6 @@ public void init() { for (int i = 0; i < size; i++) { int finalI = i; - System.out.println(i); imperativeTransactions = imperativeTransactions.add(universeTransaction.addImperative("TEST" + i, (pre, post, last, setted) -> { passed.getAndUpdate(list -> { if (incoming.containsKey(finalI)) { @@ -104,7 +91,6 @@ public void init() { return list.put(finalI, true); }); - System.out.println("?"); pre.diff(post, o -> o instanceof TestNewable, s -> s == Mutable.D_PARENT_CONTAINING).forEach(e -> { if (e.getValue().get(Mutable.D_PARENT_CONTAINING).b() != null) { TestNewable n = (TestNewable) e.getKey(); @@ -120,7 +106,6 @@ public void init() { incoming = incoming.put(edge.b(), incoming.getOrDefault(edge.b(), Set.of()).add(edge.a())); } for (var edge : actualEdges) { - System.out.println(edge.a() + " -> " + edge.b()); universeTransaction.orderImperativeTransactions("TEST" + edge.a(), "TEST" + edge.b()); } } From cae9d39418722f9d8382aae4765ef5f8453b86e6 Mon Sep 17 00:00:00 2001 From: David Robinson Date: Wed, 31 Jul 2024 09:33:00 -0400 Subject: [PATCH 4/4] Resolved issues and added comments --- .../dclare/UniverseTransaction.java | 164 +++++++++++++++--- .../dclare/test/ImperativeOrderingTests.java | 16 +- .../test/support/OrderedTestUniverse.java | 2 +- 3 files changed, 147 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java b/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java index ecee8b36..002aa591 100644 --- a/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java +++ b/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java @@ -25,18 +25,14 @@ import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; import org.modelingvalue.collections.*; -import org.modelingvalue.collections.util.Concurrent; -import org.modelingvalue.collections.util.ContextPool; -import org.modelingvalue.collections.util.StatusProvider; +import org.modelingvalue.collections.util.*; import org.modelingvalue.collections.util.StatusProvider.AbstractStatus; import org.modelingvalue.collections.util.StatusProvider.StatusIterator; -import org.modelingvalue.collections.util.TraceTimer; import org.modelingvalue.dclare.NonCheckingObserver.NonCheckingTransaction; import org.modelingvalue.dclare.Priority.MutableStates; import org.modelingvalue.dclare.ex.ConsistencyError; @@ -197,6 +193,7 @@ public UniverseTransaction(Universe universe, ContextPool pool, boolean pull, Dc if (startStatusConsumer != null) { startStatusConsumer.accept(startStatus); } + imperativeTransactions = imperativeTransactions.putEdge(SOURCE, TARGET, Void.class); } @Override @@ -704,6 +701,16 @@ public void addPostAction(Action action) { } } + /** + * Constructs a new imperative transaction and add it to the universe transaction. Synchronized + * on {@code this}. + * + * @param id string identifier + * @param diffHandler handles the incoming difference after fixpoint achieved, always executed on {@code scheduler} + * @param scheduler a single-threaded consumer that always uses the same thread + * @param keepTransaction if true, will persist the association of this transaction on the {@code scheduler}'s thread after scheduled actions have completed + * @return the created imperative transaction + */ public ImperativeTransaction addImperative(String id, StateDeltaHandler diffHandler, Consumer scheduler, boolean keepTransaction) { ImperativeTransaction n = ImperativeTransaction.of(Imperative.of(id), preState, this, scheduler, diffHandler, keepTransaction); synchronized (this) { @@ -712,12 +719,119 @@ public ImperativeTransaction addImperative(String id, StateDeltaHandler diffHand return n; } - public void orderImperativeTransactions(ImperativeTransaction first, ImperativeTransaction second) { - imperativeTransactions = imperativeTransactions.putEdge(first, second, Void.class); + /** + * Constructs a new imperative transaction and add it to the universe transaction, adding any + * orderings between imperative transactions from {@code dependencies} and {@code dependents}. + * Synchronized on {@code this}. + * + * @param id string identifier + * @param diffHandler handles the incoming difference after fixpoint achieved, always executed on {@code scheduler} + * @param scheduler a single-threaded consumer that always uses the same thread + * @param keepTransaction if true, will persist the association of this transaction on the {@code scheduler}'s thread after scheduled actions have completed + * @param dependencies set of imperative transactions that will only run before this imperative transaction + * @param dependents set of imperative transactions that will only run after this imperative transaction + * @return the created imperative transaction + */ + public ImperativeTransaction addImperative(String id, StateDeltaHandler diffHandler, Consumer scheduler, boolean keepTransaction, Set dependencies, Set dependents) { + ImperativeTransaction n = ImperativeTransaction.of(Imperative.of(id), preState, this, scheduler, diffHandler, keepTransaction); + synchronized (this) { + imperativeTransactions = imperativeTransactions.putEdge(SOURCE, n, Void.class); + + for (var it : dependencies) { + imperativeTransactions = imperativeTransactions.putEdge(it, n, Void.class); + } + + for (var it : dependents) { + imperativeTransactions = imperativeTransactions.putEdge(n, it, Void.class); + } + + if (imperativeTransactions.getIncomingNodes(n).size() > 1) + imperativeTransactions = imperativeTransactions.removeEdge(SOURCE, n, Void.class); + } + return n; } - public void orderImperativeTransactions(String first, String second) { - orderImperativeTransactions(getImperativeTransaction(first), getImperativeTransaction(second)); + /** + * Removes the given imperative transaction if it has no dependents. Synchronized on + * {@code this}. + * + * @param it imperative transaction to be removed + */ + public void removeImperative(ImperativeTransaction it) { + synchronized (this) { + Set outgoing = imperativeTransactions.getOutgoingNodes(it); + + if (!outgoing.isEmpty()) { + throw new Error("Cannot remove " + it + " because it is a dependent of " + outgoing); + } + + imperativeTransactions = imperativeTransactions.removeNode(it); + } + } + /** + * Removes the given imperative transaction if it has no dependents. Synchronized on + * {@code this}. + * + * @param id id of imperative transaction to be removed + */ + public void removeImperative(String id) { + removeImperative(getImperativeTransaction(id)); + } + + /** + * Creates an ordering between the two imperative transactions, ensuring that {@code first} + * will always run before {@code second}. Synchronized on {@code this}. + * + * @param first imperative transaction to be run before {@code second} + * @param second imperative transaction to be run after {@code first} + */ + public void orderImperatives(ImperativeTransaction first, ImperativeTransaction second) { + synchronized (this) { + imperativeTransactions = imperativeTransactions.putEdge(first, second, Void.class); + if (imperativeTransactions.hasCycles(n -> true, e -> true, first)) + throw new Error("Circular native group ordering"); + + if (imperativeTransactions.getIncomingNodes(second).size() > 1) + imperativeTransactions = imperativeTransactions.removeEdge(SOURCE, second, Void.class); + } + } + + /** + * Creates an ordering between the two imperative transactions, ensuring that {@code first} + * will always run before {@code second}. Synchronized on {@code this}. + * + * @param first id of imperative transaction to be run before {@code second} + * @param second id of imperative transaction to be run after {@code first} + */ + public void orderImperatives(String first, String second) { + orderImperatives(getImperativeTransaction(first), getImperativeTransaction(second)); + } + + /** + * Removes an ordering between the two imperative transactions if it exists. Synchronized on + * {@code this}. + * + * @param first imperative transaction that was run before {@code second} + * @param second imperative transaction that was run after {@code first} + */ + public void unorderImperatives(ImperativeTransaction first, ImperativeTransaction second) { + synchronized (this) { + if (imperativeTransactions.getIncomingNodes(second).size() == 1 && imperativeTransactions.containsEdge(first, second, Void.class)) + imperativeTransactions = imperativeTransactions.putEdge(SOURCE, second, Void.class); + + imperativeTransactions = imperativeTransactions.removeEdge(first, second, Void.class); + } + } + + /** + * Removes an ordering between the two imperative transactions if it exists. Synchronized on + * {@code this}. + * + * @param first id of imperative transaction that runs before {@code second} + * @param second id of imperative transaction that runs after {@code first} + */ + public void unorderImperatives(String first, String second) { + unorderImperatives(getImperativeTransaction(first), getImperativeTransaction(second)); } @SuppressWarnings("unchecked") @@ -735,40 +849,36 @@ public ImperativeTransaction getImperativeTransaction(String id) { } private void commit(State state, boolean timeTraveling) { - if (imperativeTransactions.hasCycles(n -> true, e -> true)) - throw new Error("Circular native group ordering"); - - AtomicReference> started = new AtomicReference<>(Set.of()); - AtomicReference> stopped = new AtomicReference<>(Set.of()); - AtomicBoolean inSync = new AtomicBoolean(true); + AtomicReference> status = new AtomicReference<>(Map.of()); + status.updateAndGet(map -> map.put(TARGET, 1)); - imperativeTransactions = imperativeTransactions.putEdge(SOURCE, TARGET, Void.class); - tryCommit(state, timeTraveling, SOURCE, started, stopped, inSync); + tryCommit(state, timeTraveling, SOURCE, status); } - private void tryCommit(State state, boolean timeTraveling, IImperativeTransaction it, AtomicReference> started, AtomicReference> stopped, AtomicBoolean inSync) { - if (!inSync.get() || killed || Objects.equals(it, TARGET)) return; + private void tryCommit(State state, boolean timeTraveling, IImperativeTransaction it, AtomicReference> status) { + if (status.get().get(TARGET) == 0 || killed || Objects.equals(it, TARGET)) return; Set incoming = imperativeTransactions.getIncomingNodes(it); - if (!stopped.get().containsAll(incoming)) return; + Map map = status.get(); + if (!incoming.isEmpty() && !incoming.allMatch(e -> map.getOrDefault(e, -1) == 2)) return; - Set s = started.getAndUpdate(old -> old.add(it)); - if (s.contains(it)) return; + Map m = status.getAndUpdate(old -> old.put(it, 1)); + if (m.getOrDefault(it, 0) == 1) return; if (it instanceof ImperativeTransaction im) { im.schedule(() -> { if (im.commit(state, timeTraveling)) { - stopped.updateAndGet(old -> old.add(it)); + status.updateAndGet(old -> old.put(it, 2)); imperativeTransactions.getOutgoingNodes(it).forEach(next -> - tryCommit(state, timeTraveling, next, started, stopped, inSync)); + tryCommit(state, timeTraveling, next, status)); } else { - inSync.set(false); + status.getAndUpdate(old -> old.put(TARGET, 0)); } }); } else { - stopped.updateAndGet(old -> old.add(it)); + status.updateAndGet(old -> old.put(it, 2)); imperativeTransactions.getOutgoingNodes(it).forEach(next -> - tryCommit(state, timeTraveling, next, started, stopped, inSync)); + tryCommit(state, timeTraveling, next, status)); } } diff --git a/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java b/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java index 71192416..ab343a09 100644 --- a/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java +++ b/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java @@ -4,18 +4,17 @@ import org.junit.jupiter.api.RepetitionInfo; import org.modelingvalue.collections.List; import org.modelingvalue.collections.Set; -import org.modelingvalue.collections.util.Concurrent; import org.modelingvalue.collections.util.Pair; import org.modelingvalue.collections.util.StatusProvider; import org.modelingvalue.dclare.*; import org.modelingvalue.dclare.test.support.*; +import java.time.Duration; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.*; import static org.modelingvalue.dclare.CoreSetableModifier.containment; import static org.modelingvalue.dclare.test.support.Shared.THE_POOL; -import static org.modelingvalue.dclare.test.support.TestNewable.create; public class ImperativeOrderingTests { private static final DclareConfig BASE_CONFIG = new DclareConfig() // @@ -82,11 +81,14 @@ public void complexOrdering(RepetitionInfo repetitionInfo) { public void cyclicOrdering(RepetitionInfo repetitionInfo) { DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32]; - assertThrows(Error.class, () -> imperativeTest(config, 2, Set.of(Pair.of(0, 1), Pair.of(1, 0)))); - assertThrows(Error.class, () -> imperativeTest(config, 1, Set.of(Pair.of(0, 0)))); - assertThrows(Error.class, () -> imperativeTest(config, 4, Set.of( - Pair.of(0, 1), Pair.of(1, 2), Pair.of(2, 3), Pair.of(3, 1) - ))); + assertTimeoutPreemptively(Duration.ofSeconds(2), () -> assertThrows(Error.class, () -> + imperativeTest(config, 2, Set.of(Pair.of(0, 1), Pair.of(1, 0))))); + assertTimeoutPreemptively(Duration.ofSeconds(2), () -> assertThrows(Error.class, () -> + imperativeTest(config, 1, Set.of(Pair.of(0, 0))))); + assertTimeoutPreemptively(Duration.ofSeconds(2), () -> assertThrows(Error.class, () -> + imperativeTest(config, 4, Set.of( + Pair.of(0, 1), Pair.of(1, 2), Pair.of(2, 3), Pair.of(3, 1) + )))); } @RepeatedTest(64) diff --git a/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java b/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java index ef46aa23..969db6ad 100644 --- a/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java +++ b/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java @@ -106,7 +106,7 @@ public void init() { incoming = incoming.put(edge.b(), incoming.getOrDefault(edge.b(), Set.of()).add(edge.a())); } for (var edge : actualEdges) { - universeTransaction.orderImperativeTransactions("TEST" + edge.a(), "TEST" + edge.b()); + universeTransaction.orderImperatives("TEST" + edge.a(), "TEST" + edge.b()); } }