Take snapshot after recovery on migrated messages 47/43747/8
authorTom Pantelis <tpanteli@brocade.com>
Thu, 11 Aug 2016 01:19:59 +0000 (21:19 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 22 Aug 2016 12:51:58 +0000 (12:51 +0000)
Modified RaftActorRecoverySupport to capture and persist a snapshot
after recovery when there are migrated messages recovered. It utilizes
the new MigratedSerializable interface.

I also created equivalent classes in the persisted packages for
UpdateElectionTerm, DeleteEntries and ApplyJournalEntries that implement
MigratedSerializable and use the Externalizable proxy pattern. The
existing classes were deprecated and readResolve to the new classes.

Change-Id: Ia2e664de9ffd59991c49160424b13bc8ca0bfcbf
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
35 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyJournalEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/UpdateElectionTerm.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ApplyJournalEntries.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/DeleteEntries.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/UpdateElectionTerm.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ElectionTermImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MigratedMessagesTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/NonVotingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayloadTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntriesTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/base/messages/UpdateElectionTermTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ApplyJournalEntriesTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/DeleteEntriesTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayloadTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/UpdateElectionTermTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 8e2e3d5..e44247d 100644 (file)
@@ -8,7 +8,7 @@
 package org.opendaylight.controller.cluster.raft;
 
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.slf4j.Logger;
 
 /**
index 90f23dd..93336c8 100644 (file)
@@ -34,7 +34,6 @@ import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
@@ -50,6 +49,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -839,7 +839,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     /**
-     * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries}
+     * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.persisted.DeleteEntries}
      *             whose type for fromIndex is long instead of int. This class was kept for backwards
      *             compatibility with Helium.
      */
@@ -856,10 +856,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         public int getFromIndex() {
             return fromIndex;
         }
+
+        private Object readResolve() {
+            return org.opendaylight.controller.cluster.raft.persisted.DeleteEntries.createMigrated(fromIndex);
+        }
     }
 
     /**
-     * @deprecated Deprecated in favor of non-inner class {@link org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm}
+     * @deprecated Deprecated in favor of non-inner class {@link org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm}
      *             which has serialVersionUID set. This class was kept for backwards compatibility with Helium.
      */
     // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
@@ -881,6 +885,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         public String getVotedFor() {
             return votedFor;
         }
