Merge "BUG 2773 : Transition Shard to Leader state when it has no peers"
authorTom Pantelis <tpanteli@brocade.com>
Fri, 27 Mar 2015 12:52:22 +0000 (12:52 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 27 Mar 2015 12:52:23 +0000 (12:52 +0000)
34 files changed:
opendaylight/commons/opendaylight/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DOMConcurrentDataCommitCoordinatorTest.java with 99% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java

index 130cb11c5afec8039831cf78d271c91a1e674074..2915a8dfd93de373373476ea717779937673e78d 100644 (file)
     <yang-ext.version>2013.09.07.7-SNAPSHOT</yang-ext.version>
     <yang-jmx-generator.version>1.1.0-SNAPSHOT</yang-jmx-generator.version>
     <yangtools.version>0.7.0-SNAPSHOT</yangtools.version>
-    <sshd-core.version>0.12.0</sshd-core.version>
+    <sshd-core.version>0.14.0</sshd-core.version>
     <jmh.version>0.9.7</jmh.version>
     <lmax.version>3.3.0</lmax.version>
   </properties>
index 15063cff5b4a8a9c0114dbad73709f27172fc050..bcfd472bf6c394ab88b5e10ced1a0263be01e9f6 100644 (file)
@@ -10,21 +10,17 @@ package org.opendaylight.controller.cluster.raft;
 
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 public class FollowerLogInformationImpl implements FollowerLogInformation {
-    private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
-    private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
-
     private final String id;
 
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
 
     private final RaftActorContext context;
 
-    private volatile long nextIndex;
+    private long nextIndex;
 
-    private volatile long matchIndex;
+    private long matchIndex;
 
     private long lastReplicatedIndex = -1L;
 
@@ -39,13 +35,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
     }
 
     @Override
-    public long incrNextIndex(){
-        return NEXT_INDEX_UPDATER.incrementAndGet(this);
+    public long incrNextIndex() {
+        return nextIndex++;
     }
 
     @Override
     public long decrNextIndex() {
-        return NEXT_INDEX_UPDATER.decrementAndGet(this);
+        return nextIndex--;
     }
 
     @Override
@@ -60,7 +56,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public long incrMatchIndex(){
-        return MATCH_INDEX_UPDATER.incrementAndGet(this);
+        return matchIndex++;
     }
 
     @Override
index aa72485187cc9143fbcf6eac5f1adb0b7815b27e..b74259d4851153659df0c2866f6323b9234eff06 100644 (file)
@@ -22,7 +22,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
@@ -40,7 +39,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
@@ -104,6 +102,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 public void apply(ApplyJournalEntries param) throws Exception {
                 }
             };
+    private static final String COMMIT_SNAPSHOT = "commit_snapshot";
 
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
@@ -119,13 +118,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      */
     private final RaftActorContextImpl context;
 
+    private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
+
     /**
      * The in-memory journal
      */
     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
 
-    private CaptureSnapshot captureSnapshot = null;
-
     private Stopwatch recoveryTimer;
 
     private int currentRecoveryBatchCount;
@@ -379,26 +378,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
                     persistenceId(), saveSnapshotFailure.cause());
 
-            context.getReplicatedLog().snapshotRollback();
-
-            LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
-                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
-                context.getReplicatedLog().getSnapshotIndex(),
-                context.getReplicatedLog().getSnapshotTerm(),
-                context.getReplicatedLog().size());
+            context.getSnapshotManager().rollback();
 
         } else if (message instanceof CaptureSnapshot) {
             LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
 
-            if(captureSnapshot == null) {
-                captureSnapshot = (CaptureSnapshot)message;
-                createSnapshot();
-            }
+            context.getSnapshotManager().create(createSnapshotProcedure);
 
-        } else if (message instanceof CaptureSnapshotReply){
+        } else if (message instanceof CaptureSnapshotReply) {
             handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
         } else if(message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
+        } else if (message.equals(COMMIT_SNAPSHOT)) {
+            commitSnapshot(-1);
         } else {
             reusableBehaviorStateHolder.init(currentBehavior);
 
@@ -416,7 +408,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .currentTerm(context.getTermInformation().getCurrentTerm())
                 .inMemoryJournalDataSize(replicatedLog.dataSize())
                 .inMemoryJournalLogSize(replicatedLog.size())
-                .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated())
+                .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
                 .lastApplied(context.getLastApplied())
                 .lastIndex(replicatedLog.lastIndex())
                 .lastTerm(replicatedLog.lastTerm())
@@ -515,15 +507,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             // the state to durable storage
                             self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
 
-                            // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
-                            if(!context.isSnapshotCaptureInitiated()){
-                                raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
-                                        raftContext.getTermInformation().getCurrentTerm());
-                                raftContext.getReplicatedLog().snapshotCommit();
-                            } else {
-                                LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
-                                        persistenceId(), getId());
-                            }
+                            context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+
                         } else if (clientActor != null) {
                             // Send message for replication
                             currentBehavior.handleMessage(getSelf(),
@@ -621,10 +606,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void commitSnapshot(long sequenceNumber) {
-        context.getReplicatedLog().snapshotCommit();
-
-        // TODO: Not sure if we want to be this aggressive with trimming stuff
-        trimPersistentData(sequenceNumber);
+        context.getSnapshotManager().commit(persistence(), sequenceNumber);
     }
 
     /**
@@ -716,17 +698,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
-    private void trimPersistentData(long sequenceNumber) {
-        // Trim akka snapshots
-        // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
-        // For now guessing that it is ANDed.
-        persistence().deleteSnapshots(new SnapshotSelectionCriteria(
-            sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
-
-        // Trim akka journal
-        persistence().deleteMessages(sequenceNumber);
-    }
-
     private String getLeaderAddress(){
         if(isLeader()){
             return getSelf().path().toString();
@@ -747,67 +718,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
         LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
 
-        // create a snapshot object from the state provided and save it
-        // when snapshot is saved async, SaveSnapshotSuccess is raised.
-
-        Snapshot sn = Snapshot.create(snapshotBytes,
-            context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
-            captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
-            captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
-
-        persistence().saveSnapshot(sn);
-
-        LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
-
-        long dataThreshold = getTotalMemory() *
-                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-        if (context.getReplicatedLog().dataSize() > dataThreshold) {
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
-                        persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
-                        captureSnapshot.getLastAppliedIndex());
-            }
-
-            // if memory is less, clear the log based on lastApplied.
-            // this could/should only happen if one of the followers is down
-            // as normally we keep removing from the log when its replicated to all.
-            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
-                    captureSnapshot.getLastAppliedTerm());
-
-            // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
-            // install snapshot to a follower.
-            if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
-                getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
-            }
-        } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
-            // clear the log based on replicatedToAllIndex
-            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
-                    captureSnapshot.getReplicatedToAllTerm());
-
-            getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
-        } else {
-            // The replicatedToAllIndex was not found in the log
-            // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
-            // In this scenario we may need to save the snapshot to the akka persistence
-            // snapshot for recovery but we do not need to do the replicated log trimming.
-            context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
-                    replicatedLog.getSnapshotTerm());
-        }
-
-
-        LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " +
-            "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(),
-            replicatedLog.getSnapshotTerm());
-
-        if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
-            // this would be call straight to the leader and won't initiate in serialization
-            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
-                    ByteString.copyFrom(snapshotBytes)));
-        }
-
-        captureSnapshot = null;
-        context.setSnapshotCaptureInitiated(false);
+        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
     }
 
     protected long getTotalMemory() {
@@ -819,9 +730,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
-
         private static final int DATA_SIZE_DIVIDER = 5;
-        private long dataSizeSinceLastSnapshot = 0;
+        private long dataSizeSinceLastSnapshot = 0L;
+
 
         public ReplicatedLogImpl(Snapshot snapshot) {
             super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
@@ -887,9 +798,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         long dataSizeForCheck = dataSize;
 
                         dataSizeSinceLastSnapshot += logEntrySize;
-                        long journalSize = lastIndex() + 1;
 
-                        if(!hasFollowers()) {
+                        if (!hasFollowers()) {
                             // When we do not have followers we do not maintain an in-memory log
                             // due to this the journalSize will never become anything close to the
                             // snapshot batch count. In fact will mostly be 1.
@@ -903,51 +813,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             // as if we were maintaining a real snapshot
                             dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
                         }
-
+                        long journalSize = replicatedLogEntry.getIndex() + 1;
                         long dataThreshold = getTotalMemory() *
-                                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
-                        // when a snaphsot is being taken, captureSnapshot != null
-                        if (!context.isSnapshotCaptureInitiated() &&
-                                ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
-                                        dataSizeForCheck > dataThreshold)) {
+                                context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
 
-                            dataSizeSinceLastSnapshot = 0;
+                        if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+                                || dataSizeForCheck > dataThreshold)) {
 
-                            LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
-                                " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+                            boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+                                    currentBehavior.getReplicatedToAllIndex());
 
-                            long lastAppliedIndex = -1;
-                            long lastAppliedTerm = -1;
-
-                            ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
-                            if (!hasFollowers()) {
-                                lastAppliedIndex = replicatedLogEntry.getIndex();
-                                lastAppliedTerm = replicatedLogEntry.getTerm();
-                            } else if (lastAppliedEntry != null) {
-                                lastAppliedIndex = lastAppliedEntry.getIndex();
-                                lastAppliedTerm = lastAppliedEntry.getTerm();
-                            }
-
-                            if(LOG.isDebugEnabled()) {
-                                LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
-                                LOG.debug("{}: Snapshot Capture lastApplied:{} ",
-                                        persistenceId(), context.getLastApplied());
-                                LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
-                                        lastAppliedIndex);
-                                LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
-                                        lastAppliedTerm);
+                            if(started){
+                                dataSizeSinceLastSnapshot = 0;
                             }
 
-                            // send a CaptureSnapshot to self to make the expensive operation async.
-                            long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
-                            ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-                            getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
-                                (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
-                                (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
-                                null);
-                            context.setSnapshotCaptureInitiated(true);
                         }
+
                         if (callback != null){
                             callback.apply(replicatedLogEntry);
                         }
@@ -1051,7 +932,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         @Override
         public void saveSnapshot(Object o) {
             // Make saving Snapshot successful
-            commitSnapshot(-1L);
+            // Committing the snapshot here would end up calling commit in the creating state which would
+            // be a state violation. That's why now we send a message to commit the snapshot.
+            self().tell(COMMIT_SNAPSHOT, self());
+        }
+    }
+
+
+    private class CreateSnapshotProcedure implements Procedure<Void> {
+
+        @Override
+        public void apply(Void aVoid) throws Exception {
+            createSnapshot();
         }
     }
 
index 9d391a1588ba31c575125572db2ef642e3e14e58..2e7eb5eb3aaf221e2889b412e0942d7ea71c241c 100644 (file)
@@ -166,8 +166,6 @@ public interface RaftActorContext {
      */
     ConfigParams getConfigParams();
 
-    void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated);
-
-    boolean isSnapshotCaptureInitiated();
+    SnapshotManager getSnapshotManager();
 
 }
