Bug 1831 Batch messages on journal recovery
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 0737d2020bcde75125eafebbfe963fc19f2258f7..ddb5989f096145cddd3b716afa8dfc1dce3311e3 100644 (file)
@@ -20,6 +20,7 @@ import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -35,7 +36,6 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
@@ -48,11 +48,11 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.raft.ConfigParams;
-import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
@@ -67,14 +67,12 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 /**
  * A Shard represents a portion of the logical data tree <br/>
@@ -84,8 +82,6 @@ import java.util.concurrent.TimeUnit;
  */
 public class Shard extends RaftActor {
 
-    private static final ConfigParams configParams = new ShardConfigParams();
-
     public static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
@@ -114,11 +110,18 @@ public class Shard extends RaftActor {
 
     private ActorRef createSnapshotTransaction;
 
+    /**
+     * Coordinates persistence recovery on startup.
+     */
+    private ShardRecoveryCoordinator recoveryCoordinator;
+    private List<Object> currentLogRecoveryBatch;
+
     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
 
-    private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
+    protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
             DatastoreContext datastoreContext, SchemaContext schemaContext) {
-        super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
+        super(name.toString(), mapPeerAddresses(peerAddresses),
+                Optional.of(datastoreContext.getShardRaftConfig()));
 
         this.name = name;
         this.datastoreContext = datastoreContext;
@@ -171,8 +174,11 @@ public class Shard extends RaftActor {
     }
 
     @Override public void onReceiveRecover(Object message) {
-        LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
-            getSender());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("onReceiveRecover: Received message {} from {}",
+                message.getClass().toString(),
+                getSender());
+        }
 
         if (message instanceof RecoveryFailure){
             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
@@ -182,8 +188,11 @@ public class Shard extends RaftActor {
     }
 
     @Override public void onReceiveCommand(Object message) {
-        LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
-            getSender());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("onReceiveCommand: Received message {} from {}",
+                message.getClass().toString(),
+                getSender());
+        }
 
         if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
             // This must be for install snapshot. Don't want to open this up and trigger
@@ -192,6 +201,7 @@ public class Shard extends RaftActor {
                 .tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
                     self());
 
+            createSnapshotTransaction = null;
             // Send a PoisonPill instead of sending close transaction because we do not really need
             // a response
             getSender().tell(PoisonPill.getInstance(), self());
@@ -297,7 +307,9 @@ public class Shard extends RaftActor {
             ShardTransactionIdentifier.builder()
                 .remoteTransactionId(remoteTransactionId)
                 .build();
-        LOG.debug("Creating transaction : {} ", transactionId);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Creating transaction : {} ", transactionId);
+        }
         ActorRef transactionActor =
             createTypedTransactionActor(transactionType, transactionId, transactionChainId);
 
@@ -324,40 +336,22 @@ public class Shard extends RaftActor {
         DOMStoreThreePhaseCommitCohort cohort =
             modificationToCohort.remove(serialized);
         if (cohort == null) {
-            LOG.debug(
-                "Could not find cohort for modification : {}. Writing modification using a new transaction",
-                modification);
-            DOMStoreWriteTransaction transaction =
-                store.newWriteOnlyTransaction();
-
-            LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
-
-            modification.apply(transaction);
-            try {
-                syncCommitTransaction(transaction);
-            } catch (InterruptedException | ExecutionException e) {
-                shardMBean.incrementFailedTransactionsCount();
-                LOG.error("Failed to commit", e);
-                return;
-            }
-            //we want to just apply the recovery commit and return
-            shardMBean.incrementCommittedTransactionCount();
+            // If there's no cached cohort then we must be applying replicated state.
+            commitWithNewTransaction(serialized);
             return;
         }
 
-
-        if(sender == null){
+        if(sender == null) {
             LOG.error("Commit failed. Sender cannot be null");
             return;
         }
 
-        final ListenableFuture<Void> future = cohort.commit();
-        final ActorRef self = getSelf();
+        ListenableFuture<Void> future = cohort.commit();
 
         Futures.addCallback(future, new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void v) {
-                sender.tell(new CommitTransactionReply().toSerializable(), self);
+                sender.tell(new CommitTransactionReply().toSerializable(), getSelf());
                 shardMBean.incrementCommittedTransactionCount();
                 shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
             }
@@ -366,12 +360,24 @@ public class Shard extends RaftActor {
             public void onFailure(Throwable t) {
                 LOG.error(t, "An exception happened during commit");
                 shardMBean.incrementFailedTransactionsCount();
-                sender.tell(new akka.actor.Status.Failure(t), self);
+                sender.tell(new akka.actor.Status.Failure(t), getSelf());
             }
         });
 
     }
 
+    private void commitWithNewTransaction(Object modification) {
+        DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
+        MutableCompositeModification.fromSerializable(modification, schemaContext).apply(tx);
+        try {
+            syncCommitTransaction(tx);
+            shardMBean.incrementCommittedTransactionCount();
+        } catch (InterruptedException | ExecutionException e) {
+            shardMBean.incrementFailedTransactionsCount();
+            LOG.error(e, "Failed to commit");
+        }
+    }
+
     private void handleForwardedCommit(ForwardedCommitTransaction message) {
         Object serializedModification =
             message.getModification().toSerializable();
@@ -400,8 +406,10 @@ public class Shard extends RaftActor {
     private void registerChangeListener(
         RegisterChangeListener registerChangeListener) {
 
-        LOG.debug("registerDataChangeListener for {}", registerChangeListener
-            .getPath());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("registerDataChangeListener for {}", registerChangeListener
+                .getPath());
+        }
 
 
         ActorSelection dataChangeListenerPath = getContext()
@@ -429,48 +437,118 @@ public class Shard extends RaftActor {
             getContext().actorOf(
                 DataChangeListenerRegistration.props(registration));
 
-        LOG.debug(
-            "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
-            , listenerRegistration.path().toString());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(
+                "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
+                , listenerRegistration.path().toString());
+        }
 
         getSender()
             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
                 getSelf());
     }
 
-    private void createTransactionChain() {
-        DOMStoreTransactionChain chain = store.createTransactionChain();
-        ActorRef transactionChain = getContext().actorOf(
-                ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
-        getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
-                getSelf());
-    }
-
     private boolean isMetricsCaptureEnabled(){
         CommonConfig config = new CommonConfig(getContext().system().settings().config());
         return config.isMetricCaptureEnabled();
     }
 
-    @Override protected void applyState(ActorRef clientActor, String identifier,
-        Object data) {
+    @Override
+    protected
+    void startLogRecoveryBatch(int maxBatchSize) {
+        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
+        }
+    }
+
+    @Override
+    protected void appendRecoveredLogEntry(Payload data) {
+        if (data instanceof CompositeModificationPayload) {
+            currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+        } else {
+            LOG.error("Unknown state received {} during recovery", data);
+        }
+    }
+
+    @Override
+    protected void applyRecoverySnapshot(ByteString snapshot) {
+        if(recoveryCoordinator == null) {
+            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+        }
+
+        recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{} : submitted recovery sbapshot", persistenceId());
+        }
+    }
+
+    @Override
+    protected void applyCurrentLogRecoveryBatch() {
+        if(recoveryCoordinator == null) {
+            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+        }
+
+        recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
+                    currentLogRecoveryBatch.size());
+        }
+    }
+
+    @Override
+    protected void onRecoveryComplete() {
+        if(recoveryCoordinator != null) {
+            Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
+            }
+
+            for(DOMStoreWriteTransaction tx: txList) {
+                try {
+                    syncCommitTransaction(tx);
+                    shardMBean.incrementCommittedTransactionCount();
+                } catch (InterruptedException | ExecutionException e) {
+                    shardMBean.incrementFailedTransactionsCount();
+                    LOG.error(e, "Failed to commit");
+                }
+            }
+        }
+
+        recoveryCoordinator = null;
+        currentLogRecoveryBatch = null;
+        updateJournalStats();
+    }
+
+    @Override
+    protected void applyState(ActorRef clientActor, String identifier, Object data) {
 
         if (data instanceof CompositeModificationPayload) {
-            Object modification =
-                ((CompositeModificationPayload) data).getModification();
+            Object modification = ((CompositeModificationPayload) data).getModification();
 
             if (modification != null) {
                 commit(clientActor, modification);
             } else {
                 LOG.error(
                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                    identifier, clientActor.path().toString());
+                    identifier, clientActor != null ? clientActor.path().toString() : null);
             }
 
         } else {
-            LOG.error("Unknown state received {}", data);
+            LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+                    data, data.getClass().getClassLoader(),
+                    CompositeModificationPayload.class.getClassLoader());
         }
 