+
+        private Object readResolve() {
+            return org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm.createMigrated(
+                    currentTerm, votedFor);
+        }
     }
 
     /**
index 9e44908..adcd668 100644 (file)
@@ -9,19 +9,19 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
-import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.base.Stopwatch;
 import java.io.ByteArrayInputStream;
 import java.io.ObjectInputStream;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import java.util.Collections;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 import org.slf4j.Logger;
-
 /**
  * Support class that handles persistence recovery for a RaftActor.
  *
@@ -34,6 +34,7 @@ class RaftActorRecoverySupport {
     private int currentRecoveryBatchCount;
     private boolean dataRecoveredWithPersistenceDisabled;
     private boolean anyDataRecovered;
+    private boolean hasMigratedDataRecovered;
 
     private Stopwatch recoveryTimer;
     private final Logger log;
@@ -49,67 +50,25 @@ class RaftActorRecoverySupport {
 
         anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
 
+        if(isMigratedSerializable(message)) {
+            hasMigratedDataRecovered = true;
+        }
+
         boolean recoveryComplete = false;
-        DataPersistenceProvider persistence = context.getPersistenceProvider();
-        if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm) {
-            // Handle this message for backwards compatibility with pre-Lithium versions.
-            org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm update =
-                    (org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm)message;
-            context.getTermInformation().update(update.getCurrentTerm(), update.getVotedFor());
-        } else if (message instanceof UpdateElectionTerm) {
+        if (message instanceof UpdateElectionTerm) {
             context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
                     ((UpdateElectionTerm) message).getVotedFor());
-        } else if(persistence.isRecoveryApplicable()) {
-            if (message instanceof SnapshotOffer) {
-                onRecoveredSnapshot((SnapshotOffer) message);
-            } else if (message instanceof ReplicatedLogEntry) {
-                onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
-            } else if (message instanceof ApplyJournalEntries) {
-                onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
-            } else if (message instanceof DeleteEntries) {
-                replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
-            } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) {
-                // Handle this message for backwards compatibility with pre-Lithium versions.
-                replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex());
-            } else if (message instanceof RecoveryCompleted) {
-                onRecoveryCompletedMessage();
-                possiblyRestoreFromSnapshot();
-                recoveryComplete = true;
-            }
+        } else if (message instanceof SnapshotOffer) {
+            onRecoveredSnapshot((SnapshotOffer) message);
+        } else if (message instanceof ReplicatedLogEntry) {
+            onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
+        } else if (message instanceof ApplyJournalEntries) {
+            onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
+        } else if (message instanceof DeleteEntries) {
+            onDeleteEntries((DeleteEntries) message);
         } else if (message instanceof RecoveryCompleted) {
             recoveryComplete = true;
-
-            if(dataRecoveredWithPersistenceDisabled) {
-                // Data persistence is disabled but we recovered some data entries so we must have just
-                // transitioned to disabled or a persistence backup was restored. Either way, delete all the
-                // messages from the akka journal for efficiency and so that we do not end up with consistency
-                // issues in case persistence is -re-enabled.
-                persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
-
-                // Delete all the akka snapshots as they will not be needed
-                persistentProvider.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
-                        scala.Long.MaxValue(), 0L, 0L));
-
-                // Since we cleaned out the journal, we need to re-write the current election info.
-                context.getTermInformation().updateAndPersist(context.getTermInformation().getCurrentTerm(),
-                        context.getTermInformation().getVotedFor());
-            }
-
-            onRecoveryCompletedMessage();
-            possiblyRestoreFromSnapshot();
-        } else {
-            boolean isServerConfigPayload = false;
-            if(message instanceof ReplicatedLogEntry){
-                ReplicatedLogEntry repLogEntry = (ReplicatedLogEntry)message;
-                if(isServerConfigurationPayload(repLogEntry)){
-                    isServerConfigPayload = true;
-                    context.updatePeerIds((ServerConfigurationPayload)repLogEntry.getData());
-                }
-            }
-
-            if(!isServerConfigPayload){
-                dataRecoveredWithPersistenceDisabled = true;
-            }
+            onRecoveryCompletedMessage(persistentProvider);
         }
 
         return recoveryComplete;
@@ -157,6 +116,20 @@ class RaftActorRecoverySupport {
 
         Snapshot snapshot = (Snapshot) offer.snapshot();
 
+        for(ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
+            if(isMigratedPayload(entry)) {
+                hasMigratedDataRecovered = true;
+            }
+        }
+
+        if(!context.getPersistenceProvider().isRecoveryApplicable()) {
+            // We may have just transitioned to disabled and have a snapshot containing state data and/or log
+            // entries - we don't want to preserve these, only the server config and election term info.
+
+            snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1,
+                    snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
+        }
+
         // Create a replicated log with the snapshot information
         // The replicated log can be used later on to retrieve this snapshot
         // when we need to install it on a peer
@@ -173,6 +146,10 @@ class RaftActorRecoverySupport {
 
         if (snapshot.getServerConfiguration() != null) {
             context.updatePeerIds(snapshot.getServerConfiguration());
+
+            if(isMigratedSerializable(snapshot.getServerConfiguration())) {
+                hasMigratedDataRecovered = true;
+            }
         }
 
         timer.stop();
@@ -190,10 +167,24 @@ class RaftActorRecoverySupport {
         if(isServerConfigurationPayload(logEntry)){
             context.updatePeerIds((ServerConfigurationPayload)logEntry.getData());
         }
-        replicatedLog().append(logEntry);
+
+        if(isMigratedPayload(logEntry)) {
+            hasMigratedDataRecovered = true;
+        }
+
+        if(context.getPersistenceProvider().isRecoveryApplicable()) {
+            replicatedLog().append(logEntry);
+        } else if(!isPersistentPayload(logEntry)) {
+            dataRecoveredWithPersistenceDisabled = true;
+        }
     }
 
     private void onRecoveredApplyLogEntries(long toIndex) {
+        if(!context.getPersistenceProvider().isRecoveryApplicable()) {
+            dataRecoveredWithPersistenceDisabled = true;
+            return;
+        }
+
         long lastUnappliedIndex = context.getLastApplied() + 1;
 
         if(log.isDebugEnabled()) {
@@ -220,6 +211,14 @@ class RaftActorRecoverySupport {
         context.setCommitIndex(lastApplied);
     }
 
+    private void onDeleteEntries(DeleteEntries deleteEntries) {
+        if(context.getPersistenceProvider().isRecoveryApplicable()) {
+            replicatedLog().removeFrom(deleteEntries.getFromIndex());
+        } else {
+            dataRecoveredWithPersistenceDisabled = true;
+        }
+    }
+
     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
         initRecoveryTimer();
 
@@ -242,7 +241,7 @@ class RaftActorRecoverySupport {
         currentRecoveryBatchCount = 0;
     }
 
-    private void onRecoveryCompletedMessage() {
+    private void onRecoveryCompletedMessage(PersistentDataProvider persistentProvider) {
         if(currentRecoveryBatchCount > 0) {
             endCurrentLogRecoveryBatch();
         }
@@ -259,9 +258,49 @@ class RaftActorRecoverySupport {
                  " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " +
                  "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
                  replicatedLog().getSnapshotTerm(), replicatedLog().size());
+
+        if(dataRecoveredWithPersistenceDisabled ||
+                (hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable())) {
+            if(hasMigratedDataRecovered) {
+                log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
+            } else {
+                log.info("{}: Saving snapshot after recovery due to data persistence disabled", context.getId());
+            }
+
+            // Either data persistence is disabled and we recovered some data entries (ie we must have just
+            // transitioned to disabled or a persistence backup was restored) or we recovered migrated
+            // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
+            // to clean out unwanted messages.
+
+            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(), -1, -1, -1, -1,
+                    context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
+                    context.getPeerServerInfo(true));
+
+            persistentProvider.saveSnapshot(snapshot);
+
+            persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
+        } else if(hasMigratedDataRecovered) {
+            log.info("{}: Snapshot capture initiated after recovery due to migrated messages", context.getId());
+
+            context.getSnapshotManager().capture(replicatedLog().last(), -1);
+        } else {
+            possiblyRestoreFromSnapshot();
+        }
     }
 
     private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry){
-        return (repLogEntry.getData() instanceof ServerConfigurationPayload);
+        return repLogEntry.getData() instanceof ServerConfigurationPayload;
+    }
+
+    private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry){
+        return repLogEntry.getData() instanceof PersistentPayload;
+    }
+
+    private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry){
+        return isMigratedSerializable(repLogEntry.getData());
+    }
+
+    private static boolean isMigratedSerializable(Object message){
+        return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
     }
 }
index dfaed9b..c81ecfb 100644 (file)
@@ -11,7 +11,7 @@ import akka.japi.Procedure;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.List;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
 
 /**
  * Implementation of ReplicatedLog used by the RaftActor.
index ca251d2..4b90979 100644 (file)
@@ -16,7 +16,10 @@ import java.io.Serializable;
  * behavior to the RaftActor to persist.
  *
  * @author Thomas Pantelis
+ *
+ * @deprecated Use {@link org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries} instead.
  */