index 6fc5e4369bb4879e1e8dd2ca01f92b98d5ec4ba4..eb059d60fbee1c89d6a309e5cfaedbf5bc608c64 100644 (file)
@@ -41,6 +41,10 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private boolean snapshotCaptureInitiated;
 
+    // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
+    // be passed to it in the constructor
+    private SnapshotManager snapshotManager;
+
     public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
         String id,
         ElectionTerm termInformation, long commitIndex,
@@ -134,16 +138,6 @@ public class RaftActorContextImpl implements RaftActorContext {
         return configParams;
     }
 
-    @Override
-    public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
-        this.snapshotCaptureInitiated = snapshotCaptureInitiated;
-    }
-
-    @Override
-    public boolean isSnapshotCaptureInitiated() {
-        return snapshotCaptureInitiated;
-    }
-
     @Override public void addToPeers(String name, String address) {
         peerAddresses.put(name, address);
     }
@@ -166,4 +160,11 @@ public class RaftActorContextImpl implements RaftActorContext {
 
         peerAddresses.put(peerId, peerAddress);
     }
+
+    public SnapshotManager getSnapshotManager() {
+        if(snapshotManager == null){
+            snapshotManager = new SnapshotManager(this, LOG);
+        }
+        return snapshotManager;
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
new file mode 100644 (file)
index 0000000..432d678
--- /dev/null
@@ -0,0 +1,431 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+public class SnapshotManager implements SnapshotState {
+
+
+    private final SnapshotState IDLE = new Idle();
+    private final SnapshotState CAPTURING = new Capturing();
+    private final SnapshotState PERSISTING = new Persisting();
+    private final SnapshotState CREATING = new Creating();
+
+    private final Logger LOG;
+    private final RaftActorContext context;
+    private final LastAppliedTermInformationReader lastAppliedTermInformationReader =
+            new LastAppliedTermInformationReader();
+    private final ReplicatedToAllTermInformationReader replicatedToAllTermInformationReader =
+            new ReplicatedToAllTermInformationReader();
+
+
+    private SnapshotState currentState = IDLE;
+    private CaptureSnapshot captureSnapshot;
+
+    public SnapshotManager(RaftActorContext context, Logger logger) {
+        this.context = context;
+        this.LOG = logger;
+    }
+
+    @Override
+    public boolean isCapturing() {
+        return currentState.isCapturing();
+    }
+
+    @Override
+    public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+        return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower);
+    }
+
+    @Override
+    public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        return currentState.capture(lastLogEntry, replicatedToAllIndex);
+    }
+
+    @Override
+    public void create(Procedure<Void> callback) {
+        currentState.create(callback);
+    }
+
+    @Override
+    public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+                        RaftActorBehavior currentBehavior, long totalMemory) {
+        currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory);
+    }
+
+    @Override
+    public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+        currentState.commit(persistenceProvider, sequenceNumber);
+    }
+
+    @Override
+    public void rollback() {
+        currentState.rollback();
+    }
+
+    @Override
+    public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+        return currentState.trimLog(desiredTrimIndex, currentBehavior);
+    }
+
+    private boolean hasFollowers(){
+        return context.getPeerAddresses().keySet().size() > 0;
+    }
+
+    private String persistenceId(){
+        return context.getId();
+    }
+
+    private class AbstractSnapshotState implements SnapshotState {
+
+        @Override
+        public boolean isCapturing() {
+            return false;
+        }
+
+        @Override
+        public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            LOG.debug("capture should not be called in state {}", this);
+            return false;
+        }
+
+        @Override
+        public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+            LOG.debug("captureToInstall should not be called in state {}", this);
+            return false;
+        }
+
+        @Override
+        public void create(Procedure<Void> callback) {
+            LOG.debug("create should not be called in state {}", this);
+        }
+
+        @Override
+        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+                            RaftActorBehavior currentBehavior, long totalMemory) {
+            LOG.debug("persist should not be called in state {}", this);
+        }
+
+        @Override
+        public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+            LOG.debug("commit should not be called in state {}", this);
+        }
+
+        @Override
+        public void rollback() {
+            LOG.debug("rollback should not be called in state {}", this);
+        }
+
+        @Override
+        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+            LOG.debug("trimLog should not be called in state {}", this);
+            return -1;
+        }
+
+        protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
+            //  we would want to keep the lastApplied as its used while capturing snapshots
+            long lastApplied = context.getLastApplied();
+            long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
+
+            if(LOG.isTraceEnabled()) {
+                LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
+                        persistenceId(), desiredTrimIndex, lastApplied, tempMin);
+            }
+
+            if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
+                LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
+                        context.getTermInformation().getCurrentTerm());
+
+                //use the term of the temp-min, since we check for isPresent, entry will not be null
+                ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
+                context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
+                context.getReplicatedLog().snapshotCommit();
+                return tempMin;
+            } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+                // It's possible a follower was lagging and an install snapshot advanced its match index past
+                // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
+                // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
+                // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
+                // trim the log to the last applied index even if previous entries weren't replicated to all followers.
+                currentBehavior.setReplicatedToAllIndex(tempMin);
+            }
+            return -1;
+        }
+    }
+
+    private class Idle extends AbstractSnapshotState {
+
+        private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+            TermInformationReader lastAppliedTermInfoReader =
+                    lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
+                            lastLogEntry, hasFollowers());
+
+            long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
+            long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
+
+            TermInformationReader replicatedToAllTermInfoReader =
+                    replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
+
+            long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
+            long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
+
+            // send a CaptureSnapshot to self to make the expensive operation async.
+            captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
+                    lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
+                    newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
+
+            SnapshotManager.this.currentState = CAPTURING;
+
+            if(targetFollower != null){
+                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
+            } else {
+                LOG.info("{}: Initiating snapshot capture {} to install on {}",
+                        persistenceId(), captureSnapshot, targetFollower);
+            }
+
+            context.getActor().tell(captureSnapshot, context.getActor());
+
+            return true;
+        }
+
+        @Override
+        public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            return capture(lastLogEntry, replicatedToAllIndex, null);
+        }
+
+        @Override
+        public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+            return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
+        }
+
+        @Override
+        public String toString() {
+            return "Idle";
+        }
+
+        @Override
+        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+            return doTrimLog(desiredTrimIndex, currentBehavior);
+        }
+    }
+
+    private class Capturing extends AbstractSnapshotState {
+
+        @Override
+        public boolean isCapturing() {
+            return true;
+        }
+
+        @Override
+        public void create(Procedure<Void> callback) {
+            try {
+                callback.apply(null);
+                SnapshotManager.this.currentState = CREATING;
+            } catch (Exception e) {
+                LOG.error("Unexpected error occurred", e);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "Capturing";
+        }
+
+    }
+
+    private class Creating extends AbstractSnapshotState {
+
+        @Override
+        public boolean isCapturing() {
+            return true;
+        }
+
+        @Override
+        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+                            RaftActorBehavior currentBehavior, long totalMemory) {
+            // create a snapshot object from the state provided and save it
+            // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+            Snapshot sn = Snapshot.create(snapshotBytes,
+                    context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+                    captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+                    captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+            persistenceProvider.saveSnapshot(sn);
+
+            LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
+
+            long dataThreshold = totalMemory *
+                    context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+            if (context.getReplicatedLog().dataSize() > dataThreshold) {
+
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
+                            persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
+                            captureSnapshot.getLastAppliedIndex());
+                }
+
+                // if memory is less, clear the log based on lastApplied.
+                // this could/should only happen if one of the followers is down
+                // as normally we keep removing from the log when its replicated to all.
+                context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+                        captureSnapshot.getLastAppliedTerm());
+
+                // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
+                // install snapshot to a follower.
+                if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
+                    currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+                }
+
+            } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
+                // clear the log based on replicatedToAllIndex
+                context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+                        captureSnapshot.getReplicatedToAllTerm());
+
+                currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+            } else {
+                // The replicatedToAllIndex was not found in the log
+                // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
+                // In this scenario we may need to save the snapshot to the akka persistence
+                // snapshot for recovery but we do not need to do the replicated log trimming.
+                context.getReplicatedLog().snapshotPreCommit(context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm());
+            }
+
+            LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+                            "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
+                    captureSnapshot.getLastAppliedTerm());
+
+            if (context.getId().equals(currentBehavior.getLeaderId())
+                    && captureSnapshot.isInstallSnapshotInitiated()) {
+                // this would be call straight to the leader and won't initiate in serialization
+                currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(
+                        ByteString.copyFrom(snapshotBytes)));
+            }
+
+            captureSnapshot = null;
+            SnapshotManager.this.currentState = PERSISTING;
+        }
+
+        @Override
+        public String toString() {
+            return "Creating";
+        }
+
+    }
+
+    private class Persisting extends AbstractSnapshotState {
+
+        @Override
+        public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+            context.getReplicatedLog().snapshotCommit();
+            persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
+                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+
+            persistenceProvider.deleteMessages(sequenceNumber);
+
+            SnapshotManager.this.currentState = IDLE;
+        }
+
+        @Override
+        public void rollback() {
+            context.getReplicatedLog().snapshotRollback();
+
+            LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
+                            "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
+                    context.getReplicatedLog().getSnapshotIndex(),
+                    context.getReplicatedLog().getSnapshotTerm(),
+                    context.getReplicatedLog().size());
+
+            SnapshotManager.this.currentState = IDLE;
+        }
+
+        @Override
+        public String toString() {
+            return "Persisting";
+        }
+
+    }
+
+    private static interface TermInformationReader {
+        long getIndex();
+        long getTerm();
+    }
+
+    private static class LastAppliedTermInformationReader implements TermInformationReader{
+        private long index;
+        private long term;
+
+        public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex,
+                                         ReplicatedLogEntry lastLogEntry, boolean hasFollowers){
+            ReplicatedLogEntry entry = log.get(originalIndex);
+            this.index = -1L;
+            this.term = -1L;
+            if (!hasFollowers) {
+                if(lastLogEntry != null) {
+                    index = lastLogEntry.getIndex();
+                    term = lastLogEntry.getTerm();
+                }
+            } else if (entry != null) {
+                index = entry.getIndex();
+                term = entry.getTerm();
+            } else if(log.getSnapshotIndex() > -1){
+                index = log.getSnapshotIndex();
+                term = log.getSnapshotTerm();
+            }
+            return this;
+        }
+
+        @Override
+        public long getIndex(){
+            return this.index;
+        }
+
+        @Override
+        public long getTerm(){
+            return this.term;
+        }
+    }
+
+    private static class ReplicatedToAllTermInformationReader implements TermInformationReader{
+        private long index;
+        private long term;
+
+        ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){
+            ReplicatedLogEntry entry = log.get(originalIndex);
+            this.index = -1L;
+            this.term = -1L;
+
+            if (entry != null) {
+                index = entry.getIndex();
+                term = entry.getTerm();
+            }
+
+            return this;
+        }
+
+        @Override
+        public long getIndex(){
+            return this.index;
+        }
+
+        @Override
+        public long getTerm(){
+            return this.term;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
new file mode 100644 (file)
index 0000000..9a9bf1c
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+public interface SnapshotState {
+    /**
+     * Should return true when a snapshot is being captured
+     * @return
+     */
+    boolean isCapturing();
+
+    /**
+     * Initiate capture snapshot
+     *
+     * @param lastLogEntry the last entry in the replicated log
+     * @param replicatedToAllIndex the current replicatedToAllIndex
+     *
+     * @return true if capture was started
+     */
+    boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+
+    /**
+     * Initiate capture snapshot for the purposing of installing that snapshot
+     *
+     * @param lastLogEntry
+     * @param replicatedToAllIndex
+     * @param targetFollower
+     *
+     * @return true if capture was started
+     */
+    boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
+
+    /**
+     * Create the snapshot
+     *
+     * @param callback a procedure to be called which should create the snapshot
+     */
+    void create(Procedure<Void> callback);
+
+    /**
+     * Persist the snapshot
+     *
+     * @param persistenceProvider
+     * @param snapshotBytes
+     * @param currentBehavior
+     * @param totalMemory
+     */
+    void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior
+            ,long totalMemory);
+
+    /**
+     * Commit the snapshot by trimming the log
+     *
+     * @param persistenceProvider
+     * @param sequenceNumber
+     */
+    void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber);
+
+    /**
+     * Rollback the snapshot
+     */
+    void rollback();
+
+    /**
+     * Trim the log
+     *
+     * @param desiredTrimIndex
+     * @return the actual trim index
+     */
+    long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior);
+}
index a63c62fa30740b5830676ab6f15f3de9c1988e7b..2c433f90076b20cda5002b819bf08b4dd7211104 100644 (file)
@@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -235,7 +234,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(context.getCommitIndex());
         }
 
