Merge changes I3e404877,Ida2a5c32,I9e6ce426,I6a4b90f6,I79717533
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index a22e535fad1f6fa7052121c6be736a299e0b48cb..a3ef0339b7571172dfb0d5b2338f52911a86cf9c 100644 (file)
@@ -69,6 +69,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -94,11 +95,10 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
-
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
-    public static final String DEFAULT_NAME = "default";
+    @VisibleForTesting
+    static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
     private final InMemoryDOMDataStore store;
@@ -132,7 +132,7 @@ public class Shard extends RaftActor {
 
     private Cancellable txCommitTimeoutCheckSchedule;
 
-    private Optional<ActorRef> roleChangeNotifier;
+    private final Optional<ActorRef> roleChangeNotifier;
 
     /**
      * Coordinates persistence recovery on startup.
@@ -320,8 +320,14 @@ public class Shard extends RaftActor {
             // currently uses a same thread executor anyway.
             cohortEntry.getCohort().preCommit().get();
 
-            Shard.this.persistData(getSender(), transactionID,
-                    new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
+            // If we do not have any followers and we are not using persistence we can
+            // apply modification to the state immediately
+            if(!hasFollowers() && !persistence().isRecoveryApplicable()){
+                applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
+            } else {
+                Shard.this.persistData(getSender(), transactionID,
+                        new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
+            }
         } catch (InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred while preCommitting transaction {}",
                     cohortEntry.getTransactionID());
@@ -347,7 +353,7 @@ public class Shard extends RaftActor {
             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
             if(cohortEntry != null) {
                 commitWithNewTransaction(cohortEntry.getModification());
-                sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+                sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
             } else {
                 // This really shouldn't happen - it likely means that persistence or replication
                 // took so long to complete such that the cohort entry was expired from the cache.
@@ -369,7 +375,7 @@ public class Shard extends RaftActor {
             // currently uses a same thread executor anyway.
             cohortEntry.getCohort().commit().get();
 
-            sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+            sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
 
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
@@ -405,7 +411,7 @@ public class Shard extends RaftActor {
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
         ActorRef replyActorPath = self();
-        if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+        if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
             LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
@@ -421,7 +427,7 @@ public class Shard extends RaftActor {
         doAbortTransaction(abort.getTransactionID(), getSender());
     }
 
-    private void doAbortTransaction(final String transactionID, final ActorRef sender) {
+    void doAbortTransaction(final String transactionID, final ActorRef sender) {
         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
             LOG.debug("Aborting transaction {}", transactionID);
@@ -440,7 +446,7 @@ public class Shard extends RaftActor {
                     shardMBean.incrementAbortTransactionsCount();
 
                     if(sender != null) {
-                        sender.tell(new AbortTransactionReply().toSerializable(), self);
+                        sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
                     }
                 }
 
@@ -473,7 +479,7 @@ public class Shard extends RaftActor {
         // This must be for install snapshot. Don't want to open this up and trigger
         // deSerialization
 
-        self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
+        self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)),
                 self());
 
         createSnapshotTransaction = null;
@@ -493,7 +499,8 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
-            ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
+            ShardTransactionIdentifier transactionId, String transactionChainId,
+            short clientVersion ) {
 
         DOMStoreTransactionFactory factory = store;
 
@@ -561,7 +568,7 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
-            String transactionChainId, int clientVersion) {
+            String transactionChainId, short clientVersion) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
@@ -652,7 +659,7 @@ public class Shard extends RaftActor {
         dataChangeListeners.add(dataChangeListenerPath);
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
-                new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+                new DataChangeListenerProxy(dataChangeListenerPath);
 
         LOG.debug("Registering for path {}", registerChangeListener.getPath());
 
@@ -679,6 +686,8 @@ public class Shard extends RaftActor {
     protected void appendRecoveredLogEntry(final Payload data) {
         if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+        } else if (data instanceof CompositeModificationByteStringPayload) {
+            currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
         } else {
             LOG.error("Unknown state received {} during recovery", data);
         }
@@ -755,19 +764,12 @@ public class Shard extends RaftActor {
         if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
-            if(modification == null) {
-                LOG.error(
-                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                     identifier, clientActor != null ? clientActor.path().toString() : null);
-            } else if(clientActor == null) {
-                // There's no clientActor to which to send a commit reply so we must be applying
-                // replicated state from the leader.
-                commitWithNewTransaction(MutableCompositeModification.fromSerializable(
-                        modification, schemaContext));
-            } else {
-                // This must be the OK to commit after replication consensus.
-                finishCommit(clientActor, identifier);
-            }
+            applyModificationToState(clientActor, identifier, modification);
+        } else if(data instanceof CompositeModificationByteStringPayload ){
+            Object modification = ((CompositeModificationByteStringPayload) data).getModification();
+
+            applyModificationToState(clientActor, identifier, modification);
+
         } else {
             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
                     data, data.getClass().getClassLoader(),
@@ -778,6 +780,22 @@ public class Shard extends RaftActor {
 
     }
 
+    private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+        if(modification == null) {
+            LOG.error(
+                    "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+                    identifier, clientActor != null ? clientActor.path().toString() : null);
+        } else if(clientActor == null) {
+            // There's no clientActor to which to send a commit reply so we must be applying
+            // replicated state from the leader.
+            commitWithNewTransaction(MutableCompositeModification.fromSerializable(
+                    modification, schemaContext));
+        } else {
+            // This must be the OK to commit after replication consensus.
+            finishCommit(clientActor, identifier);
+        }
+    }
+
     private void updateJournalStats() {
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
 
@@ -800,7 +818,7 @@ public class Shard extends RaftActor {
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());