+@Deprecated
 public class ApplyJournalEntries implements Serializable {
     private static final long serialVersionUID = 1L;
 
@@ -30,6 +33,10 @@ public class ApplyJournalEntries implements Serializable {
         return toIndex;
     }
 
+    private Object readResolve() {
+        return org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries.createMigrated(toIndex);
+    }
+
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
index 97742c0..059361a 100644 (file)
@@ -13,7 +13,10 @@ import java.io.Serializable;
  * Internal message that is stored in the akka's persistent journal to delete journal entries.
  *
  * @author Thomas Pantelis
+ *
+ * @deprecated Use {@link org.opendaylight.controller.cluster.raft.persisted.DeleteEntries} instead.
  */
+@Deprecated
 public class DeleteEntries implements Serializable {
     private static final long serialVersionUID = 1L;
 
@@ -27,6 +30,10 @@ public class DeleteEntries implements Serializable {
         return fromIndex;
     }
 
+    private Object readResolve() {
+        return org.opendaylight.controller.cluster.raft.persisted.DeleteEntries.createMigrated(fromIndex);
+    }
+
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
index a94a1ff..9fac37e 100644 (file)
@@ -12,7 +12,10 @@ import java.io.Serializable;
 
 /**
  * Message class to persist election term information.
+ *
+ * @deprecated Use {@link org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm} instead.
  */
+@Deprecated
 public class UpdateElectionTerm implements Serializable {
     private static final long serialVersionUID = 1L;
 
@@ -32,6 +35,11 @@ public class UpdateElectionTerm implements Serializable {
         return votedFor;
     }
 
+    private Object readResolve() {
+        return org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm.createMigrated(
+                currentTerm, votedFor);
+    }
+
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
index d8b23f9..d6a4e9d 100644 (file)
@@ -17,13 +17,13 @@ import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.slf4j.Logger;
 import scala.concurrent.duration.FiniteDuration;
 
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ApplyJournalEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ApplyJournalEntries.java
new file mode 100644 (file)
index 0000000..a21c959
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+/**
+ * This is an internal message that is stored in the akka's persistent journal. During recovery, this
+ * message is used to apply recovered journal entries to the state whose indexes range from the context's
+ * current lastApplied index to "toIndex" contained in the message. This message is sent internally from a
+ * behavior to the RaftActor to persist.
+ *
+ * @author Thomas Pantelis
+ */
+public class ApplyJournalEntries implements Serializable, MigratedSerializable {
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private ApplyJournalEntries applyEntries;
+
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final ApplyJournalEntries applyEntries) {
+            this.applyEntries = applyEntries;
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeLong(applyEntries.toIndex);
+         }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            applyEntries = new ApplyJournalEntries(in.readLong());
+        }
+
+        private Object readResolve() {
+            return applyEntries;
+        }
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    private final long toIndex;
+    private final boolean migrated;
+
+    private ApplyJournalEntries(final long toIndex, final boolean migrated) {
+        this.toIndex = toIndex;
+        this.migrated = migrated;
+    }
+
+    public ApplyJournalEntries(final long toIndex) {
+        this(toIndex, false);
+    }
+
+    public long getToIndex() {
+        return toIndex;
+    }
+
+    @Override
+    public boolean isMigrated() {
+        return migrated;
+    }
+
+    @Override
+    public Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    @Deprecated
+    public static ApplyJournalEntries createMigrated(final long fromIndex) {
+        return new ApplyJournalEntries(fromIndex, true);
+    }
+
+    @Override
+    public String toString() {
+        return "ApplyJournalEntries [toIndex=" + toIndex + "]";
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/DeleteEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/DeleteEntries.java
new file mode 100644 (file)
index 0000000..5650360
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+/**
+ * Internal message that is stored in the akka's persistent journal to delete journal entries.
+ *
+ * @author Thomas Pantelis
+ */
+public class DeleteEntries implements Serializable, MigratedSerializable {
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private DeleteEntries deleteEntries;
+
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final DeleteEntries deleteEntries) {
+            this.deleteEntries = deleteEntries;
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeLong(deleteEntries.fromIndex);
+         }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            deleteEntries = new DeleteEntries(in.readLong());
+        }
+
+        private Object readResolve() {
+            return deleteEntries;
+        }
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    private final long fromIndex;
+    private final boolean migrated;
+
+    private DeleteEntries(final long fromIndex, final boolean migrated) {
+        this.fromIndex = fromIndex;
+        this.migrated = migrated;
+    }
+
+    public DeleteEntries(final long fromIndex) {
+        this(fromIndex, false);
+    }
+
+    public long getFromIndex() {
+        return fromIndex;
+    }
+
+    @Override
+    public boolean isMigrated() {
+        return migrated;
+    }
+
+    @Override
+    public Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    @Deprecated
+    public static DeleteEntries createMigrated(final long fromIndex) {
+        return new DeleteEntries(fromIndex, true);
+    }
+
+    @Override
+    public String toString() {
+        return "DeleteEntries [fromIndex=" + fromIndex + "]";
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/UpdateElectionTerm.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/UpdateElectionTerm.java
new file mode 100644 (file)
index 0000000..ae86921
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+/**
+ * Message class to persist election term information.
+ */
+public class UpdateElectionTerm implements Serializable, MigratedSerializable {
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private UpdateElectionTerm updateElectionTerm;
+
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final UpdateElectionTerm updateElectionTerm) {
+            this.updateElectionTerm = updateElectionTerm;
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeLong(updateElectionTerm.currentTerm);
+            out.writeObject(updateElectionTerm.votedFor);
+         }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            updateElectionTerm = new UpdateElectionTerm(in.readLong(), (String) in.readObject());
+        }
+
+        private Object readResolve() {
+            return updateElectionTerm;
+        }
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    private final long currentTerm;
+    private final String votedFor;
+    private final boolean migrated;
+
+    private UpdateElectionTerm(final long currentTerm, final String votedFor, final boolean migrated) {
+        this.currentTerm = currentTerm;
+        this.votedFor = votedFor;
+        this.migrated = migrated;
+    }
+
+    public UpdateElectionTerm(final long currentTerm, final String votedFor) {
+        this(currentTerm, votedFor, false);
+    }
+
+    public long getCurrentTerm() {
+        return currentTerm;
+    }
+
+    public String getVotedFor() {
+        return votedFor;
+    }
+
+    @Override
+    public boolean isMigrated() {
+        return migrated;
+    }
+
+    @Override
+    public Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    @Deprecated
+    public static UpdateElectionTerm createMigrated(final long currentTerm, final String votedFor) {
+        return new UpdateElectionTerm(currentTerm, votedFor, true);
+    }
+
+    @Override
+    public String toString() {
+        return "UpdateElectionTerm [currentTerm=" + currentTerm + ", votedFor=" + votedFor + "]";
+    }
+}
+
index 343febb..5d3b2aa 100644 (file)
@@ -26,11 +26,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
index 9dd5e40..972cafd 100644 (file)
@@ -16,7 +16,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MigratedMessagesTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MigratedMessagesTest.java
new file mode 100644 (file)
index 0000000..2f1a391
--- /dev/null
@@ -0,0 +1,242 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import akka.actor.ActorRef;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+
+/**
+ * Unit tests for migrated messages on recovery.
+ *
+ * @author Thomas Pantelis
+ */
+public class MigratedMessagesTest extends AbstractActorTest {
+    static final Logger TEST_LOG = LoggerFactory.getLogger(MigratedMessagesTest.class);
+
+    private TestActorFactory factory;
+
+    @Before
+    public void setUp(){
+        factory = new TestActorFactory(getSystem());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        factory.close();
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
+    }
+
+    @Test
+    public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled() {
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled starting");
+        doTestSnapshotAfterStartupWithMigratedServerConfigPayload(true);
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled ending");
+    }
+
+    @Test
+    public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled() {
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled starting");
+
+        TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedServerConfigPayload(false);
+        MockRaftActor mockRaftActor = actor.underlyingActor();
+        String id = mockRaftActor.persistenceId();
+        ConfigParams config = mockRaftActor.getRaftActorContext().getConfigParams();
+
+        factory.killActor(actor, new JavaTestKit(getSystem()));
+
+        actor = factory.createTestActor(MockRaftActor.builder().id(id).config(config).persistent(Optional.of(false)).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        mockRaftActor = actor.underlyingActor();
+        mockRaftActor.waitForRecoveryComplete();
+
+        assertEquals("electionTerm", 1,
+                mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+        assertEquals("votedFor", id,
+                mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
+
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled ending");
+    }
+
+    @Test
+    public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled() {
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled starting");
+
+        String persistenceId = factory.generateActorId("test-actor-");
+
+        org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
+                new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
+
+        InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
+
+        doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
+            assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
+            assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
+        });
+
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled ending");
+    }
+
+    @Test
+    public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled() {
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled starting");
+
+        String persistenceId = factory.generateActorId("test-actor-");
+
+        org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
+                new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
+
+        InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
+
+        doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, false, snapshot -> {
+            assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
+            assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
+        });
+
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled ending");
+    }
+
+    @Test
+    public void testSnapshotAfterStartupWithMigratedApplyJournalEntries() {
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries starting");
+
+        String persistenceId = factory.generateActorId("test-actor-");
+
+        InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
+        InMemoryJournal.addEntry(persistenceId, 2, new ReplicatedLogImplEntry(0, 1,
+                new MockRaftActorContext.MockPayload("A")));
+        InMemoryJournal.addEntry(persistenceId, 3,
+                new org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries(0));
+
+
+        doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
+            assertEquals("getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
+            assertEquals("getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+            assertEquals("getLastIndex", 0, snapshot.getLastIndex());
+            assertEquals("getLastTerm", 1, snapshot.getLastTerm());
+        });
+
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries ending");
+    }
+
+    @Test
+    public void testNoSnapshotAfterStartupWithNoMigratedMessages() {
+        String id = factory.generateActorId("test-actor-");
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
+            @Override
+            public void createSnapshot(ActorRef actorRef) {
+                actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+            }
+
+            @Override
+            public void applySnapshot(byte[] snapshotBytes) {
+            }
+        };
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id).
+                config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(true)).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForRecoveryComplete();
+
+        Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
+
+        List<Snapshot> snapshots = InMemorySnapshotStore.getSnapshots(id, Snapshot.class);
+        assertEquals("Snapshots", 0, snapshots.size());
+    }
+
+    private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedServerConfigPayload(boolean persistent) {
+        String persistenceId = factory.generateActorId("test-actor-");
+
+        org.opendaylight.controller.cluster.raft.ServerConfigurationPayload persistedServerConfig =
+                new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload(Arrays.asList(
+                    new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
+                            persistenceId, true),
+                    new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
+                            "downNode", true)));
+
+        ServerConfigurationPayload expectedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo(persistenceId, true), new ServerInfo("downNode", true)));
+
+        InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
+        InMemoryJournal.addEntry(persistenceId, 3, new ReplicatedLogImplEntry(0, 1, persistedServerConfig));
+
+        TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedMessage(persistenceId,
+                persistent, snapshot -> {
+            assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
+            assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
+            assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()),
+                    new HashSet<>(snapshot.getServerConfiguration().getServerConfig()));
+        });
+
+        return actor;
+    }
+
+
+    private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
+            Consumer<Snapshot> snapshotVerifier) {
+        InMemorySnapshotStore.addSnapshotSavedLatch(id);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
+            @Override
+            public void createSnapshot(ActorRef actorRef) {
+                actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+            }
+
+            @Override
+            public void applySnapshot(byte[] snapshotBytes) {
+            }
+        };
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id).
+                config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent)).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForRecoveryComplete();
+
+        Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+        snapshotVerifier.accept(snapshot);
+
+        InMemoryJournal.waitForDeleteMessagesComplete(id);
+
+        assertEquals("InMemoryJournal size", 0, InMemoryJournal.get(id).size());
+
+        return raftActorRef;
+    }
+
+}
index 3ba664b..88865d5 100644 (file)
@@ -52,7 +52,9 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         state = new ArrayList<>();
         this.actorDelegate = mock(RaftActor.class);
         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
