BUG-5626: make CloseTransactionChain implement Identifiable
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 57e85570a903fb7923dc80b1478b693071d8b848..b6b7ae12c8d9eaac33f4e5bdadaf5dc483a13f39 100644 (file)
@@ -16,17 +16,21 @@ import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import org.opendaylight.controller.cluster.common.actor.MessageTracker;
+import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
-import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -48,10 +52,9 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
@@ -62,9 +65,8 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
@@ -81,13 +83,24 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
-
     @VisibleForTesting
-    static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
+    static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
+        @Override
+        public String toString() {
+            return "txCommitTimeoutCheck";
+        }
+    };
 
     @VisibleForTesting
-    static final String DEFAULT_NAME = "default";
+    static final Object GET_SHARD_MBEAN_MESSAGE = new Object() {
+        @Override
+        public String toString() {
+            return "getShardMBeanMessage";
+        }
+    };
+
+    // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
+    public static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
     private final ShardDataTree store;
@@ -161,7 +174,8 @@ public class Shard extends RaftActor {
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(
                         Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
 
-        snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
+        snapshotCohort = new ShardSnapshotCohort(builder.getId().getMemberName(), transactionActorFactory, store,
+            LOG, this.name);
 
         messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
     }
@@ -195,30 +209,28 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    public void onReceiveRecover(final Object message) throws Exception {
+    protected void handleRecover(final Object message) {
         LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(),
             getSender());
 
-        super.onReceiveRecover(message);
+        super.handleRecover(message);
         if (LOG.isTraceEnabled()) {
             appendEntriesReplyTracker.begin();
         }
     }
 
     @Override
-    public void onReceiveCommand(final Object message) throws Exception {
-
-        MessageTracker.Context context = appendEntriesReplyTracker.received(message);
-
-        if(context.error().isPresent()){
-            LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
-                context.error());
-        }
+    protected void handleNonRaftCommand(final Object message) {
+        try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
+            final Optional<Error> maybeError = context.error();
+            if (maybeError.isPresent()) {
+                LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
+                    maybeError.get());
+            }
 
-        try {
             if (CreateTransaction.isSerializedType(message)) {
                 handleCreateTransaction(message);
-            } else if (BatchedModifications.class.isInstance(message)) {
+            } else if (message instanceof BatchedModifications) {
                 handleBatchedModifications((BatchedModifications)message);
             } else if (message instanceof ForwardedReadyTransaction) {
                 handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
@@ -242,7 +254,7 @@ public class Shard extends RaftActor {
                 PeerAddressResolved resolved = (PeerAddressResolved) message;
                 setPeerAddress(resolved.getPeerId().toString(),
                         resolved.getPeerAddress());
-            } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+            } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
                 commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
             } else if(message instanceof DatastoreContext) {
                 onDatastoreContext((DatastoreContext)message);
@@ -259,11 +271,12 @@ public class Shard extends RaftActor {
                 context().parent().forward(message, context());
             } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
                 messageRetrySupport.onTimerMessage(message);
+            } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
+                commitCoordinator.processCohortRegistryCommand(getSender(),
+                        (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
             } else {
-                super.onReceiveCommand(message);
+                super.handleNonRaftCommand(message);
             }
-        } finally {
-            context.done();
         }
     }
 
@@ -286,9 +299,8 @@ public class Shard extends RaftActor {
 
     @Override
     protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
-        return new ShardLeaderStateChanged(memberId, leaderId,
-                isLeader() ? Optional.<DataTree>of(store.getDataTree()) : Optional.<DataTree>absent(),
-                leaderPayloadVersion);
+        return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
+                : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
     }
 
     protected void onDatastoreContext(DatastoreContext context) {
@@ -313,29 +325,72 @@ public class Shard extends RaftActor {
 
     void continueCommit(final CohortEntry cohortEntry) {
         final DataTreeCandidate candidate = cohortEntry.getCandidate();
+        final TransactionIdentifier transactionId = cohortEntry.getTransactionID();
 
         // If we do not have any followers and we are not using persistence
         // or if cohortEntry has no modifications
         // we can apply modification to the state immediately
         if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
-            applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
-        } else {
-            Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
-                    DataTreeCandidatePayload.create(candidate));
+            applyModificationToState(cohortEntry.getReplySender(), transactionId, candidate);
+            return;
         }
+
+        final Payload payload;
+        try {
+            payload = CommitTransactionPayload.create(transactionId, candidate);
+        } catch (IOException e) {
+            LOG.error("{}: failed to encode transaction {} candidate {}", persistenceId(), transactionId, candidate,
+                e);
+            // TODO: do we need to do something smarter here?
+            throw Throwables.propagate(e);
+        }
+
+        persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), payload);
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
-        if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
-            shardMBean.incrementFailedTransactionsCount();
+        if (isLeader()) {
+            if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
+                shardMBean.incrementFailedTransactionsCount();
+            }
+        } else {
+            ActorSelection leader = getLeader();
+            if (leader == null) {
+                messageRetrySupport.addMessageToRetry(commit, getSender(),
+                        "Could not commit transaction " + commit.getTransactionID());
+            } else {
+                LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader);
+                leader.forward(commit, getContext());
+            }
         }
     }
 
