* @author Thomas Pantelis
*/
class RaftActorSnapshotMessageSupport {
- static final String COMMIT_SNAPSHOT = "commit_snapshot";
+ static final Object COMMIT_SNAPSHOT = new Object() {
+ @Override
+ public String toString() {
+ return "commit_snapshot";
+ }
+ };
private final RaftActorContext context;
private final RaftActorSnapshotCohort cohort;
onSaveSnapshotFailure((SaveSnapshotFailure) message);
} else if (message instanceof CaptureSnapshotReply) {
onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
- } else if (message.equals(COMMIT_SNAPSHOT)) {
+ } else if (COMMIT_SNAPSHOT.equals(message)) {
context.getSnapshotManager().commit(-1);
} else if (message instanceof GetSnapshot) {
onGetSnapshot(sender);
static final String NEW_SERVER_ID = "new-server";
static final String NEW_SERVER_ID2 = "new-server2";
private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
+ private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
private static final boolean NO_PERSISTENCE = false;
private static final boolean PERSISTENT = true;
leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
- String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
+ Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
// Drop commit message so the snapshot doesn't complete.
- leaderRaftActor.setDropMessageOfType(String.class);
+ leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
// Drop the commit message so the snapshot doesn't complete yet.
- leaderRaftActor.setDropMessageOfType(String.class);
+ leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
- String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
+ Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
// Change the leader behavior to follower
leaderActor.tell(new Follower(leaderActorContext), leaderActor);
* </p>
*/
public class Shard extends RaftActor {
-
- protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
+ @VisibleForTesting
+ static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
+ @Override
+ public String toString() {
+ return "txCommitTimeoutCheck";
+ }
+ };
@VisibleForTesting
- static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
+ static final Object GET_SHARD_MBEAN_MESSAGE = new Object() {
+ @Override
+ public String toString() {
+ return "getShardMBeanMessage";
+ }
+ };
// FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
public static final String DEFAULT_NAME = "default";
* @author Thomas Pantelis
*/
class EntityOwnershipShardCommitCoordinator {
- private static final Object COMMIT_RETRY_MESSAGE = "entityCommitRetry";
+ private static final Object COMMIT_RETRY_MESSAGE = new Object() {
+ @Override
+ public String toString() {
+ return "entityCommitRetry";
+ }
+ };
private final Logger log;
private int transactionIDCounter = 0;
} else if(message instanceof akka.actor.Status.Failure) {
// Failure reply from a local commit.
inflightCommitFailure(((Failure)message).cause(), shard);
- } else if(message.equals(COMMIT_RETRY_MESSAGE)) {
+ } else if(COMMIT_RETRY_MESSAGE.equals(message)) {
retryInflightCommit(shard);
} else {
handled = false;
@Override
public void handleCommand(final Object message) {
super.handleCommand(message);
- if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
if(cleaupCheckLatch.get() != null) {
cleaupCheckLatch.get().countDown();
}
public void handleCommand(final Object message) {
super.handleCommand(message);
- if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+ // XXX: commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
+ if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
latch.get().countDown();
}
}