-        this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
+
+        this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
+            mock(RaftActorSnapshotCohort.class);
 
         if(builder.dataPersistenceProvider == null){
             setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
@@ -284,6 +286,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         private Optional<Boolean> persistent = Optional.absent();
         private final Class<A> actorClass;
         private Function<Runnable, Void> pauseLeaderFunction;
+        private RaftActorSnapshotCohort snapshotCohort;
 
         protected AbstractBuilder(Class<A> actorClass) {
             this.actorClass = actorClass;
@@ -339,6 +342,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
             return self();
         }
 
+        public T snapshotCohort(RaftActorSnapshotCohort snapshotCohort) {
+            this.snapshotCohort = snapshotCohort;
+            return self();
+        }
+
         public Props props() {
             return Props.create(actorClass, this);
         }
index 74ac12d..7bf22b4 100644 (file)
@@ -8,8 +8,6 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
-import akka.actor.ActorRef;
-import akka.dispatch.Dispatchers;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
@@ -21,13 +19,15 @@ import org.opendaylight.controller.cluster.raft.AbstractRaftActorIntegrationTest
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import akka.actor.ActorRef;
+import akka.dispatch.Dispatchers;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
index af19b49..3795103 100644 (file)
@@ -10,17 +10,17 @@ package org.opendaylight.controller.cluster.raft;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.AdditionalMatchers.aryEq;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
-import akka.japi.Procedure;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotOffer;
-import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.collect.Sets;
 import java.util.Arrays;
 import java.util.Collections;
@@ -35,11 +35,11 @@ import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -240,22 +240,6 @@ public class RaftActorRecoverySupportTest {
         verifyNoMoreInteractions(mockCohort);
     }
 
-    @Test
-    public void testOnDeprecatedDeleteEntries() {
-        ReplicatedLog replicatedLog = context.getReplicatedLog();
-        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
-                0, new MockRaftActorContext.MockPayload("0")));
-        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
-                1, new MockRaftActorContext.MockPayload("1")));
-        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
-                2, new MockRaftActorContext.MockPayload("2")));
-
-        sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
-
-        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
-        assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
-    }
-
     @Test
     public void testOnDeleteEntries() {
         ReplicatedLog replicatedLog = context.getReplicatedLog();
@@ -281,28 +265,20 @@ public class RaftActorRecoverySupportTest {
         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
     }
 
-    @Test
-    public void testDeprecatedUpdateElectionTerm() {
-
-        sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
-
-        assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
-        assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
-    }
-
     @SuppressWarnings("unchecked")
     @Test
     public void testDataRecoveredWithPersistenceDisabled() {
+        doNothing().when(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
         doReturn(false).when(mockPersistence).isRecoveryApplicable();
         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
 
-        sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
-
         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
 
         sendMessageToSupport(snapshotOffer);
 
+        sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
+
         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
                 4, new MockRaftActorContext.MockPayload("4")));
         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
@@ -312,8 +288,6 @@ public class RaftActorRecoverySupportTest {
 
         sendMessageToSupport(new DeleteEntries(5));
 
-        sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
-
         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
         assertEquals("Last applied", -1, context.getLastApplied());
@@ -326,12 +300,11 @@ public class RaftActorRecoverySupportTest {
 
         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
 
-        verify(mockCohort).getRestoreFromSnapshot();
+        verify(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
+        verify(mockCohort, never()).getRestoreFromSnapshot();
         verifyNoMoreInteractions(mockCohort);
 
         verify(mockPersistentProvider).deleteMessages(10L);
-        verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
-        verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
     }
 
     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
index 630ec7f..3145950 100644 (file)
@@ -34,14 +34,12 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -58,8 +56,10 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
index 3c4a96c..7654648 100644 (file)
@@ -63,16 +63,13 @@ import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -80,8 +77,11 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@@ -312,24 +312,14 @@ public class RaftActorTest extends AbstractActorTest {
         DeleteEntries deleteEntries = new DeleteEntries(1);
         mockRaftActor.handleRecover(deleteEntries);
 
-        org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
-                new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
-        mockRaftActor.handleRecover(deprecatedDeleteEntries);
-
         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
         mockRaftActor.handleRecover(updateElectionTerm);
 
-        org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm =
-                new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
-        mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
-
         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
         verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
         verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
-        verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class));
         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
-        verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm), any(PersistentDataProvider.class));
     }
 
     @Test
