Merge "Bug2268: Deprecate legacy Payload classes"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 7ef6e040a9f3f0d94bf2fdc47790377d505184b3..9cd758ba30fdb94e85cd1703d99a8e0c55a50a17 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
-import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
@@ -25,8 +24,7 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -51,31 +49,29 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
@@ -95,11 +91,12 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
+    private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
 
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
-    public static final String DEFAULT_NAME = "default";
+    @VisibleForTesting
+    static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
     private final InMemoryDOMDataStore store;
@@ -123,8 +120,6 @@ public class Shard extends RaftActor {
 
     private SchemaContext schemaContext;
 
-    private ActorRef createSnapshotTransaction;
-
     private int createSnapshotTransactionCounter;
 
     private final ShardCommitCoordinator commitCoordinator;
@@ -243,9 +238,7 @@ public class Shard extends RaftActor {
             LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
         }
 
-        if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
-            handleReadDataReply(message);
-        } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+        if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
             handleCreateTransaction(message);
         } else if(message instanceof ForwardedReadyTransaction) {
             handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
@@ -327,9 +320,9 @@ public class Shard extends RaftActor {
                 applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
             } else {
                 Shard.this.persistData(getSender(), transactionID,
-                        new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
+                        new ModificationPayload(cohortEntry.getModification()));
             }
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (InterruptedException | ExecutionException | IOException e) {
             LOG.error(e, "An exception occurred while preCommitting transaction {}",
                     cohortEntry.getTransactionID());
             shardMBean.incrementFailedTransactionsCount();
@@ -354,7 +347,7 @@ public class Shard extends RaftActor {
             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
             if(cohortEntry != null) {
                 commitWithNewTransaction(cohortEntry.getModification());
-                sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+                sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
             } else {
                 // This really shouldn't happen - it likely means that persistence or replication
                 // took so long to complete such that the cohort entry was expired from the cache.
@@ -376,7 +369,7 @@ public class Shard extends RaftActor {
             // currently uses a same thread executor anyway.
             cohortEntry.getCohort().commit().get();
 
-            sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+            sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
 
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
@@ -412,7 +405,7 @@ public class Shard extends RaftActor {
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
         ActorRef replyActorPath = self();
-        if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+        if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
             LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
@@ -447,7 +440,7 @@ public class Shard extends RaftActor {
                     shardMBean.incrementAbortTransactionsCount();
 
                     if(sender != null) {
-                        sender.tell(new AbortTransactionReply().toSerializable(), self);
+                        sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
                     }
                 }
 
@@ -476,20 +469,6 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleReadDataReply(final Object message) {
-        // This must be for install snapshot. Don't want to open this up and trigger
-        // deSerialization
-
-        self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
-                self());
-
-        createSnapshotTransaction = null;
-
-        // Send a PoisonPill instead of sending close transaction because we do not really need
-        // a response
-        getSender().tell(PoisonPill.getInstance(), self());
-    }
-
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
         DOMStoreTransactionChain chain =
             transactionChains.remove(closeTransactionChain.getTransactionChainId());
@@ -500,7 +479,8 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
-            ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
+            ShardTransactionIdentifier transactionId, String transactionChainId,
+            short clientVersion ) {
 
         DOMStoreTransactionFactory factory = store;
 
@@ -568,7 +548,7 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
-            String transactionChainId, int clientVersion) {
+            String transactionChainId, short clientVersion) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
@@ -659,7 +639,7 @@ public class Shard extends RaftActor {
         dataChangeListeners.add(dataChangeListenerPath);
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
-                new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+                new DataChangeListenerProxy(dataChangeListenerPath);
 
         LOG.debug("Registering for path {}", registerChangeListener.getPath());
 
@@ -684,7 +664,13 @@ public class Shard extends RaftActor {
 
     @Override
     protected void appendRecoveredLogEntry(final Payload data) {
-        if (data instanceof CompositeModificationPayload) {
+        if(data instanceof ModificationPayload) {
+            try {
+                currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
+            } catch (ClassNotFoundException | IOException e) {
+                LOG.error(e, "Error extracting ModificationPayload");
+            }
+        } else if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
         } else if (data instanceof CompositeModificationByteStringPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
@@ -694,12 +680,12 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void applyRecoverySnapshot(final ByteString snapshot) {
+    protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
         if(recoveryCoordinator == null) {
             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
         }
 
-        recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
+        recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
 
         if(LOG.isDebugEnabled()) {
             LOG.debug("{} : submitted recovery sbapshot", persistenceId());
@@ -761,7 +747,14 @@ public class Shard extends RaftActor {
     @Override
     protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
 
-        if (data instanceof CompositeModificationPayload) {
+        if(data instanceof ModificationPayload) {
+            try {
+                applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
+            } catch (ClassNotFoundException | IOException e) {
+                LOG.error(e, "Error extracting ModificationPayload");
+            }
+        }
+        else if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
             applyModificationToState(clientActor, identifier, modification);
@@ -769,7 +762,6 @@ public class Shard extends RaftActor {
             Object modification = ((CompositeModificationByteStringPayload) data).getModification();
 
             applyModificationToState(clientActor, identifier, modification);
-
         } else {
             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
                     data, data.getClass().getClassLoader(),
@@ -788,8 +780,7 @@ public class Shard extends RaftActor {
         } else if(clientActor == null) {
             // There's no clientActor to which to send a commit reply so we must be applying
             // replicated state from the leader.
-            commitWithNewTransaction(MutableCompositeModification.fromSerializable(
-                    modification, schemaContext));
+            commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
         } else {
             // This must be the OK to commit after replication consensus.
             finishCommit(clientActor, identifier);
@@ -811,24 +802,21 @@ public class Shard extends RaftActor {
 
     @Override
     protected void createSnapshot() {
-        if (createSnapshotTransaction == null) {
+        // Create a transaction actor. We are really going to treat the transaction as a worker
+        // so that this actor does not get block building the snapshot. THe transaction actor will
+        // after processing the CreateSnapshot message.
 
-            // Create a transaction. We are really going to treat the transaction as a worker
-            // so that this actor does not get block building the snapshot
-            createSnapshotTransaction = createTransaction(
+        ActorRef createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
-            createSnapshotTransaction.tell(
-                new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
-
-        }
+        createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
     }
 
     @VisibleForTesting
     @Override
-    protected void applySnapshot(final ByteString snapshot) {
+    protected void applySnapshot(final byte[] snapshotBytes) {
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
@@ -836,17 +824,16 @@ public class Shard extends RaftActor {
         LOG.info("Applying snapshot");
         try {
             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-            NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
-            NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
-                .decode(serializedNode);
+
+            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
 
             // delete everything first
-            transaction.delete(YangInstanceIdentifier.builder().build());
+            transaction.delete(DATASTORE_ROOT);
 
             // Add everything from the remote node back
-            transaction.write(YangInstanceIdentifier.builder().build(), node);
+            transaction.write(DATASTORE_ROOT, node);
             syncCommitTransaction(transaction);
-        } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
+        } catch (InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred when applying snapshot");
         } finally {
             LOG.info("Done applying snapshot");