import akka.actor.ActorRef;
import akka.actor.PoisonPill;
-import akka.persistence.UntypedPersistentActor;
+import akka.persistence.AbstractPersistentActor;
import com.google.common.annotations.Beta;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
import org.slf4j.Logger;
* @author Robert Varga
*/
@Beta
-public abstract class AbstractClientActor extends UntypedPersistentActor {
+public abstract class AbstractClientActor extends AbstractPersistentActor {
private static final Logger LOG = LoggerFactory.getLogger(AbstractClientActor.class);
private AbstractClientActorBehavior<?> currentBehavior;
}
@Override
- public final void onReceiveCommand(final Object command) {
+ public Receive createReceive() {
+ return receiveBuilder().matchAny(this::onReceiveCommand).build();
+ }
+
+ @Override
+ public Receive createReceiveRecover() {
+ return receiveBuilder().matchAny(this::onReceiveRecover).build();
+ }
+
+ private void onReceiveCommand(final Object command) {
if (command == null) {
LOG.debug("{}: ignoring null command", persistenceId());
return;
}
}
- @Override
- public final void onReceiveRecover(final Object recover) {
+ private void onReceiveRecover(final Object recover) {
switchBehavior(currentBehavior.onReceiveRecover(recover));
}
mockRaftActor.waitUntilLeader();
- mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
+ mockRaftActor.handleCommand(new ApplyJournalEntries(10));
verify(dataPersistenceProvider).persistAsync(any(ApplyJournalEntries.class), any(Procedure.class));
}
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
//fake snapshot on index 5
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
+ leaderActor.handleCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
assertEquals(8, leaderActor.getReplicatedLog().size());
//fake snapshot on index 6
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
+ leaderActor.handleCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
assertEquals(8, leaderActor.getReplicatedLog().size());
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
new SimpleReplicatedLogEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
//fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
+ leaderActor.handleCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
assertEquals(2, leaderActor.getReplicatedLog().size());
assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
}
//fake snapshot on index 6
List<ReplicatedLogEntry> entries = Arrays.asList(
(ReplicatedLogEntry) new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("foo-6")));
- followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
+ followerActor.handleCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
assertEquals(7, followerActor.getReplicatedLog().size());
//fake snapshot on index 7
entries = Arrays.asList((ReplicatedLogEntry) new SimpleReplicatedLogEntry(7, 1,
new MockRaftActorContext.MockPayload("foo-7")));
- followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
+ followerActor.handleCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
assertEquals(8, followerActor.getReplicatedLog().size());
assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
new MockRaftActorContext.MockPayload("foo-2"),
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
- followerActor.onReceiveCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
+ followerActor.handleCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
java.util.Optional.empty()));
assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
entries = Arrays.asList((ReplicatedLogEntry) new SimpleReplicatedLogEntry(8, 1,
new MockRaftActorContext.MockPayload("foo-7")));
// send an additional entry 8 with leaderCommit = 7
- followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
+ followerActor.handleCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
// 7 and 8, as lastapplied is 7
assertEquals(2, followerActor.getReplicatedLog().size());
assertEquals(5, leaderActor.getReplicatedLog().size());
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
+ leaderActor.handleCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
assertEquals(5, leaderActor.getReplicatedLog().size());
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
// set the 2nd follower nextIndex to 1 which has been snapshotted
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
+ leaderActor.handleCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
assertEquals(5, leaderActor.getReplicatedLog().size());
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
// simulate a real snapshot
- leaderActor.onReceiveCommand(SendHeartBeat.INSTANCE);
+ leaderActor.handleCommand(SendHeartBeat.INSTANCE);
assertEquals(5, leaderActor.getReplicatedLog().size());
assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId()),
//reply from a slow follower does not initiate a fake snapshot
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
+ leaderActor.handleCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
assertEquals("Fake snapshot should not happen when Initiate is in progress", 5,
leaderActor.getReplicatedLog().size());
new MockRaftActorContext.MockPayload("foo-2"),
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
- leaderActor.onReceiveCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
+ leaderActor.handleCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
java.util.Optional.empty()));
assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
leaderActor.getReplicatedLog().size());
//reply from a slow follower after should not raise errors
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
+ leaderActor.handleCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
assertEquals(0, leaderActor.getReplicatedLog().size());
}
assertEquals("isPersistencePending", true, logEntry.isPersistencePending());
assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
- leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, 0, 1, (short)0));
+ leaderActor.handleCommand(new AppendEntriesReply(followerId, 1, true, 0, 1, (short)0));
assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
ArgumentCaptor<Procedure> callbackCaptor = ArgumentCaptor.forClass(Procedure.class);
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
MessageCollectorActor.clearMessages(followerActor);
- leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, -1, -1, (short)0));
+ leaderActor.handleCommand(new AppendEntriesReply(followerId, 1, true, -1, -1, (short)0));
leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"), true);
MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
*/
package org.opendaylight.controller.cluster;
+import static java.util.Objects.requireNonNull;
+
import akka.japi.Procedure;
+import akka.persistence.AbstractPersistentActor;
import akka.persistence.SnapshotSelectionCriteria;
-import akka.persistence.UntypedPersistentActor;
-import com.google.common.base.Preconditions;
/**
* A DataPersistenceProvider implementation with persistence enabled.
*/
public class PersistentDataProvider implements DataPersistenceProvider {
- private final UntypedPersistentActor persistentActor;
+ private final AbstractPersistentActor persistentActor;
- public PersistentDataProvider(UntypedPersistentActor persistentActor) {
- this.persistentActor = Preconditions.checkNotNull(persistentActor, "persistentActor can't be null");
+ public PersistentDataProvider(AbstractPersistentActor persistentActor) {
+ this.persistentActor = requireNonNull(persistentActor, "persistentActor can't be null");
}
@Override
package org.opendaylight.controller.cluster.common.actor;
+import akka.actor.AbstractActor;
import akka.actor.ActorRef;
-import akka.actor.UntypedActor;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.eclipse.jdt.annotation.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractUntypedActor extends UntypedActor implements ExecuteInSelfActor {
+public abstract class AbstractUntypedActor extends AbstractActor implements ExecuteInSelfActor {
// The member name should be lower case but it's referenced in many subclasses. Suppressing the CS warning for now.
@SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_PRIVATE")
@SuppressWarnings("checkstyle:MemberName")
}
@Override
- public final void onReceive(final Object message) {
- if (message instanceof ExecuteInSelfMessage) {
- ((ExecuteInSelfMessage) message).run();
- } else {
- handleReceive(message);
- }
+ public Receive createReceive() {
+ return receiveBuilder()
+ .match(ExecuteInSelfMessage.class, ExecuteInSelfMessage::run)
+ .matchAny(this::handleReceive)
+ .build();
}
/**
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.common.actor;
import akka.actor.ActorRef;
-import akka.persistence.UntypedPersistentActor;
+import akka.persistence.AbstractPersistentActor;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.eclipse.jdt.annotation.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor implements ExecuteInSelfActor {
+public abstract class AbstractUntypedPersistentActor extends AbstractPersistentActor implements ExecuteInSelfActor {
// The member name should be lower case but it's referenced in many subclasses. Suppressing the CS warning for now.
@SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_PRIVATE")
}
@Override
- public final void onReceiveCommand(final Object message) throws Exception {
- final String messageType = message.getClass().getSimpleName();
- LOG.trace("Received message {}", messageType);
-
- if (message instanceof ExecuteInSelfMessage) {
- LOG.trace("Executing {}", message);
- ((ExecuteInSelfMessage) message).run();
- } else {
- handleCommand(message);
- }
-
- LOG.trace("Done handling message {}", messageType);
+ public final Receive createReceive() {
+ return receiveBuilder()
+ .match(ExecuteInSelfMessage.class, ExecuteInSelfMessage::run)
+ .matchAny(this::handleCommand)
+ .build();
}
@Override
- public final void onReceiveRecover(final Object message) throws Exception {
- final String messageType = message.getClass().getSimpleName();
- LOG.trace("Received message {}", messageType);
- handleRecover(message);
- LOG.trace("Done handling message {}", messageType);
+ public final Receive createReceiveRecover() {
+ return receiveBuilder().matchAny(this::handleRecover).build();
}
protected abstract void handleRecover(Object message) throws Exception;
*/
package org.opendaylight.controller.cluster.common.actor;
-import akka.actor.UntypedActor;
-import akka.japi.Procedure;
+import akka.actor.AbstractActor;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import scala.PartialFunction;
+import scala.runtime.AbstractPartialFunction;
+import scala.runtime.BoxedUnit;
/**
* Represents behaviour that can be exhibited by actors of type {@link akka.actor.UntypedActor}
* </ul>
* The information is reported to {@link org.opendaylight.controller.cluster.reporting.MetricsReporter}
*/
-public class MeteringBehavior implements Procedure<Object> {
+public class MeteringBehavior extends AbstractPartialFunction<Object, BoxedUnit> {
public static final String DOMAIN = "org.opendaylight.controller.actor.metric";
private static final String MSG_PROCESSING_RATE = "msg-rate";
- private final UntypedActor meteredActor;
-
private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DOMAIN).getMetricsRegistry();
-
- private String actorQualifiedName;
- private Timer msgProcessingTimer;
+ private final String actorQualifiedName;
+ private final Timer msgProcessingTimer;
+ private final PartialFunction<Object, BoxedUnit> receive;
+
+ private MeteringBehavior(final String actorName, final AbstractActor meteredActor) {
+ actorQualifiedName = meteredActor.getSelf().path().parent().toStringWithoutAddress() + "/" + actorName;
+ msgProcessingTimer = metricRegistry.timer(MetricRegistry.name(actorQualifiedName, MSG_PROCESSING_RATE));
+ receive = meteredActor.createReceive().onMessage();
+ }
/**
* Constructs an instance.
* @param actor whose behaviour needs to be metered
*/
public MeteringBehavior(final AbstractUntypedActorWithMetering actor) {
- Preconditions.checkArgument(actor != null, "actor must not be null");
- this.meteredActor = actor;
-
- String actorName = actor.getActorNameOverride() != null ? actor.getActorNameOverride()
- : actor.getSelf().path().name();
- init(actorName);
+ this(actor.getActorNameOverride() != null ? actor.getActorNameOverride() : actor.getSelf().path().name(),
+ actor);
}
- public MeteringBehavior(final UntypedActor actor) {
- Preconditions.checkArgument(actor != null, "actor must not be null");
- this.meteredActor = actor;
-
- String actorName = actor.getSelf().path().name();
- init(actorName);
+ public MeteringBehavior(final AbstractActor actor) {
+ this(actor.getSelf().path().name(), actor);
}
- private void init(final String actorName) {
- actorQualifiedName = meteredActor.getSelf().path().parent().toStringWithoutAddress()
- + "/" + actorName;
-
- final String msgProcessingTime = MetricRegistry.name(actorQualifiedName, MSG_PROCESSING_RATE);
- msgProcessingTimer = metricRegistry.timer(msgProcessingTime);
+ @Override
+ public boolean isDefinedAt(final Object obj) {
+ return receive.isDefinedAt(obj);
}
/**
* http://dropwizard.github.io/metrics/manual/core/#timers</a>
*
* @param message the message to process
- * @throws Exception on message failure
*/
- @SuppressWarnings("checkstyle:IllegalCatch")
@Override
- public void apply(final Object message) throws Exception {
+ public BoxedUnit apply(Object message) {
final String messageType = message.getClass().getSimpleName();
-
final String msgProcessingTimeByMsgType =
MetricRegistry.name(actorQualifiedName, MSG_PROCESSING_RATE, messageType);
-
final Timer msgProcessingTimerByMsgType = metricRegistry.timer(msgProcessingTimeByMsgType);
//start timers
final Timer.Context contextByMsgType = msgProcessingTimerByMsgType.time();
try {
- meteredActor.onReceive(message);
- } catch (Throwable e) {
- Throwables.propagateIfPossible(e, Exception.class);
- throw new RuntimeException(e);
+ return receive.apply(message);
} finally {
//stop timers
contextByMsgType.stop();
*/
package org.opendaylight.controller.cluster.datastore;
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.AbstractActor.ActorContext;
import akka.actor.ActorRef;
-import akka.actor.UntypedActorContext;
-import com.google.common.base.Preconditions;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
private final DatastoreContext datastoreContext;
private final String txnDispatcherPath;
private final ShardStats shardMBean;
- private final UntypedActorContext actorContext;
+ private final ActorContext actorContext;
private final ActorRef shardActor;
private final String shardName;
ShardTransactionActorFactory(ShardDataTree dataTree, DatastoreContext datastoreContext,
- String txnDispatcherPath, ActorRef shardActor, UntypedActorContext actorContext, ShardStats shardMBean,
+ String txnDispatcherPath, ActorRef shardActor, ActorContext actorContext, ShardStats shardMBean,
String shardName) {
- this.dataTree = Preconditions.checkNotNull(dataTree);
- this.datastoreContext = Preconditions.checkNotNull(datastoreContext);
- this.txnDispatcherPath = Preconditions.checkNotNull(txnDispatcherPath);
- this.shardMBean = Preconditions.checkNotNull(shardMBean);
- this.actorContext = Preconditions.checkNotNull(actorContext);
- this.shardActor = Preconditions.checkNotNull(shardActor);
- this.shardName = Preconditions.checkNotNull(shardName);
+ this.dataTree = requireNonNull(dataTree);
+ this.datastoreContext = requireNonNull(datastoreContext);
+ this.txnDispatcherPath = requireNonNull(txnDispatcherPath);
+ this.shardMBean = requireNonNull(shardMBean);
+ this.actorContext = requireNonNull(actorContext);
+ this.shardActor = requireNonNull(shardActor);
+ this.shardName = requireNonNull(shardName);
}
private String actorNameFor(final TransactionIdentifier txId) {
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.sharding;
import akka.actor.ActorRef;
final DOMDataTreeIdentifier prefix = message.getPrefix();
- final ActorUtils context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
+ final ActorUtils utils = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
// schedule a notification task for the reply
actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
- context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
+ utils, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
}
private void onPrefixShardCreated(final PrefixShardCreated message) {
TestShardManager shardManager = newTestShardManager();
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(
+ shardManager.handleCommand(new RoleChangeNotification(
memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
verify(ready, never()).countDown();
- shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
+ shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
verify(ready, times(1)).countDown();
TestShardManager shardManager = newTestShardManager();
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
+ shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
verify(ready, never()).countDown();
- shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
+ shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
- shardManager.onReceiveCommand(
+ shardManager.handleCommand(
new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
TestShardManager shardManager = newTestShardManager();
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
+ shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
verify(ready, never()).countDown();
- shardManager.onReceiveCommand(
+ shardManager.handleCommand(
new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
- shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
+ shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
verify(ready, times(1)).countDown();
}
public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
TestShardManager shardManager = newTestShardManager();
- shardManager.onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
+ shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
RaftState.Leader.name()));
verify(ready, never()).countDown();
public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
TestShardManager shardManager = newTestShardManager();
- shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+ shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
RaftState.Follower.name(), RaftState.Leader.name()));
assertTrue(shardManager.getMBean().getSyncStatus());
TestShardManager shardManager = newTestShardManager();
String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
+ shardManager.handleCommand(new RoleChangeNotification(shardId,
RaftState.Follower.name(), RaftState.Candidate.name()));
assertFalse(shardManager.getMBean().getSyncStatus());
// Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
- shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
+ shardManager.handleCommand(new FollowerInitialSyncUpStatus(
true, shardId));
assertFalse(shardManager.getMBean().getSyncStatus());
TestShardManager shardManager = newTestShardManager();
String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
+ shardManager.handleCommand(new RoleChangeNotification(shardId,
RaftState.Candidate.name(), RaftState.Follower.name()));
// Initially will be false
assertFalse(shardManager.getMBean().getSyncStatus());
// Send status true will make sync status true
- shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
+ shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId));
assertTrue(shardManager.getMBean().getSyncStatus());
// Send status false will make sync status false
- shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
+ shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId));
assertFalse(shardManager.getMBean().getSyncStatus());
}
// Make default shard leader
String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
+ shardManager.handleCommand(new RoleChangeNotification(defaultShardId,
RaftState.Follower.name(), RaftState.Leader.name()));
// default = Leader, astronauts is unknown so sync status remains false
// Make astronauts shard leader as well
String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+ shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
RaftState.Follower.name(), RaftState.Leader.name()));
// Now sync status should be true
assertTrue(shardManager.getMBean().getSyncStatus());
// Make astronauts a Follower
- shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+ shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
RaftState.Leader.name(), RaftState.Follower.name()));
// Sync status is not true
assertFalse(shardManager.getMBean().getSyncStatus());
// Make the astronauts follower sync status true
- shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
+ shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
// Sync status is now true
assertTrue(shardManager.getMBean().getSyncStatus());