captureSnapshot();
}
} else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
- setPersistence(new NonPersistentDataProvider() {
+ setPersistence(new NonPersistentDataProvider(this) {
/**
* The way snapshotting works is,
* <ol>
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.persisted.ByteState;
};
}
+ private static DataPersistenceProvider createProvider() {
+ return new NonPersistentDataProvider(Runnable::run);
+ }
+
public MockRaftActorContext() {
super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
+ new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG);
setReplicatedLog(new MockReplicatedLogBuilder().build());
}
public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
- applyState -> actor.tell(applyState, actor), LOG);
+ new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG);
this.system = system;
import java.util.Map;
import org.junit.After;
import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
peerMap.put("peer2", null);
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
- "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", LOG), -1, -1,
- peerMap, configParams, new NonPersistentDataProvider(), applyState -> { }, LOG);
+ "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
+ peerMap, configParams, createProvider(), applyState -> { }, LOG);
assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
public void testSetPeerAddress() {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
- "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", LOG), -1, -1,
+ "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
- new NonPersistentDataProvider(), applyState -> { }, LOG);
+ createProvider(), applyState -> { }, LOG);
context.setPeerAddress("peer1", "peerAddress1_1");
assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1"));
@Test
public void testUpdatePeerIds() {
RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
- "self", new ElectionTermImpl(new NonPersistentDataProvider(), "test", LOG), -1, -1,
+ "self", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
+ new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG);
context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo("self", false),
new ServerInfo("peer2", true), new ServerInfo("peer3", false))));
assertEquals("isVotingMember", false, context.isVotingMember());
}
- private static void verifyPeerInfo(RaftActorContextImpl context, String peerId, Boolean voting) {
+ private static DataPersistenceProvider createProvider() {
+ return new NonPersistentDataProvider(Runnable::run);
+ }
+
+ private static void verifyPeerInfo(final RaftActorContextImpl context, final String peerId, final Boolean voting) {
PeerInfo peerInfo = context.getPeerInfo(peerId);
if (voting != null) {
assertNotNull("Expected peer " + peerId, peerInfo);
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(100000);
- NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
+ NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
termInfo.update(1, LEADER_ID);
return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
ImmutableMap.<String, String>builder().put("member1", "address").build(),
- config, new NonPersistentDataProvider()), persistenceId);
+ config, createProvider()), persistenceId);
MockRaftActor mockRaftActor = ref.underlyingActor();
TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
ImmutableMap.<String, String>builder().put("member1", "address").build(),
- config, new NonPersistentDataProvider())
+ config, createProvider())
.withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
ref = factory.createTestActor(MockRaftActor.props(persistenceId,
ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
- new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ createProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
factory.generateActorId("follower-"));
MockRaftActor actor = ref.underlyingActor();
final TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder()
.id(persistenceId).config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
- new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ createProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
persistenceId);
List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
final short newLeaderVersion = 6;
Follower follower = new Follower(raftActor.getRaftActorContext()) {
@Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+ public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
setLeaderId(newLeaderId);
setLeaderPayloadVersion(newLeaderVersion);
return this;
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+ DataPersistenceProvider dataPersistenceProvider = createProvider();
Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+ DataPersistenceProvider dataPersistenceProvider = createProvider();
Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
assertEquals(3, leader.getReplicatedToAllIndex());
}
+ private static DataPersistenceProvider createProvider() {
+ return new NonPersistentDataProvider(Runnable::run);
+ }
+
@Test
public void testSwitchBehavior() {
String persistenceId = factory.generateActorId("leader-");
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+ DataPersistenceProvider dataPersistenceProvider = createProvider();
Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
}
- public static ByteString fromObject(Object snapshot) throws Exception {
+ public static ByteString fromObject(final Object snapshot) throws Exception {
ByteArrayOutputStream bos = null;
ObjectOutputStream os = null;
try {
Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
"candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
- new NonPersistentDataProvider(), applyState -> { }, LOG);
+ new NonPersistentDataProvider(Runnable::run), applyState -> { }, LOG);
raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);
*/
package org.opendaylight.controller.cluster;
+import static java.util.Objects.requireNonNull;
+
import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
+import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NonPersistentDataProvider implements DataPersistenceProvider {
private static final Logger LOG = LoggerFactory.getLogger(NonPersistentDataProvider.class);
+ private final ExecuteInSelfActor actor;
+
+ public NonPersistentDataProvider(final ExecuteInSelfActor actor) {
+ this.actor = requireNonNull(actor);
+ }
+
@Override
public boolean isRecoveryApplicable() {
return false;
}
@Override
- @SuppressWarnings("checkstyle:IllegalCatch")
- public <T> void persist(T entry, Procedure<T> procedure) {
- try {
- procedure.apply(entry);
- } catch (Exception e) {
- LOG.error("An unexpected error occurred", e);
- }
+ public <T> void persist(final T entry, final Procedure<T> procedure) {
+ invokeProcedure(procedure, entry);
}
@Override
- public <T> void persistAsync(T entry, Procedure<T> procedure) {
- persist(entry, procedure);
+ public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
+ actor.executeInSelf(() -> invokeProcedure(procedure, entry));
}
@Override
- public void saveSnapshot(Object snapshot) {
+ public void saveSnapshot(final Object snapshot) {
// no-op
}
@Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
// no-op
}
@Override
- public void deleteMessages(long sequenceNumber) {
+ public void deleteMessages(final long sequenceNumber) {
// no-op
}
public long getLastSequenceNumber() {
return -1;
}
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ static <T> void invokeProcedure(final Procedure<T> procedure, final T argument) {
+ try {
+ procedure.apply(argument);
+ } catch (Exception e) {
+ LOG.error("An unexpected error occurred", e);
+ }
+ }
}
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
+import org.eclipse.jdt.annotation.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractUntypedActor extends UntypedActor {
+public abstract class AbstractUntypedActor extends UntypedActor implements ExecuteInSelfActor {
// The member name should be lower case but it's referenced in many subclasses. Suppressing the CS warning for now.
@SuppressWarnings("checkstyle:MemberName")
protected final Logger LOG = LoggerFactory.getLogger(getClass());
}
@Override
- public final void onReceive(Object message) throws Exception {
- handleReceive(message);
+ public final void executeInSelf(@NonNull final Runnable runnable) {
+ final ExecuteInSelfMessage message = new ExecuteInSelfMessage(runnable);
+ self().tell(message, ActorRef.noSender());
+ }
+
+ @Override
+ public final void onReceive(final Object message) throws Exception {
+ if (message instanceof ExecuteInSelfMessage) {
+ ((ExecuteInSelfMessage) message).run();
+ } else {
+ handleReceive(message);
+ }
}
/**
*/
protected abstract void handleReceive(Object message) throws Exception;
- protected final void ignoreMessage(Object message) {
+ protected final void ignoreMessage(final Object message) {
LOG.debug("Ignoring unhandled message {}", message);
}
- protected final void unknownMessage(Object message) {
+ protected final void unknownMessage(final Object message) {
LOG.debug("Received unhandled message {}", message);
unhandled(message);
}
- protected boolean isValidSender(ActorRef sender) {
+ protected boolean isValidSender(final ActorRef sender) {
// If the caller passes in a null sender (ActorRef.noSender()), akka translates that to the
// deadLetters actor.
return sender != null && !getContext().system().deadLetters().equals(sender);
package org.opendaylight.controller.cluster.common.actor;
+import akka.actor.ActorRef;
import akka.persistence.UntypedPersistentActor;
+import org.eclipse.jdt.annotation.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor {
+public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor implements ExecuteInSelfActor {
// The member name should be lower case but it's referenced in many subclasses. Suppressing the CS warning for now.
@SuppressWarnings("checkstyle:MemberName")
}
@Override
- public final void onReceiveCommand(Object message) throws Exception {
+ public final void executeInSelf(@NonNull final Runnable runnable) {
+ final ExecuteInSelfMessage message = new ExecuteInSelfMessage(runnable);
+ LOG.trace("Scheduling execution of {}", message);
+ self().tell(message, ActorRef.noSender());
+ }
+
+ @Override
+ public final void onReceiveCommand(final Object message) throws Exception {
final String messageType = message.getClass().getSimpleName();
LOG.trace("Received message {}", messageType);
- handleCommand(message);
+ if (message instanceof ExecuteInSelfMessage) {
+ LOG.trace("Executing {}", message);
+ ((ExecuteInSelfMessage) message).run();
+ } else {
+ handleCommand(message);
+ }
LOG.trace("Done handling message {}", messageType);
}
@Override
- public final void onReceiveRecover(Object message) throws Exception {
+ public final void onReceiveRecover(final Object message) throws Exception {
final String messageType = message.getClass().getSimpleName();
LOG.trace("Received message {}", messageType);
handleRecover(message);
protected abstract void handleCommand(Object message) throws Exception;
- protected void ignoreMessage(Object message) {
+ protected void ignoreMessage(final Object message) {
LOG.debug("Unhandled message {} ", message);
}
- protected void unknownMessage(Object message) throws Exception {
+ protected void unknownMessage(final Object message) throws Exception {
LOG.debug("Received unhandled message {}", message);
unhandled(message);
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.japi.Procedure;
+import com.google.common.annotations.Beta;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * Interface implemented by Actors, who can schedule invocation of a {@link Procedure} in their context.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public interface ExecuteInSelfActor {
+ /**
+ * Run a Runnable in the context of this actor.
+ *
+ * @param runnable Runnable to run
+ */
+ void executeInSelf(@NonNull Runnable runnable);
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.dispatch.ControlMessage;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * Message internal to {@link ExecuteInSelfActor} implementations in this package.
+ *
+ * @author Robert Varga
+ */
+final class ExecuteInSelfMessage implements ControlMessage {
+ private final Runnable runnable;
+
+ ExecuteInSelfMessage(final @NonNull Runnable runnable) {
+ this.runnable = requireNonNull(runnable);
+ }
+
+ void run() {
+ runnable.run();
+ }
+}