-        if (!context.isSnapshotCaptureInitiated()) {
+        if (!context.getSnapshotManager().isCapturing()) {
             purgeInMemoryLog();
         }
 
@@ -388,7 +387,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerToSnapshot.markSendStatus(false);
             }
 
-            if (wasLastChunk && !context.isSnapshotCaptureInitiated()) {
+            if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
                 // Since the follower is now caught up try to purge the log.
                 purgeInMemoryLog();
             } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
@@ -491,7 +490,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         sendAppendEntries = true;
                     }
                 } else if (isFollowerActive && followerNextIndex >= 0 &&
-                    leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
+                    leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
                     // if the followers next index is not present in the leaders log, and
                     // if the follower is just not starting and if leader's index is more than followers index
                     // then snapshot should be sent
@@ -562,37 +561,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 final ActorSelection followerActor = context.getPeerActorSelection(followerId);
                 sendSnapshotChunk(followerActor, followerId);
 
-            } else if (!context.isSnapshotCaptureInitiated()) {
 
-                ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
-                long lastAppliedIndex = -1;
-                long lastAppliedTerm = -1;
-
-                if (lastAppliedEntry != null) {
-                    lastAppliedIndex = lastAppliedEntry.getIndex();
-                    lastAppliedTerm = lastAppliedEntry.getTerm();
-                } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
-                    lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
-                    lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
-                }
-
-                boolean isInstallSnapshotInitiated = true;
-                long replicatedToAllIndex = super.getReplicatedToAllIndex();
-                ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-
-                CaptureSnapshot captureSnapshot = new CaptureSnapshot(
-                        lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
-                        (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
-                        (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
-                        isInstallSnapshotInitiated);
-
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId,
-                            captureSnapshot);
-                }
-
-                actor().tell(captureSnapshot, actor());
-                context.setSnapshotCaptureInitiated(true);
+            } else {
+                context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+                        this.getReplicatedToAllIndex(), followerId);
             }
         }
     }
index 45671ea31e4c804f9993df96e3d534ceaf6e4247..c276d32cce33d5b5bfada40f7f62afb6244a2e07 100644 (file)
@@ -462,31 +462,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param snapshotCapturedIndex
      */
     protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
-        //  we would want to keep the lastApplied as its used while capturing snapshots
-        long lastApplied = context.getLastApplied();
-        long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
+        long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this);
 
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("{}: performSnapshotWithoutCapture: snapshotCapturedIndex: {}, lastApplied: {}, tempMin: {}",
-                    logName, snapshotCapturedIndex, lastApplied, tempMin);
-        }
-
-        if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
-            LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin,
-                    context.getTermInformation().getCurrentTerm());
-
-            //use the term of the temp-min, since we check for isPresent, entry will not be null
-            ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
-            context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
-            context.getReplicatedLog().snapshotCommit();
-            setReplicatedToAllIndex(tempMin);
-        } else if(tempMin > getReplicatedToAllIndex()) {
-            // It's possible a follower was lagging and an install snapshot advanced its match index past
-            // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
-            // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
-            // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
-            // trim the log to the last applied index even if previous entries weren't replicated to all followers.
-            setReplicatedToAllIndex(tempMin);
+        if(actualIndex != -1){
+            setReplicatedToAllIndex(actualIndex);
         }
     }
 
index a1174d70dcfd40d4272f3ef664bc8ed3c78d6868..a6722e6ff98dbbe9ab68df6c9e04915c23c8721a 100644 (file)
@@ -260,7 +260,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         sender.tell(reply, actor());
 
-        if (!context.isSnapshotCaptureInitiated()) {
+        if (!context.getSnapshotManager().isCapturing()) {
             super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
         }
 
index 1cc7b5f576e81df7d160bc0b7a2f1ca65eba97bc..53cca237413ea7383c912cda09a8b9e6d6e6f098 100644 (file)
@@ -35,6 +35,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private Map<String, String> peerAddresses = new HashMap<>();
     private ConfigParams configParams;
     private boolean snapshotCaptureInitiated;
+    private SnapshotManager snapshotManager;
 
     public MockRaftActorContext(){
         electionTerm = new ElectionTerm() {
@@ -191,13 +192,11 @@ public class MockRaftActorContext implements RaftActorContext {
     }
 
     @Override
-    public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
-        this.snapshotCaptureInitiated = snapshotCaptureInitiated;
-    }
-
-    @Override
-    public boolean isSnapshotCaptureInitiated() {
-        return snapshotCaptureInitiated;
+    public SnapshotManager getSnapshotManager() {
+        if(this.snapshotManager == null){
+            this.snapshotManager = new SnapshotManager(this, getLogger());
+        }
+        return this.snapshotManager;
     }
 
     public void setConfigParams(ConfigParams configParams) {
index a3b070e2cc4e91a60b0d1a073010b3ed15955289..0a4a2c7717facfcc9fc883c2234d4577ff49f876 100644 (file)
@@ -61,7 +61,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntrie
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
@@ -760,10 +759,12 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("C"),
                         new MockRaftActorContext.MockPayload("D")));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
-
                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
 
+                raftActorContext.getSnapshotManager().capture(
+                        new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
+                                new MockRaftActorContext.MockPayload("D")), -1);
+
                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
@@ -792,12 +793,13 @@ public class RaftActorTest extends AbstractActorTest {
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
                 mockRaftActor.waitForInitializeBehaviorComplete();
+                MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
 
                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
-                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
+                mockRaftActor.getReplicatedLog().append(lastEntry);
 
                 ByteString snapshotBytes = fromObject(Arrays.asList(
                         new MockRaftActorContext.MockPayload("A"),
@@ -809,7 +811,8 @@ public class RaftActorTest extends AbstractActorTest {
                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
 
                 long replicatedToAllIndex = 1;
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
+
+                mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
 
                 verify(mockRaftActor.delegate).createSnapshot();
 
@@ -951,7 +954,9 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
+                raftActorContext.getSnapshotManager().capture(
+                        new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+                                new MockRaftActorContext.MockPayload("D")), 1);
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 
@@ -1126,9 +1131,10 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
-                leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
+                leaderActor.getRaftActorContext().getSnapshotManager()
+                        .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+                                new MockRaftActorContext.MockPayload("x")), 4);
 
-                leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
                 verify(leaderActor.delegate).createSnapshot();
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
@@ -1154,8 +1160,14 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-2"),
                         new MockRaftActorContext.MockPayload("foo-3"),
                         new MockRaftActorContext.MockPayload("foo-4")));
