Bug 1831 Batch messages on journal recovery 36/11136/11
authortpantelis <tpanteli@brocade.com>
Sun, 31 Aug 2014 15:15:18 +0000 (11:15 -0400)
committertpantelis <tpanteli@brocade.com>
Thu, 18 Sep 2014 14:25:34 +0000 (10:25 -0400)
Added journal log recovery batching support in RaftActor along with
additonal abstract methods for granular recovery control.

It turns out that batching helps a little but the real performance hog
was deserialization. There was a couple inefficient PreConditions in the
NormalizedNodeSerializer (unwanted String concats in the fast path) that
accounted for significant performance degradation. I also made a few
other minor performance enhancements.

Although deserialization is much better with the fixes, I also
implemented some parallelization during shard recovery. I added a
ShardRecoveryCoordinator class that parallelizes deserialization
of journal log enry batches and snapshots for faster recovery time.
The resulting transactions are still committed to the data store in the
order the corresponding snapshot or log batch are received to preserve
data store integrity.

The journal recovery log batch size is configurable vai the config XML.
I also made the shard heartbeat interval and shard snapshot batch count
configurable.
`
Change-Id: I52ef1690bfb6cc486c329ee60f77c52720c24469
Signed-off-by: tpantelis <tpanteli@brocade.com>
24 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/PathArgumentSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueType.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf

index 3bfdf732cf01cd3d3898158bc4b8e62585e5a9f5..04df7785ad8d27c69e77f9e0cd85a5cbb64208b0 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.cluster.example;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
+
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
+
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
@@ -165,4 +167,24 @@ public class ExampleActor extends RaftActor {
     @Override public String persistenceId() {
         return getId();
     }
+
+    @Override
+    protected void startLogRecoveryBatch(int maxBatchSize) {
+    }
+
+    @Override
+    protected void appendRecoveredLogEntry(Payload data) {
+    }
+
+    @Override
+    protected void applyCurrentLogRecoveryBatch() {
+    }
+
+    @Override
+    protected void onRecoveryComplete() {
+    }
+
+    @Override
+    protected void applyRecoverySnapshot(ByteString snapshot) {
+    }
 }
index b436bce50061f98d0362ce3df4336c4279977d63..2be4a0c36f91bb41be4d21f79f928ef1c0bfe71f 100644 (file)
@@ -18,13 +18,14 @@ import java.util.List;
  */
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
-    protected List<ReplicatedLogEntry> journal;
+    // We define this as ArrayList so we can use ensureCapacity.
+    protected ArrayList<ReplicatedLogEntry> journal;
     protected ByteString snapshot;
     protected long snapshotIndex = -1;
     protected long snapshotTerm = -1;
 
     // to be used for rollback during save snapshot failure
-    protected List<ReplicatedLogEntry> snapshottedJournal;
+    protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
     protected ByteString previousSnapshot;
     protected long previousSnapshotIndex = -1;
     protected long previousSnapshotTerm = -1;
@@ -106,6 +107,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         journal.add(replicatedLogEntry);
     }
 
+    @Override
+    public void increaseJournalLogCapacity(int amount) {
+        journal.ensureCapacity(journal.size() + amount);
+    }
+
     @Override
     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
         return getFrom(logEntryIndex, journal.size());
@@ -208,7 +214,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public void snapshotCommit() {
-        snapshottedJournal.clear();
         snapshottedJournal = null;
         previousSnapshotIndex = -1;
         previousSnapshotTerm = -1;
@@ -218,7 +223,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     @Override
     public void snapshotRollback() {
         snapshottedJournal.addAll(journal);
-        journal.clear();
         journal = snapshottedJournal;
         snapshottedJournal = null;
 
index ed6439d8c33bceb545927fd6c2f892665b160f8a..bff2a2779733761bff220e7e1223e85c46d18a73 100644 (file)
@@ -26,7 +26,7 @@ public interface ConfigParams {
      *
      * @return long
      */
-    public long getSnapshotBatchCount();
+    long getSnapshotBatchCount();
 
     /**
      * The interval at which a heart beat message will be sent to the remote
@@ -34,7 +34,7 @@ public interface ConfigParams {
      *
      * @return FiniteDuration
      */
-    public FiniteDuration getHeartBeatInterval();
+    FiniteDuration getHeartBeatInterval();
 
     /**
      * The interval in which a new election would get triggered if no leader is found
@@ -43,7 +43,7 @@ public interface ConfigParams {
      *
      * @return FiniteDuration
      */
-    public FiniteDuration getElectionTimeOutInterval();
+    FiniteDuration getElectionTimeOutInterval();
 
     /**
      * The maximum election time variance. The election is scheduled using both
@@ -51,10 +51,15 @@ public interface ConfigParams {
      *
      * @return int
      */
-    public int getElectionTimeVariance();
+    int getElectionTimeVariance();
 
     /**
      * The size (in bytes) of the snapshot chunk sent from Leader
      */
-    public int getSnapshotChunkSize();
+    int getSnapshotChunkSize();
+
+    /**
+     * The number of journal log entries to batch on recovery before applying.
+     */
+    int getJournalRecoveryLogBatchSize();
 }
index 9d06f6360473097beefbbce34962d7433f447f88..dc4145358aa332febafa690baa549fbfccfa91b2 100644 (file)
@@ -20,12 +20,14 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     private static final int SNAPSHOT_BATCH_COUNT = 20000;
 
+    private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000;
+
     /**
      * The maximum election time variance
      */
     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
 
-    private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
+    private static final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
 
 
     /**
@@ -39,17 +41,32 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         new FiniteDuration(100, TimeUnit.MILLISECONDS);
 
 
+    private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
+    private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
+    private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
+
+    public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
+        this.heartBeatInterval = heartBeatInterval;
+    }
+
+    public void setSnapshotBatchCount(long snapshotBatchCount) {
+        this.snapshotBatchCount = snapshotBatchCount;
+    }
+
+    public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
+        this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
+    }
+
     @Override
     public long getSnapshotBatchCount() {
-        return SNAPSHOT_BATCH_COUNT;
+        return snapshotBatchCount;
     }
 
     @Override
     public FiniteDuration getHeartBeatInterval() {
-        return HEART_BEAT_INTERVAL;
+        return heartBeatInterval;
     }
 
-
     @Override
     public FiniteDuration getElectionTimeOutInterval() {
         // returns 2 times the heart beat interval
@@ -65,4 +82,9 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     public int getSnapshotChunkSize() {
         return SNAPSHOT_CHUNK_SIZE;
     }
+
+    @Override
+    public int getJournalRecoveryLogBatchSize() {
+        return journalRecoveryLogBatchSize;
+    }
 }
index 0a4e2170e527e0dd74cae73802ae7893eba436be..64fa7496042466e58bd51cf0a488c265898866da 100644 (file)
@@ -20,6 +20,7 @@ import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
 import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
@@ -39,7 +40,6 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
 import java.io.Serializable;
 import java.util.Map;
 
@@ -97,7 +97,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
      */
-    protected RaftActorContext context;
+    private final RaftActorContext context;
 
     /**
      * The in-memory journal
@@ -108,6 +108,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     private volatile boolean hasSnapshotCaptureInitiated = false;
 
+    private Stopwatch recoveryTimer;
+
+    private int currentRecoveryBatchCount;
+
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
     }
@@ -122,71 +126,134 @@ public abstract class RaftActor extends UntypedPersistentActor {
             LOG);
     }
 
-    @Override public void onReceiveRecover(Object message) {
+    private void initRecoveryTimer() {
+        if(recoveryTimer == null) {
+            recoveryTimer = new Stopwatch();
+            recoveryTimer.start();
+        }
+    }
+
+    @Override
+    public void preStart() throws Exception {
+        LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
+                context.getConfigParams().getJournalRecoveryLogBatchSize());
+        super.preStart();
+    }
+
+    @Override
+    public void onReceiveRecover(Object message) {
         if (message instanceof SnapshotOffer) {
-            LOG.info("SnapshotOffer called..");
-            SnapshotOffer offer = (SnapshotOffer) message;
-            Snapshot snapshot = (Snapshot) offer.snapshot();
+            onRecoveredSnapshot((SnapshotOffer)message);
+        } else if (message instanceof ReplicatedLogEntry) {
+            onRecoveredJournalLogEntry((ReplicatedLogEntry)message);
+        } else if (message instanceof ApplyLogEntries) {
+            onRecoveredApplyLogEntries((ApplyLogEntries)message);
+        } else if (message instanceof DeleteEntries) {
+            replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+        } else if (message instanceof UpdateElectionTerm) {
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                    ((UpdateElectionTerm) message).getVotedFor());
+        } else if (message instanceof RecoveryCompleted) {
+            onRecoveryCompletedMessage();
+        }
+    }
 
-            // Create a replicated log with the snapshot information
-            // The replicated log can be used later on to retrieve this snapshot
-            // when we need to install it on a peer
-            replicatedLog = new ReplicatedLogImpl(snapshot);
+    private void onRecoveredSnapshot(SnapshotOffer offer) {
+        LOG.debug("SnapshotOffer called..");
 
-            context.setReplicatedLog(replicatedLog);
-            context.setLastApplied(snapshot.getLastAppliedIndex());
-            context.setCommitIndex(snapshot.getLastAppliedIndex());
+        initRecoveryTimer();
 
-            LOG.info("Applied snapshot to replicatedLog. " +
-                    "snapshotIndex={}, snapshotTerm={}, journal-size={}",
-                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
-                replicatedLog.size()
-            );
+        Snapshot snapshot = (Snapshot) offer.snapshot();
 
-            // Apply the snapshot to the actors state
-            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+        // Create a replicated log with the snapshot information
+        // The replicated log can be used later on to retrieve this snapshot
+        // when we need to install it on a peer
+        replicatedLog = new ReplicatedLogImpl(snapshot);
 
-        } else if (message instanceof ReplicatedLogEntry) {
-            ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex());
-            }
-            replicatedLog.append(logEntry);
+        context.setReplicatedLog(replicatedLog);
+        context.setLastApplied(snapshot.getLastAppliedIndex());
+        context.setCommitIndex(snapshot.getLastAppliedIndex());
 
-        } else if (message instanceof ApplyLogEntries) {
-            ApplyLogEntries ale = (ApplyLogEntries) message;
+        Stopwatch timer = new Stopwatch();
+        timer.start();
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Received ApplyLogEntries for recovery, applying to state:{} to {}",
+        // Apply the snapshot to the actors state
+        applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+        timer.stop();
+        LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
+                replicatedLog.size(), persistenceId(), timer.toString(),
+                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+    }
+
+    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+        }
+
+        replicatedLog.append(logEntry);
+    }
+
+    private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
                     context.getLastApplied() + 1, ale.getToIndex());
-            }
+        }
 
-            for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
-                applyState(null, "recovery", replicatedLog.get(i).getData());
-            }
-            context.setLastApplied(ale.getToIndex());
-            context.setCommitIndex(ale.getToIndex());
+        for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+            batchRecoveredLogEntry(replicatedLog.get(i));
+        }
 
-        } else if (message instanceof DeleteEntries) {
-            replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+        context.setLastApplied(ale.getToIndex());
+        context.setCommitIndex(ale.getToIndex());
+    }
 
-        } else if (message instanceof UpdateElectionTerm) {
-            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
-                ((UpdateElectionTerm) message).getVotedFor());
+    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+        initRecoveryTimer();
 
-        } else if (message instanceof RecoveryCompleted) {
-            LOG.info(
-                "RecoveryCompleted - Switching actor to Follower - " +
-                    "Persistence Id =  " + persistenceId() +
-                    " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
-                    "journal-size={}",
-                replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-                replicatedLog.snapshotTerm, replicatedLog.size());
-            currentBehavior = switchBehavior(RaftState.Follower);
-            onStateChanged();
+        int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
+        if(currentRecoveryBatchCount == 0) {
+            startLogRecoveryBatch(batchSize);
+        }
+
+        appendRecoveredLogEntry(logEntry.getData());
+
+        if(++currentRecoveryBatchCount >= batchSize) {
+            endCurrentLogRecoveryBatch();
         }
     }
 
+    private void endCurrentLogRecoveryBatch() {
+        applyCurrentLogRecoveryBatch();
+        currentRecoveryBatchCount = 0;
+    }
+
+    private void onRecoveryCompletedMessage() {
+        if(currentRecoveryBatchCount > 0) {
+            endCurrentLogRecoveryBatch();
+        }
+
+        onRecoveryComplete();
+
+        String recoveryTime = "";
+        if(recoveryTimer != null) {
+            recoveryTimer.stop();
+            recoveryTime = " in " + recoveryTimer.toString();
+            recoveryTimer = null;
+        }
+
+        LOG.info(
+            "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
+                "Persistence Id =  " + persistenceId() +
+                " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
+                "journal-size={}",
+            replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+            replicatedLog.snapshotTerm, replicatedLog.size());
+
+        currentBehavior = switchBehavior(RaftState.Follower);
+        onStateChanged();
+    }
+
     @Override public void onReceiveCommand(Object message) {
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
@@ -397,6 +464,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
         return context.getLastApplied();
     }
 
+    protected RaftActorContext getRaftActorContext() {
+        return context;
+    }
+
     /**
      * setPeerAddress sets the address of a known peer at a later time.
      * <p>
@@ -437,6 +508,38 @@ public abstract class RaftActor extends UntypedPersistentActor {
     protected abstract void applyState(ActorRef clientActor, String identifier,
         Object data);
 
+    /**
+     * This method is called during recovery at the start of a batch of state entries. Derived
+     * classes should perform any initialization needed to start a batch.
+     */
+    protected abstract void startLogRecoveryBatch(int maxBatchSize);
+
+    /**
+     * This method is called during recovery to append state data to the current batch. This method
+     * is called 1 or more times after {@link #startRecoveryStateBatch}.
+     *
+     * @param data the state data
+     */
+    protected abstract void appendRecoveredLogEntry(Payload data);
+
+    /**
+     * This method is called during recovery to reconstruct the state of the actor.
+     *
+     * @param snapshot A snapshot of the state of the actor
+     */
+    protected abstract void applyRecoverySnapshot(ByteString snapshot);
+
+    /**
+     * This method is called during recovery at the end of a batch to apply the current batched
+     * log entries. This method is called after {@link #appendRecoveryLogEntry}.
+     */
+    protected abstract void applyCurrentLogRecoveryBatch();
+
+    /**
+     * This method is called when recovery is complete.
+     */
+    protected abstract void onRecoveryComplete();
+
     /**
      * This method will be called by the RaftActor when a snapshot needs to be
      * created. The derived actor should respond with its current state.
@@ -449,10 +552,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
     protected abstract void createSnapshot();
 
     /**
-     * This method will be called by the RaftActor during recovery to
-     * reconstruct the state of the actor.
-     * <p/>
-     * This method may also be called at any other point during normal
+     * This method can be called at any other point during normal
      * operations when the derived actor is out of sync with it's peers
      * and the only way to bring it in sync is by applying a snapshot
      *
@@ -609,6 +709,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             // of a single command.
             persist(replicatedLogEntry,
                 new Procedure<ReplicatedLogEntry>() {
+                    @Override
                     public void apply(ReplicatedLogEntry evt) throws Exception {
                         // when a snaphsot is being taken, captureSnapshot != null
                         if (hasSnapshotCaptureInitiated == false &&
@@ -673,10 +774,12 @@ public abstract class RaftActor extends UntypedPersistentActor {
         private long currentTerm = 0;
         private String votedFor = null;
 
+        @Override
         public long getCurrentTerm() {
             return currentTerm;
         }
 
+        @Override
         public String getVotedFor() {
             return votedFor;
         }
index 25da37105cea18e46270a4baccfdae9b459a2500..e4aef0a8445d2400e54fff5c1d09c9044747134f 100644 (file)
@@ -59,26 +59,32 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.LOG = logger;
     }
 
+    @Override
     public ActorRef actorOf(Props props){
         return context.actorOf(props);
     }
 
+    @Override
     public ActorSelection actorSelection(String path){
         return context.actorSelection(path);
     }
 
+    @Override
     public String getId() {
         return id;
     }
 
+    @Override
     public ActorRef getActor() {
         return actor;
     }
 
+    @Override
     public ElectionTerm getTermInformation() {
         return termInformation;
     }
 
+    @Override
     public long getCommitIndex() {
         return commitIndex;
     }
@@ -87,6 +93,7 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.commitIndex = commitIndex;
     }
 
+    @Override
     public long getLastApplied() {
         return lastApplied;
     }
index c17f5448c6e256a97c4f7134959bb6c2d88a0971..85893333c2892eadf43e9aacc374f574c1b4a03a 100644 (file)
@@ -74,6 +74,13 @@ public interface ReplicatedLog {
      */
     void append(ReplicatedLogEntry replicatedLogEntry);
 
+    /**
+     * Optimization method to increase the capacity of the journal log prior to appending entries.
+     *
+     * @param amount the amount to increase by
+     */
+    void increaseJournalLogCapacity(int amount);
+
     /**
      *
      * @param replicatedLogEntry
index 998c198756d191df038de6f6cc75a8091fd0d1f1..22f374319cfdbca12115fad320949c7b277a45a5 100644 (file)
@@ -4,19 +4,22 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Terminated;
 import akka.event.Logging;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
 import org.junit.After;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
-
+import scala.concurrent.duration.FiniteDuration;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -27,9 +30,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.TestCase.assertEquals;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
 
 public class RaftActorTest extends AbstractActorTest {
 
@@ -42,58 +45,90 @@ public class RaftActorTest extends AbstractActorTest {
 
     public static class MockRaftActor extends RaftActor {
 
-        private boolean applySnapshotCalled = false;
-        private List<Object> state;
+        public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
+            private final Map<String, String> peerAddresses;
+            private final String id;
+            private final Optional<ConfigParams> config;
 
-        public MockRaftActor(String id,
-            Map<String, String> peerAddresses) {
-            super(id, peerAddresses);
-            state = new ArrayList<>();
+            private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
+                    Optional<ConfigParams> config) {
+                this.peerAddresses = peerAddresses;
+                this.id = id;
+                this.config = config;
+            }
+
+            @Override
+            public MockRaftActor create() throws Exception {
+                return new MockRaftActor(id, peerAddresses, config);
+            }
         }
 
-        public RaftActorContext getRaftActorContext() {
-            return context;
+        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+        private final List<Object> state;
+
+        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config) {
+            super(id, peerAddresses, config);
+            state = new ArrayList<>();
         }
 
-        public boolean isApplySnapshotCalled() {
-            return applySnapshotCalled;
+        public void waitForRecoveryComplete() {
+            try {
+                assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
         }
 
         public List<Object> getState() {
             return state;
         }
 
-        public static Props props(final String id, final Map<String, String> peerAddresses){
-            return Props.create(new Creator<MockRaftActor>(){
-
-                @Override public MockRaftActor create() throws Exception {
-                    return new MockRaftActor(id, peerAddresses);
-                }
-            });
+        public static Props props(final String id, final Map<String, String> peerAddresses,
+                Optional<ConfigParams> config){
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config));
         }
 
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+        }
+
+        @Override
+        protected void startLogRecoveryBatch(int maxBatchSize) {
+        }
+
+        @Override
+        protected void appendRecoveredLogEntry(Payload data) {
             state.add(data);
         }
 
-        @Override protected void createSnapshot() {
-            throw new UnsupportedOperationException("createSnapshot");
+        @Override
+        protected void applyCurrentLogRecoveryBatch() {
         }
 
-        @Override protected void applySnapshot(ByteString snapshot) {
-            applySnapshotCalled = true;
+        @Override
+        protected void onRecoveryComplete() {
+            recoveryComplete.countDown();
+        }
+
+        @Override
+        protected void applyRecoverySnapshot(ByteString snapshot) {
             try {
                 Object data = toObject(snapshot);
+                System.out.println("!!!!!applyRecoverySnapshot: "+data);
                 if (data instanceof List) {
                     state.addAll((List) data);
                 }
-            } catch (ClassNotFoundException e) {
-                e.printStackTrace();
-            } catch (IOException e) {
+            } catch (Exception e) {
                 e.printStackTrace();
             }
         }
 
+        @Override protected void createSnapshot() {
+            throw new UnsupportedOperationException("createSnapshot");
+        }
+
+        @Override protected void applySnapshot(ByteString snapshot) {
+        }
+
         @Override protected void onStateChanged() {
         }
 
@@ -130,9 +165,8 @@ public class RaftActorTest extends AbstractActorTest {
         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
             super(actorSystem);
 
-            raftActor = this.getSystem()
-                .actorOf(MockRaftActor.props(actorName,
-                    Collections.EMPTY_MAP), actorName);
+            raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
+                    Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
 
         }
 
@@ -142,6 +176,7 @@ public class RaftActorTest extends AbstractActorTest {
             return
                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
                 ) {
+                    @Override
                     protected Boolean run() {
                         return true;
                     }
@@ -153,37 +188,15 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         public void findLeader(final String expectedLeader){
+            raftActor.tell(new FindLeader(), getRef());
 
-
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    raftActor.tell(new FindLeader(), getRef());
-
-                    String s = new ExpectMsg<String>(duration("1 seconds"),
-                        "findLeader") {
-                        // do not put code outside this method, will run afterwards
-                        protected String match(Object in) {
-                            if (in instanceof FindLeaderReply) {
-                                return ((FindLeaderReply) in).getLeaderActor();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get();// this extracts the received message
-
-                    assertEquals(expectedLeader, s);
-
-                }
-
-
-            };
+            FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+            assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor());
         }
 
         public ActorRef getRaftActor() {
             return raftActor;
         }
-
     }
 
 
@@ -201,89 +214,84 @@ public class RaftActorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRaftActorRecovery() {
+    public void testRaftActorRecovery() throws Exception {
         new JavaTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    String persistenceId = "follower10";
-
-                    ActorRef followerActor = getSystem().actorOf(
-                        MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
-
-                    List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
-                    ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
-                    snapshotUnappliedEntries.add(entry1);
-
-                    int lastAppliedDuringSnapshotCapture = 3;
-                    int lastIndexDuringSnapshotCapture = 4;
-
-                    ByteString snapshotBytes = null;
-                    try {
-                        // 4 messages as part of snapshot, which are applied to state
-                        snapshotBytes  = fromObject(Arrays.asList(new MockRaftActorContext.MockPayload("A"),
-                            new MockRaftActorContext.MockPayload("B"),
-                            new MockRaftActorContext.MockPayload("C"),
-                            new MockRaftActorContext.MockPayload("D")));
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                    Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
-                        snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
-                        lastAppliedDuringSnapshotCapture, 1);
-                    MockSnapshotStore.setMockSnapshot(snapshot);
-                    MockSnapshotStore.setPersistenceId(persistenceId);
-
-                    // add more entries after snapshot is taken
-                    List<ReplicatedLogEntry> entries = new ArrayList<>();
-                    ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
-                    ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("G"));
-                    ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("H"));
-                    entries.add(entry2);
-                    entries.add(entry3);
-                    entries.add(entry4);
-
-                    int lastAppliedToState = 5;
-                    int lastIndex = 7;
-
-                    MockAkkaJournal.addToJournal(5, entry2);
-                    // 2 entries are applied to state besides the 4 entries in snapshot
-                    MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
-                    MockAkkaJournal.addToJournal(7, entry3);
-                    MockAkkaJournal.addToJournal(8, entry4);
-
-                    // kill the actor
-                    followerActor.tell(PoisonPill.getInstance(), null);
-
-                    try {
-                        // give some time for actor to die
-                        Thread.sleep(200);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-
-                    //reinstate the actor
-                    TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
-                        MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
-
-                    try {
-                        //give some time for snapshot offer to get called.
-                        Thread.sleep(200);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-
-                    RaftActorContext context = ref.underlyingActor().getRaftActorContext();
-                    assertEquals(snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size());
-                    assertEquals(lastIndex, context.getReplicatedLog().lastIndex());
-                    assertEquals(lastAppliedToState, context.getLastApplied());
-                    assertEquals(lastAppliedToState, context.getCommitIndex());
-                    assertTrue(ref.underlyingActor().isApplySnapshotCalled());
-                    assertEquals(6, ref.underlyingActor().getState().size());
-                }
-            };
+            String persistenceId = "follower10";
+
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            // Set the heartbeat interval high to essentially disable election otherwise the test
+            // may fail if the actor is switched to Leader and the commitIndex is set to the last
+            // log entry.
+            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+            ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
+                    Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
+
+            watch(followerActor);
+
+            List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
+            ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
+                    new MockRaftActorContext.MockPayload("E"));
+            snapshotUnappliedEntries.add(entry1);
+
+            int lastAppliedDuringSnapshotCapture = 3;
+            int lastIndexDuringSnapshotCapture = 4;
+
+                // 4 messages as part of snapshot, which are applied to state
+            ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+            Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+                    snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
+                    lastAppliedDuringSnapshotCapture, 1);
+            MockSnapshotStore.setMockSnapshot(snapshot);
+            MockSnapshotStore.setPersistenceId(persistenceId);
+
+            // add more entries after snapshot is taken
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+                    new MockRaftActorContext.MockPayload("F"));
+            ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+                    new MockRaftActorContext.MockPayload("G"));
+            ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+                    new MockRaftActorContext.MockPayload("H"));
+            entries.add(entry2);
+            entries.add(entry3);
+            entries.add(entry4);
+
+            int lastAppliedToState = 5;
+            int lastIndex = 7;
+
+            MockAkkaJournal.addToJournal(5, entry2);
+            // 2 entries are applied to state besides the 4 entries in snapshot
+            MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+            MockAkkaJournal.addToJournal(7, entry3);
+            MockAkkaJournal.addToJournal(8, entry4);
+
+            // kill the actor
+            followerActor.tell(PoisonPill.getInstance(), null);
+            expectMsgClass(duration("5 seconds"), Terminated.class);
+
+            unwatch(followerActor);
+
+            //reinstate the actor
+            TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
+                    MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
+                            Optional.<ConfigParams>of(config)));
+
+            ref.underlyingActor().waitForRecoveryComplete();
+
+            RaftActorContext context = ref.underlyingActor().getRaftActorContext();
+            assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
+                    context.getReplicatedLog().size());
+            assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+            assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+            assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+            assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
         }};
-
     }
 
     private ByteString fromObject(Object snapshot) throws Exception {
index 3e1bd35632385d3ec973ba96c882587867170b94..44da4a56683558e9b15b8a1e94d1af8c10d1a3df 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
 
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
 import org.opendaylight.yangtools.yang.data.api.Node;
@@ -36,6 +37,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNo
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -232,7 +234,7 @@ public class NormalizedNodeSerializer {
 
     private static class DeSerializer implements NormalizedNodeDeSerializationContext {
         private static Map<NormalizedNodeType, DeSerializationFunction>
-            deSerializationFunctions = new HashMap<>();
+            deSerializationFunctions = new EnumMap<>(NormalizedNodeType.class);
 
         static {
             deSerializationFunctions.put(CONTAINER_NODE_TYPE,
@@ -447,8 +449,9 @@ public class NormalizedNodeSerializer {
 
         private NormalizedNode deSerialize(NormalizedNodeMessages.Node node){
             Preconditions.checkNotNull(node, "node should not be null");
-            DeSerializationFunction deSerializationFunction =
-                Preconditions.checkNotNull(deSerializationFunctions.get(NormalizedNodeType.values()[node.getIntType()]), "Unknown type " + node);
+
+            DeSerializationFunction deSerializationFunction = deSerializationFunctions.get(
+                    NormalizedNodeType.values()[node.getIntType()]);
 
             return deSerializationFunction.apply(this, node);
         }
@@ -544,8 +547,4 @@ public class NormalizedNodeSerializer {
             NormalizedNode apply(DeSerializer deserializer, NormalizedNodeMessages.Node node);
         }
     }
-
-
-
-
 }
index d7627c008eb9062a76b3540a679706c08c686b07..4fb676e5189bcb70158cccd1a563f3e54ba36405 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
 
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.cluster.datastore.node.utils.NodeIdentifierFactory;
 import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
@@ -26,6 +27,7 @@ import java.util.Set;
 import static org.opendaylight.controller.cluster.datastore.node.utils.serialization.PathArgumentType.getSerializablePathArgumentType;
 
 public class PathArgumentSerializer {
+    private static final String REVISION_ARG = "?revision=";
     private static final Map<Class, PathArgumentAttributesGetter> pathArgumentAttributesGetters = new HashMap<>();
 
     public static NormalizedNodeMessages.PathArgument serialize(NormalizedNodeSerializationContext context, YangInstanceIdentifier.PathArgument pathArgument){
@@ -190,27 +192,24 @@ public class PathArgumentSerializer {
         // If this serializer is used qName cannot be null (see encodeQName)
         // adding null check only in case someone tried to deSerialize a protocol buffer node
         // that was not serialized using the PathArgumentSerializer
-        Preconditions.checkNotNull(qName, "qName should not be null");
-        Preconditions.checkArgument(!"".equals(qName.getLocalName()),
-            "qName.localName cannot be empty qName = " + qName.toString());
-        Preconditions.checkArgument(qName.getNamespace() != -1, "qName.namespace should be valid");
+//        Preconditions.checkNotNull(qName, "qName should not be null");
+//        Preconditions.checkArgument(qName.getNamespace() != -1, "qName.namespace should be valid");
 
-        StringBuilder sb = new StringBuilder();
         String namespace = context.getNamespace(qName.getNamespace());
-        String revision = "";
         String localName = context.getLocalName(qName.getLocalName());
+        StringBuilder sb;
         if(qName.getRevision() != -1){
-            revision = context.getRevision(qName.getRevision());
-            sb.append("(").append(namespace).append("?revision=").append(
-                revision).append(")").append(
-                localName);
+            String revision = context.getRevision(qName.getRevision());
+            sb = new StringBuilder(namespace.length() + REVISION_ARG.length() + revision.length() +
+                    localName.length() + 2);
+            sb.append('(').append(namespace).append(REVISION_ARG).append(
+                revision).append(')').append(localName);
         } else {
-            sb.append("(").append(namespace).append(")").append(
-                localName);
+            sb = new StringBuilder(namespace.length() + localName.length() + 2);
+            sb.append('(').append(namespace).append(')').append(localName);
         }
 
         return sb.toString();
-
     }
 
     /**
@@ -223,10 +222,6 @@ public class PathArgumentSerializer {
         NormalizedNodeDeSerializationContext context,
         NormalizedNodeMessages.PathArgument pathArgument) {
 
-        Preconditions.checkArgument(pathArgument.getIntType() >= 0
-            && pathArgument.getIntType() < PathArgumentType.values().length,
-            "Illegal PathArgumentType " + pathArgument.getIntType());
-
         switch(PathArgumentType.values()[pathArgument.getIntType()]){
             case NODE_IDENTIFIER_WITH_VALUE : {
 
@@ -272,13 +267,21 @@ public class PathArgumentSerializer {
         NormalizedNodeDeSerializationContext context,
         List<NormalizedNodeMessages.PathArgumentAttribute> attributesList) {
 
-        Map<QName, Object> map = new HashMap<>();
-
-        for(NormalizedNodeMessages.PathArgumentAttribute attribute : attributesList){
+        Map<QName, Object> map;
+        if(attributesList.size() == 1) {
+            NormalizedNodeMessages.PathArgumentAttribute attribute = attributesList.get(0);
             NormalizedNodeMessages.QName name = attribute.getName();
             Object value = parseAttribute(context, attribute);
+            map = Collections.singletonMap(QNameFactory.create(qNameToString(context, name)), value);
+        } else {
+            map = new HashMap<>();
+
+            for(NormalizedNodeMessages.PathArgumentAttribute attribute : attributesList){
+                NormalizedNodeMessages.QName name = attribute.getName();
+                Object value = parseAttribute(context, attribute);
 
-            map.put(QNameFactory.create(qNameToString(context, name)), value);
+                map.put(QNameFactory.create(qNameToString(context, name)), value);
+            }
         }
 
         return map;
index 04c95d61ceb20854a8f19308df74481998b4c776..8def754f117792ef8e02f327455a9aa54785cf3f 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
 
-import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
@@ -70,9 +69,6 @@ public class ValueSerializer {
 
 
     private static Object deSerializeBasicTypes(int valueType, String value) {
-        Preconditions.checkArgument(valueType >= 0 && valueType < ValueType.values().length,
-            "Illegal value type " + valueType );
-
         switch(ValueType.values()[valueType]){
            case SHORT_TYPE: {
                return Short.valueOf(value);
index 8724dfe43a2b2ba70c41b3108ac50bb2ca040994..49db8967a685914924eb4e90d5914ecdbc1b43a8 100644 (file)
@@ -50,8 +50,9 @@ public enum ValueType {
     public static final ValueType getSerializableType(Object node){
         Preconditions.checkNotNull(node, "node should not be null");
 
-        if(types.containsKey(node.getClass())) {
-            return types.get(node.getClass());
+        ValueType type = types.get(node.getClass());
+        if(type != null) {
+            return type;
         } else if(node instanceof Set){
             return BITS_TYPE;
         }
index 1021ddeee7a5348f26cbaa8845059a515c537f92..83164b07d9431a70ce609139a476190a5488ab9a 100644 (file)
@@ -8,11 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Preconditions;
-
+import org.opendaylight.controller.cluster.raft.ConfigParams;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.concurrent.TimeUnit;
 
@@ -27,22 +28,30 @@ public class DatastoreContext {
     private final Duration shardTransactionIdleTimeout;
     private final int operationTimeoutInSeconds;
     private final String dataStoreMXBeanType;
+    private final ConfigParams shardRaftConfig;
 
     public DatastoreContext() {
-        this.dataStoreProperties = null;
-        this.dataStoreMXBeanType = "DistributedDatastore";
-        this.shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
-        this.operationTimeoutInSeconds = 5;
+        this("DistributedDatastore", null, Duration.create(10, TimeUnit.MINUTES), 5, 1000, 20000, 500);
     }
 
     public DatastoreContext(String dataStoreMXBeanType,
             InMemoryDOMDataStoreConfigProperties dataStoreProperties,
             Duration shardTransactionIdleTimeout,
-            int operationTimeoutInSeconds) {
+            int operationTimeoutInSeconds,
+            int shardJournalRecoveryLogBatchSize,
+            int shardSnapshotBatchCount,
+            int shardHeartbeatIntervalInMillis) {
         this.dataStoreMXBeanType = dataStoreMXBeanType;
-        this.dataStoreProperties = Preconditions.checkNotNull(dataStoreProperties);
+        this.dataStoreProperties = dataStoreProperties;
         this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
         this.operationTimeoutInSeconds = operationTimeoutInSeconds;
+
+        DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+        raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
+                TimeUnit.MILLISECONDS));
+        raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
+        raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+        shardRaftConfig = raftConfig;
     }
 
     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
@@ -60,4 +69,8 @@ public class DatastoreContext {
     public int getOperationTimeoutInSeconds() {
         return operationTimeoutInSeconds;
     }
+
+    public ConfigParams getShardRaftConfig() {
+        return shardRaftConfig;
+    }
 }
index 0fa27706e19382c0b84cc44b93d890ee9f0d1c8e..ddb5989f096145cddd3b716afa8dfc1dce3311e3 100644 (file)
@@ -20,6 +20,7 @@ import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -47,12 +48,11 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.raft.ConfigParams;
-import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
@@ -67,14 +67,12 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 /**
  * A Shard represents a portion of the logical data tree <br/>
@@ -84,8 +82,6 @@ import java.util.concurrent.TimeUnit;
  */
 public class Shard extends RaftActor {
 
-    private static final ConfigParams configParams = new ShardConfigParams();
-
     public static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
@@ -114,11 +110,18 @@ public class Shard extends RaftActor {
 
     private ActorRef createSnapshotTransaction;
 
+    /**
+     * Coordinates persistence recovery on startup.
+     */
+    private ShardRecoveryCoordinator recoveryCoordinator;
+    private List<Object> currentLogRecoveryBatch;
+
     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
 
-    private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
+    protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
             DatastoreContext datastoreContext, SchemaContext schemaContext) {
-        super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
+        super(name.toString(), mapPeerAddresses(peerAddresses),
+                Optional.of(datastoreContext.getShardRaftConfig()));
 
         this.name = name;
         this.datastoreContext = datastoreContext;
@@ -333,35 +336,12 @@ public class Shard extends RaftActor {
         DOMStoreThreePhaseCommitCohort cohort =
             modificationToCohort.remove(serialized);
         if (cohort == null) {
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug(
-                    "Could not find cohort for modification : {}. Writing modification using a new transaction",
-                    modification);
-            }
-
-            DOMStoreWriteTransaction transaction =
-                store.newWriteOnlyTransaction();
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
-            }
-
-            modification.apply(transaction);
-            try {
-                syncCommitTransaction(transaction);
-            } catch (InterruptedException | ExecutionException e) {
-                shardMBean.incrementFailedTransactionsCount();
-                LOG.error("Failed to commit", e);
-                return;
-            }
-            //we want to just apply the recovery commit and return
-            shardMBean.incrementCommittedTransactionCount();
+            // If there's no cached cohort then we must be applying replicated state.
+            commitWithNewTransaction(serialized);
             return;
         }
 
-
-        if(sender == null){
+        if(sender == null) {
             LOG.error("Commit failed. Sender cannot be null");
             return;
         }
@@ -386,6 +366,18 @@ public class Shard extends RaftActor {
 
     }
 
+    private void commitWithNewTransaction(Object modification) {
+        DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
+        MutableCompositeModification.fromSerializable(modification, schemaContext).apply(tx);
+        try {
+            syncCommitTransaction(tx);
+            shardMBean.incrementCommittedTransactionCount();
+        } catch (InterruptedException | ExecutionException e) {
+            shardMBean.incrementFailedTransactionsCount();
+            LOG.error(e, "Failed to commit");
+        }
+    }
+
     private void handleForwardedCommit(ForwardedCommitTransaction message) {
         Object serializedModification =
             message.getModification().toSerializable();
@@ -461,26 +453,102 @@ public class Shard extends RaftActor {
         return config.isMetricCaptureEnabled();
     }
 
-    @Override protected void applyState(ActorRef clientActor, String identifier,
-        Object data) {
+    @Override
+    protected
+    void startLogRecoveryBatch(int maxBatchSize) {
+        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
+        }
+    }
+
+    @Override
+    protected void appendRecoveredLogEntry(Payload data) {
+        if (data instanceof CompositeModificationPayload) {
+            currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+        } else {
+            LOG.error("Unknown state received {} during recovery", data);
+        }
+    }
+
+    @Override
+    protected void applyRecoverySnapshot(ByteString snapshot) {
+        if(recoveryCoordinator == null) {
+            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+        }
+
+        recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{} : submitted recovery sbapshot", persistenceId());
+        }
+    }
+
+    @Override
+    protected void applyCurrentLogRecoveryBatch() {
+        if(recoveryCoordinator == null) {
+            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+        }
+
+        recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
+                    currentLogRecoveryBatch.size());
+        }
+    }
+
+    @Override
+    protected void onRecoveryComplete() {
+        if(recoveryCoordinator != null) {
+            Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
+            }
+
+            for(DOMStoreWriteTransaction tx: txList) {
+                try {
+                    syncCommitTransaction(tx);
+                    shardMBean.incrementCommittedTransactionCount();
+                } catch (InterruptedException | ExecutionException e) {
+                    shardMBean.incrementFailedTransactionsCount();
+                    LOG.error(e, "Failed to commit");
+                }
+            }
+        }
+
+        recoveryCoordinator = null;
+        currentLogRecoveryBatch = null;
+        updateJournalStats();
+    }
+
+    @Override
+    protected void applyState(ActorRef clientActor, String identifier, Object data) {
 
         if (data instanceof CompositeModificationPayload) {
-            Object modification =
-                ((CompositeModificationPayload) data).getModification();
+            Object modification = ((CompositeModificationPayload) data).getModification();
 
             if (modification != null) {
                 commit(clientActor, modification);
             } else {
                 LOG.error(
                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                    identifier, clientActor.path().toString());
+                    identifier, clientActor != null ? clientActor.path().toString() : null);
             }
 
         } else {
-            LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), CompositeModificationPayload.class.getClassLoader());
+            LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+                    data, data.getClass().getClassLoader(),
+                    CompositeModificationPayload.class.getClassLoader());
         }
 
-        // Update stats
+        updateJournalStats();
+
+    }
+
+    private void updateJournalStats() {
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
 
         if (lastLogEntry != null) {
@@ -490,10 +558,10 @@ public class Shard extends RaftActor {
 
         shardMBean.setCommitIndex(getCommitIndex());
         shardMBean.setLastApplied(getLastApplied());
-
     }
 
-    @Override protected void createSnapshot() {
+    @Override
+    protected void createSnapshot() {
         if (createSnapshotTransaction == null) {
 
             // Create a transaction. We are really going to treat the transaction as a worker
@@ -508,7 +576,9 @@ public class Shard extends RaftActor {
         }
     }
 
-    @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
+    @VisibleForTesting
+    @Override
+    protected void applySnapshot(ByteString snapshot) {
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
@@ -565,16 +635,6 @@ public class Shard extends RaftActor {
         return this.name.toString();
     }
 
-
-    private static class ShardConfigParams extends DefaultConfigParamsImpl {
-        public static final FiniteDuration HEART_BEAT_INTERVAL =
-            new FiniteDuration(500, TimeUnit.MILLISECONDS);
-
-        @Override public FiniteDuration getHeartBeatInterval() {
-            return HEART_BEAT_INTERVAL;
-        }
-    }
-
     private static class ShardCreator implements Creator<Shard> {
 
         private static final long serialVersionUID = 1L;
@@ -598,20 +658,24 @@ public class Shard extends RaftActor {
         }
     }
 
-    @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
+    @VisibleForTesting
+    NormalizedNode<?,?> readStore(YangInstanceIdentifier id)
+            throws ExecutionException, InterruptedException {
         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
 
         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
-            transaction.read(YangInstanceIdentifier.builder().build());
+            transaction.read(id);
 
-        NormalizedNode<?, ?> node = future.get().get();
+        Optional<NormalizedNode<?, ?>> optional = future.get();
+        NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
 
         transaction.close();
 
         return node;
     }
 
-    @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
+    @VisibleForTesting
+    void writeToStore(YangInstanceIdentifier id, NormalizedNode<?,?> node)
         throws ExecutionException, InterruptedException {
         DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
 
@@ -620,4 +684,8 @@ public class Shard extends RaftActor {
         syncCommitTransaction(transaction);
     }
 
+    @VisibleForTesting
+    ShardStats getShardMBean() {
+        return shardMBean;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
new file mode 100644 (file)
index 0000000..8afdb4c
--- /dev/null
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
+ * and journal log entry batch are de-serialized and applied to their own write transaction
+ * instance in parallel on a thread pool for faster recovery time. However the transactions are
+ * committed to the data store in the order the corresponding snapshot or log batch are received
+ * to preserve data store integrity.
+ *
+ * @author Thomas Panetelis
+ */
+class ShardRecoveryCoordinator {
+
+    private static final int TIME_OUT = 10;
+
+    private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
+
+    private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
+    private final SchemaContext schemaContext;
+    private final String shardName;
+    private final ExecutorService executor;
+
+    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
+        this.schemaContext = schemaContext;
+        this.shardName = shardName;
+
+        executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
+                new ThreadFactoryBuilder().setDaemon(true)
+                        .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
+    }
+
+    /**
+     * Submits a batch of journal log entries.
+     *
+     * @param logEntries the serialized journal log entries
+     * @param resultingTx the write Tx to which to apply the entries
+     */
+    void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
+        LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
+        resultingTxList.add(resultingTx);
+        executor.execute(task);
+    }
+
+    /**
+     * Submits a snapshot.
+     *
+     * @param snapshot the serialized snapshot
+     * @param resultingTx the write Tx to which to apply the entries
+     */
+    void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
+        SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx);
+        resultingTxList.add(resultingTx);
+        executor.execute(task);
+    }
+
+    Collection<DOMStoreWriteTransaction> getTransactions() {
+        // Shutdown the executor and wait for task completion.
+        executor.shutdown();
+
+        try {
+            if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES))  {
+                return resultingTxList;
+            } else {
+                LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+
+        return Collections.emptyList();
+    }
+
+    private static abstract class ShardRecoveryTask implements Runnable {
+
+        final DOMStoreWriteTransaction resultingTx;
+
+        ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
+            this.resultingTx = resultingTx;
+        }
+    }
+
+    private class LogRecoveryTask extends ShardRecoveryTask {
+
+        private final List<Object> logEntries;
+
+        LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
+            super(resultingTx);
+            this.logEntries = logEntries;
+        }
+
+        @Override
+        public void run() {
+            for(int i = 0; i < logEntries.size(); i++) {
+                MutableCompositeModification.fromSerializable(
+                        logEntries.get(i), schemaContext).apply(resultingTx);
+                // Null out to GC quicker.
+                logEntries.set(i, null);
+            }
+        }
+    }
+
+    private class SnapshotRecoveryTask extends ShardRecoveryTask {
+
+        private final ByteString snapshot;
+
+        SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
+            super(resultingTx);
+            this.snapshot = snapshot;
+        }
+
+        @Override
+        public void run() {
+            try {
+                NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
+                NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext).decode(
+                        YangInstanceIdentifier.builder().build(), serializedNode);
+
+                // delete everything first
+                resultingTx.delete(YangInstanceIdentifier.builder().build());
+
+                // Add everything from the remote node back
+                resultingTx.write(YangInstanceIdentifier.builder().build(), node);
+            } catch (InvalidProtocolBufferException e) {
+                LOG.error("Error deserializing snapshot", e);
+            }
+        }
+    }
+}
index e7a7aab406677c53713e33b92d04f0c3a03f29aa..84614bd7bb43a0bbf43f3fe055673f64fe2cd10d 100644 (file)
@@ -42,13 +42,16 @@ public class DistributedConfigDataStoreProviderModule extends
 
         DatastoreContext datastoreContext = new DatastoreContext("DistributedConfigDatastore",
                 InMemoryDOMDataStoreConfigProperties.create(
-                        props.getMaxShardDataChangeExecutorPoolSize().getValue(),
-                        props.getMaxShardDataChangeExecutorQueueSize().getValue(),
-                        props.getMaxShardDataChangeListenerQueueSize().getValue(),
-                        props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+                        props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
+                        props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
+                        props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
+                        props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()),
                 Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
                         TimeUnit.MINUTES),
-                props.getOperationTimeoutInSeconds().getValue());
+                props.getOperationTimeoutInSeconds().getValue(),
+                props.getShardJournalRecoveryLogBatchSize().getValue().intValue(),
+                props.getShardSnapshotBatchCount().getValue().intValue(),
+                props.getShardHearbeatIntervalInMillis().getValue());
 
         return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
                 datastoreContext, bundleContext);
index 814e6f606ac00bc311eb191c81497e46b56e2357..3183527eb032d287623c03e1a4fc57bca9252f2d 100644 (file)
@@ -42,13 +42,16 @@ public class DistributedOperationalDataStoreProviderModule extends
 
         DatastoreContext datastoreContext = new DatastoreContext("DistributedOperationalDatastore",
                 InMemoryDOMDataStoreConfigProperties.create(
-                        props.getMaxShardDataChangeExecutorPoolSize().getValue(),
-                        props.getMaxShardDataChangeExecutorQueueSize().getValue(),
-                        props.getMaxShardDataChangeListenerQueueSize().getValue(),
-                        props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+                        props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
+                        props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
+                        props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
+                        props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()),
                 Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
                         TimeUnit.MINUTES),
-                props.getOperationTimeoutInSeconds().getValue());
+                props.getOperationTimeoutInSeconds().getValue(),
+                props.getShardJournalRecoveryLogBatchSize().getValue().intValue(),
+                props.getShardSnapshotBatchCount().getValue().intValue(),
+                props.getShardHearbeatIntervalInMillis().getValue());
 
         return DistributedDataStoreFactory.createInstance("operational",
                 getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
index e19a76703f69f61ec8df33decddf483cfc6e7192..af43f953ffb52ffdd24bd892fe60d8369a0d74da 100644 (file)
@@ -36,8 +36,8 @@ module distributed-datastore-provider {
                 config:java-name-prefix DistributedOperationalDataStoreProvider;
      }
 
-    typedef non-zero-uint16-type {
-        type uint16 {
+    typedef non-zero-uint32-type {
+        type uint32 {
             range "1..max";
         }
     }
@@ -48,43 +48,67 @@ module distributed-datastore-provider {
         }
     }
 
+    typedef heartbeat-interval-type {
+        type uint16 {
+            range "100..max";
+        }
+    }
+
     grouping data-store-properties {
         leaf max-shard-data-change-executor-queue-size {
             default 1000;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum queue size for each shard's data store data change notification executor.";
          }
 
          leaf max-shard-data-change-executor-pool-size {
             default 20;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum thread pool size for each shard's data store data change notification executor.";
          }
 
          leaf max-shard-data-change-listener-queue-size {
             default 1000;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum queue size for each shard's data store data change listeners.";
          }
 
          leaf max-shard-data-store-executor-queue-size {
             default 5000;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum queue size for each shard's data store executor.";
          }
 
          leaf shard-transaction-idle-timeout-in-minutes {
             default 10;
-            type non-zero-uint16-type;
+            type non-zero-uint32-type;
             description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
          }
 
+         leaf shard-snapshot-batch-count {
+            default 20000;
+            type non-zero-uint32-type;
+            description "The minimum number of entries to be present in the in-memory journal log before a snapshot to be taken.";
+         }
+
+         leaf shard-hearbeat-interval-in-millis {
+            default 500;
+            type heartbeat-interval-type;
+            description "The interval at which a shard will send a heart beat message to its remote shard.";
+         }
+
          leaf operation-timeout-in-seconds {
             default 5;
             type operation-timeout-type;
             description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
          }
 
+         leaf shard-journal-recovery-log-batch-size {
+            default 5000;
+            type non-zero-uint32-type;
+            description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.";
+         }
+
          leaf enable-metric-capture {
             default false;
             type boolean;
@@ -93,7 +117,7 @@ module distributed-datastore-provider {
 
          leaf bounded-mailbox-capacity {
              default 1000;
-             type non-zero-uint16-type;
+             type non-zero-uint32-type;
              description "Max queue size that an actor's mailbox can reach";
          }
     }
index 022ef9bbafef949921ec24041357cff64013ea12..fae21f27091a2382f3e1feabf7a3c26ce7004ba5 100644 (file)
@@ -10,11 +10,10 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import org.apache.commons.io.FileUtils;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
-import java.io.File;
 import java.io.IOException;
 
 public abstract class AbstractActorTest {
@@ -25,35 +24,15 @@ public abstract class AbstractActorTest {
 
         System.setProperty("shard.persistent", "false");
         system = ActorSystem.create("test");
-
-        deletePersistenceFiles();
     }
 
     @AfterClass
     public static void tearDownClass() throws IOException {
         JavaTestKit.shutdownActorSystem(system);
         system = null;
-
-        deletePersistenceFiles();
-    }
-
-    protected static void deletePersistenceFiles() throws IOException {
-        File journal = new File("journal");
-
-        if(journal.exists()) {
-            FileUtils.deleteDirectory(journal);
-        }
-
-        File snapshots = new File("snapshots");
-
-        if(snapshots.exists()){
-            FileUtils.deleteDirectory(snapshots);
-        }
-
     }
 
     protected ActorSystem getSystem() {
         return system;
     }
-
 }
index deb71c2df4aa9cc522904e4014ed66d536aa2fa4..a3e0b3a07d70993b5c403e7712dfeb1c3ad0c96d 100644 (file)
@@ -4,23 +4,43 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.event.Logging;
+import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -28,222 +48,138 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.Duration;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
 
 public class ShardTest extends AbstractActorTest {
 
-    private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext();
+    private static final DatastoreContext DATA_STORE_CONTEXT =
+            new DatastoreContext("", null, Duration.create(10, TimeUnit.MINUTES), 5, 3, 5000, 500);
 
-    @Test
-    public void testOnReceiveRegisterListener() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
+    private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testRegisterChangeListener");
+    private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1")
+            .shardName("inventory").type("config").build();
 
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+    @Before
+    public void setUp() {
+        System.setProperty("shard.persistent", "false");
 
-                    subject.tell(
-                        new UpdateSchemaContext(SchemaContextHelper.full()),
-                        getRef());
+        InMemorySnapshotStore.clear();
+        InMemoryJournal.clear();
+    }
 
-                    subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                        getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
-                        getRef());
+    @After
+    public void tearDown() {
+        InMemorySnapshotStore.clear();
+        InMemoryJournal.clear();
+    }
 
-                    final Boolean notificationEnabled = new ExpectMsg<Boolean>(
-                                                   duration("3 seconds"), "enable notification") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected Boolean match(Object in) {
-                            if(in instanceof EnableNotification){
-                                return ((EnableNotification) in).isEnabled();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
-
-                    assertFalse(notificationEnabled);
-
-                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in.getClass().equals(RegisterChangeListenerReply.class)) {
-                                RegisterChangeListenerReply reply =
-                                    (RegisterChangeListenerReply) in;
-                                return reply.getListenerRegistrationPath()
-                                    .toString();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+    private Props newShardProps() {
+        return Shard.props(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+                DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
+    }
 
-                    assertTrue(out.matches(
-                        "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
-                }
+    @Test
+    public void testOnReceiveRegisterListener() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef subject = getSystem().actorOf(newShardProps(), "testRegisterChangeListener");
 
+            subject.tell(new UpdateSchemaContext(SchemaContextHelper.full()), getRef());
 
-            };
+            subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
+                    getRef().path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+
+            EnableNotification enable = expectMsgClass(duration("3 seconds"), EnableNotification.class);
+            assertEquals("isEnabled", false, enable.isEnabled());
+
+            RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
+                    RegisterChangeListenerReply.class);
+            assertTrue(reply.getListenerRegistrationPath().toString().matches(
+                    "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
         }};
     }
 
     @Test
     public void testCreateTransaction(){
-        new JavaTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
-
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testCreateTransaction");
-
-            // Wait for a specific log message to show up
-            final boolean result =
-                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
-                ) {
-                    @Override
-                    protected Boolean run() {
-                        return true;
-                    }
-                }.from(subject.path().toString())
-                    .message("Switching from state Candidate to Leader")
-                    .occurrences(1).exec();
-
-            Assert.assertEquals(true, result);
+        new ShardTestKit(getSystem()) {{
+            ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransaction");
 
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+            waitUntilLeader(subject);
 
-                    subject.tell(
-                        new UpdateSchemaContext(TestModel.createTestContext()),
-                        getRef());
+            subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-                    subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
-                        getRef());
+            subject.tell(new CreateTransaction("txn-1",
+                    TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in instanceof CreateTransactionReply) {
-                                CreateTransactionReply reply =
-                                    (CreateTransactionReply) in;
-                                return reply.getTransactionActorPath()
-                                    .toString();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+            CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
+                    CreateTransactionReply.class);
 
-                    assertTrue("Unexpected transaction path " + out,
-                        out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
-                    expectNoMsg();
-                }
-            };
+            String path = reply.getTransactionActorPath().toString();
+            assertTrue("Unexpected transaction path " + path,
+                    path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
+            expectNoMsg();
         }};
     }
 
     @Test
     public void testCreateTransactionOnChain(){
-        new JavaTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
-
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testCreateTransactionOnChain");
-
-            // Wait for a specific log message to show up
-            final boolean result =
-                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
-                ) {
-                    @Override
-                    protected Boolean run() {
-                        return true;
-                    }
-                }.from(subject.path().toString())
-                    .message("Switching from state Candidate to Leader")
-                    .occurrences(1).exec();
-
-            Assert.assertEquals(true, result);
-
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+        new ShardTestKit(getSystem()) {{
+            final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
 
-                    subject.tell(
-                        new UpdateSchemaContext(TestModel.createTestContext()),
-                        getRef());
+            waitUntilLeader(subject);
 
-                    subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
-                        getRef());
+            subject.tell(new CreateTransaction("txn-1",
+                    TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
+                    getRef());
 
-                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in instanceof CreateTransactionReply) {
-                                CreateTransactionReply reply =
-                                    (CreateTransactionReply) in;
-                                return reply.getTransactionActorPath()
-                                    .toString();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
+            CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
+                    CreateTransactionReply.class);
 
-                    assertTrue("Unexpected transaction path " + out,
-                        out.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
-                    expectNoMsg();
-                }
-            };
+            String path = reply.getTransactionActorPath().toString();
+            assertTrue("Unexpected transaction path " + path,
+                    path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
+            expectNoMsg();
         }};
     }
 
     @Test
     public void testPeerAddressResolved(){
         new JavaTestKit(getSystem()) {{
-            Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
-
             final ShardIdentifier identifier =
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            peerAddresses.put(identifier, null);
-            final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testPeerAddressResolved");
+            Props props = Shard.props(identifier,
+                    Collections.<ShardIdentifier, String>singletonMap(identifier, null),
+                    DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
+            final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved");
 
             new Within(duration("3 seconds")) {
                 @Override
@@ -261,99 +197,205 @@ public class ShardTest extends AbstractActorTest {
 
     @Test
     public void testApplySnapshot() throws ExecutionException, InterruptedException {
-        Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+        TestActorRef<Shard> ref = TestActorRef.create(getSystem(), newShardProps());
 
-        final ShardIdentifier identifier =
-            ShardIdentifier.builder().memberName("member-1")
-                .shardName("inventory").type("config").build();
+        NormalizedNodeToNodeCodec codec =
+            new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
 
-        peerAddresses.put(identifier, null);
-        final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
+        ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(
+                TestModel.TEST_QNAME));
 
-        TestActorRef<Shard> ref = TestActorRef.create(getSystem(), props);
+        YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
+        NormalizedNode<?,?> expected = ref.underlyingActor().readStore(root);
 
-        ref.underlyingActor().updateSchemaContext(TestModel.createTestContext());
+        NormalizedNodeMessages.Container encode = codec.encode(root, expected);
 
-        NormalizedNodeToNodeCodec codec =
-            new NormalizedNodeToNodeCodec(TestModel.createTestContext());
+        ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
+                encode.getNormalizedNode().toByteString().toByteArray(),
+                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
 
-        ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        ref.underlyingActor().onReceiveCommand(applySnapshot);
 
-        NormalizedNode expected = ref.underlyingActor().readStore();
+        NormalizedNode<?,?> actual = ref.underlyingActor().readStore(root);
 
-        NormalizedNodeMessages.Container encode = codec
-            .encode(YangInstanceIdentifier.builder().build(), expected);
+        assertEquals(expected, actual);
+    }
 
+    @Test
+    public void testApplyState() throws Exception {
 
-        ref.underlyingActor().applySnapshot(encode.getNormalizedNode().toByteString());
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
 
-        NormalizedNode actual = ref.underlyingActor().readStore();
+        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        assertEquals(expected, actual);
-    }
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
+        Payload payload = new CompositeModificationPayload(compMod.toSerializable());
+        ApplyState applyState = new ApplyState(null, "test",
+                new ReplicatedLogImplEntry(1, 2, payload));
 
-    private static class ShardTestKit extends JavaTestKit {
+        shard.underlyingActor().onReceiveCommand(applyState);
 
-        private ShardTestKit(ActorSystem actorSystem) {
-            super(actorSystem);
+        NormalizedNode<?,?> actual = shard.underlyingActor().readStore(TestModel.TEST_PATH);
+        assertEquals("Applied state", node, actual);
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testRecovery() throws Exception {
+
+        // Set up the InMemorySnapshotStore.
+
+        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
+        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+
+        DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
+        writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
+        commitCohort.preCommit().get();
+        commitCohort.commit().get();
+
+        DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
+        NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
+
+        InMemorySnapshotStore.addSnapshot(IDENTIFIER.toString(), Snapshot.create(
+                new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
+                        YangInstanceIdentifier.builder().build(), root).
+                                getNormalizedNode().toByteString().toByteArray(),
+                                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+
+        // Set up the InMemoryJournal.
+
+        InMemoryJournal.addEntry(IDENTIFIER.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
+                  new WriteModification(TestModel.OUTER_LIST_PATH,
+                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                          SCHEMA_CONTEXT))));
+
+        int nListEntries = 11;
+        Set<Integer> listEntryKeys = new HashSet<>();
+        for(int i = 1; i <= nListEntries; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+            Modification mod = new MergeModification(path,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
+                    SCHEMA_CONTEXT);
+            InMemoryJournal.addEntry(IDENTIFIER.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                    newPayload(mod)));
         }
 
-        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
-            // Wait for a specific log message to show up
-            final boolean result =
-                new JavaTestKit.EventFilter<Boolean>(logLevel
-                ) {
+        InMemoryJournal.addEntry(IDENTIFIER.toString(), nListEntries + 1,
+                new ApplyLogEntries(nListEntries));
+
+        // Create the actor and wait for recovery complete.
+
+        final CountDownLatch recoveryComplete = new CountDownLatch(1);
+
+        Creator<Shard> creator = new Creator<Shard>() {
+            @Override
+            public Shard create() throws Exception {
+                return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+                        DATA_STORE_CONTEXT, SCHEMA_CONTEXT) {
                     @Override
-                    protected Boolean run() {
-                        return true;
+                    protected void onRecoveryComplete() {
+                        try {
+                            super.onRecoveryComplete();
+                        } finally {
+                            recoveryComplete.countDown();
+                        }
                     }
-                }.from(subject.path().toString())
-                    .message(logMessage)
-                    .occurrences(1).exec();
+                };
+            }
+        };
 
-            Assert.assertEquals(true, result);
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                Props.create(new DelegatingShardCreator(creator)), "testRecovery");
+
+        assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
+
+        // Verify data in the data store.
+
+        NormalizedNode<?, ?> outerList = shard.underlyingActor().readStore(TestModel.OUTER_LIST_PATH);
+        assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+        assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+                outerList.getValue() instanceof Iterable);
+        for(Object entry: (Iterable<?>) outerList.getValue()) {
+            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+                    entry instanceof MapEntryNode);
+            MapEntryNode mapEntry = (MapEntryNode)entry;
+            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+            Object value = idLeaf.get().getValue();
+            assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
+                    listEntryKeys.remove(value));
+        }
 
+        if(!listEntryKeys.isEmpty()) {
+            fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
+                    listEntryKeys);
         }
 
+        assertEquals("Last log index", nListEntries,
+                shard.underlyingActor().getShardMBean().getLastLogIndex());
+        assertEquals("Commit index", nListEntries,
+                shard.underlyingActor().getShardMBean().getCommitIndex());
+        assertEquals("Last applied", nListEntries,
+                shard.underlyingActor().getShardMBean().getLastApplied());
     }
 
+    private CompositeModificationPayload newPayload(Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new CompositeModificationPayload(compMod.toSerializable());
+    }
+
+    @SuppressWarnings("unchecked")
     @Test
-    public void testCreateSnapshot() throws IOException, InterruptedException {
+    public void testForwardedCommitTransactionWithPersistence() throws IOException {
+        System.setProperty("shard.persistent", "true");
+
         new ShardTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testCreateSnapshot");
+            waitUntilLeader(shard);
 
-            // Wait for a specific log message to show up
-            this.waitForLogMessage(Logging.Info.class, subject, "Switching from state Candidate to Leader");
+            NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class);
+            doReturn(Futures.immediateFuture(null)).when(cohort).commit();
 
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+            MutableCompositeModification modification = new MutableCompositeModification();
+            modification.addModification(new WriteModification(TestModel.TEST_PATH, node,
+                    SCHEMA_CONTEXT));
 
-                    subject.tell(
-                        new UpdateSchemaContext(TestModel.createTestContext()),
-                        getRef());
+            shard.tell(new ForwardedCommitTransaction(cohort, modification), getRef());
 
-                    subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
-                        getRef());
+            expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
 
-                    waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+            verify(cohort).commit();
 
-                    subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
-                        getRef());
+            assertEquals("Last log index", 0, shard.underlyingActor().getShardMBean().getLastLogIndex());
+        }};
+    }
 
-                    waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+    @Test
+    public void testCreateSnapshot() throws IOException, InterruptedException {
+        new ShardTestKit(getSystem()) {{
+            final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateSnapshot");
 
-                }
-            };
+            waitUntilLeader(subject);
+
+            subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+
+            waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
+            subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
 
-            deletePersistenceFiles();
+            waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
         }};
     }
 
@@ -366,7 +408,7 @@ public class ShardTest extends AbstractActorTest {
         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
             MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
 
-        store.onGlobalContextUpdated(TestModel.createTestContext());
+        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
 
         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
         putTransaction.write(TestModel.TEST_PATH,
@@ -424,4 +466,46 @@ public class ShardTest extends AbstractActorTest {
             }
         };
     }
+
+    private static final class DelegatingShardCreator implements Creator<Shard> {
+        private final Creator<Shard> delegate;
+
+        DelegatingShardCreator(Creator<Shard> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public Shard create() throws Exception {
+            return delegate.create();
+        }
+    }
+
+    private static class ShardTestKit extends JavaTestKit {
+
+        private ShardTestKit(ActorSystem actorSystem) {
+            super(actorSystem);
+        }
+
+        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
+            // Wait for a specific log message to show up
+            final boolean result =
+                new JavaTestKit.EventFilter<Boolean>(logLevel
+                ) {
+                    @Override
+                    protected Boolean run() {
+                        return true;
+                    }
+                }.from(subject.path().toString())
+                    .message(logMessage)
+                    .occurrences(1).exec();
+
+            Assert.assertEquals(true, result);
+
+        }
+
+        protected void waitUntilLeader(ActorRef subject) {
+            waitForLogMessage(Logging.Info.class, subject,
+                    "Switching from state Candidate to Leader");
+        }
+    }
 }
index 0beb00b435ebe622d888dd58b40d938877c4d612..3f31591c79c4f7d3aa6dff45f518c2f8d38726e0 100644 (file)
@@ -503,7 +503,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
         datastoreContext = new DatastoreContext("Test",
                 InMemoryDOMDataStoreConfigProperties.getDefault(),
-                Duration.create(500, TimeUnit.MILLISECONDS), 5);
+                Duration.create(500, TimeUnit.MILLISECONDS), 5, 1000, 1000, 500);
 
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java
new file mode 100644 (file)
index 0000000..c9a0eaf
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.Maps;
+import scala.concurrent.Future;
+import akka.dispatch.Futures;
+import akka.japi.Procedure;
+import akka.persistence.PersistentConfirmation;
+import akka.persistence.PersistentId;
+import akka.persistence.PersistentImpl;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
+
+public class InMemoryJournal extends AsyncWriteJournal {
+
+    private static Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
+
+    public static void addEntry(String persistenceId, long sequenceNr, Object data) {
+        Map<Long, Object> journal = journals.get(persistenceId);
+        if(journal == null) {
+            journal = Maps.newLinkedHashMap();
+            journals.put(persistenceId, journal);
+        }
+
+        journal.put(sequenceNr, data);
+    }
+
+    public static void clear() {
+        journals.clear();
+    }
+
+    @Override
+    public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
+            long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
+        return Futures.future(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                Map<Long, Object> journal = journals.get(persistenceId);
+                if(journal == null) {
+                    return null;
+                }
+
+                for (Map.Entry<Long,Object> entry : journal.entrySet()) {
+                    PersistentRepr persistentMessage =
+                        new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null);
+                    replayCallback.apply(persistentMessage);
+                }
+
+                return null;
+            }
+        }, context().dispatcher());
+    }
+
+    @Override
+    public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
+        return Futures.successful(new Long(0));
+    }
+
+    @Override
+    public Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> messages) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> confirmations) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> messageIds, boolean permanent) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+        return Futures.successful(null);
+    }
+}
index 0e492f0fbbc57ada924540b7f2958ea9c81ae437..22e522b760792366997fe0e4576ab4d576fc390f 100644 (file)
@@ -16,46 +16,66 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.snapshot.japi.SnapshotStore;
 import com.google.common.collect.Iterables;
 import scala.concurrent.Future;
-
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 
 public class InMemorySnapshotStore extends SnapshotStore {
 
-    Map<String, List<Snapshot>> snapshots = new HashMap<>();
+    private static Map<String, List<StoredSnapshot>> snapshots = new ConcurrentHashMap<>();
+
+    public static void addSnapshot(String persistentId, Snapshot snapshot) {
+        List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
+
+        if(snapshotList == null) {
+            snapshotList = new ArrayList<>();
+            snapshots.put(persistentId, snapshotList);
+        }
+
+        snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(),
+                System.currentTimeMillis()), snapshot));
+    }
+
+    public static void clear() {
+        snapshots.clear();
+    }
 
-    @Override public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
+    @Override
+    public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
         SnapshotSelectionCriteria snapshotSelectionCriteria) {
-        List<Snapshot> snapshotList = snapshots.get(s);
+        List<StoredSnapshot> snapshotList = snapshots.get(s);
         if(snapshotList == null){
             return Futures.successful(Option.<SelectedSnapshot>none());
         }
 
-        Snapshot snapshot = Iterables.getLast(snapshotList);
+        StoredSnapshot snapshot = Iterables.getLast(snapshotList);
         SelectedSnapshot selectedSnapshot =
             new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData());
         return Futures.successful(Option.some(selectedSnapshot));
     }
 
-    @Override public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
-        List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+    @Override
+    public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
+        List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
 
         if(snapshotList == null){
             snapshotList = new ArrayList<>();
             snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
         }
-        snapshotList.add(new Snapshot(snapshotMetadata, o));
+        snapshotList.add(new StoredSnapshot(snapshotMetadata, o));
 
         return Futures.successful(null);
     }
 
-    @Override public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
+    @Override
+    public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
     }
 
-    @Override public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
-        List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+    @Override
+    public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
+        List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
 
         if(snapshotList == null){
             return;
@@ -64,7 +84,7 @@ public class InMemorySnapshotStore extends SnapshotStore {
         int deleteIndex = -1;
 
         for(int i=0;i<snapshotList.size(); i++){
-            Snapshot snapshot = snapshotList.get(i);
+            StoredSnapshot snapshot = snapshotList.get(i);
             if(snapshotMetadata.equals(snapshot.getMetadata())){
                 deleteIndex = i;
                 break;
@@ -77,9 +97,10 @@ public class InMemorySnapshotStore extends SnapshotStore {
 
     }
 
-    @Override public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
+    @Override
+    public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
         throws Exception {
-        List<Snapshot> snapshotList = snapshots.get(s);
+        List<StoredSnapshot> snapshotList = snapshots.get(s);
 
         if(snapshotList == null){
             return;
@@ -90,11 +111,11 @@ public class InMemorySnapshotStore extends SnapshotStore {
         snapshots.remove(s);
     }
 
-    private static class Snapshot {
+    private static class StoredSnapshot {
         private final SnapshotMetadata metadata;
         private final Object data;
 
-        private Snapshot(SnapshotMetadata metadata, Object data) {
+        private StoredSnapshot(SnapshotMetadata metadata, Object data) {
             this.metadata = metadata;
             this.data = data;
         }
index f0dadc618b2b4769b0240ff1c55567b28c924fb9..3a37dd937656d028bd20e5591975d509521d7a93 100644 (file)
@@ -1,5 +1,6 @@
 akka {
     persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
 
     loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
 
@@ -17,6 +18,10 @@ akka {
     }
 }
 
+in-memory-journal {
+    class = "org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal"
+}
+
 in-memory-snapshot-store {
   # Class name of the plugin.
   class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"