Bug 2268: Use streaming for Modification payload
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index cf4bd1db43b59f1af750bb81ec7a588da9f97d83..dea377a810cb73be01cf4c17dab93c4c2b476c1b 100644 (file)
@@ -27,6 +27,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -63,6 +64,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
@@ -95,11 +97,10 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
-
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
-    public static final String DEFAULT_NAME = "default";
+    @VisibleForTesting
+    static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
     private final InMemoryDOMDataStore store;
@@ -133,7 +134,7 @@ public class Shard extends RaftActor {
 
     private Cancellable txCommitTimeoutCheckSchedule;
 
-    private Optional<ActorRef> roleChangeNotifier;
+    private final Optional<ActorRef> roleChangeNotifier;
 
     /**
      * Coordinates persistence recovery on startup.
@@ -327,9 +328,9 @@ public class Shard extends RaftActor {
                 applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
             } else {
                 Shard.this.persistData(getSender(), transactionID,
-                        new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
+                        new ModificationPayload(cohortEntry.getModification()));
             }
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (InterruptedException | ExecutionException | IOException e) {
             LOG.error(e, "An exception occurred while preCommitting transaction {}",
                     cohortEntry.getTransactionID());
             shardMBean.incrementFailedTransactionsCount();
@@ -354,7 +355,7 @@ public class Shard extends RaftActor {
             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
             if(cohortEntry != null) {
                 commitWithNewTransaction(cohortEntry.getModification());
-                sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+                sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
             } else {
                 // This really shouldn't happen - it likely means that persistence or replication
                 // took so long to complete such that the cohort entry was expired from the cache.
@@ -376,7 +377,7 @@ public class Shard extends RaftActor {
             // currently uses a same thread executor anyway.
             cohortEntry.getCohort().commit().get();
 
-            sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+            sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
 
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
@@ -412,7 +413,7 @@ public class Shard extends RaftActor {
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
         ActorRef replyActorPath = self();
-        if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+        if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
             LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
@@ -428,7 +429,7 @@ public class Shard extends RaftActor {
         doAbortTransaction(abort.getTransactionID(), getSender());
     }
 
-    private void doAbortTransaction(final String transactionID, final ActorRef sender) {
+    void doAbortTransaction(final String transactionID, final ActorRef sender) {
         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
             LOG.debug("Aborting transaction {}", transactionID);
@@ -447,7 +448,7 @@ public class Shard extends RaftActor {
                     shardMBean.incrementAbortTransactionsCount();
 
                     if(sender != null) {
-                        sender.tell(new AbortTransactionReply().toSerializable(), self);
+                        sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
                     }
                 }
 
@@ -480,7 +481,7 @@ public class Shard extends RaftActor {
         // This must be for install snapshot. Don't want to open this up and trigger
         // deSerialization
 
-        self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
+        self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)),
                 self());
 
         createSnapshotTransaction = null;
@@ -500,7 +501,8 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
-            ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
+            ShardTransactionIdentifier transactionId, String transactionChainId,
+            short clientVersion ) {
 
         DOMStoreTransactionFactory factory = store;
 
@@ -568,7 +570,7 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
-            String transactionChainId, int clientVersion) {
+            String transactionChainId, short clientVersion) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
@@ -659,7 +661,7 @@ public class Shard extends RaftActor {
         dataChangeListeners.add(dataChangeListenerPath);
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
-                new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+                new DataChangeListenerProxy(dataChangeListenerPath);
 
         LOG.debug("Registering for path {}", registerChangeListener.getPath());
 
@@ -684,7 +686,13 @@ public class Shard extends RaftActor {
 
     @Override
     protected void appendRecoveredLogEntry(final Payload data) {
-        if (data instanceof CompositeModificationPayload) {
+        if(data instanceof ModificationPayload) {
+            try {
+                currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
+            } catch (ClassNotFoundException | IOException e) {
+                LOG.error(e, "Error extracting ModificationPayload");
+            }
+        } else if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
         } else if (data instanceof CompositeModificationByteStringPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
@@ -761,7 +769,14 @@ public class Shard extends RaftActor {
     @Override
     protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
 
-        if (data instanceof CompositeModificationPayload) {
+        if(data instanceof ModificationPayload) {
+            try {
+                applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
+            } catch (ClassNotFoundException | IOException e) {
+                LOG.error(e, "Error extracting ModificationPayload");
+            }
+        }
+        else if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
             applyModificationToState(clientActor, identifier, modification);
@@ -769,7 +784,6 @@ public class Shard extends RaftActor {
             Object modification = ((CompositeModificationByteStringPayload) data).getModification();
 
             applyModificationToState(clientActor, identifier, modification);
-
         } else {
             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
                     data, data.getClass().getClassLoader(),
@@ -788,8 +802,7 @@ public class Shard extends RaftActor {
         } else if(clientActor == null) {
             // There's no clientActor to which to send a commit reply so we must be applying
             // replicated state from the leader.
-            commitWithNewTransaction(MutableCompositeModification.fromSerializable(
-                    modification, schemaContext));
+            commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
         } else {
             // This must be the OK to commit after replication consensus.
             finishCommit(clientActor, identifier);
@@ -818,7 +831,7 @@ public class Shard extends RaftActor {
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());