X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=9cb015cfaf35796bdf98692d8d98b9b67dd86a13;hp=138b71c10e111969c59777eedffce7c384f5afdd;hb=f276ae33b951d173b51c467bb7bb1a5f5cf9a1e6;hpb=26721a6f2bc3ce4d524ccb562d7e7a38b4b76068
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
index 138b71c10e..9cb015cfaf 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
@@ -20,9 +20,12 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import org.opendaylight.controller.cluster.common.actor.MessageTracker;
+import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
@@ -52,7 +55,6 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
@@ -65,7 +67,6 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
@@ -81,14 +82,24 @@ import scala.concurrent.duration.FiniteDuration;
*
*/
public class Shard extends RaftActor {
-
- protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
-
@VisibleForTesting
- static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
+ static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
+ @Override
+ public String toString() {
+ return "txCommitTimeoutCheck";
+ }
+ };
@VisibleForTesting
- static final String DEFAULT_NAME = "default";
+ static final Object GET_SHARD_MBEAN_MESSAGE = new Object() {
+ @Override
+ public String toString() {
+ return "getShardMBeanMessage";
+ }
+ };
+
+ // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
+ public static final String DEFAULT_NAME = "default";
// The state of this Shard
private final ShardDataTree store;
@@ -196,30 +207,28 @@ public class Shard extends RaftActor {
}
@Override
- public void onReceiveRecover(final Object message) throws Exception {
+ protected void handleRecover(final Object message) {
LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(),
getSender());
- super.onReceiveRecover(message);
+ super.handleRecover(message);
if (LOG.isTraceEnabled()) {
appendEntriesReplyTracker.begin();
}
}
@Override
- public void onReceiveCommand(final Object message) throws Exception {
-
- MessageTracker.Context context = appendEntriesReplyTracker.received(message);
-
- if(context.error().isPresent()){
- LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
- context.error());
- }
+ protected void handleNonRaftCommand(final Object message) {
+ try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
+ final Optional maybeError = context.error();
+ if (maybeError.isPresent()) {
+ LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
+ maybeError.get());
+ }
- try {
if (CreateTransaction.isSerializedType(message)) {
handleCreateTransaction(message);
- } else if (BatchedModifications.class.isInstance(message)) {
+ } else if (message instanceof BatchedModifications) {
handleBatchedModifications((BatchedModifications)message);
} else if (message instanceof ForwardedReadyTransaction) {
handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
@@ -243,7 +252,7 @@ public class Shard extends RaftActor {
PeerAddressResolved resolved = (PeerAddressResolved) message;
setPeerAddress(resolved.getPeerId().toString(),
resolved.getPeerAddress());
- } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
} else if(message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
@@ -261,10 +270,8 @@ public class Shard extends RaftActor {
} else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
} else {
- super.onReceiveCommand(message);
+ super.handleNonRaftCommand(message);
}
- } finally {
- context.done();
}
}
@@ -287,9 +294,8 @@ public class Shard extends RaftActor {
@Override
protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
- return new ShardLeaderStateChanged(memberId, leaderId,
- isLeader() ? Optional.of(store.getDataTree()) : Optional.absent(),
- leaderPayloadVersion);
+ return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
+ : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
}
protected void onDatastoreContext(DatastoreContext context) {
@@ -327,8 +333,19 @@ public class Shard extends RaftActor {
}
private void handleCommitTransaction(final CommitTransaction commit) {
- if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
- shardMBean.incrementFailedTransactionsCount();
+ if (isLeader()) {
+ if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
+ shardMBean.incrementFailedTransactionsCount();
+ }
+ } else {
+ ActorSelection leader = getLeader();
+ if (leader == null) {
+ messageRetrySupport.addMessageToRetry(commit, getSender(),
+ "Could not commit transaction " + commit.getTransactionID());
+ } else {
+ LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader);
+ leader.forward(commit, getContext());
+ }
}
}
@@ -336,7 +353,27 @@ public class Shard extends RaftActor {
LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
try {
- cohortEntry.commit();
+ try {
+ cohortEntry.commit();
+ } catch(ExecutionException e) {
+ // We may get a "store tree and candidate base differ" IllegalStateException from commit under
+ // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
+ // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
+ // applying it to the state. We then become the leader and a second tx is pre-committed and
+ // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
+ // candidate via applyState prior to the second tx. Since the second tx has already been
+ // pre-committed, when it gets here to commit it will get an IllegalStateException.
+
+ // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
+ // solution will be forthcoming.
+ if(e.getCause() instanceof IllegalStateException) {
+ LOG.debug("{}: commit failed for transaction {} - retrying as foreign candidate", persistenceId(),
+ transactionID, e);
+ store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
+ } else {
+ throw e;
+ }
+ }
sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), getSelf());
@@ -393,7 +430,19 @@ public class Shard extends RaftActor {
private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
- commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+
+ if (isLeader()) {
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+ } else {
+ ActorSelection leader = getLeader();
+ if (leader == null) {
+ messageRetrySupport.addMessageToRetry(canCommit, getSender(),
+ "Could not canCommit transaction " + canCommit.getTransactionID());
+ } else {
+ LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader);
+ leader.forward(canCommit, getContext());
+ }
+ }
}
protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
@@ -689,9 +738,6 @@ public class Shard extends RaftActor {
}
store.closeAllTransactionChains();
-
- commitCoordinator.abortPendingTransactions(
- "The transacton was aborted due to inflight leadership change.", this);
}
if(hasLeader && !isIsolatedLeader()) {
@@ -703,7 +749,31 @@ public class Shard extends RaftActor {
protected void onLeaderChanged(String oldLeader, String newLeader) {
shardMBean.incrementLeadershipChangeCount();
- if(hasLeader() && !isIsolatedLeader()) {
+ boolean hasLeader = hasLeader();
+ if(hasLeader && !isLeader()) {
+ // Another leader was elected. If we were the previous leader and had pending transactions, convert
+ // them to transaction messages and send to the new leader.
+ ActorSelection leader = getLeader();
+ if(leader != null) {
+ Collection