Merge "BUG 2954 : Thread local DocumentBuilder's for XmlUtil"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index b53d12c0c8cc420129b281b685cbdea03b2a0568..e62e918e5e7eff53b8f7e00dbe1e8300d270b59f 100644 (file)
@@ -48,12 +48,14 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
@@ -63,6 +65,10 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -262,6 +268,12 @@ public class Shard extends RaftActor {
         return roleChangeNotifier;
     }
 
+    @Override
+    protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) {
+        return new ShardLeaderStateChanged(memberId, leaderId,
+                isLeader() ? Optional.<DataTree>of(store.getDataTree()) : Optional.<DataTree>absent());
+    }
+
     private void onDatastoreContext(DatastoreContext context) {
         datastoreContext = context;
 
@@ -291,15 +303,21 @@ public class Shard extends RaftActor {
         }
     }
 
+    private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
+        return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType());
+    }
+
     void continueCommit(final CohortEntry cohortEntry) throws Exception {
+        final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate();
+
         // If we do not have any followers and we are not using persistence
         // or if cohortEntry has no modifications
         // we can apply modification to the state immediately
-        if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
-            applyModificationToState(getSender(), cohortEntry.getTransactionID(), cohortEntry.getModification());
+        if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
+            applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate);
         } else {
             Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
-                    new ModificationPayload(cohortEntry.getModification()));
+                DataTreeCandidatePayload.create(candidate));
         }
     }
 
@@ -309,12 +327,37 @@ public class Shard extends RaftActor {
         }
     }
 
+    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
+        LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
+
+        try {
+            // We block on the future here so we don't have to worry about possibly accessing our
+            // state on a different thread outside of our dispatcher. Also, the data store
+            // currently uses a same thread executor anyway.
+            cohortEntry.getCohort().commit().get();
+
+            sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
+
+            shardMBean.incrementCommittedTransactionCount();
+            shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
+
+        } catch (Exception e) {
+            sender.tell(new akka.actor.Status.Failure(e), getSelf());
+
+            LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
+                    transactionID, e);
+            shardMBean.incrementFailedTransactionsCount();
+        } finally {
+            commitCoordinator.currentTransactionComplete(transactionID, true);
+        }
+    }
+
     private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
         // With persistence enabled, this method is called via applyState by the leader strategy
         // after the commit has been replicated to a majority of the followers.
 
         CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
-        if(cohortEntry == null) {
+        if (cohortEntry == null) {
             // The transaction is no longer the current commit. This can happen if the transaction
             // was aborted prior, most likely due to timeout in the front-end. We need to finish
             // committing the transaction though since it was successfully persisted and replicated
@@ -323,7 +366,13 @@ public class Shard extends RaftActor {
             // transaction.
             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
             if(cohortEntry != null) {
-                commitWithNewTransaction(cohortEntry.getModification());
+                try {
+                    store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate());
+                } catch (DataValidationFailedException e) {
+                    shardMBean.incrementFailedTransactionsCount();
+                    LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
+                }
+
                 sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
             } else {
                 // This really shouldn't happen - it likely means that persistence or replication
@@ -334,31 +383,8 @@ public class Shard extends RaftActor {
                 LOG.error(ex.getMessage());
                 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
             }
-
-            return;
-        }
-
-        LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
-
-        try {
-            // We block on the future here so we don't have to worry about possibly accessing our
-            // state on a different thread outside of our dispatcher. Also, the data store
-            // currently uses a same thread executor anyway.
-            cohortEntry.getCohort().commit().get();
-
-            sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
-
-            shardMBean.incrementCommittedTransactionCount();
-            shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-
-        } catch (Exception e) {
-            sender.tell(new akka.actor.Status.Failure(e), getSelf());
-
-            LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
-                    transactionID, e);
-            shardMBean.incrementFailedTransactionsCount();
-        } finally {
-            commitCoordinator.currentTransactionComplete(transactionID, true);
+        } else {
+            finishCommit(sender, transactionID, cohortEntry);
         }
     }
 
@@ -556,15 +582,25 @@ public class Shard extends RaftActor {
 
     @Override
     protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
-
-        if(data instanceof ModificationPayload) {
+        if (data instanceof DataTreeCandidatePayload) {
+            if (clientActor == null) {
+                // No clientActor indicates a replica coming from the leader
+                try {
+                    store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate());
+                } catch (DataValidationFailedException | IOException e) {
+                    LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
+                }
+            } else {
+                // Replication consensus reached, proceed to commit
+                finishCommit(clientActor, identifier);
+            }
+        } else if (data instanceof ModificationPayload) {
             try {
                 applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
             } catch (ClassNotFoundException | IOException e) {
                 LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
             }
-        }
-        else if (data instanceof CompositeModificationPayload) {
+        } else if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
             applyModificationToState(clientActor, identifier, modification);