Fix issues when persistence enabled 21/44521/3
authorTom Pantelis <tpanteli@brocade.com>
Mon, 22 Aug 2016 21:32:59 +0000 (17:32 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 23 Aug 2016 19:23:51 +0000 (19:23 +0000)
When persistence is dynamically enabled, we start persisting subsequent log
entries which causes issues on restart due to a gap in journal indexes. We need to
persist a snapshot with the current state to avoid this.

Also if persistence is disabled, we still persist ReplicatedLogEntry instances that
have a PersistentPayload, of which ServerConfigurationPayload is currently the only one.
This also can cause gaps in the persisted journal indexes which cause issues if
persistence is later enabled. To avoid this, we really shouldn't persist
ReplicatedLogEntry instances at all if data persistence is disabled since we don't
add them to the in-memory journal on recovery anyway - we just recover and apply the
ServerConfigurationPayload. Instead we should persist just the
ServerConfigurationPayload.

Change-Id: Ief78d68423b33aac1649220a36d32ff50f493eb7
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProviderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java

index 93336c8acbecd4ec52d0b9ea39e1f954eba94363..dce9ee8d596dccf6bc46b0f8b98bf4e5365570db 100644 (file)
@@ -134,7 +134,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
             -1, -1, peerAddresses,
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
             -1, -1, peerAddresses,
-            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
+            configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl(),
             delegatingPersistenceProvider, LOG);
 
         context.setPayloadVersion(payloadVersion);
             delegatingPersistenceProvider, LOG);
 
         context.setPayloadVersion(payloadVersion);
@@ -275,7 +275,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if(message instanceof InitiateCaptureSnapshot) {
             captureSnapshot();
         } else if(message instanceof SwitchBehavior) {
         } else if(message instanceof InitiateCaptureSnapshot) {
             captureSnapshot();
         } else if(message instanceof SwitchBehavior) {
-            switchBehavior(((SwitchBehavior) message));
+            switchBehavior((SwitchBehavior) message);
         } else if(message instanceof LeaderTransitioning) {
             onLeaderTransitioning();
         } else if(message instanceof Shutdown) {
         } else if(message instanceof LeaderTransitioning) {
             onLeaderTransitioning();
         } else if(message instanceof Shutdown) {
@@ -488,7 +488,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         if (roleChangeNotifier.isPresent() &&
         }
 
         if (roleChangeNotifier.isPresent() &&
-                (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
+                (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
             roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
                     currentBehavior.state().name()), getSelf());
         }
             roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
                     currentBehavior.state().name()), getSelf());
         }
