captureSnapshot();
}
} else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
- setPersistence(new NonPersistentDataProvider() {
+ setPersistence(new NonPersistentDataProvider(this) {
/**
* The way snapshotting works is,
* <ol>
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.persisted.ByteState;
}
@Override
- public void update(long newTerm, String newVotedFor) {
+ public void update(final long newTerm, final String newVotedFor) {
this.currentTerm = newTerm;
this.votedFor = newVotedFor;
// TODO : Write to some persistent state
}
- @Override public void updateAndPersist(long newTerm, String newVotedFor) {
+ @Override public void updateAndPersist(final long newTerm, final String newVotedFor) {
update(newTerm, newVotedFor);
}
};
}
+ private static DataPersistenceProvider createProvider() {
+ return new NonPersistentDataProvider(Runnable::run);
+ }
+
public MockRaftActorContext() {
super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
+ new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG);
setReplicatedLog(new MockReplicatedLogBuilder().build());
}
- public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) {
+ public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
- applyState -> actor.tell(applyState, actor), LOG);
+ new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG);
this.system = system;
setLastApplied(replicatedLog.lastIndex());
}
- @Override public ActorRef actorOf(Props props) {
+ @Override public ActorRef actorOf(final Props props) {
return system.actorOf(props);
}
- @Override public ActorSelection actorSelection(String path) {
+ @Override public ActorSelection actorSelection(final String path) {
return system.actorSelection(path);
}
return this.system;
}
- @Override public ActorSelection getPeerActorSelection(String peerId) {
+ @Override public ActorSelection getPeerActorSelection(final String peerId) {
String peerAddress = getPeerAddress(peerId);
if (peerAddress != null) {
return actorSelection(peerAddress);
return null;
}
- public void setPeerAddresses(Map<String, String> peerAddresses) {
+ public void setPeerAddresses(final Map<String, String> peerAddresses) {
for (String id: getPeerIds()) {
removePeer(id);
}
snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
@Override
- public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
+ public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
return ByteState.of(snapshotBytes.read());
}
@Override
- public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
}
@Override
- public void applySnapshot(State snapshotState) {
+ public void applySnapshot(final State snapshotState) {
}
});
return snapshotManager;
}
- public void setCreateSnapshotProcedure(Consumer<Optional<OutputStream>> createSnapshotProcedure) {
+ public void setCreateSnapshotProcedure(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
this.createSnapshotProcedure = createSnapshotProcedure;
}
return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
}
- public void setRaftPolicy(RaftPolicy raftPolicy) {
+ public void setRaftPolicy(final RaftPolicy raftPolicy) {
this.raftPolicy = raftPolicy;
}
}
@Override
- public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
+ public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
}
@Override
- public boolean shouldCaptureSnapshot(long logIndex) {
+ public boolean shouldCaptureSnapshot(final long logIndex) {
return false;
}
@Override
- public boolean removeFromAndPersist(long index) {
+ public boolean removeFromAndPersist(final long index) {
return removeFrom(index) >= 0;
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback,
- boolean doAsync) {
+ public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
+ final Procedure<ReplicatedLogEntry> callback, final boolean doAsync) {
append(replicatedLogEntry);
if (callback != null) {
public MockPayload() {
}
- public MockPayload(String data) {
+ public MockPayload(final String data) {
this.value = data;
size = value.length();
}
- public MockPayload(String data, int size) {
+ public MockPayload(final String data, final int size) {
this(data);
this.size = size;
}
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
public static class MockReplicatedLogBuilder {
private final ReplicatedLog mockLog = new SimpleReplicatedLog();
- public MockReplicatedLogBuilder createEntries(int start, int end, int term) {
+ public MockReplicatedLogBuilder createEntries(final int start, final int end, final int term) {
for (int i = start; i < end; i++) {
this.mockLog.append(new SimpleReplicatedLogEntry(i, term,
new MockRaftActorContext.MockPayload(Integer.toString(i))));
return this;
}
- public MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) {
+ public MockReplicatedLogBuilder addEntry(final int index, final int term, final MockPayload payload) {
this.mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
return this;
}
import java.util.Map;
import org.junit.After;
import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
peerMap.put("peer2", null);
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
- "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
- peerMap, configParams, new NonPersistentDataProvider(), applyState -> { }, log);
+ "test", new ElectionTermImpl(createProvider(), "test", log), -1, -1,
+ peerMap, configParams, createProvider(), applyState -> { }, log);
assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
public void testSetPeerAddress() {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
- "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
+ "test", new ElectionTermImpl(createProvider(), "test", log), -1, -1,
Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
- new NonPersistentDataProvider(), applyState -> { }, log);
+ createProvider(), applyState -> { }, log);
context.setPeerAddress("peer1", "peerAddress1_1");
assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1"));
@Test
public void testUpdatePeerIds() {
RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
- "self", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
+ "self", new ElectionTermImpl(createProvider(), "test", log), -1, -1,
Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, log);
+ new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, log);
context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo("self", false),
new ServerInfo("peer2", true), new ServerInfo("peer3", false))));
assertEquals("isVotingMember", false, context.isVotingMember());
}
- private static void verifyPeerInfo(RaftActorContextImpl context, String peerId, Boolean voting) {
+ private static DataPersistenceProvider createProvider() {
+ return new NonPersistentDataProvider(Runnable::run);
+ }
+
+ private static void verifyPeerInfo(final RaftActorContextImpl context, final String peerId, final Boolean voting) {
PeerInfo peerInfo = context.getPeerInfo(peerId);
if (voting != null) {
assertNotNull("Expected peer " + peerId, peerInfo);
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(100000);
- NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
+ NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
termInfo.update(1, LEADER_ID);
return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
ImmutableMap.<String, String>builder().put("member1", "address").build(),
- config, new NonPersistentDataProvider()), persistenceId);
+ config, createProvider()), persistenceId);
MockRaftActor mockRaftActor = ref.underlyingActor();
TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
ImmutableMap.<String, String>builder().put("member1", "address").build(),
- config, new NonPersistentDataProvider())
+ config, createProvider())
.withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
ref = factory.createTestActor(MockRaftActor.props(persistenceId,
ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
- new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ createProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
factory.generateActorId("follower-"));
MockRaftActor actor = ref.underlyingActor();
final TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder()
.id(persistenceId).config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
- new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ createProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
persistenceId);
List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
final short newLeaderVersion = 6;
Follower follower = new Follower(raftActor.getRaftActorContext()) {
@Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+ public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
setLeaderId(newLeaderId);
setLeaderPayloadVersion(newLeaderVersion);
return this;
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+ DataPersistenceProvider dataPersistenceProvider = createProvider();
Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+ DataPersistenceProvider dataPersistenceProvider = createProvider();
Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
assertEquals(3, leader.getReplicatedToAllIndex());
}
+ private static DataPersistenceProvider createProvider() {
+ return new NonPersistentDataProvider(Runnable::run);
+ }
+
@Test
public void testSwitchBehavior() {
String persistenceId = factory.generateActorId("leader-");
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+ DataPersistenceProvider dataPersistenceProvider = createProvider();
Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
}
- public static ByteString fromObject(Object snapshot) throws Exception {
+ public static ByteString fromObject(final Object snapshot) throws Exception {
ByteArrayOutputStream bos = null;
ObjectOutputStream os = null;
try {
Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
"candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
- new NonPersistentDataProvider(), applyState -> { }, LOG);
+ new NonPersistentDataProvider(Runnable::run), applyState -> { }, LOG);
raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);
*/
package org.opendaylight.controller.cluster;
+import static java.util.Objects.requireNonNull;
+
import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
+import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NonPersistentDataProvider implements DataPersistenceProvider {
private static final Logger LOG = LoggerFactory.getLogger(NonPersistentDataProvider.class);
+ private final ExecuteInSelfActor actor;
+
+ public NonPersistentDataProvider(final ExecuteInSelfActor actor) {
+ this.actor = requireNonNull(actor);
+ }
+
@Override
public boolean isRecoveryApplicable() {
return false;
}
@Override
- @SuppressWarnings("checkstyle:IllegalCatch")
- public <T> void persist(T entry, Procedure<T> procedure) {
- try {
- procedure.apply(entry);
- } catch (Exception e) {
- LOG.error("An unexpected error occurred", e);
- }
+ public <T> void persist(final T entry, final Procedure<T> procedure) {
+ invokeProcedure(procedure, entry);
}
@Override
- public <T> void persistAsync(T entry, Procedure<T> procedure) {
- persist(entry, procedure);
+ public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
+ actor.executeInSelf(() -> invokeProcedure(procedure, entry));
}
@Override
- public void saveSnapshot(Object snapshot) {
+ public void saveSnapshot(final Object snapshot) {
// no-op
}
@Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
// no-op
}
@Override
- public void deleteMessages(long sequenceNumber) {
+ public void deleteMessages(final long sequenceNumber) {
// no-op
}
public long getLastSequenceNumber() {
return -1;
}
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ static <T> void invokeProcedure(final Procedure<T> procedure, final T argument) {
+ try {
+ procedure.apply(argument);
+ } catch (Exception e) {
+ LOG.error("An unexpected error occurred", e);
+ }
+ }
}
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
+import org.eclipse.jdt.annotation.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractUntypedActor extends UntypedActor {
+public abstract class AbstractUntypedActor extends UntypedActor implements ExecuteInSelfActor {
// The member name should be lower case but it's referenced in many subclasses. Suppressing the CS warning for now.
@SuppressWarnings("checkstyle:MemberName")
protected final Logger LOG = LoggerFactory.getLogger(getClass());
}
@Override
- public final void onReceive(Object message) throws Exception {
- handleReceive(message);
+ public final void executeInSelf(@NonNull final Runnable runnable) {
+ final ExecuteInSelfMessage message = new ExecuteInSelfMessage(runnable);
+ self().tell(message, ActorRef.noSender());
+ }
+
+ @Override
+ public final void onReceive(final Object message) throws Exception {
+ if (message instanceof ExecuteInSelfMessage) {
+ ((ExecuteInSelfMessage) message).run();
+ } else {
+ handleReceive(message);
+ }
}
/**
*/
protected abstract void handleReceive(Object message) throws Exception;
- protected final void ignoreMessage(Object message) {
+ protected final void ignoreMessage(final Object message) {
LOG.debug("Ignoring unhandled message {}", message);
}
- protected final void unknownMessage(Object message) {
+ protected final void unknownMessage(final Object message) {
LOG.debug("Received unhandled message {}", message);
unhandled(message);
}
- protected boolean isValidSender(ActorRef sender) {
+ protected boolean isValidSender(final ActorRef sender) {
// If the caller passes in a null sender (ActorRef.noSender()), akka translates that to the
// deadLetters actor.
return sender != null && !getContext().system().deadLetters().equals(sender);
package org.opendaylight.controller.cluster.common.actor;
+import akka.actor.ActorRef;
import akka.persistence.UntypedPersistentActor;
+import org.eclipse.jdt.annotation.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor {
+public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor implements ExecuteInSelfActor {
// The member name should be lower case but it's referenced in many subclasses. Suppressing the CS warning for now.
@SuppressWarnings("checkstyle:MemberName")
}
@Override
- public final void onReceiveCommand(Object message) throws Exception {
+ public final void executeInSelf(@NonNull final Runnable runnable) {
+ final ExecuteInSelfMessage message = new ExecuteInSelfMessage(runnable);
+ LOG.trace("Scheduling execution of {}", message);
+ self().tell(message, ActorRef.noSender());
+ }
+
+ @Override
+ public final void onReceiveCommand(final Object message) throws Exception {
final String messageType = message.getClass().getSimpleName();
LOG.trace("Received message {}", messageType);
- handleCommand(message);
+ if (message instanceof ExecuteInSelfMessage) {
+ LOG.trace("Executing {}", message);
+ ((ExecuteInSelfMessage) message).run();
+ } else {
+ handleCommand(message);
+ }
LOG.trace("Done handling message {}", messageType);
}
@Override
- public final void onReceiveRecover(Object message) throws Exception {
+ public final void onReceiveRecover(final Object message) throws Exception {
final String messageType = message.getClass().getSimpleName();
LOG.trace("Received message {}", messageType);
handleRecover(message);
protected abstract void handleCommand(Object message) throws Exception;
- protected void ignoreMessage(Object message) {
+ protected void ignoreMessage(final Object message) {
LOG.debug("Unhandled message {} ", message);
}
- protected void unknownMessage(Object message) throws Exception {
+ protected void unknownMessage(final Object message) throws Exception {
LOG.debug("Received unhandled message {}", message);
unhandled(message);
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.japi.Procedure;
+import com.google.common.annotations.Beta;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * Interface implemented by Actors, who can schedule invocation of a {@link Procedure} in their context.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public interface ExecuteInSelfActor {
+ /**
+ * Run a Runnable in the context of this actor.
+ *
+ * @param runnable Runnable to run
+ */
+ void executeInSelf(@NonNull Runnable runnable);
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.dispatch.ControlMessage;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * Message internal to {@link ExecuteInSelfActor} implementations in this package.
+ *
+ * @author Robert Varga
+ */
+final class ExecuteInSelfMessage implements ControlMessage {
+ private final Runnable runnable;
+
+ ExecuteInSelfMessage(final @NonNull Runnable runnable) {
+ this.runnable = requireNonNull(runnable);
+ }
+
+ void run() {
+ runnable.run();
+ }
+}