-    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
+    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final Identifier transactionID,
+            @Nonnull final CohortEntry cohortEntry) {
         LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
 
         try {
+            try {
             cohortEntry.commit();
+            } catch(ExecutionException e) {
+                // We may get a "store tree and candidate base differ" IllegalStateException from commit under
+                // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
+                // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
+                // applying it to the state. We then become the leader and a second tx is pre-committed and
+                // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
+                // candidate via applyState prior to the second tx. Since the second tx has already been
+                // pre-committed, when it gets here to commit it will get an IllegalStateException.
+
+                // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
+                // solution will be forthcoming.
+                if(e.getCause() instanceof IllegalStateException) {
+                    LOG.debug("{}: commit failed for transaction {} - retrying as foreign candidate", persistenceId(),
+                            transactionID, e);
+                    store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
+                } else {
+                    throw e;
+                }
+            }
 
             sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), getSelf());
 
@@ -353,7 +408,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
+    private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull Identifier transactionID) {
         // With persistence enabled, this method is called via applyState by the leader strategy
         // after the commit has been replicated to a majority of the followers.
 
@@ -392,12 +447,24 @@ public class Shard extends RaftActor {
 
     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
+
+        if (isLeader()) {
         commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+        } else {
+            ActorSelection leader = getLeader();
+            if (leader == null) {
+                messageRetrySupport.addMessageToRetry(canCommit, getSender(),
+                        "Could not canCommit transaction " + canCommit.getTransactionID());
+            } else {
+                LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader);
+                leader.forward(canCommit, getContext());
+            }
+        }
     }
 
     protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
         try {
-            commitCoordinator.handleBatchedModifications(batched, sender, this);
+            commitCoordinator.handleBatchedModifications(batched, sender, this, store.getSchemaContext());
         } catch (Exception e) {
             LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                     batched.getTransactionID(), e);
@@ -427,11 +494,19 @@ public class Shard extends RaftActor {
                 messageRetrySupport.addMessageToRetry(batched, getSender(),
                         "Could not commit transaction " + batched.getTransactionID());
             } else {
-                // TODO: what if this is not the first batch and leadership changed in between batched messages?
-                // We could check if the commitCoordinator already has a cached entry and forward all the previous
-                // batched modifications.
-                LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
-                leader.forward(batched, getContext());
+                // If this is not the first batch and leadership changed in between batched messages,
+                // we need to reconstruct previous BatchedModifications from the transaction
+                // DataTreeModification, honoring the max batched modification count, and forward all the
+                // previous BatchedModifications to the new leader.
+                Collection<BatchedModifications> newModifications = commitCoordinator.createForwardedBatchedModifications(
+                        batched, datastoreContext.getShardBatchedModificationCount());
+
+                LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
+                        newModifications.size(), leader);
+
+                for(BatchedModifications bm: newModifications) {
+                    leader.forward(bm, getContext());
+                }
             }
         }
     }
@@ -458,7 +533,7 @@ public class Shard extends RaftActor {
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
             try {
-                commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+                commitCoordinator.handleReadyLocalTransaction(message, getSender(), this, store.getSchemaContext());
             } catch (Exception e) {
                 LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
                         message.getTransactionID(), e);
@@ -482,7 +557,8 @@ public class Shard extends RaftActor {
 
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
-            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this,
+                    store.getSchemaContext());
         } else {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {
@@ -503,7 +579,7 @@ public class Shard extends RaftActor {
         doAbortTransaction(abort.getTransactionID(), getSender());
     }
 
-    void doAbortTransaction(final String transactionID, final ActorRef sender) {
+    void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) {
         commitCoordinator.handleAbort(transactionID, sender, this);
     }
 
@@ -519,14 +595,7 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
-    }
-
-    private ActorRef createTypedTransactionActor(int transactionType,
-            ShardTransactionIdentifier transactionId, String transactionChainId) {
-
-        return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
-                transactionId, transactionChainId);
+        store.closeTransactionChain(closeTransactionChain.getIdentifier());
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
@@ -537,7 +606,7 @@ public class Shard extends RaftActor {
             }
 
             ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
-                createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
+                createTransaction.getTransactionId());
 
             getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
                     createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
