From 5ac895c3913cfff3799fa6e45b0d9865e9f44275 Mon Sep 17 00:00:00 2001 From: Yi Zhang Date: Wed, 24 Sep 2025 11:08:27 +0800 Subject: [PATCH 1/2] [FLINK-38757][runtime] Introduce the base class for application --- .../flink/api/common/ApplicationID.java | 110 ++++++++ .../flink/api/common/ApplicationState.java | 63 +++++ .../application/AbstractApplication.java | 213 ++++++++++++++++ .../application/AbstractApplicationTest.java | 240 ++++++++++++++++++ 4 files changed, 626 insertions(+) create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java new file mode 100644 index 0000000000000..c7b0c8b7c5471 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.StringUtils; + +import java.nio.ByteBuffer; + +/** Unique (at least statistically unique) identifier for a Flink Application. */ +@PublicEvolving +public final class ApplicationID extends AbstractID { + + private static final long serialVersionUID = 1L; + + /** Creates a new (statistically) random ApplicationID. */ + public ApplicationID() { + super(); + } + + /** + * Creates a new ApplicationID, using the given lower and upper parts. + * + * @param lowerPart The lower 8 bytes of the ID. + * @param upperPart The upper 8 bytes of the ID. + */ + public ApplicationID(long lowerPart, long upperPart) { + super(lowerPart, upperPart); + } + + /** + * Creates a new ApplicationID from the given byte sequence. The byte sequence must be exactly + * 16 bytes long. The first eight bytes make up the lower part of the ID, while the next 8 bytes + * make up the upper part of the ID. + * + * @param bytes The byte sequence. + */ + public ApplicationID(byte[] bytes) { + super(bytes); + } + + // ------------------------------------------------------------------------ + // Static factory methods + // ------------------------------------------------------------------------ + + /** + * Creates a new (statistically) random ApplicationID. + * + * @return A new random ApplicationID. + */ + public static ApplicationID generate() { + return new ApplicationID(); + } + + /** + * Creates a new ApplicationID from the given byte sequence. The byte sequence must be exactly + * 16 bytes long. The first eight bytes make up the lower part of the ID, while the next 8 bytes + * make up the upper part of the ID. + * + * @param bytes The byte sequence. + * @return A new ApplicationID corresponding to the ID encoded in the bytes. + */ + public static ApplicationID fromByteArray(byte[] bytes) { + return new ApplicationID(bytes); + } + + public static ApplicationID fromByteBuffer(ByteBuffer buf) { + long lower = buf.getLong(); + long upper = buf.getLong(); + return new ApplicationID(lower, upper); + } + + /** + * Parses an ApplicationID from the given string. + * + * @param hexString string representation of an ApplicationID + * @return Parsed ApplicationID + * @throws IllegalArgumentException if the ApplicationID could not be parsed from the given + * string + */ + public static ApplicationID fromHexString(String hexString) { + try { + return new ApplicationID(StringUtils.hexStringToByte(hexString)); + } catch (Exception e) { + throw new IllegalArgumentException( + "Cannot parse ApplicationID from \"" + + hexString + + "\". The expected format is " + + "[0-9a-fA-F]{32}, e.g. fd72014d4c864993a2e5a9287b4a9c5d.", + e); + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java new file mode 100644 index 0000000000000..7880c92e97dec --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.DeploymentOptions; + +/** Possible states of an application. */ +@PublicEvolving +public enum ApplicationState { + + /** The application is newly created and has not started running. */ + CREATED(false), + + /** The application has started running. */ + RUNNING(false), + + /** The application has encountered a failure and is waiting for the cleanup to complete. */ + FAILING(false), + + /** The application has failed due to an exception. */ + FAILED(true), + + /** The application is being cancelled. */ + CANCELLING(false), + + /** The application has been cancelled. */ + CANCELED(true), + + /** + * All jobs in the application have completed, See {@link + * DeploymentOptions#TERMINATE_APPLICATION_ON_ANY_JOB_EXCEPTION} for more information. + */ + FINISHED(true); + + // -------------------------------------------------------------------------------------------- + + private final boolean terminalState; + + ApplicationState(boolean terminalState) { + this.terminalState = terminalState; + } + + public boolean isTerminalState() { + return terminalState; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java new file mode 100644 index 0000000000000..e351998506122 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.application; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.concurrent.ScheduledExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** Base class for all applications. */ +@Internal +public abstract class AbstractApplication implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractApplication.class); + + private static final long serialVersionUID = 1L; + + private final ApplicationID applicationId; + + private ApplicationState applicationState; + + /** + * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()}) when the + * application transitioned into a certain status. The index into this array is the ordinal of + * the enum value, i.e. the timestamp when the application went into state "RUNNING" is at + * {@code timestamps[RUNNING.ordinal()]}. + */ + private final long[] statusTimestamps; + + private final Set jobs = new HashSet<>(); + + public AbstractApplication(ApplicationID applicationId) { + this.applicationId = applicationId; + this.statusTimestamps = new long[ApplicationState.values().length]; + this.applicationState = ApplicationState.CREATED; + this.statusTimestamps[ApplicationState.CREATED.ordinal()] = System.currentTimeMillis(); + } + + /** + * Entry method to run the application asynchronously. + * + *

The returned CompletableFuture indicates that the execution request has been accepted and + * the application transitions to RUNNING state. + * + *

Note: This method must be called on the main thread. + * + * @param dispatcherGateway the dispatcher of the cluster to run the application. + * @param scheduledExecutor the executor to run the user logic. + * @param mainThreadExecutor the executor bound to the main thread. + * @param errorHandler the handler for fatal errors. + * @return a future indicating that the execution request has been accepted. + */ + public abstract CompletableFuture execute( + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor, + final Executor mainThreadExecutor, + final FatalErrorHandler errorHandler); + + /** + * Cancels the application execution. + * + *

This method is responsible for initiating the cancellation process and handling the + * appropriate state transitions of the application. + * + *

Note: This method must be called from the main thread. + */ + public abstract void cancel(); + + /** + * Cleans up execution associated with the application. + * + *

This method is typically invoked when the cluster is shutting down. + */ + public abstract void dispose(); + + public abstract String getName(); + + public ApplicationID getApplicationId() { + return applicationId; + } + + public Set getJobs() { + return Collections.unmodifiableSet(jobs); + } + + /** + * Adds a job ID to the jobs set. + * + *

This method is not thread-safe and should not be called concurrently. + */ + public boolean addJob(JobID jobId) { + return jobs.add(jobId); + } + + public ApplicationState getApplicationStatus() { + return applicationState; + } + + // ------------------------------------------------------------------------ + // State Transitions + // ------------------------------------------------------------------------ + + private static final Map> ALLOWED_TRANSITIONS; + + static { + ALLOWED_TRANSITIONS = new EnumMap<>(ApplicationState.class); + ALLOWED_TRANSITIONS.put( + ApplicationState.CREATED, + new HashSet<>( + Arrays.asList(ApplicationState.RUNNING, ApplicationState.CANCELLING))); + ALLOWED_TRANSITIONS.put( + ApplicationState.RUNNING, + new HashSet<>( + Arrays.asList( + ApplicationState.FINISHED, + ApplicationState.FAILING, + ApplicationState.CANCELLING))); + ALLOWED_TRANSITIONS.put( + ApplicationState.FAILING, + new HashSet<>(Collections.singletonList(ApplicationState.FAILED))); + ALLOWED_TRANSITIONS.put( + ApplicationState.CANCELLING, + new HashSet<>(Collections.singletonList(ApplicationState.CANCELED))); + } + + /** All state transition methods must be called from the main method. */ + public void transitionToRunning() { + transitionState(ApplicationState.RUNNING); + } + + /** All state transition methods must be called from the main method. */ + public void transitionToCancelling() { + transitionState(ApplicationState.CANCELLING); + } + + /** All state transition methods must be called from the main method. */ + public void transitionToFailing() { + transitionState(ApplicationState.FAILING); + } + + /** All state transition methods must be called from the main method. */ + public void transitionToFailed() { + transitionState(ApplicationState.FAILED); + } + + /** All state transition methods must be called from the main method. */ + public void transitionToFinished() { + transitionState(ApplicationState.FINISHED); + } + + /** All state transition methods must be called from the main method. */ + public void transitionToCanceled() { + transitionState(ApplicationState.CANCELED); + } + + void transitionState(ApplicationState targetState) { + validateTransition(targetState); + LOG.info( + "Application {} ({}) switched from state {} to {}.", + getName(), + getApplicationId(), + applicationState, + targetState); + this.statusTimestamps[targetState.ordinal()] = System.currentTimeMillis(); + this.applicationState = targetState; + } + + private void validateTransition(ApplicationState targetState) { + Set allowedTransitions = ALLOWED_TRANSITIONS.get(applicationState); + if (allowedTransitions == null || !allowedTransitions.contains(targetState)) { + throw new IllegalStateException( + String.format( + "Invalid transition from %s to %s", applicationState, targetState)); + } + } + + public long getStatusTimestamp(ApplicationState status) { + return this.statusTimestamps[status.ordinal()]; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java new file mode 100644 index 0000000000000..850e7c85e2e14 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.application; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.concurrent.ScheduledExecutor; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for {@link AbstractApplication}. */ +public class AbstractApplicationTest { + + @Test + void testInitialization() { + ApplicationID applicationID = new ApplicationID(); + long start = System.currentTimeMillis(); + AbstractApplication application = new MockApplication(applicationID); + long end = System.currentTimeMillis(); + + assertEquals(applicationID, application.getApplicationId()); + + assertEquals(ApplicationState.CREATED, application.getApplicationStatus()); + + long ts = application.getStatusTimestamp(ApplicationState.CREATED); + assertTrue(start <= ts && ts <= end); + } + + @Test + void testAddJob() { + AbstractApplication application = new MockApplication(new ApplicationID()); + JobID jobId = JobID.generate(); + + boolean added = application.addJob(jobId); + assertTrue(added); + assertEquals(1, application.getJobs().size()); + assertTrue(application.getJobs().contains(jobId)); + + added = application.addJob(jobId); + assertFalse(added); + assertEquals(1, application.getJobs().size()); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"RUNNING", "CANCELLING"}) + void testTransitionFromCreated(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + + long start = System.currentTimeMillis(); + application.transitionState(targetState); + long end = System.currentTimeMillis(); + + assertEquals(targetState, application.getApplicationStatus()); + + long ts = application.getStatusTimestamp(targetState); + assertTrue(start <= ts && ts <= end); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"CREATED", "FINISHED", "FAILING", "CANCELED", "FAILED"}) + void testTransitionFromCreatedToUnsupportedStates(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"FINISHED", "FAILING", "CANCELLING"}) + void testTransitionFromRunning(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToRunning(); + + long start = System.currentTimeMillis(); + application.transitionState(targetState); + long end = System.currentTimeMillis(); + + assertEquals(targetState, application.getApplicationStatus()); + + long ts = application.getStatusTimestamp(targetState); + assertTrue(start <= ts && ts <= end); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"CREATED", "RUNNING", "FAILED", "CANCELED"}) + void testTransitionFromRunningToUnsupportedStates(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToRunning(); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + @Test + void testTransitionFromCancelling() { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToCancelling(); + + long start = System.currentTimeMillis(); + application.transitionToCanceled(); + long end = System.currentTimeMillis(); + + assertEquals(ApplicationState.CANCELED, application.getApplicationStatus()); + + long ts = application.getStatusTimestamp(ApplicationState.CANCELED); + assertTrue(start <= ts && ts <= end); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"CREATED", "RUNNING", "CANCELLING", "FAILING", "FINISHED", "FAILED"}) + void testTransitionFromCancelingToUnsupportedStates(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToCancelling(); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + @Test + void testTransitionFromFailing() { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToRunning(); + application.transitionToFailing(); + + long start = System.currentTimeMillis(); + application.transitionToFailed(); + long end = System.currentTimeMillis(); + + assertEquals(ApplicationState.FAILED, application.getApplicationStatus()); + + long ts = application.getStatusTimestamp(ApplicationState.FAILED); + assertTrue(start <= ts && ts <= end); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"CREATED", "RUNNING", "CANCELLING", "FAILING", "FINISHED", "CANCELED"}) + void testTransitionFromFailingToUnsupportedStates(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToRunning(); + application.transitionToFailing(); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + @ParameterizedTest + @EnumSource(value = ApplicationState.class) + void testTransitionFromFinished(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToRunning(); + application.transitionToFinished(); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + @ParameterizedTest + @EnumSource(value = ApplicationState.class) + void testTransitionFromCanceled(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToCancelling(); + application.transitionToCanceled(); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + @ParameterizedTest + @EnumSource(value = ApplicationState.class) + void testTransitionFromFailed(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToRunning(); + application.transitionToFailing(); + application.transitionToFailed(); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + private static class MockApplication extends AbstractApplication { + public MockApplication(ApplicationID applicationId) { + super(applicationId); + } + + @Override + public CompletableFuture execute( + DispatcherGateway dispatcherGateway, + ScheduledExecutor scheduledExecutor, + Executor mainThreadExecutor, + FatalErrorHandler errorHandler) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public void cancel() {} + + @Override + public void dispose() {} + + @Override + public String getName() { + return "Mock Application"; + } + } +} From f5fb572a83f42f58409622cc42ad0cef142581ae Mon Sep 17 00:00:00 2001 From: Yi Zhang Date: Mon, 15 Dec 2025 15:24:05 +0800 Subject: [PATCH 2/2] refactor: rename to 'canceling' and optimize comments --- .../flink/api/common/ApplicationState.java | 6 ++-- .../application/AbstractApplication.java | 32 +++++++++---------- .../application/AbstractApplicationTest.java | 16 +++++----- 3 files changed, 26 insertions(+), 28 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java index 7880c92e97dec..b5b7f5db04d2e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java @@ -37,10 +37,10 @@ public enum ApplicationState { /** The application has failed due to an exception. */ FAILED(true), - /** The application is being cancelled. */ - CANCELLING(false), + /** The application is being canceled. */ + CANCELING(false), - /** The application has been cancelled. */ + /** The application has been canceled. */ CANCELED(true), /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java index e351998506122..94a0621fa4089 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java @@ -18,10 +18,10 @@ package org.apache.flink.runtime.application; -import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.ApplicationState; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.Dispatcher; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -41,7 +41,6 @@ import java.util.concurrent.Executor; /** Base class for all applications. */ -@Internal public abstract class AbstractApplication implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(AbstractApplication.class); @@ -75,7 +74,7 @@ public AbstractApplication(ApplicationID applicationId) { *

The returned CompletableFuture indicates that the execution request has been accepted and * the application transitions to RUNNING state. * - *

Note: This method must be called on the main thread. + *

Note: This method must be called in the main thread of the {@link Dispatcher}. * * @param dispatcherGateway the dispatcher of the cluster to run the application. * @param scheduledExecutor the executor to run the user logic. @@ -95,7 +94,7 @@ public abstract CompletableFuture execute( *

This method is responsible for initiating the cancellation process and handling the * appropriate state transitions of the application. * - *

Note: This method must be called from the main thread. + *

Note: This method must be called in the main thread of the {@link Dispatcher}. */ public abstract void cancel(); @@ -119,7 +118,7 @@ public Set getJobs() { /** * Adds a job ID to the jobs set. * - *

This method is not thread-safe and should not be called concurrently. + *

Note:This method must be called in the main thread of the {@link Dispatcher}. */ public boolean addJob(JobID jobId) { return jobs.add(jobId); @@ -139,49 +138,48 @@ public ApplicationState getApplicationStatus() { ALLOWED_TRANSITIONS = new EnumMap<>(ApplicationState.class); ALLOWED_TRANSITIONS.put( ApplicationState.CREATED, - new HashSet<>( - Arrays.asList(ApplicationState.RUNNING, ApplicationState.CANCELLING))); + new HashSet<>(Arrays.asList(ApplicationState.RUNNING, ApplicationState.CANCELING))); ALLOWED_TRANSITIONS.put( ApplicationState.RUNNING, new HashSet<>( Arrays.asList( ApplicationState.FINISHED, ApplicationState.FAILING, - ApplicationState.CANCELLING))); + ApplicationState.CANCELING))); ALLOWED_TRANSITIONS.put( ApplicationState.FAILING, new HashSet<>(Collections.singletonList(ApplicationState.FAILED))); ALLOWED_TRANSITIONS.put( - ApplicationState.CANCELLING, + ApplicationState.CANCELING, new HashSet<>(Collections.singletonList(ApplicationState.CANCELED))); } - /** All state transition methods must be called from the main method. */ + /** All state transition methods must be called in the main thread. */ public void transitionToRunning() { transitionState(ApplicationState.RUNNING); } - /** All state transition methods must be called from the main method. */ - public void transitionToCancelling() { - transitionState(ApplicationState.CANCELLING); + /** All state transition methods must be called in the main thread. */ + public void transitionToCanceling() { + transitionState(ApplicationState.CANCELING); } - /** All state transition methods must be called from the main method. */ + /** All state transition methods must be called in the main thread. */ public void transitionToFailing() { transitionState(ApplicationState.FAILING); } - /** All state transition methods must be called from the main method. */ + /** All state transition methods must be called in the main thread. */ public void transitionToFailed() { transitionState(ApplicationState.FAILED); } - /** All state transition methods must be called from the main method. */ + /** All state transition methods must be called in the main thread. */ public void transitionToFinished() { transitionState(ApplicationState.FINISHED); } - /** All state transition methods must be called from the main method. */ + /** All state transition methods must be called in the main thread. */ public void transitionToCanceled() { transitionState(ApplicationState.CANCELED); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java index 850e7c85e2e14..ef609e41c3ead 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java @@ -74,7 +74,7 @@ void testAddJob() { @ParameterizedTest @EnumSource( value = ApplicationState.class, - names = {"RUNNING", "CANCELLING"}) + names = {"RUNNING", "CANCELING"}) void testTransitionFromCreated(ApplicationState targetState) { AbstractApplication application = new MockApplication(new ApplicationID()); @@ -101,7 +101,7 @@ void testTransitionFromCreatedToUnsupportedStates(ApplicationState targetState) @ParameterizedTest @EnumSource( value = ApplicationState.class, - names = {"FINISHED", "FAILING", "CANCELLING"}) + names = {"FINISHED", "FAILING", "CANCELING"}) void testTransitionFromRunning(ApplicationState targetState) { AbstractApplication application = new MockApplication(new ApplicationID()); application.transitionToRunning(); @@ -128,9 +128,9 @@ void testTransitionFromRunningToUnsupportedStates(ApplicationState targetState) } @Test - void testTransitionFromCancelling() { + void testTransitionFromCanceling() { AbstractApplication application = new MockApplication(new ApplicationID()); - application.transitionToCancelling(); + application.transitionToCanceling(); long start = System.currentTimeMillis(); application.transitionToCanceled(); @@ -145,10 +145,10 @@ void testTransitionFromCancelling() { @ParameterizedTest @EnumSource( value = ApplicationState.class, - names = {"CREATED", "RUNNING", "CANCELLING", "FAILING", "FINISHED", "FAILED"}) + names = {"CREATED", "RUNNING", "CANCELING", "FAILING", "FINISHED", "FAILED"}) void testTransitionFromCancelingToUnsupportedStates(ApplicationState targetState) { AbstractApplication application = new MockApplication(new ApplicationID()); - application.transitionToCancelling(); + application.transitionToCanceling(); assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); } @@ -172,7 +172,7 @@ void testTransitionFromFailing() { @ParameterizedTest @EnumSource( value = ApplicationState.class, - names = {"CREATED", "RUNNING", "CANCELLING", "FAILING", "FINISHED", "CANCELED"}) + names = {"CREATED", "RUNNING", "CANCELING", "FAILING", "FINISHED", "CANCELED"}) void testTransitionFromFailingToUnsupportedStates(ApplicationState targetState) { AbstractApplication application = new MockApplication(new ApplicationID()); application.transitionToRunning(); @@ -195,7 +195,7 @@ void testTransitionFromFinished(ApplicationState targetState) { @EnumSource(value = ApplicationState.class) void testTransitionFromCanceled(ApplicationState targetState) { AbstractApplication application = new MockApplication(new ApplicationID()); - application.transitionToCancelling(); + application.transitionToCanceling(); application.transitionToCanceled(); assertThrows(IllegalStateException.class, () -> application.transitionState(targetState));