index a934586..ac46397 100644 (file)
@@ -16,7 +16,7 @@ import com.google.common.collect.Lists;
 import java.util.List;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
index b264739..b18148d 100644 (file)
@@ -18,10 +18,10 @@ import java.util.Map;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 
index d2fe7dd..1caef35 100644 (file)
@@ -29,8 +29,8 @@ import org.mockito.internal.matchers.Same;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
index c658ef0..c880428 100644 (file)
@@ -13,12 +13,12 @@ import com.google.common.collect.ImmutableMap;
 import java.util.List;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
index d2ba8ac..747bbe7 100644 (file)
@@ -21,19 +21,19 @@ import javax.annotation.Nullable;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
index f689172..3338e26 100644 (file)
@@ -8,32 +8,31 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ImmutableSet;
 import java.util.Arrays;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
-import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 
 /**
  * Unit tests for ServerConfigurationPayload.
  *
  * @author Thomas Pantelis
  */
+@Deprecated
 public class ServerConfigurationPayloadTest {
 
     @Test
     public void testSerialization() {
-        ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(new ServerInfo("1", true),
-                new ServerInfo("2", false)));
-        ServerConfigurationPayload cloned = (ServerConfigurationPayload) SerializationUtils.clone(expected);
+        ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(
+                new ServerConfigurationPayload.ServerInfo("1", true),
+                new ServerConfigurationPayload.ServerInfo("2", false)));
+        org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload cloned =
+                (org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload) SerializationUtils.clone(expected);
 