-                leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-                assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+                leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider()
+                        , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
+
+                assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+
+                // The commit is needed to complete the snapshot creation process
+                leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
 
                 // capture snapshot reply should remove the snapshotted entries only
                 assertEquals(3, leaderActor.getReplicatedLog().size());
@@ -1218,9 +1230,10 @@ public class RaftActorTest extends AbstractActorTest {
                 assertEquals(6, followerActor.getReplicatedLog().size());
 
                 //snapshot on 4
-                followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
+                followerActor.getRaftActorContext().getSnapshotManager().capture(
+                        new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+                                new MockRaftActorContext.MockPayload("D")), 4);
 
-                followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
                 verify(followerActor.delegate).createSnapshot();
 
                 assertEquals(6, followerActor.getReplicatedLog().size());
@@ -1255,7 +1268,10 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-3"),
                         new MockRaftActorContext.MockPayload("foo-4")));
                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-                assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
+                assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
+
+                // The commit is needed to complete the snapshot creation process
+                followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
 
                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
@@ -1353,7 +1369,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-3"),
                         new MockRaftActorContext.MockPayload("foo-4")));
                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-                assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+                assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
 
@@ -1437,7 +1453,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             // Trimming log in this scenario is a no-op
             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
-            assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+            assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
             assertEquals(-1, leader.getReplicatedToAllIndex());
 
         }};
@@ -1480,7 +1496,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             // Trimming log in this scenario is a no-op
             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
-            assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+            assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
             assertEquals(3, leader.getReplicatedToAllIndex());
 
         }};
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
new file mode 100644 (file)
index 0000000..3d75edb
--- /dev/null
@@ -0,0 +1,551 @@
+package org.opendaylight.controller.cluster.raft;
+
+import static junit.framework.TestCase.assertFalse;
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.testkit.TestActorRef;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.LoggerFactory;
+
+public class SnapshotManagerTest extends AbstractActorTest {
+
+    @Mock
+    private RaftActorContext mockRaftActorContext;
+
+    @Mock
+    private ConfigParams mockConfigParams;
+
+    @Mock
+    private ReplicatedLog mockReplicatedLog;
+
+    @Mock
+    private DataPersistenceProvider mockDataPersistenceProvider;
+
+    @Mock
+    private RaftActorBehavior mockRaftActorBehavior;
+
+    @Mock
+    private Procedure<Void> mockProcedure;
+
+    private SnapshotManager snapshotManager;
+
+    private TestActorFactory factory;
+
+    private TestActorRef<MessageCollectorActor> actorRef;
+
+    @Before
+    public void setUp(){
+        MockitoAnnotations.initMocks(this);
+
+        doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses();
+        doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
+        doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
+        doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
+        doReturn("123").when(mockRaftActorContext).getId();
+        doReturn("123").when(mockRaftActorBehavior).getLeaderId();
+
+        snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
+        factory = new TestActorFactory(getSystem());
+
+        actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
+        doReturn(actorRef).when(mockRaftActorContext).getActor();
+
+    }
+
+    @After
+    public void tearDown(){
+        factory.close();
+    }
+
+    @Test
+    public void testConstruction(){
+        assertEquals(false, snapshotManager.isCapturing());
+    }
+
+    @Test
+    public void testCaptureToInstall(){
+
+        // Force capturing toInstall = true
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+                new MockRaftActorContext.MockPayload()), 0, "follower-1");
+
+        assertEquals(true, snapshotManager.isCapturing());
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+
+        // LastIndex and LastTerm are picked up from the lastLogEntry
+        assertEquals(0L, captureSnapshot.getLastIndex());
+        assertEquals(1L, captureSnapshot.getLastTerm());
+
+        // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+        assertEquals(0L, captureSnapshot.getLastAppliedIndex());
+        assertEquals(1L, captureSnapshot.getLastAppliedTerm());
+
+        //
+        assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+        assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+        actorRef.underlyingActor().clear();
+    }
+
+    @Test
+    public void testCapture(){
+        boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        assertTrue(capture);
+
+        assertEquals(true, snapshotManager.isCapturing());
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+        // LastIndex and LastTerm are picked up from the lastLogEntry
+        assertEquals(9L, captureSnapshot.getLastIndex());
+        assertEquals(1L, captureSnapshot.getLastTerm());
+
+        // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+        assertEquals(9L, captureSnapshot.getLastAppliedIndex());
+        assertEquals(1L, captureSnapshot.getLastAppliedTerm());
+
+        //
+        assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+        assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+
+        actorRef.underlyingActor().clear();
+
+    }
+
+    @Test
+    public void testIllegalCapture() throws Exception {
+        boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        assertTrue(capture);
+
+        List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+
+        assertEquals(1, allMatching.size());
+
+        // This will not cause snapshot capture to start again
+        capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        assertFalse(capture);
+
+        allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+
+        assertEquals(1, allMatching.size());
+    }
+
+    @Test
+    public void testPersistWhenReplicatedToAllIndexMinusOne(){
+        doReturn("123").when(mockRaftActorContext).getId();
+        doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
+        doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
+
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+        snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
+
+        ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
+        verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
+
+        Snapshot snapshot = snapshotArgumentCaptor.getValue();
+
+        assertEquals(6, snapshot.getLastAppliedTerm());
+        assertEquals(9, snapshot.getLastAppliedIndex());
+        assertEquals(9, snapshot.getLastIndex());
+        assertEquals(6, snapshot.getLastTerm());
+        assertEquals(10, snapshot.getState().length);
+        assertTrue(Arrays.equals(bytes, snapshot.getState()));
+        assertEquals(0, snapshot.getUnAppliedEntries().size());
+
+        verify(mockReplicatedLog).snapshotPreCommit(45L, 6L);
+    }
+
+
+    @Test
+    public void testCreate() throws Exception {
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure).apply(null);
+    }
+
+    @Test
+    public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception {
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure, times(1)).apply(null);
+    }
+
+    @Test
+    public void testCallingCreateBeforeCapture() throws Exception {
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure, times(0)).apply(null);
+    }
+
+    @Test
+    public void testCallingCreateAfterPersist() throws Exception {
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure, times(1)).apply(null);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
+
+        reset(mockProcedure);
+
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure, never()).apply(null);
+    }
+
+    @Test
+    public void testPersistWhenReplicatedToAllIndexNotMinus(){
+        doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
+        doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
+        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(replicatedLogEntry).when(mockReplicatedLog).get(9);
+        doReturn(6L).when(replicatedLogEntry).getTerm();
+        doReturn(9L).when(replicatedLogEntry).getIndex();
+
+        // when replicatedToAllIndex != -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
+
+        verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+        verify(mockRaftActorBehavior).setReplicatedToAllIndex(9);
+    }
+
+
+    @Test
+    public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){
+        doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
+
+        verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+    }
+
+    @Test
+    public void testPersistSendInstallSnapshot(){
+        doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+        // when replicatedToAllIndex = -1
+        boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+        assertTrue(capture);
+
+        snapshotManager.create(mockProcedure);
+
+        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+
+        snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
+
+        verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+        ArgumentCaptor<SendInstallSnapshot> sendInstallSnapshotArgumentCaptor
+                = ArgumentCaptor.forClass(SendInstallSnapshot.class);
+
+        verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), sendInstallSnapshotArgumentCaptor.capture());
+
+        SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
+
+        assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray()));
+    }
+
+    @Test
+    public void testCallingPersistWithoutCaptureWillDoNothing(){
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
+
+        verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog, never()).snapshotPreCommit(9L, 6L);
+
+        verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
+    }
+    @Test
+    public void testCallingPersistTwiceWillDoNoHarm(){
+        doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
+
+        verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+        verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
+    }
+
+    @Test
+    public void testCommit(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
+
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        verify(mockReplicatedLog).snapshotCommit();
+
+        verify(mockDataPersistenceProvider).deleteMessages(100L);
+
+        ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
+
+        verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
+
+        assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100
+                                                                     // config snapShotBatchCount = 10
+                                                                     // therefore maxSequenceNumber = 90
+    }
+
+    @Test
+    public void testCommitBeforePersist(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        verify(mockReplicatedLog, never()).snapshotCommit();
+
+        verify(mockDataPersistenceProvider, never()).deleteMessages(100L);
+
+        verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+
+    }
+
+    @Test
+    public void testCommitBeforeCapture(){
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        verify(mockReplicatedLog, never()).snapshotCommit();
+
+        verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong());
+
+        verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+
+    }
+
+    @Test
+    public void testCallingCommitMultipleTimesCausesNoHarm(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
+
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        verify(mockReplicatedLog, times(1)).snapshotCommit();
+
+        verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
+
+        verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+    }
+
+    @Test
+    public void testRollback(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
+
+        snapshotManager.rollback();
+
+        verify(mockReplicatedLog).snapshotRollback();
+    }
+
+
+    @Test
+    public void testRollbackBeforePersist(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+        snapshotManager.rollback();
+
+        verify(mockReplicatedLog, never()).snapshotRollback();
+    }
+
+    @Test
+    public void testRollbackBeforeCapture(){
+        snapshotManager.rollback();
+
+        verify(mockReplicatedLog, never()).snapshotRollback();
+    }
+
+    @Test
+    public void testCallingRollbackMultipleTimesCausesNoHarm(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
+
+        snapshotManager.rollback();
+
+        snapshotManager.rollback();
+
+        verify(mockReplicatedLog, times(1)).snapshotRollback();
+    }
+
+    @Test
+    public void testTrimLog(){
+        ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(20L).when(mockRaftActorContext).getLastApplied();
+        doReturn(true).when(mockReplicatedLog).isPresent(10);
+        doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+        doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+        doReturn(5L).when(replicatedLogEntry).getTerm();
+
+        snapshotManager.trimLog(10, mockRaftActorBehavior);
+
+        verify(mockReplicatedLog).snapshotPreCommit(10, 5);
+        verify(mockReplicatedLog).snapshotCommit();
+    }
+
+    @Test
+    public void testTrimLogAfterCapture(){
+        boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        assertTrue(capture);
+
+        assertEquals(true, snapshotManager.isCapturing());
+
+        ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(20L).when(mockRaftActorContext).getLastApplied();
+        doReturn(true).when(mockReplicatedLog).isPresent(10);
+        doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+        doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+        doReturn(5L).when(replicatedLogEntry).getTerm();
+
+        snapshotManager.trimLog(10, mockRaftActorBehavior);
+
+        verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
+        verify(mockReplicatedLog, never()).snapshotCommit();
+
+    }
+
+    @Test
+    public void testTrimLogAfterCaptureToInstall(){
+        boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9, "follower-1");
+
+        assertTrue(capture);
+
+        assertEquals(true, snapshotManager.isCapturing());
+
+        ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(20L).when(mockRaftActorContext).getLastApplied();
+        doReturn(true).when(mockReplicatedLog).isPresent(10);
+        doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+        doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+        doReturn(5L).when(replicatedLogEntry).getTerm();
+
+        snapshotManager.trimLog(10, mockRaftActorBehavior);
+
+        verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
+        verify(mockReplicatedLog, never()).snapshotCommit();
+
+    }
+
+}
\ No newline at end of file
index 095e756409ab52aa98f0033d307f80a0bcdf0cd7..ba0bd0f29c96237ab758487b719eac959d115edc 100644 (file)
@@ -525,6 +525,8 @@ public class LeaderTest extends AbstractLeaderTest {
                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
                         new MockRaftActorContext.MockPayload("D"));
 
