Added requuired-capabilities to the impl/.../config/default-config.xml and added...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index af16d02eea2909ee75e83e2b62dc58bc633b7031..9cd758ba30fdb94e85cd1703d99a8e0c55a50a17 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
-import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
@@ -25,8 +24,7 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -51,30 +49,29 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
@@ -94,11 +91,12 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
+    private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
 
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
-    public static final String DEFAULT_NAME = "default";
+    @VisibleForTesting
+    static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
     private final InMemoryDOMDataStore store;
@@ -122,8 +120,6 @@ public class Shard extends RaftActor {
 
     private SchemaContext schemaContext;
 
-    private ActorRef createSnapshotTransaction;
-
     private int createSnapshotTransactionCounter;
 
     private final ShardCommitCoordinator commitCoordinator;
@@ -132,7 +128,7 @@ public class Shard extends RaftActor {
 
     private Cancellable txCommitTimeoutCheckSchedule;
 
-    private Optional<ActorRef> roleChangeNotifier;
+    private final Optional<ActorRef> roleChangeNotifier;
 
     /**
      * Coordinates persistence recovery on startup.
@@ -242,9 +238,7 @@ public class Shard extends RaftActor {
             LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
         }
 
-        if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
-            handleReadDataReply(message);
-        } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+        if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
             handleCreateTransaction(message);
         } else if(message instanceof ForwardedReadyTransaction) {
             handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
@@ -320,9 +314,15 @@ public class Shard extends RaftActor {
             // currently uses a same thread executor anyway.
             cohortEntry.getCohort().preCommit().get();
 
-            Shard.this.persistData(getSender(), transactionID,
-                    new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
-        } catch (InterruptedException | ExecutionException e) {
+            // If we do not have any followers and we are not using persistence we can
+            // apply modification to the state immediately
+            if(!hasFollowers() && !persistence().isRecoveryApplicable()){
+                applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
+            } else {
+                Shard.this.persistData(getSender(), transactionID,
+                        new ModificationPayload(cohortEntry.getModification()));
+            }
+        } catch (InterruptedException | ExecutionException | IOException e) {
             LOG.error(e, "An exception occurred while preCommitting transaction {}",
                     cohortEntry.getTransactionID());
             shardMBean.incrementFailedTransactionsCount();
@@ -347,7 +347,7 @@ public class Shard extends RaftActor {
             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
             if(cohortEntry != null) {
                 commitWithNewTransaction(cohortEntry.getModification());
-                sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+                sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
             } else {
                 // This really shouldn't happen - it likely means that persistence or replication
                 // took so long to complete such that the cohort entry was expired from the cache.
@@ -369,7 +369,7 @@ public class Shard extends RaftActor {
             // currently uses a same thread executor anyway.
             cohortEntry.getCohort().commit().get();
 
-            sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+            sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
 
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
@@ -405,7 +405,7 @@ public class Shard extends RaftActor {
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
         ActorRef replyActorPath = self();
-        if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+        if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
             LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
@@ -421,7 +421,7 @@ public class Shard extends RaftActor {
         doAbortTransaction(abort.getTransactionID(), getSender());
     }
 
-    private void doAbortTransaction(final String transactionID, final ActorRef sender) {
+    void doAbortTransaction(final String transactionID, final ActorRef sender) {
         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
             LOG.debug("Aborting transaction {}", transactionID);
@@ -440,7 +440,7 @@ public class Shard extends RaftActor {
                     shardMBean.incrementAbortTransactionsCount();
 
                     if(sender != null) {
-                        sender.tell(new AbortTransactionReply().toSerializable(), self);
+                        sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
                     }
                 }
 
@@ -469,20 +469,6 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleReadDataReply(final Object message) {
-        // This must be for install snapshot. Don't want to open this up and trigger
-        // deSerialization
-
-        self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
-                self());
-
-        createSnapshotTransaction = null;
-
-        // Send a PoisonPill instead of sending close transaction because we do not really need
-        // a response
-        getSender().tell(PoisonPill.getInstance(), self());
-    }
-
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
         DOMStoreTransactionChain chain =
             transactionChains.remove(closeTransactionChain.getTransactionChainId());
@@ -493,7 +479,8 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
-            ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
+            ShardTransactionIdentifier transactionId, String transactionChainId,
+            short clientVersion ) {
 
         DOMStoreTransactionFactory factory = store;
 
@@ -561,7 +548,7 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
-            String transactionChainId, int clientVersion) {
+            String transactionChainId, short clientVersion) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
@@ -652,7 +639,7 @@ public class Shard extends RaftActor {
         dataChangeListeners.add(dataChangeListenerPath);
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
-                new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+                new DataChangeListenerProxy(dataChangeListenerPath);
 
         LOG.debug("Registering for path {}", registerChangeListener.getPath());
 
@@ -677,20 +664,28 @@ public class Shard extends RaftActor {
 
     @Override
     protected void appendRecoveredLogEntry(final Payload data) {
-        if (data instanceof CompositeModificationPayload) {
+        if(data instanceof ModificationPayload) {
+            try {
+                currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
+            } catch (ClassNotFoundException | IOException e) {
+                LOG.error(e, "Error extracting ModificationPayload");
+            }
+        } else if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+        } else if (data instanceof CompositeModificationByteStringPayload) {
+            currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
         } else {
             LOG.error("Unknown state received {} during recovery", data);
         }
     }
 
     @Override
-    protected void applyRecoverySnapshot(final ByteString snapshot) {
+    protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
         if(recoveryCoordinator == null) {
             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
         }
 
-        recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
+        recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
 
         if(LOG.isDebugEnabled()) {
             LOG.debug("{} : submitted recovery sbapshot", persistenceId());
@@ -752,22 +747,21 @@ public class Shard extends RaftActor {
     @Override
     protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
 
-        if (data instanceof CompositeModificationPayload) {
+        if(data instanceof ModificationPayload) {
+            try {
+                applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
+            } catch (ClassNotFoundException | IOException e) {
+                LOG.error(e, "Error extracting ModificationPayload");
+            }
+        }
+        else if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
-            if(modification == null) {
-                LOG.error(
-                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                     identifier, clientActor != null ? clientActor.path().toString() : null);
-            } else if(clientActor == null) {
-                // There's no clientActor to which to send a commit reply so we must be applying
-                // replicated state from the leader.
-                commitWithNewTransaction(MutableCompositeModification.fromSerializable(
-                        modification, schemaContext));
-            } else {
-                // This must be the OK to commit after replication consensus.
-                finishCommit(clientActor, identifier);
-            }
+            applyModificationToState(clientActor, identifier, modification);
+        } else if(data instanceof CompositeModificationByteStringPayload ){
+            Object modification = ((CompositeModificationByteStringPayload) data).getModification();
+
+            applyModificationToState(clientActor, identifier, modification);
         } else {
             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
                     data, data.getClass().getClassLoader(),
@@ -778,6 +772,21 @@ public class Shard extends RaftActor {
 
     }
 
+    private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+        if(modification == null) {
+            LOG.error(
+                    "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+                    identifier, clientActor != null ? clientActor.path().toString() : null);
+        } else if(clientActor == null) {
+            // There's no clientActor to which to send a commit reply so we must be applying
+            // replicated state from the leader.
+            commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
+        } else {
+            // This must be the OK to commit after replication consensus.
+            finishCommit(clientActor, identifier);
+        }
+    }
+
     private void updateJournalStats() {
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
 
@@ -788,29 +797,26 @@ public class Shard extends RaftActor {
 
         shardMBean.setCommitIndex(getCommitIndex());
         shardMBean.setLastApplied(getLastApplied());
-        shardMBean.setDataSize(getRaftActorContext().getReplicatedLog().dataSize());
+        shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
     }
 
     @Override
     protected void createSnapshot() {
-        if (createSnapshotTransaction == null) {
+        // Create a transaction actor. We are really going to treat the transaction as a worker
+        // so that this actor does not get block building the snapshot. THe transaction actor will
+        // after processing the CreateSnapshot message.
 
-            // Create a transaction. We are really going to treat the transaction as a worker
-            // so that this actor does not get block building the snapshot
-            createSnapshotTransaction = createTransaction(
+        ActorRef createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                CreateTransaction.CURRENT_VERSION);
-
-            createSnapshotTransaction.tell(
-                new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
+                DataStoreVersions.CURRENT_VERSION);
 
-        }
+        createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
     }
 
     @VisibleForTesting
     @Override
-    protected void applySnapshot(final ByteString snapshot) {
+    protected void applySnapshot(final byte[] snapshotBytes) {
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
@@ -818,17 +824,16 @@ public class Shard extends RaftActor {
         LOG.info("Applying snapshot");
         try {
             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-            NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
-            NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
-                .decode(serializedNode);
+
+            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
 
             // delete everything first
-            transaction.delete(YangInstanceIdentifier.builder().build());
+            transaction.delete(DATASTORE_ROOT);
 
             // Add everything from the remote node back
-            transaction.write(YangInstanceIdentifier.builder().build(), node);
+            transaction.write(DATASTORE_ROOT, node);
             syncCommitTransaction(transaction);
-        } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
+        } catch (InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred when applying snapshot");
         } finally {
             LOG.info("Done applying snapshot");