Using a String for the message identity has the effect of the equality
running on Strings, which has the effect that message is not really
isolated, but can be sent by anyone who knows the message content.
Instantiate simple Objects instead, which force a proper identity
equality check. Retain debuggability by having these instances override
toString().
Change-Id: Ia581e0a1e023ace10b5dbb81a44092ca04a4ff8f
Signed-off-by: Robert Varga <rovarga@cisco.com>
* @author Thomas Pantelis
*/
class RaftActorSnapshotMessageSupport {
* @author Thomas Pantelis
*/
class RaftActorSnapshotMessageSupport {
- static final String COMMIT_SNAPSHOT = "commit_snapshot";
+ static final Object COMMIT_SNAPSHOT = new Object() {
+ @Override
+ public String toString() {
+ return "commit_snapshot";
+ }
+ };
private final RaftActorContext context;
private final RaftActorSnapshotCohort cohort;
private final RaftActorContext context;
private final RaftActorSnapshotCohort cohort;
onSaveSnapshotFailure((SaveSnapshotFailure) message);
} else if (message instanceof CaptureSnapshotReply) {
onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
onSaveSnapshotFailure((SaveSnapshotFailure) message);
} else if (message instanceof CaptureSnapshotReply) {
onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
- } else if (message.equals(COMMIT_SNAPSHOT)) {
+ } else if (COMMIT_SNAPSHOT.equals(message)) {
context.getSnapshotManager().commit(-1);
} else if (message instanceof GetSnapshot) {
onGetSnapshot(sender);
context.getSnapshotManager().commit(-1);
} else if (message instanceof GetSnapshot) {
onGetSnapshot(sender);
static final String NEW_SERVER_ID = "new-server";
static final String NEW_SERVER_ID2 = "new-server2";
private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
static final String NEW_SERVER_ID = "new-server";
static final String NEW_SERVER_ID2 = "new-server2";
private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
+ private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
private static final boolean NO_PERSISTENCE = false;
private static final boolean PERSISTENT = true;
private static final boolean NO_PERSISTENCE = false;
private static final boolean PERSISTENT = true;
leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
- String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
+ Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
// Drop commit message so the snapshot doesn't complete.
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
// Drop commit message so the snapshot doesn't complete.
- leaderRaftActor.setDropMessageOfType(String.class);
+ leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
// Drop the commit message so the snapshot doesn't complete yet.
TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
// Drop the commit message so the snapshot doesn't complete yet.
- leaderRaftActor.setDropMessageOfType(String.class);
+ leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
- String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
+ Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
// Change the leader behavior to follower
leaderActor.tell(new Follower(leaderActorContext), leaderActor);
// Change the leader behavior to follower
leaderActor.tell(new Follower(leaderActorContext), leaderActor);
* </p>
*/
public class Shard extends RaftActor {
* </p>
*/
public class Shard extends RaftActor {
-
- protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
+ @VisibleForTesting
+ static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
+ @Override
+ public String toString() {
+ return "txCommitTimeoutCheck";
+ }
+ };
- static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
+ static final Object GET_SHARD_MBEAN_MESSAGE = new Object() {
+ @Override
+ public String toString() {
+ return "getShardMBeanMessage";
+ }
+ };
// FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
public static final String DEFAULT_NAME = "default";
// FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
public static final String DEFAULT_NAME = "default";
* @author Thomas Pantelis
*/
class EntityOwnershipShardCommitCoordinator {
* @author Thomas Pantelis
*/
class EntityOwnershipShardCommitCoordinator {
- private static final Object COMMIT_RETRY_MESSAGE = "entityCommitRetry";
+ private static final Object COMMIT_RETRY_MESSAGE = new Object() {
+ @Override
+ public String toString() {
+ return "entityCommitRetry";
+ }
+ };
private final Logger log;
private int transactionIDCounter = 0;
private final Logger log;
private int transactionIDCounter = 0;
} else if(message instanceof akka.actor.Status.Failure) {
// Failure reply from a local commit.
inflightCommitFailure(((Failure)message).cause(), shard);
} else if(message instanceof akka.actor.Status.Failure) {
// Failure reply from a local commit.
inflightCommitFailure(((Failure)message).cause(), shard);
- } else if(message.equals(COMMIT_RETRY_MESSAGE)) {
+ } else if(COMMIT_RETRY_MESSAGE.equals(message)) {
retryInflightCommit(shard);
} else {
handled = false;
retryInflightCommit(shard);
} else {
handled = false;
@Override
public void handleCommand(final Object message) {
super.handleCommand(message);
@Override
public void handleCommand(final Object message) {
super.handleCommand(message);
- if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
if(cleaupCheckLatch.get() != null) {
cleaupCheckLatch.get().countDown();
}
if(cleaupCheckLatch.get() != null) {
cleaupCheckLatch.get().countDown();
}
public void handleCommand(final Object message) {
super.handleCommand(message);
public void handleCommand(final Object message) {
super.handleCommand(message);
- if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+ // XXX: commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
+ if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
latch.get().countDown();
}
}
latch.get().countDown();
}
}