+        actorContext.getReplicatedLog().append(entry);
+
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
index 7df398355e0ab0cb2c9e96329652c69261910857..cfbf9450aa2b77661f21bd00904a995b48c9c3d7 100644 (file)
@@ -35,8 +35,8 @@ operational.persistent=false
 # failing an operation (eg transaction create and change listener registration).
 #shard-initialization-timeout-in-seconds=300
 
-# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
-#shard-journal-recovery-log-batch-size=5000
+# The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.
+#shard-journal-recovery-log-batch-size=1000
 
 # The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
 #shard-snapshot-batch-count=20000
index 886c4730678208fdbf129be10463eddf252f5c0f..538f2981daae891406934ffcf9605a9aecdd72c2 100644 (file)
@@ -15,12 +15,12 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
@@ -91,8 +91,9 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
 
         final long startTime = System.nanoTime();
 
+        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
         // Not using Futures.allAsList here to avoid its internal overhead.
-        final AtomicInteger remaining = new AtomicInteger(cohorts.size());
         FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
@@ -102,9 +103,12 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
                             new TransactionCommitFailedException(
                                             "Can Commit failed, no detailed cause available."));
                 } else {
-                    if(remaining.decrementAndGet() == 0) {
+                    if(!cohortIterator.hasNext()) {
                         // All cohorts completed successfully - we can move on to the preCommit phase
                         doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
+                    } else {
+                        ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+                        Futures.addCallback(canCommitFuture, this, internalFutureCallbackExecutor);
                     }
                 }
             }
@@ -116,24 +120,26 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
             }
         };
 
-        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-            ListenableFuture<Boolean> canCommitFuture = cohort.canCommit();
-            Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
-        }
+        ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+        Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
     }
 
     private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
             final DOMDataWriteTransaction transaction,
             final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
 
+        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
         // Not using Futures.allAsList here to avoid its internal overhead.
-        final AtomicInteger remaining = new AtomicInteger(cohorts.size());
         FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void notUsed) {
-                if(remaining.decrementAndGet() == 0) {
+                if(!cohortIterator.hasNext()) {
                     // All cohorts completed successfully - we can move on to the commit phase
                     doCommit(startTime, clientSubmitFuture, transaction, cohorts);
+                } else {
+                    ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+                    Futures.addCallback(preCommitFuture, this, internalFutureCallbackExecutor);
                 }
             }
 
@@ -144,26 +150,28 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
             }
         };
 
-        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-            ListenableFuture<Void> preCommitFuture = cohort.preCommit();
-            Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
-        }
+        ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+        Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
     }
 
     private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
             final DOMDataWriteTransaction transaction,
             final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
 
+        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
         // Not using Futures.allAsList here to avoid its internal overhead.
-        final AtomicInteger remaining = new AtomicInteger(cohorts.size());
         FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void notUsed) {
-                if(remaining.decrementAndGet() == 0) {
+                if(!cohortIterator.hasNext()) {
                     // All cohorts completed successfully - we're done.
                     commitStatsTracker.addDuration(System.nanoTime() - startTime);
 
                     clientSubmitFuture.set();
+                } else {
+                    ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+                    Futures.addCallback(commitFuture, this, internalFutureCallbackExecutor);
                 }
             }
 
@@ -174,10 +182,8 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
             }
         };
 
-        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-            ListenableFuture<Void> commitFuture = cohort.commit();
-            Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
-        }
+        ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+        Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
     }
 
     private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
index a30b6f7516981411e589577f3fda9a9a0f9bc887..8e00a1389ca6a057bef39292ffcc0b04958be794 100644 (file)
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -140,7 +139,6 @@ public class Shard extends RaftActor {
      * Coordinates persistence recovery on startup.
      */
     private ShardRecoveryCoordinator recoveryCoordinator;
-    private List<Object> currentLogRecoveryBatch;
 
     private final DOMTransactionFactory transactionFactory;
 
@@ -190,6 +188,8 @@ public class Shard extends RaftActor {
 
         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+
+        recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
     }
 
     private void setTransactionCommitTimeout() {
@@ -713,81 +713,27 @@ public class Shard extends RaftActor {
     @Override
     protected
     void startLogRecoveryBatch(final int maxBatchSize) {
-        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
-        }
+        recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
     }
 
     @Override
     protected void appendRecoveredLogEntry(final Payload data) {
-        if(data instanceof ModificationPayload) {
-            try {
-                currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
-            } catch (ClassNotFoundException | IOException e) {
-                LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
-            }
-        } else if (data instanceof CompositeModificationPayload) {
-            currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
-        } else if (data instanceof CompositeModificationByteStringPayload) {
-            currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
-        } else {
-            LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
-        }
+        recoveryCoordinator.appendRecoveredLogPayload(data);
     }
 
     @Override
     protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
-        if(recoveryCoordinator == null) {
-            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
-                    LOG, name.toString());
-        }
-
-        recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: submitted recovery sbapshot", persistenceId());
-        }
+        recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
     }
 
     @Override
     protected void applyCurrentLogRecoveryBatch() {
-        if(recoveryCoordinator == null) {
-            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
-                    LOG, name.toString());
-        }
-
-        recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
-                    currentLogRecoveryBatch.size());
-        }
+        recoveryCoordinator.applyCurrentLogRecoveryBatch();
     }
 
     @Override
     protected void onRecoveryComplete() {
-        if(recoveryCoordinator != null) {
-            Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
-            }
-
-            for(DOMStoreWriteTransaction tx: txList) {
-                try {
-                    syncCommitTransaction(tx);
-                    shardMBean.incrementCommittedTransactionCount();
-                } catch (InterruptedException | ExecutionException e) {
-                    shardMBean.incrementFailedTransactionsCount();
-                    LOG.error("{}: Failed to commit", persistenceId(), e);
-                }
-            }
-        }
-
         recoveryCoordinator = null;
-        currentLogRecoveryBatch = null;
 
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
index bc4c825351cc72148f5276fc28d5a94e2e64f79d..52762b4eb352ff9de295e44969b13c4410b7f2f0 100644 (file)
@@ -43,19 +43,19 @@ import java.util.concurrent.CountDownLatch;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
@@ -96,6 +96,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     // A data store could be of type config/operational
     private final String type;
 
+    private final String shardManagerIdentifierString;
+
     private final ClusterWrapper cluster;
 
     private final Configuration configuration;
@@ -122,6 +124,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.datastoreContext = datastoreContext;
         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
         this.type = datastoreContext.getDataStoreType();
+        this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
@@ -158,8 +161,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public void handleCommand(Object message) throws Exception {
-        if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) {
-            findPrimary(FindPrimary.fromSerializable(message));
+        if (message  instanceof FindPrimary) {
+            findPrimary((FindPrimary)message);
         } else if(message instanceof FindLocalShard){
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
@@ -194,6 +197,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         if(shardInformation != null) {
             shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+            if (isReadyWithLeaderId()) {
+                LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+                        persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+                waitTillReadyCountdownLatch.countDown();
+            }
+
         } else {
             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
         }
@@ -203,13 +213,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInfo = message.getShardInfo();
 
         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
-                shardInfo.getShardId());
+                shardInfo.getShardName());
 
         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
 
         if(!shardInfo.isShardInitialized()) {
-            message.getSender().tell(new ActorNotInitialized(), getSelf());
+            LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
+            message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
         } else {
+            LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
         }
     }
