BUG-9028: make NonPersistentDataProvider schedule invocation 55/62255/1
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 22 Aug 2017 17:59:21 +0000 (19:59 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 24 Aug 2017 08:37:10 +0000 (10:37 +0200)
We need to make NonPersistentDataProvider behave in a fashion
similar to what PersistentDataProvider does for asynchronous
persistence calls, which is schedule execution of the provided
procedure rather than direct execution (which is fair for synchronous
execution).

In order to make that work we introduce ExecuteInSelfActor, which
has an executeInSelf() method, which uses internal mechanics to
schedule the call at a later point.

Change-Id: I116708d98154c8244ea80b4a1a1aa615abc3075d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit b66d6180f06097e3501a88aac9fb684336addd58)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfMessage.java [new file with mode: 0644]

index 9441f28..fd6eb17 100644 (file)
@@ -750,7 +750,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 captureSnapshot();
             }
         } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
-            setPersistence(new NonPersistentDataProvider() {
+            setPersistence(new NonPersistentDataProvider(this) {
                 /**
                  * The way snapshotting works is,
                  * <ol>
index f4f2b9f..2e05a7e 100644 (file)
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
@@ -67,16 +68,19 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         };
     }
 
+    private static DataPersistenceProvider createProvider() {
+        return new NonPersistentDataProvider(Runnable::run);
+    }
+
     public MockRaftActorContext() {
         super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
-                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
+                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG);
         setReplicatedLog(new MockReplicatedLogBuilder().build());
     }
 
     public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
         super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
-            new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
-            applyState -> actor.tell(applyState, actor), LOG);
+            new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG);
 
         this.system = system;
 
index 8945de6..2dd1c21 100644 (file)
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.junit.After;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
@@ -58,8 +59,8 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         peerMap.put("peer2", null);
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", LOG), -1, -1,
-                peerMap, configParams, new NonPersistentDataProvider(), applyState -> { }, LOG);
+                "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
+                peerMap, configParams, createProvider(), applyState -> { }, LOG);
 
         assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
         assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
@@ -82,9 +83,9 @@ public class RaftActorContextImplTest extends AbstractActorTest {
     public void testSetPeerAddress() {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", LOG), -1, -1,
+                "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
-                new NonPersistentDataProvider(), applyState -> { }, LOG);
+                createProvider(), applyState -> { }, LOG);
 
         context.setPeerAddress("peer1", "peerAddress1_1");
         assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1"));
@@ -96,9 +97,9 @@ public class RaftActorContextImplTest extends AbstractActorTest {
     @Test
     public void testUpdatePeerIds() {
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                "self", new ElectionTermImpl(new NonPersistentDataProvider(), "test", LOG), -1, -1,
+                "self", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")),
-                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
+                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG);
 
         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo("self", false),
                 new ServerInfo("peer2", true), new ServerInfo("peer3", false))));
@@ -120,7 +121,11 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         assertEquals("isVotingMember", false, context.isVotingMember());
     }
 
-    private static void verifyPeerInfo(RaftActorContextImpl context, String peerId, Boolean voting) {
+    private static DataPersistenceProvider createProvider() {
+        return new NonPersistentDataProvider(Runnable::run);
+    }
+
+    private static void verifyPeerInfo(final RaftActorContextImpl context, final String peerId, final Boolean voting) {
         PeerInfo peerInfo = context.getPeerInfo(peerId);
         if (voting != null) {
             assertNotNull("Expected peer " + peerId, peerInfo);
index 929ed60..c22a0e5 100644 (file)
@@ -1494,7 +1494,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
-        NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
+        NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
         termInfo.update(1, LEADER_ID);
         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
index db6fe1a..a810fe0 100644 (file)
@@ -222,7 +222,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
                 ImmutableMap.<String, String>builder().put("member1", "address").build(),
-                config, new NonPersistentDataProvider()), persistenceId);
+                config, createProvider()), persistenceId);
 
         MockRaftActor mockRaftActor = ref.underlyingActor();
 
@@ -245,7 +245,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
                 ImmutableMap.<String, String>builder().put("member1", "address").build(),
-                config, new NonPersistentDataProvider())
+                config, createProvider())
                 .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
 
         InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
@@ -258,7 +258,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
         ref = factory.createTestActor(MockRaftActor.props(persistenceId,
                 ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
-                new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                createProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
                 factory.generateActorId("follower-"));
 
         MockRaftActor actor = ref.underlyingActor();
@@ -433,7 +433,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         final TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder()
                 .id(persistenceId).config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
-                        new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    createProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 persistenceId);
 
         List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
@@ -470,7 +470,7 @@ public class RaftActorTest extends AbstractActorTest {
         final short newLeaderVersion = 6;
         Follower follower = new Follower(raftActor.getRaftActorContext()) {
             @Override
-            public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+            public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
                 setLeaderId(newLeaderId);
                 setLeaderPayloadVersion(newLeaderVersion);
                 return this;
@@ -820,7 +820,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
         config.setSnapshotBatchCount(5);
 
-        DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+        DataPersistenceProvider dataPersistenceProvider = createProvider();
 
         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
 
@@ -864,7 +864,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
         config.setSnapshotBatchCount(5);
 
-        DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+        DataPersistenceProvider dataPersistenceProvider = createProvider();
 
         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
 
@@ -897,6 +897,10 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals(3, leader.getReplicatedToAllIndex());
     }
 
+    private static DataPersistenceProvider createProvider() {
+        return new NonPersistentDataProvider(Runnable::run);
+    }
+
     @Test
     public void testSwitchBehavior() {
         String persistenceId = factory.generateActorId("leader-");
@@ -906,7 +910,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
         config.setSnapshotBatchCount(5);
 
-        DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+        DataPersistenceProvider dataPersistenceProvider = createProvider();
 
         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
 
@@ -938,7 +942,7 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
     }
 
-    public static ByteString fromObject(Object snapshot) throws Exception {
+    public static ByteString fromObject(final Object snapshot) throws Exception {
         ByteArrayOutputStream bos = null;
         ObjectOutputStream os = null;
         try {
index 87fae0f..e39a674 100644 (file)
@@ -177,7 +177,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
         Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
         RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
                 "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
-                new NonPersistentDataProvider(), applyState -> { }, LOG);
+                new NonPersistentDataProvider(Runnable::run), applyState -> { }, LOG);
         raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
         raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
         raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);
index d4381f7..9a4a34c 100644 (file)
@@ -7,8 +7,11 @@
  */
 package org.opendaylight.controller.cluster;
 
+import static java.util.Objects.requireNonNull;
+
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
+import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -18,38 +21,39 @@ import org.slf4j.LoggerFactory;
 public class NonPersistentDataProvider implements DataPersistenceProvider {
     private static final Logger LOG = LoggerFactory.getLogger(NonPersistentDataProvider.class);
 
+    private final ExecuteInSelfActor actor;
+
+    public NonPersistentDataProvider(final ExecuteInSelfActor actor) {
+        this.actor = requireNonNull(actor);
+    }
+
     @Override
     public boolean isRecoveryApplicable() {
         return false;
     }
 
     @Override
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public <T> void persist(T entry, Procedure<T> procedure) {
-        try {
-            procedure.apply(entry);
-        } catch (Exception e) {
-            LOG.error("An unexpected error occurred", e);
-        }
+    public <T> void persist(final T entry, final Procedure<T> procedure) {
+        invokeProcedure(procedure, entry);
     }
 
     @Override
-    public <T> void persistAsync(T entry, Procedure<T> procedure) {
-        persist(entry, procedure);
+    public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
+        actor.executeInSelf(() -> invokeProcedure(procedure, entry));
     }
 
     @Override
-    public void saveSnapshot(Object snapshot) {
+    public void saveSnapshot(final Object snapshot) {
         // no-op
     }
 
     @Override
-    public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+    public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
         // no-op
     }
 
     @Override
-    public void deleteMessages(long sequenceNumber) {
+    public void deleteMessages(final long sequenceNumber) {
         // no-op
     }
 
@@ -57,4 +61,13 @@ public class NonPersistentDataProvider implements DataPersistenceProvider {
     public long getLastSequenceNumber() {
         return -1;
     }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    static <T> void invokeProcedure(final Procedure<T> procedure, final T argument) {
+        try {
+            procedure.apply(argument);
+        } catch (Exception e) {
+            LOG.error("An unexpected error occurred", e);
+        }
+    }
 }
index 3a44196..d40ec55 100644 (file)
@@ -10,10 +10,11 @@ package org.opendaylight.controller.cluster.common.actor;
 
 import akka.actor.ActorRef;
 import akka.actor.UntypedActor;
+import org.eclipse.jdt.annotation.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractUntypedActor extends UntypedActor {
+public abstract class AbstractUntypedActor extends UntypedActor implements ExecuteInSelfActor {
     // The member name should be lower case but it's referenced in many subclasses. Suppressing the CS warning for now.
     @SuppressWarnings("checkstyle:MemberName")
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
@@ -24,8 +25,18 @@ public abstract class AbstractUntypedActor extends UntypedActor {
     }
 
     @Override
-    public final void onReceive(Object message) throws Exception {
-        handleReceive(message);
+    public final void executeInSelf(@NonNull final Runnable runnable) {
+        final ExecuteInSelfMessage message = new ExecuteInSelfMessage(runnable);
+        self().tell(message, ActorRef.noSender());
+    }
+
+    @Override
+    public final void onReceive(final Object message) throws Exception {
+        if (message instanceof ExecuteInSelfMessage) {
+            ((ExecuteInSelfMessage) message).run();
+        } else {
+            handleReceive(message);
+        }
     }
 
     /**
@@ -37,16 +48,16 @@ public abstract class AbstractUntypedActor extends UntypedActor {
      */
     protected abstract void handleReceive(Object message) throws Exception;
 
-    protected final void ignoreMessage(Object message) {
+    protected final void ignoreMessage(final Object message) {
         LOG.debug("Ignoring unhandled message {}", message);
     }
 
-    protected final void unknownMessage(Object message) {
+    protected final void unknownMessage(final Object message) {
         LOG.debug("Received unhandled message {}", message);
         unhandled(message);
     }
 
-    protected boolean isValidSender(ActorRef sender) {
+    protected boolean isValidSender(final ActorRef sender) {
         // If the caller passes in a null sender (ActorRef.noSender()), akka translates that to the
         // deadLetters actor.
         return sender != null && !getContext().system().deadLetters().equals(sender);
index c50ec64..7058e77 100644 (file)
@@ -8,11 +8,13 @@
 
 package org.opendaylight.controller.cluster.common.actor;
 
+import akka.actor.ActorRef;
 import akka.persistence.UntypedPersistentActor;
+import org.eclipse.jdt.annotation.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor {
+public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor implements ExecuteInSelfActor {
 
     // The member name should be lower case but it's referenced in many subclasses. Suppressing the CS warning for now.
     @SuppressWarnings("checkstyle:MemberName")
@@ -24,17 +26,29 @@ public abstract class AbstractUntypedPersistentActor extends UntypedPersistentAc
     }
 
     @Override
-    public final void onReceiveCommand(Object message) throws Exception {
+    public final void executeInSelf(@NonNull final Runnable runnable) {
+        final ExecuteInSelfMessage message = new ExecuteInSelfMessage(runnable);
+        LOG.trace("Scheduling execution of {}", message);
+        self().tell(message, ActorRef.noSender());
+    }
+
+    @Override
+    public final void onReceiveCommand(final Object message) throws Exception {
         final String messageType = message.getClass().getSimpleName();
         LOG.trace("Received message {}", messageType);
 
-        handleCommand(message);
+        if (message instanceof ExecuteInSelfMessage) {
+            LOG.trace("Executing {}", message);
+            ((ExecuteInSelfMessage) message).run();
+        } else {
+            handleCommand(message);
+        }
 
         LOG.trace("Done handling message {}", messageType);
     }
 
     @Override
-    public final void onReceiveRecover(Object message) throws Exception {
+    public final void onReceiveRecover(final Object message) throws Exception {
         final String messageType = message.getClass().getSimpleName();
         LOG.trace("Received message {}", messageType);
         handleRecover(message);
@@ -45,11 +59,11 @@ public abstract class AbstractUntypedPersistentActor extends UntypedPersistentAc
 
     protected abstract void handleCommand(Object message) throws Exception;
 
-    protected void ignoreMessage(Object message) {
+    protected void ignoreMessage(final Object message) {
         LOG.debug("Unhandled message {} ", message);
     }
 
-    protected void unknownMessage(Object message) throws Exception {
+    protected void unknownMessage(final Object message) throws Exception {
         LOG.debug("Received unhandled message {}", message);
         unhandled(message);
     }
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfActor.java
new file mode 100644 (file)
index 0000000..fe0bbed
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.japi.Procedure;
+import com.google.common.annotations.Beta;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * Interface implemented by Actors, who can schedule invocation of a {@link Procedure} in their context.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public interface ExecuteInSelfActor {
+    /**
+     * Run a Runnable in the context of this actor.
+     *
+     * @param runnable Runnable to run
+     */
+    void executeInSelf(@NonNull Runnable runnable);
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfMessage.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfMessage.java
new file mode 100644 (file)
index 0000000..1b14e48
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.dispatch.ControlMessage;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * Message internal to {@link ExecuteInSelfActor} implementations in this package.
+ *
+ * @author Robert Varga
+ */
+final class ExecuteInSelfMessage implements ControlMessage {
+    private final Runnable runnable;
+
+    ExecuteInSelfMessage(final @NonNull Runnable runnable) {
+        this.runnable = requireNonNull(runnable);
+    }
+
+    void run() {
+        runnable.run();
+    }
+}

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.