-        // Update stats
+        updateJournalStats();
+
+    }
+
+    private void updateJournalStats() {
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
 
         if (lastLogEntry != null) {
@@ -480,10 +558,10 @@ public class Shard extends RaftActor {
 
         shardMBean.setCommitIndex(getCommitIndex());
         shardMBean.setLastApplied(getLastApplied());
-
     }
 
-    @Override protected void createSnapshot() {
+    @Override
+    protected void createSnapshot() {
         if (createSnapshotTransaction == null) {
 
             // Create a transaction. We are really going to treat the transaction as a worker
@@ -498,10 +576,14 @@ public class Shard extends RaftActor {
         }
     }
 
-    @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
+    @VisibleForTesting
+    @Override
+    protected void applySnapshot(ByteString snapshot) {
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
+
+        LOG.info("Applying snapshot");
         try {
             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
             NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
@@ -516,6 +598,8 @@ public class Shard extends RaftActor {
             syncCommitTransaction(transaction);
         } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred when applying snapshot");
+        } finally {
+            LOG.info("Done applying snapshot");
         }
     }
 
@@ -525,17 +609,17 @@ public class Shard extends RaftActor {
                 .tell(new EnableNotification(isLeader()), getSelf());
         }
 
-        if (getLeaderId() != null) {
-            shardMBean.setLeader(getLeaderId());
-        }
-
         shardMBean.setRaftState(getRaftState().name());
         shardMBean.setCurrentTerm(getCurrentTerm());
 
         // If this actor is no longer the leader close all the transaction chains
         if(!isLeader()){
             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
-                LOG.debug("onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", entry.getKey(), getId());
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug(
+                        "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
+                        entry.getKey(), getId());
+                }
                 entry.getValue().close();
             }
 
@@ -543,18 +627,12 @@ public class Shard extends RaftActor {
         }
     }
 
-    @Override public String persistenceId() {
-        return this.name.toString();
+    @Override protected void onLeaderChanged(String oldLeader, String newLeader) {
+        shardMBean.setLeader(newLeader);
     }
 
-
-    private static class ShardConfigParams extends DefaultConfigParamsImpl {
-        public static final FiniteDuration HEART_BEAT_INTERVAL =
-            new FiniteDuration(500, TimeUnit.MILLISECONDS);
-
-        @Override public FiniteDuration getHeartBeatInterval() {
-            return HEART_BEAT_INTERVAL;
-        }
+    @Override public String persistenceId() {
+        return this.name.toString();
     }
 
     private static class ShardCreator implements Creator<Shard> {
@@ -580,20 +658,24 @@ public class Shard extends RaftActor {
         }
     }
 
-    @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
+    @VisibleForTesting
+    NormalizedNode<?,?> readStore(YangInstanceIdentifier id)
+            throws ExecutionException, InterruptedException {
         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
 
         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
-            transaction.read(YangInstanceIdentifier.builder().build());
+            transaction.read(id);
 
-        NormalizedNode<?, ?> node = future.get().get();
+        Optional<NormalizedNode<?, ?>> optional = future.get();
+        NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
 
         transaction.close();
 
         return node;
     }
 
-    @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
+    @VisibleForTesting
+    void writeToStore(YangInstanceIdentifier id, NormalizedNode<?,?> node)
         throws ExecutionException, InterruptedException {
         DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
 
@@ -602,4 +684,8 @@ public class Shard extends RaftActor {
         syncCommitTransaction(transaction);
     }
 
+    @VisibleForTesting
+    ShardStats getShardMBean() {
+        return shardMBean;
+    }
 }