@@ -236,7 +248,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if(shardInformation != null) {
             shardInformation.setRole(roleChanged.getNewRole());
 
-            if (isReady()) {
+            if (isReadyWithLeaderId()) {
                 LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
                         persistenceId(), type, waitTillReadyCountdownLatch.getCount());
 
@@ -258,10 +270,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return null;
     }
 
-    private boolean isReady() {
+    private boolean isReadyWithLeaderId() {
         boolean isReady = true;
         for (ShardInformation info : localShards.values()) {
-            if(!info.isShardReady()){
+            if(!info.isShardReadyWithLeaderId()){
                 isReady = false;
                 break;
             }
@@ -297,7 +309,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void markShardAsInitialized(String shardName) {
-        LOG.debug("Initializing shard [{}]", shardName);
+        LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
 
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
@@ -367,6 +379,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 shardInformation.addOnShardInitialized(onShardInitialized);
 
+                LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
+
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
                         datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
@@ -375,8 +389,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
 
             } else if (!shardInformation.isShardInitialized()) {
-                getSender().tell(new ActorNotInitialized(), getSelf());
+                LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
+                        shardInformation.getShardName());
+                getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
             } else {
+                LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
+                        shardInformation.getShardName());
                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
             }
 
@@ -392,13 +410,26 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 "recovering and a leader is being elected. Try again later.", shardId));
     }
 
+    private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+        return new NotInitializedException(String.format(
+                "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
+    }
+
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
+        String memberName = message.member().roles().head();
+
+        LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
         memberNameToAddress.remove(message.member().roles().head());
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
         String memberName = message.member().roles().head();
 
+        LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
         memberNameToAddress.put(memberName, message.member().address());
 
         for(ShardInformation info : localShards.values()){
@@ -461,6 +492,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
+    @VisibleForTesting
+    protected ClusterWrapper getCluster() {
+        return cluster;
+    }
+
     @VisibleForTesting
     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
         return getContext().actorOf(Shard.props(info.getShardId(),
@@ -469,6 +505,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void findPrimary(FindPrimary message) {
+        LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
+
         final String shardName = message.getShardName();
 
         // First see if the there is a local replica for the shard
@@ -477,10 +515,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
-                    Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable();
+                    Object found = new PrimaryFound(info.getSerializedLeaderActor());
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("{}: Found primary for {}: {}", shardName, found);
+                        LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
                     }
 
                     return found;
@@ -490,38 +528,35 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        List<String> members = configuration.getMembersFromShardName(shardName);
+        for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
+            if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
+                String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
 
-        if(cluster.getCurrentMemberName() != null) {
-            members.remove(cluster.getCurrentMemberName());
-        }
+                LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
+                        shardName, path);
 
-        /**
-         * FIXME: Instead of sending remote shard actor path back to sender,
-         * forward FindPrimary message to remote shard manager
-         */
-        // There is no way for us to figure out the primary (for now) so assume
-        // that one of the remote nodes is a primary
-        for(String memberName : members) {
-            Address address = memberNameToAddress.get(memberName);
-            if(address != null){
-                String path =
-                    getShardActorPath(shardName, memberName);
-                getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+                getContext().actorSelection(path).forward(message, getContext());
                 return;
             }
         }
-        getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+
+        LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
+
+        getSender().tell(new PrimaryNotFoundException(
+                String.format("No primary shard found for %s.", shardName)), getSelf());
+    }
+
+    private StringBuilder getShardManagerActorPathBuilder(Address address) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
+        return builder;
     }
 
     private String getShardActorPath(String shardName, String memberName) {
         Address address = memberNameToAddress.get(memberName);
         if(address != null) {
-            StringBuilder builder = new StringBuilder();
-            builder.append(address.toString())
-                .append("/user/")
-                .append(ShardManagerIdentifier.builder().type(type).build().toString())
-                .append("/")
+            StringBuilder builder = getShardManagerActorPathBuilder(address);
+            builder.append("/")
                 .append(getShardIdentifier(memberName, shardName));
             return builder.toString();
         }
index 50528575e77123ce433bf0b4e6ab73f0077c39c5..7e547d7257dc495dc34a796d1c7b7a0944cb0e76 100644 (file)
@@ -8,19 +8,19 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.Collection;
-import java.util.Collections;
+import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 
 /**
@@ -34,115 +34,86 @@ import org.slf4j.Logger;
  */
 class ShardRecoveryCoordinator {
 
-    private static final int TIME_OUT = 10;
-
-    private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
-    private final SchemaContext schemaContext;
+    private final InMemoryDOMDataStore store;
+    private List<ModificationPayload> currentLogRecoveryBatch;
     private final String shardName;
-    private final ExecutorService executor;
     private final Logger log;
-    private final String name;
 
-    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
-            String name) {
-        this.schemaContext = schemaContext;
+    ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
+        this.store = store;
         this.shardName = shardName;
         this.log = log;
-        this.name = name;
-
-        executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
-                new ThreadFactoryBuilder().setDaemon(true)
-                        .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
     }
 
-    /**
-     * Submits a batch of journal log entries.
-     *
-     * @param logEntries the serialized journal log entries
-     * @param resultingTx the write Tx to which to apply the entries
-     */
-    void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
-        LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
-        resultingTxList.add(resultingTx);
-        executor.execute(task);
-    }
+    void startLogRecoveryBatch(int maxBatchSize) {
+        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
 
-    /**
-     * Submits a snapshot.
-     *
-     * @param snapshotBytes the serialized snapshot
-     * @param resultingTx the write Tx to which to apply the entries
-     */
-    void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
-        SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx);
-        resultingTxList.add(resultingTx);
-        executor.execute(task);
+        log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
     }
 
-    Collection<DOMStoreWriteTransaction> getTransactions() {
-        // Shutdown the executor and wait for task completion.
-        executor.shutdown();
-
+    void appendRecoveredLogPayload(Payload payload) {
         try {
-            if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES))  {
-                return resultingTxList;
+            if(payload instanceof ModificationPayload) {
+                currentLogRecoveryBatch.add((ModificationPayload) payload);
+            } else if (payload instanceof CompositeModificationPayload) {
+                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
+                        ((CompositeModificationPayload) payload).getModification())));
+            } else if (payload instanceof CompositeModificationByteStringPayload) {
+                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
+                        ((CompositeModificationByteStringPayload) payload).getModification())));
             } else {
-                log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
+                log.error("{}: Unknown payload {} received during recovery", shardName, payload);
             }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
+        } catch (IOException e) {
+            log.error("{}: Error extracting ModificationPayload", shardName, e);
         }
 
-        return Collections.emptyList();
     }
 
