BUG-9028: make NonPersistentDataProvider schedule invocation 69/62169/3
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 22 Aug 2017 17:59:21 +0000 (19:59 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 23 Aug 2017 09:28:06 +0000 (11:28 +0200)
We need to make NonPersistentDataProvider behave in a fashion
similar to what PersistentDataProvider does for asynchronous
persistence calls, which is schedule execution of the provided
procedure rather than direct execution (which is fair for synchronous
execution).

In order to make that work we introduce ExecuteInSelfActor, which
has an executeInSelf() method, which uses internal mechanics to
schedule the call at a later point.

Change-Id: I116708d98154c8244ea80b4a1a1aa615abc3075d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfMessage.java [new file with mode: 0644]

index fecf223346dcda3a8d2ab01b726506de0fe58b29..bc03b24ac0dca7266e0cc50e9a116f564d632757 100644 (file)
@@ -745,7 +745,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 captureSnapshot();
             }
         } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
-            setPersistence(new NonPersistentDataProvider() {
+            setPersistence(new NonPersistentDataProvider(this) {
                 /**
                  * The way snapshotting works is,
                  * <ol>
index 05b1d34a0f8b921cffbd192c0b9bb9cfefc6c494..858c249735d36f37c029eb751d3069f885596a40 100644 (file)
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
@@ -55,29 +56,32 @@ public class MockRaftActorContext extends RaftActorContextImpl {
             }
 
             @Override
-            public void update(long newTerm, String newVotedFor) {
+            public void update(final long newTerm, final String newVotedFor) {
                 this.currentTerm = newTerm;
                 this.votedFor = newVotedFor;
 
                 // TODO : Write to some persistent state
             }
 
-            @Override public void updateAndPersist(long newTerm, String newVotedFor) {
+            @Override public void updateAndPersist(final long newTerm, final String newVotedFor) {
                 update(newTerm, newVotedFor);
             }
         };
     }
 
+    private static DataPersistenceProvider createProvider() {
+        return new NonPersistentDataProvider(Runnable::run);
+    }
+
     public MockRaftActorContext() {
         super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
-                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
+                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG);
         setReplicatedLog(new MockReplicatedLogBuilder().build());
     }
 
-    public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) {
+    public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
         super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
-            new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
-            applyState -> actor.tell(applyState, actor), LOG);
+            new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG);
 
         this.system = system;
 
@@ -95,11 +99,11 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         setLastApplied(replicatedLog.lastIndex());
     }
 
-    @Override public ActorRef actorOf(Props props) {
+    @Override public ActorRef actorOf(final Props props) {
         return system.actorOf(props);
     }
 
-    @Override public ActorSelection actorSelection(String path) {
+    @Override public ActorSelection actorSelection(final String path) {
         return system.actorSelection(path);
     }
 
@@ -107,7 +111,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         return this.system;
     }
 
-    @Override public ActorSelection getPeerActorSelection(String peerId) {
+    @Override public ActorSelection getPeerActorSelection(final String peerId) {
         String peerAddress = getPeerAddress(peerId);
         if (peerAddress != null) {
             return actorSelection(peerAddress);
@@ -115,7 +119,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         return null;
     }
 
-    public void setPeerAddresses(Map<String, String> peerAddresses) {
+    public void setPeerAddresses(final Map<String, String> peerAddresses) {
         for (String id: getPeerIds()) {
             removePeer(id);
         }
@@ -132,23 +136,23 @@ public class MockRaftActorContext extends RaftActorContextImpl {
 
         snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
             @Override
-            public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
+            public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
                 return ByteState.of(snapshotBytes.read());
             }
 
             @Override
-            public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+            public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
             }
 
             @Override
-            public void applySnapshot(State snapshotState) {
+            public void applySnapshot(final State snapshotState) {
             }
         });
 
         return snapshotManager;
     }
 
-    public void setCreateSnapshotProcedure(Consumer<Optional<OutputStream>> createSnapshotProcedure) {
+    public void setCreateSnapshotProcedure(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
         this.createSnapshotProcedure = createSnapshotProcedure;
     }
 
@@ -157,7 +161,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
     }
 
-    public void setRaftPolicy(RaftPolicy raftPolicy) {
+    public void setRaftPolicy(final RaftPolicy raftPolicy) {
         this.raftPolicy = raftPolicy;
     }
 
@@ -168,23 +172,23 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         }
 
         @Override
-        public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
+        public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
         }
 
         @Override
-        public boolean shouldCaptureSnapshot(long logIndex) {
+        public boolean shouldCaptureSnapshot(final long logIndex) {
             return false;
         }
 
         @Override
-        public boolean removeFromAndPersist(long index) {
+        public boolean removeFromAndPersist(final long index) {
             return removeFrom(index) >= 0;
         }
 
         @Override
         @SuppressWarnings("checkstyle:IllegalCatch")
-        public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback,
-                boolean doAsync) {
+        public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
+                final Procedure<ReplicatedLogEntry> callback, final boolean doAsync) {
             append(replicatedLogEntry);
 
             if (callback != null) {
@@ -207,12 +211,12 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         public MockPayload() {
         }
 
-        public MockPayload(String data) {
+        public MockPayload(final String data) {
             this.value = data;
             size = value.length();
         }
 
-        public MockPayload(String data, int size) {
+        public MockPayload(final String data, final int size) {
             this(data);
             this.size = size;
         }
@@ -236,7 +240,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         }
 
         @Override
-        public boolean equals(Object obj) {
+        public boolean equals(final Object obj) {
             if (this == obj) {
                 return true;
             }
@@ -261,7 +265,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
     public static class MockReplicatedLogBuilder {
         private final ReplicatedLog mockLog = new SimpleReplicatedLog();
 
-        public  MockReplicatedLogBuilder createEntries(int start, int end, int term) {
+        public  MockReplicatedLogBuilder createEntries(final int start, final int end, final int term) {
             for (int i = start; i < end; i++) {
                 this.mockLog.append(new SimpleReplicatedLogEntry(i, term,
                         new MockRaftActorContext.MockPayload(Integer.toString(i))));
@@ -269,7 +273,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
             return this;
         }
 
-        public  MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) {
+        public  MockReplicatedLogBuilder addEntry(final int index, final int term, final MockPayload payload) {
             this.mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
             return this;
         }
index 56523cb60b233a5487da8c6c83173ec63b2a1cad..8db7aa48ee14666d1403a19d602c0ff6e8231305 100644 (file)
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.junit.After;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
@@ -58,8 +59,8 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         peerMap.put("peer2", null);
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
-                peerMap, configParams, new NonPersistentDataProvider(), applyState -> { }, log);
+                "test", new ElectionTermImpl(createProvider(), "test", log), -1, -1,
+                peerMap, configParams, createProvider(), applyState -> { }, log);
 
         assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
         assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
@@ -82,9 +83,9 @@ public class RaftActorContextImplTest extends AbstractActorTest {
     public void testSetPeerAddress() {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
+                "test", new ElectionTermImpl(createProvider(), "test", log), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
-                new NonPersistentDataProvider(), applyState -> { }, log);
+                createProvider(), applyState -> { }, log);
 
         context.setPeerAddress("peer1", "peerAddress1_1");
         assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1"));
@@ -96,9 +97,9 @@ public class RaftActorContextImplTest extends AbstractActorTest {
     @Test
     public void testUpdatePeerIds() {
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                "self", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
+                "self", new ElectionTermImpl(createProvider(), "test", log), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")),
-                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, log);
+                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, log);
 
         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo("self", false),
                 new ServerInfo("peer2", true), new ServerInfo("peer3", false))));
@@ -120,7 +121,11 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         assertEquals("isVotingMember", false, context.isVotingMember());
     }
 
-    private static void verifyPeerInfo(RaftActorContextImpl context, String peerId, Boolean voting) {
+    private static DataPersistenceProvider createProvider() {
+        return new NonPersistentDataProvider(Runnable::run);
+    }
+
+    private static void verifyPeerInfo(final RaftActorContextImpl context, final String peerId, final Boolean voting) {
         PeerInfo peerInfo = context.getPeerInfo(peerId);
         if (voting != null) {
             assertNotNull("Expected peer " + peerId, peerInfo);
index 929ed6053bc3e7d49ea8e97f58d2db6c3d9d9abc..c22a0e567e592dabbf7461ac8b9fe3af0992578f 100644 (file)
@@ -1494,7 +1494,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
-        NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
+        NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
         termInfo.update(1, LEADER_ID);
         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
index db6fe1a687c64865c6b31bbee1fa2979e81478a8..a810fe0a42524091d22bd95f06adb86336556a40 100644 (file)
@@ -222,7 +222,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
                 ImmutableMap.<String, String>builder().put("member1", "address").build(),
-                config, new NonPersistentDataProvider()), persistenceId);
+                config, createProvider()), persistenceId);
 
         MockRaftActor mockRaftActor = ref.underlyingActor();
 
@@ -245,7 +245,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
                 ImmutableMap.<String, String>builder().put("member1", "address").build(),
-                config, new NonPersistentDataProvider())
+                config, createProvider())
                 .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
 
         InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
@@ -258,7 +258,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
         ref = factory.createTestActor(MockRaftActor.props(persistenceId,
                 ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
-                new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                createProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
                 factory.generateActorId("follower-"));
 
         MockRaftActor actor = ref.underlyingActor();
@@ -433,7 +433,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         final TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder()
                 .id(persistenceId).config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
-                        new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    createProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 persistenceId);
 
         List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
@@ -470,7 +470,7 @@ public class RaftActorTest extends AbstractActorTest {
         final short newLeaderVersion = 6;
         Follower follower = new Follower(raftActor.getRaftActorContext()) {
             @Override
-            public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+            public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
                 setLeaderId(newLeaderId);
                 setLeaderPayloadVersion(newLeaderVersion);
                 return this;
@@ -820,7 +820,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
         config.setSnapshotBatchCount(5);
 
-        DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+        DataPersistenceProvider dataPersistenceProvider = createProvider();
 
         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
 
@@ -864,7 +864,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
         config.setSnapshotBatchCount(5);
 
-        DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+        DataPersistenceProvider dataPersistenceProvider = createProvider();
 
         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
 
@@ -897,6 +897,10 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals(3, leader.getReplicatedToAllIndex());
     }
 
+    private static DataPersistenceProvider createProvider() {
+        return new NonPersistentDataProvider(Runnable::run);
+    }
+
     @Test
     public void testSwitchBehavior() {
         String persistenceId = factory.generateActorId("leader-");
@@ -906,7 +910,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
         config.setSnapshotBatchCount(5);
 
-        DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+        DataPersistenceProvider dataPersistenceProvider = createProvider();
 
         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
 
@@ -938,7 +942,7 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
     }
 
-    public static ByteString fromObject(Object snapshot) throws Exception {
+    public static ByteString fromObject(final Object snapshot) throws Exception {
         ByteArrayOutputStream bos = null;
         ObjectOutputStream os = null;
         try {
index 4124a94d5fb3f6be5c0ed86f0cc21f56cd39d2cc..61c7f631ddf00ddf211d9fff4a7da00d838de507 100644 (file)
@@ -177,7 +177,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
         Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
         RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
                 "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
-                new NonPersistentDataProvider(), applyState -> { }, LOG);
+                new NonPersistentDataProvider(Runnable::run), applyState -> { }, LOG);
         raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
         raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
         raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);
index d4381f7cd09119b8bf1128c5ddc76d5a458f4573..9a4a34cf596d00dbf1d1eca8dd1973b88f3d236d 100644 (file)
@@ -7,8 +7,11 @@
  */
 package org.opendaylight.controller.cluster;
 
+import static java.util.Objects.requireNonNull;
+
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
+import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -18,38 +21,39 @@ import org.slf4j.LoggerFactory;
 public class NonPersistentDataProvider implements DataPersistenceProvider {
     private static final Logger LOG = LoggerFactory.getLogger(NonPersistentDataProvider.class);
 
+    private final ExecuteInSelfActor actor;
+
+    public NonPersistentDataProvider(final ExecuteInSelfActor actor) {
+        this.actor = requireNonNull(actor);
+    }
+
     @Override
     public boolean isRecoveryApplicable() {
         return false;
     }
 
     @Override
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public <T> void persist(T entry, Procedure<T> procedure) {
-        try {
-            procedure.apply(entry);
-        } catch (Exception e) {
-            LOG.error("An unexpected error occurred", e);
-        }
+    public <T> void persist(final T entry, final Procedure<T> procedure) {
+        invokeProcedure(procedure, entry);
     }
 
     @Override
-    public <T> void persistAsync(T entry, Procedure<T> procedure) {
-        persist(entry, procedure);
+    public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
+        actor.executeInSelf(() -> invokeProcedure(procedure, entry));
     }
 
     @Override
-    public void saveSnapshot(Object snapshot) {
+    public void saveSnapshot(final Object snapshot) {
         // no-op
     }
 
     @Override
-    public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+    public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
         // no-op
     }
 
     @Override
-    public void deleteMessages(long sequenceNumber) {
+    public void deleteMessages(final long sequenceNumber) {
         // no-op
     }
 
@@ -57,4 +61,13 @@ public class NonPersistentDataProvider implements DataPersistenceProvider {
     public long getLastSequenceNumber() {
         return -1;
     }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    static <T> void invokeProcedure(final Procedure<T> procedure, final T argument) {
+        try {
+            procedure.apply(argument);
+        } catch (Exception e) {
+            LOG.error("An unexpected error occurred", e);
+        }
+    }
 }
index 3a4419610a155d4cc6c853709cf0b8e1296ac544..d40ec55b94ac96caa4dcd0a7f01de091e3685f74 100644 (file)
@@ -10,10 +10,11 @@ package org.opendaylight.controller.cluster.common.actor;
 
 import akka.actor.ActorRef;
 import akka.actor.UntypedActor;
+import org.eclipse.jdt.annotation.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractUntypedActor extends UntypedActor {
+public abstract class AbstractUntypedActor extends UntypedActor implements ExecuteInSelfActor {
     // The member name should be lower case but it's referenced in many subclasses. Suppressing the CS warning for now.
     @SuppressWarnings("checkstyle:MemberName")
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
@@ -24,8 +25,18 @@ public abstract class AbstractUntypedActor extends UntypedActor {
     }
 
     @Override
-    public final void onReceive(Object message) throws Exception {
-        handleReceive(message);
+    public final void executeInSelf(@NonNull final Runnable runnable) {
+        final ExecuteInSelfMessage message = new ExecuteInSelfMessage(runnable);
+        self().tell(message, ActorRef.noSender());
+    }
+
+    @Override
+    public final void onReceive(final Object message) throws Exception {
+        if (message instanceof ExecuteInSelfMessage) {
+            ((ExecuteInSelfMessage) message).run();
+        } else {
+            handleReceive(message);
+        }
     }
 
     /**
@@ -37,16 +48,16 @@ public abstract class AbstractUntypedActor extends UntypedActor {
      */
     protected abstract void handleReceive(Object message) throws Exception;
 
-    protected final void ignoreMessage(Object message) {
+    protected final void ignoreMessage(final Object message) {
         LOG.debug("Ignoring unhandled message {}", message);
     }
 
-    protected final void unknownMessage(Object message) {
+    protected final void unknownMessage(final Object message) {
         LOG.debug("Received unhandled message {}", message);
         unhandled(message);
     }
 
-    protected boolean isValidSender(ActorRef sender) {
+    protected boolean isValidSender(final ActorRef sender) {
         // If the caller passes in a null sender (ActorRef.noSender()), akka translates that to the
         // deadLetters actor.
         return sender != null && !getContext().system().deadLetters().equals(sender);
index c50ec64e841d288bcb8fafd4f0c10745042db314..7058e77d87b235da2de3dc77f913fb4a486121fa 100644 (file)
@@ -8,11 +8,13 @@
 
 package org.opendaylight.controller.cluster.common.actor;
 
+import akka.actor.ActorRef;
 import akka.persistence.UntypedPersistentActor;
+import org.eclipse.jdt.annotation.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor {
+public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor implements ExecuteInSelfActor {
 
     // The member name should be lower case but it's referenced in many subclasses. Suppressing the CS warning for now.
     @SuppressWarnings("checkstyle:MemberName")
@@ -24,17 +26,29 @@ public abstract class AbstractUntypedPersistentActor extends UntypedPersistentAc
     }
 
     @Override
-    public final void onReceiveCommand(Object message) throws Exception {
+    public final void executeInSelf(@NonNull final Runnable runnable) {
+        final ExecuteInSelfMessage message = new ExecuteInSelfMessage(runnable);
+        LOG.trace("Scheduling execution of {}", message);
+        self().tell(message, ActorRef.noSender());
+    }
+
+    @Override
+    public final void onReceiveCommand(final Object message) throws Exception {
         final String messageType = message.getClass().getSimpleName();
         LOG.trace("Received message {}", messageType);
 
-        handleCommand(message);
+        if (message instanceof ExecuteInSelfMessage) {
+            LOG.trace("Executing {}", message);
+            ((ExecuteInSelfMessage) message).run();
+        } else {
+            handleCommand(message);
+        }
 
         LOG.trace("Done handling message {}", messageType);
     }
 
     @Override
-    public final void onReceiveRecover(Object message) throws Exception {
+    public final void onReceiveRecover(final Object message) throws Exception {
         final String messageType = message.getClass().getSimpleName();
         LOG.trace("Received message {}", messageType);
         handleRecover(message);
@@ -45,11 +59,11 @@ public abstract class AbstractUntypedPersistentActor extends UntypedPersistentAc
 
     protected abstract void handleCommand(Object message) throws Exception;
 
-    protected void ignoreMessage(Object message) {
+    protected void ignoreMessage(final Object message) {
         LOG.debug("Unhandled message {} ", message);
     }
 
-    protected void unknownMessage(Object message) throws Exception {
+    protected void unknownMessage(final Object message) throws Exception {
         LOG.debug("Received unhandled message {}", message);
         unhandled(message);
     }
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfActor.java
new file mode 100644 (file)
index 0000000..fe0bbed
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.japi.Procedure;
+import com.google.common.annotations.Beta;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * Interface implemented by Actors, who can schedule invocation of a {@link Procedure} in their context.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public interface ExecuteInSelfActor {
+    /**
+     * Run a Runnable in the context of this actor.
+     *
+     * @param runnable Runnable to run
+     */
+    void executeInSelf(@NonNull Runnable runnable);
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfMessage.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExecuteInSelfMessage.java
new file mode 100644 (file)
index 0000000..1b14e48
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.dispatch.ControlMessage;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * Message internal to {@link ExecuteInSelfActor} implementations in this package.
+ *
+ * @author Robert Varga
+ */
+final class ExecuteInSelfMessage implements ControlMessage {
+    private final Runnable runnable;
+
+    ExecuteInSelfMessage(final @NonNull Runnable runnable) {
+        this.runnable = requireNonNull(runnable);
+    }
+
+    void run() {
+        runnable.run();
+    }
+}