@@ -663,9 +663,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void setPersistence(boolean persistent) {
     }
 
     protected void setPersistence(boolean persistent) {
-        if(persistent) {
+        DataPersistenceProvider currentPersistence = persistence();
+        if(persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
             setPersistence(new PersistentDataProvider(this));
             setPersistence(new PersistentDataProvider(this));
-        } else {
+
+            if(getCurrentBehavior() != null) {
+                LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
+                captureSnapshot();
+            }
+        } else if(!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
             setPersistence(new NonPersistentDataProvider() {
                 /**
                  * The way snapshotting works is,
             setPersistence(new NonPersistentDataProvider() {
                 /**
                  * The way snapshotting works is,
index 466609d8938c7c9674af0d3954fb6b9e7fa6c7f3..84e2dafafae8ac78bf94cc539af89a3675662dd3 100644 (file)
@@ -12,6 +12,7 @@ import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 
 /**
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 
 /**
@@ -30,17 +31,19 @@ class RaftActorDelegatingPersistentDataProvider extends DelegatingPersistentData
     }
 
     @Override
     }
 
     @Override
-    public <T> void persist(T o, Procedure<T> procedure) {
+    public <T> void persist(final T o, final Procedure<T> procedure) {
         if(getDelegate().isRecoveryApplicable()) {
             super.persist(o, procedure);
         } else {
         if(getDelegate().isRecoveryApplicable()) {
             super.persist(o, procedure);
         } else {
-            boolean isPersistentPayload = false;
             if(o instanceof ReplicatedLogEntry) {
             if(o instanceof ReplicatedLogEntry) {
-                isPersistentPayload = ((ReplicatedLogEntry)o).getData() instanceof PersistentPayload;
-            }
-
-            if(isPersistentPayload) {
-                persistentProvider.persist(o, procedure);
+                Payload payload = ((ReplicatedLogEntry)o).getData();
+                if(payload instanceof PersistentPayload) {
+                    // We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes
+                    // on recovery if data persistence is later enabled.
+                    persistentProvider.persist(payload, p -> procedure.apply(o));
+                } else {
+                    super.persist(o, procedure);
+                }
             } else {
                 super.persist(o, procedure);
             }
             } else {
                 super.persist(o, procedure);
             }
index adcd6684bf78892d105634df0b0fe55ffee73227..15d98b5289d1db98050e1f956dbf8a4859830d4e 100644 (file)
@@ -66,6 +66,8 @@ class RaftActorRecoverySupport {
             onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
         } else if (message instanceof DeleteEntries) {
             onDeleteEntries((DeleteEntries) message);
             onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
         } else if (message instanceof DeleteEntries) {
             onDeleteEntries((DeleteEntries) message);
+        } else if (message instanceof ServerConfigurationPayload) {
+            context.updatePeerIds((ServerConfigurationPayload)message);
         } else if (message instanceof RecoveryCompleted) {
             recoveryComplete = true;
             onRecoveryCompletedMessage(persistentProvider);
         } else if (message instanceof RecoveryCompleted) {
             recoveryComplete = true;
             onRecoveryCompletedMessage(persistentProvider);
@@ -260,7 +262,7 @@ class RaftActorRecoverySupport {
                  replicatedLog().getSnapshotTerm(), replicatedLog().size());
 
         if(dataRecoveredWithPersistenceDisabled ||
                  replicatedLog().getSnapshotTerm(), replicatedLog().size());
 
         if(dataRecoveredWithPersistenceDisabled ||
-                (hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable())) {
+                hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
             if(hasMigratedDataRecovered) {
                 log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
             } else {
             if(hasMigratedDataRecovered) {
                 log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
             } else {
index 3b45e1184b644605927e84b192984b143bfa89dd..c4a21d9663da77367c4cc46838319839285a1d67 100644 (file)
@@ -7,11 +7,14 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
  */
 package org.opendaylight.controller.cluster.raft;
 
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
 import akka.japi.Procedure;
 import org.junit.Before;
 import org.junit.Test;
 import akka.japi.Procedure;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
@@ -72,13 +75,18 @@ public class RaftActorDelegatingPersistentDataProviderTest {
         verify(mockDelegateProvider).persist(OTHER_DATA_OBJECT, mockProcedure);
     }
 
         verify(mockDelegateProvider).persist(OTHER_DATA_OBJECT, mockProcedure);
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Test
     @Test
-    public void testPersistWithPersistenceDisabled() {
+    public void testPersistWithPersistenceDisabled() throws Exception {
         doReturn(false).when(mockDelegateProvider).isRecoveryApplicable();
 
         provider.persist(mockPersistentLogEntry, mockProcedure);
         doReturn(false).when(mockDelegateProvider).isRecoveryApplicable();
 
         provider.persist(mockPersistentLogEntry, mockProcedure);
-        verify(mockPersistentProvider).persist(mockPersistentLogEntry, mockProcedure);
+
+        ArgumentCaptor<Procedure> procedureCaptor = ArgumentCaptor.forClass(Procedure.class);
+        verify(mockPersistentProvider).persist(eq(PERSISTENT_PAYLOAD), procedureCaptor.capture());
+        verify(mockDelegateProvider, never()).persist(mockNonPersistentLogEntry, mockProcedure);
+        procedureCaptor.getValue().apply(PERSISTENT_PAYLOAD);
+        verify(mockProcedure).apply(mockPersistentLogEntry);
 
         provider.persist(mockNonPersistentLogEntry, mockProcedure);
         verify(mockDelegateProvider).persist(mockNonPersistentLogEntry, mockProcedure);
 
         provider.persist(mockNonPersistentLogEntry, mockProcedure);
         verify(mockDelegateProvider).persist(mockNonPersistentLogEntry, mockProcedure);
index 3145950b80ed5ca5af4b4cac2bd110d83d9fa407..cba4dd9cdf9c627e9aad499f1ed67b80cecf08b6 100644 (file)
@@ -208,20 +208,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
 
         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
 
-        List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
-        assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
-        ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
-        assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
-        assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
-        assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
-
-        persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
-        assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
-        logEntry = persistedLogEntries.get(0);
-        assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
-        assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
-        assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
-                logEntry.getData().getClass());
+        assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
+                InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class).size());
+        assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
+                InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
+
+        assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
+                InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class).size());
+        assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
+                InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
 
         LOG.info("testAddServerWithExistingFollower ending");
     }
 
         LOG.info("testAddServerWithExistingFollower ending");
     }
index 1b12462a4ab8e4125788f45de3e76f9e1417f6a9..c3f76530bccfd8924fb245adb70216de6e5a7e5b 100644 (file)
@@ -312,11 +312,7 @@ public class Shard extends RaftActor {
 
         setTransactionCommitTimeout();
 
 
         setTransactionCommitTimeout();
 
-        if (datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
-            setPersistence(true);
-        } else if (!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
-            setPersistence(false);
-        }
+        setPersistence(datastoreContext.isPersistent());
 
         updateConfigParams(datastoreContext.getShardRaftConfig());
     }
 
         updateConfigParams(datastoreContext.getShardRaftConfig());
     }