import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
* </p>
*/
public class Shard extends RaftActor {
-
- protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
+ @VisibleForTesting
+ static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
+ @Override
+ public String toString() {
+ return "txCommitTimeoutCheck";
+ }
+ };
@VisibleForTesting
- static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
+ static final Object GET_SHARD_MBEAN_MESSAGE = new Object() {
+ @Override
+ public String toString() {
+ return "getShardMBeanMessage";
+ }
+ };
// FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
public static final String DEFAULT_NAME = "default";
}
@Override
- protected void handleCommand(final Object message) {
-
- final MessageTracker.Context context = appendEntriesReplyTracker.received(message);
- final Optional<Error> maybeError = context.error();
- if (maybeError.isPresent()) {
- LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
- maybeError.get());
- }
+ protected void handleNonRaftCommand(final Object message) {
+ try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
+ final Optional<Error> maybeError = context.error();
+ if (maybeError.isPresent()) {
+ LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
+ maybeError.get());
+ }
- try {
if (CreateTransaction.isSerializedType(message)) {
handleCreateTransaction(message);
} else if (message instanceof BatchedModifications) {
PeerAddressResolved resolved = (PeerAddressResolved) message;
setPeerAddress(resolved.getPeerId().toString(),
resolved.getPeerAddress());
- } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
} else if(message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
} else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
} else {
- super.handleCommand(message);
+ super.handleNonRaftCommand(message);
}
- } finally {
- context.done();
}
}
@Override
protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
- return new ShardLeaderStateChanged(memberId, leaderId,
- isLeader() ? Optional.<DataTree>of(store.getDataTree()) : Optional.<DataTree>absent(),
- leaderPayloadVersion);
+ return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
+ : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
}
protected void onDatastoreContext(DatastoreContext context) {