-        assertEquals("getServerConfig", expected.getServerConfig(), cloned.getServerConfig());
-    }
-
-    @Test
-    public void testSize() {
-        ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(new ServerInfo("1", true)));
-        assertTrue(expected.size() > 0);
+        assertEquals("getServerConfig", ImmutableSet.of(
+                new org.opendaylight.controller.cluster.raft.persisted.ServerInfo("1", true),
+                new org.opendaylight.controller.cluster.raft.persisted.ServerInfo("2", false)),
+                ImmutableSet.copyOf(cloned.getServerConfig()));
+        assertEquals("isMigrated", true, cloned.isMigrated());
     }
 }
index 55d2bcc..1fcb248 100644 (file)
@@ -16,15 +16,16 @@ import org.junit.Test;
  *
  * @author Thomas Pantelis
  */
+@Deprecated
 public class DeleteEntriesTest {
 
     @Test
     public void testSerialization() {
-
         DeleteEntries deleteEntries = new DeleteEntries(11);
-
-        DeleteEntries clone = (DeleteEntries) SerializationUtils.clone(deleteEntries);
+        org.opendaylight.controller.cluster.raft.persisted.DeleteEntries clone =
+                (org.opendaylight.controller.cluster.raft.persisted.DeleteEntries) SerializationUtils.clone(deleteEntries);
 
         Assert.assertEquals("getFromIndex", 11, clone.getFromIndex());
+        Assert.assertEquals("isMigrated", true, clone.isMigrated());
     }
 }
