BUG-9028: make NonPersistentDataProvider schedule invocation 56/62256/1
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 22 Aug 2017 17:59:21 +0000 (19:59 +0200)
committerRobert Varga <nite@hq.sk>
Thu, 24 Aug 2017 08:37:39 +0000 (08:37 +0000)
We need to make NonPersistentDataProvider behave in a fashion
similar to what PersistentDataProvider does for asynchronous
persistence calls, which is schedule execution of the provided
procedure rather than direct execution (which is fair for synchronous
execution).

In order to make that work we introduce ExecuteInSelfActor, which
has an executeInSelf() method, which uses internal mechanics to
schedule the call at a later point.

Change-Id: I116708d98154c8244ea80b4a1a1aa615abc3075d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit b66d6180f06097e3501a88aac9fb684336addd58)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfMessage.java [new file with mode: 0644]

index 9441f28fbc2f51a64716a02295a87087a801f68b..fd6eb17384448bbef1b0885fd0b99d7b4997f0d1 100644 (file)
@@ -750,7 +750,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 captureSnapshot();
             }
         } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
-            setPersistence(new NonPersistentDataProvider() {
+            setPersistence(new NonPersistentDataProvider(this) {
                 /**
                  * The way snapshotting works is,
                  * <ol>
index f4f2b9f6a8d8e31bbf8719ab4e33683fb178cda2..2e05a7e5708b49381082353518b81cd299521f4f 100644 (file)
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
@@ -67,16 +68,19 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         };
     }
 
+    private static DataPersistenceProvider createProvider() {
+        return new NonPersistentDataProvider(Runnable::run);
+    }
+
     public MockRaftActorContext() {
         super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
-                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
+                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG);
         setReplicatedLog(new MockReplicatedLogBuilder().build());
     }
 
     public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
         super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
-            new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
-            applyState -> actor.tell(applyState, actor), LOG);
+            new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG);
 
         this.system = system;
 
index 8945de6bafcb16651768335f79bce74b50c180c9..2dd1c211af37d42e2c251d81db5cc329e1d9b153 100644 (file)
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.junit.After;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
@@ -58,8 +59,8 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         peerMap.put("peer2", null);
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", LOG), -1, -1,
-                peerMap, configParams, new NonPersistentDataProvider(), applyState -> { }, LOG);
+                "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
+                peerMap, configParams, createProvider(), applyState -> { }, LOG);
 
         assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
         assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
@@ -82,9 +83,9 @@ public class RaftActorContextImplTest extends AbstractActorTest {
     public void testSetPeerAddress() {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", LOG), -1, -1,
+                "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
-                new NonPersistentDataProvider(), applyState -> { }, LOG);
+                createProvider(), applyState -> { }, LOG);
 
         context.setPeerAddress("peer1", "peerAddress1_1");
         assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1"));
@@ -96,9 +97,9 @@ public class RaftActorContextImplTest extends AbstractActorTest {
     @Test
     public void testUpdatePeerIds() {
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                "self", new ElectionTermImpl(new NonPersistentDataProvider(), "test", LOG), -1, -1,
+                "self", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")),
-                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
+                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG);
 
         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo("self", false),
                 new ServerInfo("peer2", true), new ServerInfo("peer3", false))));
@@ -120,7 +121,11 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         assertEquals("isVotingMember", false, context.isVotingMember());
     }
 
-    private static void verifyPeerInfo(RaftActorContextImpl context, String peerId, Boolean voting) {
+    private static DataPersistenceProvider createProvider() {
+        return new NonPersistentDataProvider(Runnable::run);
+    }
+
+    private static void verifyPeerInfo(final RaftActorContextImpl context, final String peerId, final Boolean voting) {
         PeerInfo peerInfo = context.getPeerInfo(peerId);
         if (voting != null) {
             assertNotNull("Expected peer " + peerId, peerInfo);
index 929ed6053bc3e7d49ea8e97f58d2db6c3d9d9abc..c22a0e567e592dabbf7461ac8b9fe3af0992578f 100644 (file)
@@ -1494,7 +1494,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
-        NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
+        NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
         termInfo.update(1, LEADER_ID);
         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
index db6fe1a687c64865c6b31bbee1fa2979e81478a8..a810fe0a42524091d22bd95f06adb86336556a40 100644 (file)
@@ -222,7 +222,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
                 ImmutableMap.<String, String>builder().put("member1", "address").build(),
-                config, new NonPersistentDataProvider()), persistenceId);
+                config, createProvider()), persistenceId);
 
         MockRaftActor mockRaftActor = ref.underlyingActor();
 
@@ -245,7 +245,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
                 ImmutableMap.<String, String>builder().put("member1", "address").build(),
-                config, new NonPersistentDataProvider())
+                config, createProvider())
                 .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
 
         InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
@@ -258,7 +258,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
         ref = factory.createTestActor(MockRaftActor.props(persistenceId,
                 ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
-                new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                createProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
                 factory.generateActorId("follower-"));
 
         MockRaftActor actor = ref.underlyingActor();
@@ -433,7 +433,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         final TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder()
                 .id(persistenceId).config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
-                        new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    createProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 persistenceId);
 
         List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
@@ -470,7 +470,7 @@ public class RaftActorTest extends AbstractActorTest {
         final short newLeaderVersion = 6;
         Follower follower = new Follower(raftActor.getRaftActorContext()) {
             @Override
-            public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+            public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
                 setLeaderId(newLeaderId);
                 setLeaderPayloadVersion(newLeaderVersion);
                 return this;
@@ -820,7 +820,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
         config.setSnapshotBatchCount(5);
 
-        DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+        DataPersistenceProvider dataPersistenceProvider = createProvider();
 
         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
 
@@ -864,7 +864,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
         config.setSnapshotBatchCount(5);
 
-        DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+        DataPersistenceProvider dataPersistenceProvider = createProvider();
 
         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
 
@@ -897,6 +897,10 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals(3, leader.getReplicatedToAllIndex());
     }
 
+    private static DataPersistenceProvider createProvider() {
+        return new NonPersistentDataProvider(Runnable::run);
+    }
+
     @Test
     public void testSwitchBehavior() {
         String persistenceId = factory.generateActorId("leader-");
@@ -906,7 +910,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
         config.setSnapshotBatchCount(5);
 
-        DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+        DataPersistenceProvider dataPersistenceProvider = createProvider();
 
         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
 
@@ -938,7 +942,7 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
     }
 
-    public static ByteString fromObject(Object snapshot) throws Exception {
+    public static ByteString fromObject(final Object snapshot) throws Exception {
         ByteArrayOutputStream bos = null;
         ObjectOutputStream os = null;
         try {
index 87fae0fe240305fe2646eaae863860fe6c1a92c6..e39a6744267f5ad3f5155f8edbd9c0d885ed6de8 100644 (file)
@@ -177,7 +177,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
         Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
         RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
                 "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
-                new NonPersistentDataProvider(), applyState -> { }, LOG);
+                new NonPersistentDataProvider(Runnable::run), applyState -> { }, LOG);
         raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
         raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
         raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);
index d4381f7cd09119b8bf1128c5ddc76d5a458f4573..9a4a34cf596d00dbf1d1eca8dd1973b88f3d236d 100644 (file)
@@ -7,8 +7,11 @@
  */
 package org.opendaylight.controller.cluster;
 
+import static java.util.Objects.requireNonNull;
+
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
+import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -18,38 +21,39 @@ import org.slf4j.LoggerFactory;
 public class NonPersistentDataProvider implements DataPersistenceProvider {
     private static final Logger LOG = LoggerFactory.getLogger(NonPersistentDataProvider.class);
 
+    private final ExecuteInSelfActor actor;
+
+    public NonPersistentDataProvider(final ExecuteInSelfActor actor) {
+        this.actor = requireNonNull(actor);
+    }
+
     @Override
     public boolean isRecoveryApplicable() {
         return false;
     }
 
     @Override
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public <T> void persist(T entry, Procedure<T> procedure) {
-        try {
-            procedure.apply(entry);
-        } catch (Exception e) {
-            LOG.error("An unexpected error occurred", e);
-        }
+    public <T> void persist(final T entry, final Procedure<T> procedure) {
+        invokeProcedure(procedure, entry);
     }
 
     @Override
-    public <T> void persistAsync(T entry, Procedure<T> procedure) {
-        persist(entry, procedure);
+    public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
+        actor.executeInSelf(() -> invokeProcedure(procedure, entry));
     }
 
     @Override
-    public void saveSnapshot(Object snapshot) {
+    public void saveSnapshot(final Object snapshot) {
         // no-op
     }
 
     @Override
-    public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+    public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
         // no-op
     }
 
     @Override
-    public void deleteMessages(long sequenceNumber) {
+    public void deleteMessages(final long sequenceNumber) {
         // no-op
     }
 
@@ -57,4 +61,13 @@ public class NonPersistentDataProvider implements DataPersistenceProvider {
     public long getLastSequenceNumber() {
         return -1;
     }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    static <T> void invokeProcedure(final Procedure<T> procedure, final T argument) {
+        try {
+            procedure.apply(argument);
+        } catch (Exception e) {
+            LOG.error("An unexpected error occurred", e);
+        }
+    }
 }
index 3a4419610a155d4cc6c853709cf0b8e1296ac544..d40ec55b94ac96caa4dcd0a7f01de091e3685f74 100644 (file)
@@ -10,10 +10,11 @@ package org.opendaylight.controller.cluster.common.actor;
 
 import akka.actor.ActorRef;
 import akka.actor.UntypedActor;
+import org.eclipse.jdt.annotation.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractUntypedActor extends UntypedActor {
+public abstract class AbstractUntypedActor extends UntypedActor implements ExecuteInSelfActor {
     // The member name should be lower case but it's referenced in many subclasses. Suppressing the CS warning for now.
     @SuppressWarnings("checkstyle:MemberName")
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
@@ -24,8 +25,18 @@ public abstract class AbstractUntypedActor extends UntypedActor {
     }
 
     @Override
-    public final void onReceive(Object message) throws Exception {
-        handleReceive(message);
+    public final void executeInSelf(@NonNull final Runnable runnable) {
+        final ExecuteInSelfMessage message = new ExecuteInSelfMessage(runnable);
+        self().tell(message, ActorRef.noSender());
+    }
+
+    @Override
+    public final void onReceive(final Object message) throws Exception {
+        if (message instanceof ExecuteInSelfMessage) {
+            ((ExecuteInSelfMessage) message).run();
+        } else {
+            handleReceive(message);
+        }
     }
 
     /**
@@ -37,16 +48,16 @@ public abstract class AbstractUntypedActor extends UntypedActor {
      */
     protected abstract void handleReceive(Object message) throws Exception;
 
-    protected final void ignoreMessage(Object message) {
+    protected final void ignoreMessage(final Object message) {
         LOG.debug("Ignoring unhandled message {}", message);
     }
 
-    protected final void unknownMessage(Object message) {
+    protected final void unknownMessage(final Object message) {
         LOG.debug("Received unhandled message {}", message);
         unhandled(message);
     }
 
-    protected boolean isValidSender(ActorRef sender) {
+    protected boolean isValidSender(final ActorRef sender) {
         // If the caller passes in a null sender (ActorRef.noSender()), akka translates that to the
         // deadLetters actor.
         return sender != null && !getContext().system().deadLetters().equals(sender);
index c50ec64e841d288bcb8fafd4f0c10745042db314..7058e77d87b235da2de3dc77f913fb4a486121fa 100644 (file)
@@ -8,11 +8,13 @@
 
 package org.opendaylight.controller.cluster.common.actor;
 
+import akka.actor.ActorRef;
 import akka.persistence.UntypedPersistentActor;
+import org.eclipse.jdt.annotation.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor {
+public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor implements ExecuteInSelfActor {
 
     // The member name should be lower case but it's referenced in many subclasses. Suppressing the CS warning for now.
     @SuppressWarnings("checkstyle:MemberName")
@@ -24,17 +26,29 @@ public abstract class AbstractUntypedPersistentActor extends UntypedPersistentAc
     }
 
     @Override
-    public final void onReceiveCommand(Object message) throws Exception {
+    public final void executeInSelf(@NonNull final Runnable runnable) {
+        final ExecuteInSelfMessage message = new ExecuteInSelfMessage(runnable);
+        LOG.trace("Scheduling execution of {}", message);
+        self().tell(message, ActorRef.noSender());
+    }
+
+    @Override
+    public final void onReceiveCommand(final Object message) throws Exception {
         final String messageType = message.getClass().getSimpleName();
         LOG.trace("Received message {}", messageType);
 
-        handleCommand(message);
+        if (message instanceof ExecuteInSelfMessage) {
+            LOG.trace("Executing {}", message);
+            ((ExecuteInSelfMessage) message).run();
+        } else {
+            handleCommand(message);
+        }
 
         LOG.trace("Done handling message {}", messageType);
     }
 
     @Override
-    public final void onReceiveRecover(Object message) throws Exception {
+    public final void onReceiveRecover(final Object message) throws Exception {
         final String messageType = message.getClass().getSimpleName();
         LOG.trace("Received message {}", messageType);
         handleRecover(message);
@@ -45,11 +59,11 @@ public abstract class AbstractUntypedPersistentActor extends UntypedPersistentAc
 
     protected abstract void handleCommand(Object message) throws Exception;
 
-    protected void ignoreMessage(Object message) {
+    protected void ignoreMessage(final Object message) {
         LOG.debug("Unhandled message {} ", message);
     }
 
-    protected void unknownMessage(Object message) throws Exception {
+    protected void unknownMessage(final Object message) throws Exception {
         LOG.debug("Received unhandled message {}", message);
         unhandled(message);
     }
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfActor.java
new file mode 100644 (file)
index 0000000..fe0bbed
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.japi.Procedure;
+import com.google.common.annotations.Beta;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * Interface implemented by Actors, who can schedule invocation of a {@link Procedure} in their context.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public interface ExecuteInSelfActor {
+    /**
+     * Run a Runnable in the context of this actor.
+     *
+     * @param runnable Runnable to run
+     */
+    void executeInSelf(@NonNull Runnable runnable);
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfMessage.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfMessage.java
new file mode 100644 (file)
index 0000000..1b14e48
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.dispatch.ControlMessage;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * Message internal to {@link ExecuteInSelfActor} implementations in this package.
+ *
+ * @author Robert Varga
+ */
+final class ExecuteInSelfMessage implements ControlMessage {
+    private final Runnable runnable;
+
+    ExecuteInSelfMessage(final @NonNull Runnable runnable) {
+        this.runnable = requireNonNull(runnable);
+    }
+
+    void run() {
+        runnable.run();
+    }
+}