@@ -546,24 +615,14 @@ public class Shard extends RaftActor {
         }
     }
 
-    private ActorRef createTransaction(int transactionType, String remoteTransactionId,
-            String transactionChainId) {
-
-
-        ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
-        }
-
-        ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
-                transactionChainId);
-
-        return transactionActor;
+    private ActorRef createTransaction(int transactionType, TransactionIdentifier transactionId) {
+        LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
+        return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
+            transactionId);
     }
 
-    private void commitWithNewTransaction(final Modification modification) {
-        ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.toString(), null);
+    private void commitWithNewTransaction(final BatchedModifications modification) {
+        ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID());
         modification.apply(tx.getSnapshot());
         try {
             snapshotCohort.syncCommitTransaction(tx);
@@ -621,12 +680,12 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
-        if (data instanceof DataTreeCandidatePayload) {
+    protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
+        if (data instanceof DataTreeCandidateSupplier) {
             if (clientActor == null) {
                 // No clientActor indicates a replica coming from the leader
                 try {
-                    store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate());
+                    store.applyForeignCandidate(identifier, ((DataTreeCandidateSupplier)data).getCandidate().getValue());
                 } catch (DataValidationFailedException | IOException e) {
                     LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
                 }
@@ -634,22 +693,13 @@ public class Shard extends RaftActor {
                 // Replication consensus reached, proceed to commit
                 finishCommit(clientActor, identifier);
             }
-        } else if (data instanceof CompositeModificationPayload) {
-            Object modification = ((CompositeModificationPayload) data).getModification();
-
-            applyModificationToState(clientActor, identifier, modification);
-        } else if(data instanceof CompositeModificationByteStringPayload ){
-            Object modification = ((CompositeModificationByteStringPayload) data).getModification();
-
-            applyModificationToState(clientActor, identifier, modification);
         } else {
-            LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
-                    persistenceId(), data, data.getClass().getClassLoader(),
-                    CompositeModificationPayload.class.getClassLoader());
+            LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data,
+                data.getClass().getClassLoader());
         }
     }
 
-    private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+    private void applyModificationToState(ActorRef clientActor, Identifier identifier, Object modification) {
         if(modification == null) {
             LOG.error(
                     "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
@@ -657,7 +707,11 @@ public class Shard extends RaftActor {
         } else if(clientActor == null) {
             // There's no clientActor to which to send a commit reply so we must be applying
             // replicated state from the leader.
-            commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
+
+            // The only implementation we know of is BatchedModifications, which also carries a transaction
+            // identifier -- which we really need that.
+            Preconditions.checkArgument(modification instanceof BatchedModifications);
+            commitWithNewTransaction((BatchedModifications)modification);
         } else {
             // This must be the OK to commit after replication consensus.
             finishCommit(clientActor, identifier);
@@ -680,9 +734,6 @@ public class Shard extends RaftActor {
             }
 
             store.closeAllTransactionChains();
-
-            commitCoordinator.abortPendingTransactions(
-                    "The transacton was aborted due to inflight leadership change.", this);
         }
 
         if(hasLeader && !isIsolatedLeader()) {
@@ -694,7 +745,31 @@ public class Shard extends RaftActor {
     protected void onLeaderChanged(String oldLeader, String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
 
-        if(hasLeader() && !isIsolatedLeader()) {
+        boolean hasLeader = hasLeader();
+        if(hasLeader && !isLeader()) {
+            // Another leader was elected. If we were the previous leader and had pending transactions, convert
+            // them to transaction messages and send to the new leader.
+            ActorSelection leader = getLeader();
+            if(leader != null) {
+                Collection<Object> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
+                        datastoreContext.getShardBatchedModificationCount());
+
+                if(!messagesToForward.isEmpty()) {
+                    LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
+                            messagesToForward.size(), leader);
+
+                    for(Object message: messagesToForward) {
+                        leader.tell(message, self());
+                    }
+                }
+            } else {
+                commitCoordinator.abortPendingTransactions(
+                        "The transacton was aborted due to inflight leadership change and the leader address isn't available.",
+                        this);
+            }
+        }
+
+        if(hasLeader && !isIsolatedLeader()) {
             messageRetrySupport.retryMessages();
         }
     }