-    private static abstract class ShardRecoveryTask implements Runnable {
-
-        final DOMStoreWriteTransaction resultingTx;
-
-        ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
-            this.resultingTx = resultingTx;
+    private void commitTransaction(DOMStoreWriteTransaction transaction) {
+        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        try {
+            commitCohort.preCommit().get();
+            commitCohort.commit().get();
+        } catch (Exception e) {
+            log.error("{}: Failed to commit Tx on recovery", shardName, e);
         }
     }
 
-    private class LogRecoveryTask extends ShardRecoveryTask {
-
-        private final List<Object> logEntries;
-
-        LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
-            super(resultingTx);
-            this.logEntries = logEntries;
-        }
-
-        @Override
-        public void run() {
-            for(int i = 0; i < logEntries.size(); i++) {
-                MutableCompositeModification.fromSerializable(
-                        logEntries.get(i)).apply(resultingTx);
-                // Null out to GC quicker.
-                logEntries.set(i, null);
+    /**
+     * Applies the current batched log entries to the data store.
+     */
+    void applyCurrentLogRecoveryBatch() {
+        log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
+
+        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
+        for(ModificationPayload payload: currentLogRecoveryBatch) {
+            try {
+                MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
+            } catch (Exception e) {
+                log.error("{}: Error extracting ModificationPayload", shardName, e);
             }
         }
-    }
 
-    private class SnapshotRecoveryTask extends ShardRecoveryTask {
+        commitTransaction(writeTx);
+
+        currentLogRecoveryBatch = null;
+    }
 
-        private final byte[] snapshotBytes;
+    /**
+     * Applies a recovered snapshot to the data store.
+     *
+     * @param snapshotBytes the serialized snapshot
+     */
+    void applyRecoveredSnapshot(final byte[] snapshotBytes) {
+        log.debug("{}: Applyng recovered sbapshot", shardName);
 
-        SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
-            super(resultingTx);
-            this.snapshotBytes = snapshotBytes;
-        }
+        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
 
-        @Override
-        public void run() {
-            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+        NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
 
-            // delete everything first
-            resultingTx.delete(YangInstanceIdentifier.builder().build());
+        writeTx.write(YangInstanceIdentifier.builder().build(), node);
 
-            // Add everything from the remote node back
-            resultingTx.write(YangInstanceIdentifier.builder().build(), node);
-        }
+        commitTransaction(writeTx);
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java
deleted file mode 100644 (file)
index 576010f..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import java.io.Serializable;
-
-public class ActorNotInitialized implements Serializable {
-    private static final long serialVersionUID = 1L;
-}
index d51d6800a23b44f2c14ff932a1be5c21421d5c5d..2c18eaa86fff9c3d6449068b723906defc930427 100644 (file)
@@ -9,13 +9,14 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.common.base.Preconditions;
+import java.io.Serializable;
 
 /**
  * The FindPrimary message is used to locate the primary of any given shard
  *
  */
-public class FindPrimary implements SerializableMessage{
-    public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
+public class FindPrimary implements Serializable {
+    private static final long serialVersionUID = 1L;
 
     private final String shardName;
     private final boolean waitUntilReady;
@@ -36,15 +37,6 @@ public class FindPrimary implements SerializableMessage{
         return waitUntilReady;
     }
 
-    @Override
-    public Object toSerializable() {
-        return this;
-    }
-
-    public static FindPrimary fromSerializable(Object message){
-        return (FindPrimary) message;
-    }
-
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
index a5565020edc171e43d550f7ace0a8dc6a80739e3..4c154d43ae007268b153f2503ad949250031cb21 100644 (file)
@@ -8,56 +8,48 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import java.io.Serializable;
 
-public class PrimaryFound implements SerializableMessage {
-  public static final Class<PrimaryFound> SERIALIZABLE_CLASS = PrimaryFound.class;
-  private final String primaryPath;
+public class PrimaryFound implements Serializable {
+    private static final long serialVersionUID = 1L;
 
-  public PrimaryFound(final String primaryPath) {
-    this.primaryPath = primaryPath;
-  }
+    private final String primaryPath;
 
-  public String getPrimaryPath() {
-    return primaryPath;
-  }
-
-  @Override
-  public boolean equals(final Object o) {
-    if (this == o) {
-        return true;
+    public PrimaryFound(final String primaryPath) {
+        this.primaryPath = primaryPath;
     }
-    if (o == null || getClass() != o.getClass()) {
-        return false;
-    }
-
-    PrimaryFound that = (PrimaryFound) o;
 
-    if (!primaryPath.equals(that.primaryPath)) {
-        return false;
+    public String getPrimaryPath() {
+        return primaryPath;
     }
 
-    return true;
-  }
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
 
-  @Override
-  public int hashCode() {
-    return primaryPath.hashCode();
-  }
+        PrimaryFound that = (PrimaryFound) o;
 
-  @Override
-  public String toString() {
-    return "PrimaryFound{" +
-            "primaryPath='" + primaryPath + '\'' +
-            '}';
-  }
+        if (!primaryPath.equals(that.primaryPath)) {
+            return false;
+        }
 
+        return true;
+    }
 
-  @Override
-  public Object toSerializable() {
-    return  this;
-  }
+    @Override
+    public int hashCode() {
+        return primaryPath.hashCode();
+    }
 
-  public static PrimaryFound fromSerializable(final Object message){
-    return (PrimaryFound) message;
-  }
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]");
+        return builder.toString();
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java
deleted file mode 100644 (file)
index b47c91b..0000000
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import com.google.common.base.Preconditions;
-
-public class PrimaryNotFound implements SerializableMessage {
-  public static final Class<PrimaryNotFound> SERIALIZABLE_CLASS = PrimaryNotFound.class;
-
-    private final String shardName;
-
-    public PrimaryNotFound(final String shardName){
-
-        Preconditions.checkNotNull(shardName, "shardName should not be null");
-
-        this.shardName = shardName;
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        PrimaryNotFound that = (PrimaryNotFound) o;
-
-        if (shardName != null ? !shardName.equals(that.shardName) : that.shardName != null) {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return shardName != null ? shardName.hashCode() : 0;
-    }
-
-  @Override
-  public Object toSerializable() {
-    return this;
-  }
-
-  public static PrimaryNotFound fromSerializable(final Object message){
-    return (PrimaryNotFound) message;
-  }
-}
index 6f9bb7fc9feb4ede3a00d06b07d689d615f51458..b6250fc1cccbfbde152a8af7e1e2f4b9b3c6f7cd 100644 (file)
@@ -41,13 +41,11 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -209,25 +207,22 @@ public class ActorContext {
             return ret;
         }
         Future<Object> future = executeOperationAsync(shardManager,
-                new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout);
+                new FindPrimary(shardName, true), shardInitializationTimeout);
 
         return future.transform(new Mapper<Object, ActorSelection>() {
             @Override
             public ActorSelection checkedApply(Object response) throws Exception {
-                if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
-                    PrimaryFound found = PrimaryFound.fromSerializable(response);
+                if(response instanceof PrimaryFound) {
+                    PrimaryFound found = (PrimaryFound)response;
 
                     LOG.debug("Primary found {}", found.getPrimaryPath());
                     ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
                     primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
                     return actorSelection;
-                } else if(response instanceof ActorNotInitialized) {
-                    throw new NotInitializedException(
-                            String.format("Found primary shard %s but it's not initialized yet. " +
-                                          "Please try again later", shardName));
-                } else if(response instanceof PrimaryNotFound) {
-                    throw new PrimaryNotFoundException(
-                            String.format("No primary shard found for %S.", shardName));
+                } else if(response instanceof NotInitializedException) {
+                    throw (NotInitializedException)response;
+                } else if(response instanceof PrimaryNotFoundException) {
+                    throw (PrimaryNotFoundException)response;
                 } else if(response instanceof NoShardLeaderException) {
                     throw (NoShardLeaderException)response;
                 }
@@ -274,10 +269,8 @@ public class ActorContext {
                     LocalShardFound found = (LocalShardFound)response;
                     LOG.debug("Local shard found {}", found.getPath());
                     return found.getPath();
-                } else if(response instanceof ActorNotInitialized) {
-                    throw new NotInitializedException(
-                            String.format("Found local shard for %s but it's not initialized yet.",
-                                    shardName));
+                } else if(response instanceof NotInitializedException) {
+                    throw (NotInitializedException)response;
                 } else if(response instanceof LocalShardNotFound) {
                     throw new LocalShardNotFoundException(
                             String.format("Local shard for %s does not exist.", shardName));
index b775cf0a9914be9ed37f8595bbafeb662e57321e..dc83af9a756374694e6203c61eebc49d72e214fc 100644 (file)
@@ -124,7 +124,7 @@ module distributed-datastore-provider {
          }
 
          leaf shard-journal-recovery-log-batch-size {
-            default 5000;
+            default 1000;
             type non-zero-uint32-type;
             description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.";
          }
@@ -47,7 +47,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
  *
  * @author Thomas Pantelis
  */
-public class DOMConcurrentDataCommitCoordinatorTest {
+public class ConcurrentDOMDataBrokerTest {
 
     private final DOMDataWriteTransaction transaction = mock(DOMDataWriteTransaction.class);
     private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class);
index 8ffb705df21af1db39b6ac26e15a7a5be75391c4..57e0e26c116c036f218b3779d2120a7bf1a21f81 100644 (file)
@@ -7,10 +7,10 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import static org.mockito.Mockito.any;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -28,7 +28,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
@@ -173,7 +173,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
             FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
             Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
 
-            reply(new ActorNotInitialized());
+            reply(new NotInitializedException("not initialized"));
 
             new Within(duration("1 seconds")) {
                 @Override
index ae7a4f96c53fec04dbadbb612c9bc0369952f654..b676cf225c801039d1e679298e6d7cb23d2808ac 100644 (file)
@@ -9,16 +9,23 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.AddressFromURIString;
 import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.dispatch.Dispatchers;
 import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.ConfigFactory;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
@@ -35,15 +42,16 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
@@ -75,6 +83,11 @@ public class ShardManagerTest extends AbstractActorTest {
     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
 
+    private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
+        String name = new ShardIdentifier(shardName, memberName,"config").toString();
+        return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
+    }
+
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
@@ -100,21 +113,22 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     private Props newPropsShardMgrWithMockShardActor() {
+        return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
+                new MockConfiguration());
+    }
+
+    private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
+            final ClusterWrapper clusterWrapper, final Configuration config) {
         Creator<ShardManager> creator = new Creator<ShardManager>() {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
-                        datastoreContextBuilder.build(), ready) {
-                    @Override
-                    protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
-                        return mockShardActor;
-                    }
-                };
+                return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
+                        ready, name, shardActor);
             }
         };
 
-        return Props.create(new DelegatingShardManagerCreator(creator));
+        return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
     }
 
     @Test
@@ -124,9 +138,9 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("non-existent", false), getRef());
 
-            expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable());
+            expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
         }};
     }
 
@@ -146,9 +160,9 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
                     RaftState.Leader.name())), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
         }};
@@ -170,9 +184,9 @@ public class ShardManagerTest extends AbstractActorTest {
                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
             shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
         }};
@@ -183,9 +197,9 @@ public class ShardManagerTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
         }};
     }
 
@@ -197,7 +211,7 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
         }};
@@ -215,15 +229,15 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new RoleChangeNotification(memberId,
                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
 
             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
         }};
@@ -238,7 +252,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
             // delayed until we send ActorInitialized and RoleChangeNotification.
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
 
             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
 
@@ -254,7 +268,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
 
@@ -269,9 +283,9 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
 
-            expectMsgClass(duration("2 seconds"), ActorNotInitialized.class);
+            expectMsgClass(duration("2 seconds"), NotInitializedException.class);
 
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
@@ -289,7 +303,7 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
                     null, RaftState.Candidate.name()), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
 
             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
         }};
@@ -303,12 +317,78 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
 
             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
+        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create an ActorSystem ShardManager actor for member-1.
+
+        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+        final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+                newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+                        new MockConfiguration()), shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+
+        final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
+
+        MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                put("default", Arrays.asList("member-1", "member-2")).
+                put("astronauts", Arrays.asList("member-2")).build());
+
+        final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+                newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+                        mockConfig2), shardManagerID);
+
+        new JavaTestKit(system1) {{
+
+            shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+            String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+            shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
+            shardManager2.tell(new RoleChangeNotification(memberId2,
+                    RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+
+            shardManager1.underlyingActor().waitForMemberUp();
+
+            shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+            PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+            String path = found.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
+
+            shardManager2.underlyingActor().verifyFindPrimary();
+
+            Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForMemberRemoved();
+
+            shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+            expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
+        }};
+
+        JavaTestKit.shutdownActorSystem(system1);
+        JavaTestKit.shutdownActorSystem(system2);
+    }
+
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -348,7 +428,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
 
-            expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
         }};
     }
 
@@ -371,42 +451,6 @@ public class ShardManagerTest extends AbstractActorTest {
         }};
     }
 
-    @Test
-    public void testOnReceiveMemberUp() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
-
-            MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
-
-            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
-            PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
-                    PrimaryFound.SERIALIZABLE_CLASS));
-            String path = found.getPrimaryPath();
-            assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
-        }};
-    }
-
-    @Test
-    public void testOnReceiveMemberDown() throws Exception {
-
-        new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
-
-            MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
-
-            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
-            expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
-
-            MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
-
-            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
-            expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
-        }};
-    }
-
     @Test
     public void testOnRecoveryJournalIsCleaned() {
         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
@@ -595,7 +639,7 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRoleChangeNotificationReleaseReady() throws Exception {
+    public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
@@ -604,11 +648,35 @@ public class ShardManagerTest extends AbstractActorTest {
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
 
+                verify(ready, never()).countDown();
+
+                shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId));
+
                 verify(ready, times(1)).countDown();
 
             }};
     }
 