index 4887963..f831aae 100644 (file)
@@ -16,16 +16,17 @@ import org.junit.Test;
  *
  * @author Thomas Pantelis
  */
+@Deprecated
 public class UpdateElectionTermTest {
 
     @Test
     public void testSerialization() {
-
-        UpdateElectionTerm deleteEntries = new UpdateElectionTerm(5, "member1");
-
-        UpdateElectionTerm clone = (UpdateElectionTerm) SerializationUtils.clone(deleteEntries);
+        UpdateElectionTerm expected = new UpdateElectionTerm(5, "member1");
+        org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm clone =
+                (org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm) SerializationUtils.clone(expected);
 
         Assert.assertEquals("getCurrentTerm", 5, clone.getCurrentTerm());
         Assert.assertEquals("getVotedFor", "member1", clone.getVotedFor());
+        Assert.assertEquals("isMigrated", true, clone.isMigrated());
     }
 }
index 0e88236..eb81e51 100644 (file)
@@ -44,7 +44,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -59,6 +58,7 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ApplyJournalEntriesTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ApplyJournalEntriesTest.java
new file mode 100644 (file)
index 0000000..d0e5cb6
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for ApplyJournalEntries.
+ *
+ * @author Thomas Pantelis
+ */
+public class ApplyJournalEntriesTest {
+
+    @Test
+    public void testSerialization() {
+        ApplyJournalEntries expected = new ApplyJournalEntries(5);
+        ApplyJournalEntries cloned = (ApplyJournalEntries) SerializationUtils.clone(expected);
+
+        assertEquals("getFromIndex", expected.getToIndex(), cloned.getToIndex());
+        assertEquals("isMigrated", false, cloned.isMigrated());
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/DeleteEntriesTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/DeleteEntriesTest.java
new file mode 100644 (file)
index 0000000..85f644c
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for DeleteEntries.
+ *
+ * @author Thomas Pantelis
+ */
+public class DeleteEntriesTest {
+
+    @Test
+    public void testSerialization() {
+        DeleteEntries expected = new DeleteEntries(5);
+        DeleteEntries cloned = (DeleteEntries) SerializationUtils.clone(expected);
+
+        assertEquals("getFromIndex", expected.getFromIndex(), cloned.getFromIndex());
+        assertEquals("isMigrated", false, cloned.isMigrated());
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayloadTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayloadTest.java
new file mode 100644 (file)
index 0000000..6f1facb
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for ServerConfigurationPayload.
+ *
+ * @author Thomas Pantelis
+ */
+public class ServerConfigurationPayloadTest {
+
+    @Test
+    public void testSerialization() {
+        ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(new ServerInfo("1", true),
+                new ServerInfo("2", false)));
+        ServerConfigurationPayload cloned = (ServerConfigurationPayload) SerializationUtils.clone(expected);
+
+        assertEquals("getServerConfig", expected.getServerConfig(), cloned.getServerConfig());
+        assertEquals("isMigrated", false, cloned.isMigrated());
+    }
+
+    @Test
+    public void testSize() {
+        ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(new ServerInfo("1", true)));
+        assertTrue(expected.size() > 0);
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/UpdateElectionTermTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/UpdateElectionTermTest.java
new file mode 100644 (file)
index 0000000..e34aaa0
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for UpdateElectionTerm.
+ *
+ * @author Thomas Pantelis
+ */
+public class UpdateElectionTermTest {
+
+    @Test
+    public void testSerialization() {
+        UpdateElectionTerm expected = new UpdateElectionTerm(5, "leader");
+        UpdateElectionTerm cloned = (UpdateElectionTerm) SerializationUtils.clone(expected);
+
+        assertEquals("getCurrentTerm", expected.getCurrentTerm(), cloned.getCurrentTerm());
+        assertEquals("getVotedFor", expected.getVotedFor(), cloned.getVotedFor());
+        assertEquals("isMigrated", false, cloned.isMigrated());
+    }
+}
index ed7e9bb..a990b50 100644 (file)
@@ -283,7 +283,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         } else if (payload instanceof DataTreeCandidatePayload) {
             applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
         } else {
-            LOG.warn("{}: ignoring unhandled payload {}", logContext, payload);
+            LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
         }
     }
 
index 2f7c790..d6faff6 100644 (file)
@@ -58,9 +58,9 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
index 243b2cb..3634507 100644 (file)
@@ -87,7 +87,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -99,6 +98,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftS
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.