diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java new file mode 100644 index 0000000000000..c7b0c8b7c5471 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.StringUtils; + +import java.nio.ByteBuffer; + +/** Unique (at least statistically unique) identifier for a Flink Application. */ +@PublicEvolving +public final class ApplicationID extends AbstractID { + + private static final long serialVersionUID = 1L; + + /** Creates a new (statistically) random ApplicationID. */ + public ApplicationID() { + super(); + } + + /** + * Creates a new ApplicationID, using the given lower and upper parts. + * + * @param lowerPart The lower 8 bytes of the ID. + * @param upperPart The upper 8 bytes of the ID. + */ + public ApplicationID(long lowerPart, long upperPart) { + super(lowerPart, upperPart); + } + + /** + * Creates a new ApplicationID from the given byte sequence. The byte sequence must be exactly + * 16 bytes long. The first eight bytes make up the lower part of the ID, while the next 8 bytes + * make up the upper part of the ID. + * + * @param bytes The byte sequence. + */ + public ApplicationID(byte[] bytes) { + super(bytes); + } + + // ------------------------------------------------------------------------ + // Static factory methods + // ------------------------------------------------------------------------ + + /** + * Creates a new (statistically) random ApplicationID. + * + * @return A new random ApplicationID. + */ + public static ApplicationID generate() { + return new ApplicationID(); + } + + /** + * Creates a new ApplicationID from the given byte sequence. The byte sequence must be exactly + * 16 bytes long. The first eight bytes make up the lower part of the ID, while the next 8 bytes + * make up the upper part of the ID. + * + * @param bytes The byte sequence. + * @return A new ApplicationID corresponding to the ID encoded in the bytes. + */ + public static ApplicationID fromByteArray(byte[] bytes) { + return new ApplicationID(bytes); + } + + public static ApplicationID fromByteBuffer(ByteBuffer buf) { + long lower = buf.getLong(); + long upper = buf.getLong(); + return new ApplicationID(lower, upper); + } + + /** + * Parses an ApplicationID from the given string. + * + * @param hexString string representation of an ApplicationID + * @return Parsed ApplicationID + * @throws IllegalArgumentException if the ApplicationID could not be parsed from the given + * string + */ + public static ApplicationID fromHexString(String hexString) { + try { + return new ApplicationID(StringUtils.hexStringToByte(hexString)); + } catch (Exception e) { + throw new IllegalArgumentException( + "Cannot parse ApplicationID from \"" + + hexString + + "\". The expected format is " + + "[0-9a-fA-F]{32}, e.g. fd72014d4c864993a2e5a9287b4a9c5d.", + e); + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java new file mode 100644 index 0000000000000..b5b7f5db04d2e --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.DeploymentOptions; + +/** Possible states of an application. */ +@PublicEvolving +public enum ApplicationState { + + /** The application is newly created and has not started running. */ + CREATED(false), + + /** The application has started running. */ + RUNNING(false), + + /** The application has encountered a failure and is waiting for the cleanup to complete. */ + FAILING(false), + + /** The application has failed due to an exception. */ + FAILED(true), + + /** The application is being canceled. */ + CANCELING(false), + + /** The application has been canceled. */ + CANCELED(true), + + /** + * All jobs in the application have completed, See {@link + * DeploymentOptions#TERMINATE_APPLICATION_ON_ANY_JOB_EXCEPTION} for more information. + */ + FINISHED(true); + + // -------------------------------------------------------------------------------------------- + + private final boolean terminalState; + + ApplicationState(boolean terminalState) { + this.terminalState = terminalState; + } + + public boolean isTerminalState() { + return terminalState; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java new file mode 100644 index 0000000000000..94a0621fa4089 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.application; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.concurrent.ScheduledExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** Base class for all applications. */ +public abstract class AbstractApplication implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractApplication.class); + + private static final long serialVersionUID = 1L; + + private final ApplicationID applicationId; + + private ApplicationState applicationState; + + /** + * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()}) when the + * application transitioned into a certain status. The index into this array is the ordinal of + * the enum value, i.e. the timestamp when the application went into state "RUNNING" is at + * {@code timestamps[RUNNING.ordinal()]}. + */ + private final long[] statusTimestamps; + + private final Set jobs = new HashSet<>(); + + public AbstractApplication(ApplicationID applicationId) { + this.applicationId = applicationId; + this.statusTimestamps = new long[ApplicationState.values().length]; + this.applicationState = ApplicationState.CREATED; + this.statusTimestamps[ApplicationState.CREATED.ordinal()] = System.currentTimeMillis(); + } + + /** + * Entry method to run the application asynchronously. + * + *

The returned CompletableFuture indicates that the execution request has been accepted and + * the application transitions to RUNNING state. + * + *

Note: This method must be called in the main thread of the {@link Dispatcher}. + * + * @param dispatcherGateway the dispatcher of the cluster to run the application. + * @param scheduledExecutor the executor to run the user logic. + * @param mainThreadExecutor the executor bound to the main thread. + * @param errorHandler the handler for fatal errors. + * @return a future indicating that the execution request has been accepted. + */ + public abstract CompletableFuture execute( + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor, + final Executor mainThreadExecutor, + final FatalErrorHandler errorHandler); + + /** + * Cancels the application execution. + * + *

This method is responsible for initiating the cancellation process and handling the + * appropriate state transitions of the application. + * + *

Note: This method must be called in the main thread of the {@link Dispatcher}. + */ + public abstract void cancel(); + + /** + * Cleans up execution associated with the application. + * + *

This method is typically invoked when the cluster is shutting down. + */ + public abstract void dispose(); + + public abstract String getName(); + + public ApplicationID getApplicationId() { + return applicationId; + } + + public Set getJobs() { + return Collections.unmodifiableSet(jobs); + } + + /** + * Adds a job ID to the jobs set. + * + *

Note:This method must be called in the main thread of the {@link Dispatcher}. + */ + public boolean addJob(JobID jobId) { + return jobs.add(jobId); + } + + public ApplicationState getApplicationStatus() { + return applicationState; + } + + // ------------------------------------------------------------------------ + // State Transitions + // ------------------------------------------------------------------------ + + private static final Map> ALLOWED_TRANSITIONS; + + static { + ALLOWED_TRANSITIONS = new EnumMap<>(ApplicationState.class); + ALLOWED_TRANSITIONS.put( + ApplicationState.CREATED, + new HashSet<>(Arrays.asList(ApplicationState.RUNNING, ApplicationState.CANCELING))); + ALLOWED_TRANSITIONS.put( + ApplicationState.RUNNING, + new HashSet<>( + Arrays.asList( + ApplicationState.FINISHED, + ApplicationState.FAILING, + ApplicationState.CANCELING))); + ALLOWED_TRANSITIONS.put( + ApplicationState.FAILING, + new HashSet<>(Collections.singletonList(ApplicationState.FAILED))); + ALLOWED_TRANSITIONS.put( + ApplicationState.CANCELING, + new HashSet<>(Collections.singletonList(ApplicationState.CANCELED))); + } + + /** All state transition methods must be called in the main thread. */ + public void transitionToRunning() { + transitionState(ApplicationState.RUNNING); + } + + /** All state transition methods must be called in the main thread. */ + public void transitionToCanceling() { + transitionState(ApplicationState.CANCELING); + } + + /** All state transition methods must be called in the main thread. */ + public void transitionToFailing() { + transitionState(ApplicationState.FAILING); + } + + /** All state transition methods must be called in the main thread. */ + public void transitionToFailed() { + transitionState(ApplicationState.FAILED); + } + + /** All state transition methods must be called in the main thread. */ + public void transitionToFinished() { + transitionState(ApplicationState.FINISHED); + } + + /** All state transition methods must be called in the main thread. */ + public void transitionToCanceled() { + transitionState(ApplicationState.CANCELED); + } + + void transitionState(ApplicationState targetState) { + validateTransition(targetState); + LOG.info( + "Application {} ({}) switched from state {} to {}.", + getName(), + getApplicationId(), + applicationState, + targetState); + this.statusTimestamps[targetState.ordinal()] = System.currentTimeMillis(); + this.applicationState = targetState; + } + + private void validateTransition(ApplicationState targetState) { + Set allowedTransitions = ALLOWED_TRANSITIONS.get(applicationState); + if (allowedTransitions == null || !allowedTransitions.contains(targetState)) { + throw new IllegalStateException( + String.format( + "Invalid transition from %s to %s", applicationState, targetState)); + } + } + + public long getStatusTimestamp(ApplicationState status) { + return this.statusTimestamps[status.ordinal()]; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java new file mode 100644 index 0000000000000..ef609e41c3ead --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.application; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.concurrent.ScheduledExecutor; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for {@link AbstractApplication}. */ +public class AbstractApplicationTest { + + @Test + void testInitialization() { + ApplicationID applicationID = new ApplicationID(); + long start = System.currentTimeMillis(); + AbstractApplication application = new MockApplication(applicationID); + long end = System.currentTimeMillis(); + + assertEquals(applicationID, application.getApplicationId()); + + assertEquals(ApplicationState.CREATED, application.getApplicationStatus()); + + long ts = application.getStatusTimestamp(ApplicationState.CREATED); + assertTrue(start <= ts && ts <= end); + } + + @Test + void testAddJob() { + AbstractApplication application = new MockApplication(new ApplicationID()); + JobID jobId = JobID.generate(); + + boolean added = application.addJob(jobId); + assertTrue(added); + assertEquals(1, application.getJobs().size()); + assertTrue(application.getJobs().contains(jobId)); + + added = application.addJob(jobId); + assertFalse(added); + assertEquals(1, application.getJobs().size()); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"RUNNING", "CANCELING"}) + void testTransitionFromCreated(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + + long start = System.currentTimeMillis(); + application.transitionState(targetState); + long end = System.currentTimeMillis(); + + assertEquals(targetState, application.getApplicationStatus()); + + long ts = application.getStatusTimestamp(targetState); + assertTrue(start <= ts && ts <= end); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"CREATED", "FINISHED", "FAILING", "CANCELED", "FAILED"}) + void testTransitionFromCreatedToUnsupportedStates(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"FINISHED", "FAILING", "CANCELING"}) + void testTransitionFromRunning(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToRunning(); + + long start = System.currentTimeMillis(); + application.transitionState(targetState); + long end = System.currentTimeMillis(); + + assertEquals(targetState, application.getApplicationStatus()); + + long ts = application.getStatusTimestamp(targetState); + assertTrue(start <= ts && ts <= end); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"CREATED", "RUNNING", "FAILED", "CANCELED"}) + void testTransitionFromRunningToUnsupportedStates(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToRunning(); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + @Test + void testTransitionFromCanceling() { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToCanceling(); + + long start = System.currentTimeMillis(); + application.transitionToCanceled(); + long end = System.currentTimeMillis(); + + assertEquals(ApplicationState.CANCELED, application.getApplicationStatus()); + + long ts = application.getStatusTimestamp(ApplicationState.CANCELED); + assertTrue(start <= ts && ts <= end); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"CREATED", "RUNNING", "CANCELING", "FAILING", "FINISHED", "FAILED"}) + void testTransitionFromCancelingToUnsupportedStates(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToCanceling(); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + @Test + void testTransitionFromFailing() { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToRunning(); + application.transitionToFailing(); + + long start = System.currentTimeMillis(); + application.transitionToFailed(); + long end = System.currentTimeMillis(); + + assertEquals(ApplicationState.FAILED, application.getApplicationStatus()); + + long ts = application.getStatusTimestamp(ApplicationState.FAILED); + assertTrue(start <= ts && ts <= end); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"CREATED", "RUNNING", "CANCELING", "FAILING", "FINISHED", "CANCELED"}) + void testTransitionFromFailingToUnsupportedStates(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToRunning(); + application.transitionToFailing(); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + @ParameterizedTest + @EnumSource(value = ApplicationState.class) + void testTransitionFromFinished(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToRunning(); + application.transitionToFinished(); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + @ParameterizedTest + @EnumSource(value = ApplicationState.class) + void testTransitionFromCanceled(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToCanceling(); + application.transitionToCanceled(); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + @ParameterizedTest + @EnumSource(value = ApplicationState.class) + void testTransitionFromFailed(ApplicationState targetState) { + AbstractApplication application = new MockApplication(new ApplicationID()); + application.transitionToRunning(); + application.transitionToFailing(); + application.transitionToFailed(); + + assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); + } + + private static class MockApplication extends AbstractApplication { + public MockApplication(ApplicationID applicationId) { + super(applicationId); + } + + @Override + public CompletableFuture execute( + DispatcherGateway dispatcherGateway, + ScheduledExecutor scheduledExecutor, + Executor mainThreadExecutor, + FatalErrorHandler errorHandler) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public void cancel() {} + + @Override + public void dispose() {} + + @Override + public String getName() { + return "Mock Application"; + } + } +}