+    @Test
+    public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+
+                String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+                        memberId, null, RaftState.Follower.name()));
+
+                verify(ready, never()).countDown();
+
+                shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix));
+
+                verify(ready, times(1)).countDown();
+
+            }};
+    }
+
+
     @Test
     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
         new JavaTestKit(getSystem()) {
@@ -804,4 +872,69 @@ public class ShardManagerTest extends AbstractActorTest {
             return delegate.create();
         }
     }
+
+    private static class ForwardingShardManager extends ShardManager {
+        private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
+        private CountDownLatch memberUpReceived = new CountDownLatch(1);
+        private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
+        private final ActorRef shardActor;
+        private final String name;
+
+        protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
+                DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
+                ActorRef shardActor) {
+            super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+            this.shardActor = shardActor;
+            this.name = name;
+        }
+
+        @Override
+        public void handleCommand(Object message) throws Exception {
+            try{
+                super.handleCommand(message);
+            } finally {
+                if(message instanceof FindPrimary) {
+                    findPrimaryMessageReceived.countDown();
+                } else if(message instanceof ClusterEvent.MemberUp) {
+                    String role = ((ClusterEvent.MemberUp)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberUpReceived.countDown();
+                    }
+                } else if(message instanceof ClusterEvent.MemberRemoved) {
+                    String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberRemovedReceived.countDown();
+                    }
+                }
+            }
+        }
+
+        @Override
+        public String persistenceId() {
+            return name;
+        }
+
+        @Override
+        protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+            return shardActor;
+        }
+
+        void waitForMemberUp() {
+            assertEquals("MemberUp received", true,
+                    Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
+            memberUpReceived = new CountDownLatch(1);
+        }
+
+        void waitForMemberRemoved() {
+            assertEquals("MemberRemoved received", true,
+                    Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
+            memberRemovedReceived = new CountDownLatch(1);
+        }
+
+        void verifyFindPrimary() {
+            assertEquals("FindPrimary received", true,
+                    Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
+            findPrimaryMessageReceived = new CountDownLatch(1);
+        }
+    }
 }
index 78a451953bc955a73b0853bb2ec27faec8440ea3..cc96d0d3b0d070623c737dc8f78340c45a20539f 100644 (file)
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -69,13 +70,13 @@ import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListene
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -99,6 +100,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -1425,31 +1427,44 @@ public class ShardTest extends AbstractShardTest {
 
         dataStoreContextBuilder.persistent(persistent);
 
+
+
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
-            Creator<Shard> creator = new Creator<Shard>() {
-                @Override
-                public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            newDatastoreContext(), SCHEMA_CONTEXT) {
 
-                        DelegatingPersistentDataProvider delegating;
+            class TestShard extends Shard {
 
-                        @Override
-                        protected DataPersistenceProvider persistence() {
-                            if(delegating == null) {
-                                delegating = new DelegatingPersistentDataProvider(super.persistence());
-                            }
+                protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
+                                    DatastoreContext datastoreContext, SchemaContext schemaContext) {
+                    super(name, peerAddresses, datastoreContext, schemaContext);
+                }
 
-                            return delegating;
-                        }
+                DelegatingPersistentDataProvider delegating;
 
-                        @Override
-                        protected void commitSnapshot(final long sequenceNumber) {
-                            super.commitSnapshot(sequenceNumber);
-                            latch.get().countDown();
-                        }
-                    };
+                protected DataPersistenceProvider persistence() {
+                    if(delegating == null) {
+                        delegating = new DelegatingPersistentDataProvider(super.persistence());
+                    }
+                    return delegating;
+                }
+
+                @Override
+                protected void commitSnapshot(final long sequenceNumber) {
+                    super.commitSnapshot(sequenceNumber);
+                    latch.get().countDown();
+                }
+
+                @Override
+                public RaftActorContext getRaftActorContext() {
+                    return super.getRaftActorContext();
+                }
+            }
+
+            Creator<Shard> creator = new Creator<Shard>() {
+                @Override
+                public Shard create() throws Exception {
+                    return new TestShard(shardID, Collections.<String,String>emptyMap(),
+                            newDatastoreContext(), SCHEMA_CONTEXT);
                 }
             };
 
@@ -1462,8 +1477,9 @@ public class ShardTest extends AbstractShardTest {
 
             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
 
-            CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
-            shard.tell(capture, getRef());
+            // Trigger creation of a snapshot by ensuring
+            RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
+            raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
@@ -1475,7 +1491,7 @@ public class ShardTest extends AbstractShardTest {
             latch.set(new CountDownLatch(1));
             savedSnapshot.set(null);
 
-            shard.tell(capture, getRef());
+            raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
index 2746bcf982906af7046c8c8e3cb930711929df43..6b4f6337785a753e68e1330f8296927aeb70b005 100644 (file)
@@ -37,13 +37,11 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -458,7 +456,7 @@ public class ActorContextTest extends AbstractActorTest{
                             mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new PrimaryNotFound("foobar"));
+                            return Futures.successful((Object) new PrimaryNotFoundException("not found"));
                         }
                     };
 
@@ -491,7 +489,7 @@ public class ActorContextTest extends AbstractActorTest{
                             mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new ActorNotInitialized());
+                            return Futures.successful((Object) new NotInitializedException("not iniislized"));
                         }
                     };
 
@@ -518,8 +516,8 @@ public class ActorContextTest extends AbstractActorTest{
 
             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
-            shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable());
-            shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable());
+            shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
+            shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
 
             Configuration mockConfig = mock(Configuration.class);
index fe40aa0fd4571c65f431124d3b58704b9b62b9c6..810b270cfcee82aab53ca55f96a777e87bdc3141 100644 (file)
@@ -14,14 +14,22 @@ import akka.actor.AddressFromURIString;
 import akka.cluster.ClusterEvent;
 import akka.cluster.MemberStatus;
 import akka.cluster.UniqueAddress;
-import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
-import scala.collection.JavaConversions;
 import java.util.HashSet;
 import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import scala.collection.JavaConversions;
 
 public class MockClusterWrapper implements ClusterWrapper{
 
     private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550);
+    private String currentMemberName = "member-1";
+
+    public MockClusterWrapper() {
+    }
+
+    public MockClusterWrapper(String currentMemberName) {
+        this.currentMemberName = currentMemberName;
+    }
 
     @Override
     public void subscribeToMemberEvents(ActorRef actorRef) {
@@ -29,7 +37,7 @@ public class MockClusterWrapper implements ClusterWrapper{
 
     @Override
     public String getCurrentMemberName() {
-        return "member-1";
+        return currentMemberName;
     }
 
     @Override
index 4ef7d65857b3c86a76d405eeca52c888f6f9fc55..0bc561f1bd053f49674f34ac396afd50e6ba70ae 100644 (file)
@@ -9,6 +9,8 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -18,11 +20,23 @@ import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 
 public class MockConfiguration implements Configuration{
-    @Override public List<String> getMemberShardNames(final String memberName) {
-        return Arrays.asList("default");
+    private Map<String, List<String>> shardMembers = ImmutableMap.<String, List<String>>builder().
+            put("default", Arrays.asList("member-1", "member-2")).
+            /*put("astronauts", Arrays.asList("member-2", "member-3")).*/build();
+
+    public MockConfiguration() {
+    }
+
+    public MockConfiguration(Map<String, List<String>> shardMembers) {
+        this.shardMembers = shardMembers;
     }
 
-    @Override public Optional<String> getModuleNameFromNameSpace(
+    @Override
+    public List<String> getMemberShardNames(final String memberName) {
+        return new ArrayList<>(shardMembers.keySet());
+    }
+    @Override
+    public Optional<String> getModuleNameFromNameSpace(
         final String nameSpace) {
         return Optional.absent();
     }
@@ -44,7 +58,8 @@ public class MockConfiguration implements Configuration{
             return Arrays.asList("member-2", "member-3");
         }
 
-        return Collections.emptyList();
+        List<String> members = shardMembers.get(shardName);
+        return members != null ? members : Collections.<String>emptyList();
     }
 
     @Override public Set<String> getAllShardNames() {
index badec6f8313f8c8bbd7b7dfa3e859e1e49034277..03634627d643ab1042f00c31fa3bb2054b0a31a2 100644 (file)
@@ -34,3 +34,105 @@ bounded-mailbox {
   mailbox-capacity = 1000
   mailbox-push-timeout-time = 100ms
 }
+
+Member1 {
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 100ms
+  }
+
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
+
+  akka {
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+    
+    loglevel = "DEBUG"
+    
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+      
+      serializers {
+          java = "akka.serialization.JavaSerializer"
+          proto = "akka.remote.serialization.ProtobufSerializer"
+      }
+
+      serialization-bindings {
+          "com.google.protobuf.Message" = proto
+      }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2558
+      }
+    }
+
+    cluster {
+      auto-down-unreachable-after = 100s
+      
+      roles = [
+        "member-1"
+      ]
+    }
+  }
+}
+
+Member2 {
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 100ms
+  }
+  
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
+  
+  akka {
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+    
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+      
+      serializers {
+          java = "akka.serialization.JavaSerializer"
+          proto = "akka.remote.serialization.ProtobufSerializer"
+      }
+
+      serialization-bindings {
+          "com.google.protobuf.Message" = proto
+      }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2559
+      }
+    }
+
+    cluster {
+      auto-down-unreachable-after = 100s
+      
+      roles = [
+        "member-2"
+      ]
+    }
+  }
+}
index ab731260216f4452f6ba3d5e8c6b64c8179bdad2..d7d8660ae49149923cf60ada5b236dff745f9aed 100644 (file)
@@ -61,7 +61,7 @@ public class SSHTest {
     @AfterClass
     public static void tearDown() throws Exception {
         hashedWheelTimer.stop();
-        nettyGroup.shutdownGracefully().await();
+        nettyGroup.shutdownGracefully().await(5, TimeUnit.SECONDS);
         minaTimerEx.shutdownNow();
         nioExec.shutdownNow();
     }