Fix issues when persistence enabled 21/44521/3
authorTom Pantelis <tpanteli@brocade.com>
Mon, 22 Aug 2016 21:32:59 +0000 (17:32 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 23 Aug 2016 19:23:51 +0000 (19:23 +0000)
When persistence is dynamically enabled, we start persisting subsequent log
entries which causes issues on restart due to a gap in journal indexes. We need to
persist a snapshot with the current state to avoid this.

Also if persistence is disabled, we still persist ReplicatedLogEntry instances that
have a PersistentPayload, of which ServerConfigurationPayload is currently the only one.
This also can cause gaps in the persisted journal indexes which cause issues if
persistence is later enabled. To avoid this, we really shouldn't persist
ReplicatedLogEntry instances at all if data persistence is disabled since we don't
add them to the in-memory journal on recovery anyway - we just recover and apply the
ServerConfigurationPayload. Instead we should persist just the
ServerConfigurationPayload.

Change-Id: Ief78d68423b33aac1649220a36d32ff50f493eb7
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProviderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java

index 93336c8..dce9ee8 100644 (file)
@@ -134,7 +134,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
             -1, -1, peerAddresses,
-            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
+            configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl(),
             delegatingPersistenceProvider, LOG);
 
         context.setPayloadVersion(payloadVersion);
@@ -275,7 +275,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if(message instanceof InitiateCaptureSnapshot) {
             captureSnapshot();
         } else if(message instanceof SwitchBehavior) {
-            switchBehavior(((SwitchBehavior) message));
+            switchBehavior((SwitchBehavior) message);
         } else if(message instanceof LeaderTransitioning) {
             onLeaderTransitioning();
         } else if(message instanceof Shutdown) {
@@ -488,7 +488,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         if (roleChangeNotifier.isPresent() &&
-                (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
+                (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
             roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
                     currentBehavior.state().name()), getSelf());
         }
@@ -663,9 +663,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void setPersistence(boolean persistent) {
-        if(persistent) {
+        DataPersistenceProvider currentPersistence = persistence();
+        if(persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
             setPersistence(new PersistentDataProvider(this));
-        } else {
+
+            if(getCurrentBehavior() != null) {
+                LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
+                captureSnapshot();
+            }
+        } else if(!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
             setPersistence(new NonPersistentDataProvider() {
                 /**
                  * The way snapshotting works is,
index 466609d..84e2daf 100644 (file)
@@ -12,6 +12,7 @@ import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 
 /**
@@ -30,17 +31,19 @@ class RaftActorDelegatingPersistentDataProvider extends DelegatingPersistentData
     }
 
     @Override
-    public <T> void persist(T o, Procedure<T> procedure) {
+    public <T> void persist(final T o, final Procedure<T> procedure) {
         if(getDelegate().isRecoveryApplicable()) {
             super.persist(o, procedure);
         } else {
-            boolean isPersistentPayload = false;
             if(o instanceof ReplicatedLogEntry) {
-                isPersistentPayload = ((ReplicatedLogEntry)o).getData() instanceof PersistentPayload;
-            }
-
-            if(isPersistentPayload) {
-                persistentProvider.persist(o, procedure);
+                Payload payload = ((ReplicatedLogEntry)o).getData();
+                if(payload instanceof PersistentPayload) {
+                    // We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes
+                    // on recovery if data persistence is later enabled.
+                    persistentProvider.persist(payload, p -> procedure.apply(o));
+                } else {
+                    super.persist(o, procedure);
+                }
             } else {
                 super.persist(o, procedure);
             }
index adcd668..15d98b5 100644 (file)
@@ -66,6 +66,8 @@ class RaftActorRecoverySupport {
             onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
         } else if (message instanceof DeleteEntries) {
             onDeleteEntries((DeleteEntries) message);
+        } else if (message instanceof ServerConfigurationPayload) {
+            context.updatePeerIds((ServerConfigurationPayload)message);
         } else if (message instanceof RecoveryCompleted) {
             recoveryComplete = true;
             onRecoveryCompletedMessage(persistentProvider);
@@ -260,7 +262,7 @@ class RaftActorRecoverySupport {
                  replicatedLog().getSnapshotTerm(), replicatedLog().size());
 
         if(dataRecoveredWithPersistenceDisabled ||
-                (hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable())) {
+                hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
             if(hasMigratedDataRecovered) {
                 log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
             } else {
index 3b45e11..c4a21d9 100644 (file)
@@ -7,11 +7,14 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
 import akka.japi.Procedure;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
@@ -72,13 +75,18 @@ public class RaftActorDelegatingPersistentDataProviderTest {
         verify(mockDelegateProvider).persist(OTHER_DATA_OBJECT, mockProcedure);
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Test
-    public void testPersistWithPersistenceDisabled() {
+    public void testPersistWithPersistenceDisabled() throws Exception {
         doReturn(false).when(mockDelegateProvider).isRecoveryApplicable();
 
         provider.persist(mockPersistentLogEntry, mockProcedure);
-        verify(mockPersistentProvider).persist(mockPersistentLogEntry, mockProcedure);
+
+        ArgumentCaptor<Procedure> procedureCaptor = ArgumentCaptor.forClass(Procedure.class);
+        verify(mockPersistentProvider).persist(eq(PERSISTENT_PAYLOAD), procedureCaptor.capture());
+        verify(mockDelegateProvider, never()).persist(mockNonPersistentLogEntry, mockProcedure);
+        procedureCaptor.getValue().apply(PERSISTENT_PAYLOAD);
+        verify(mockProcedure).apply(mockPersistentLogEntry);
 
         provider.persist(mockNonPersistentLogEntry, mockProcedure);
         verify(mockDelegateProvider).persist(mockNonPersistentLogEntry, mockProcedure);
index 3145950..cba4dd9 100644 (file)
@@ -208,20 +208,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
 
-        List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
-        assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
-        ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
-        assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
-        assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
-        assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
-
-        persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
-        assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
-        logEntry = persistedLogEntries.get(0);
-        assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
-        assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
-        assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
-                logEntry.getData().getClass());
+        assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
+                InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class).size());
+        assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
+                InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
+
+        assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
+                InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class).size());
+        assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
+                InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
 
         LOG.info("testAddServerWithExistingFollower ending");
     }
index 1b12462..c3f7653 100644 (file)
@@ -312,11 +312,7 @@ public class Shard extends RaftActor {
 
         setTransactionCommitTimeout();
 
-        if (datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
-            setPersistence(true);
-        } else if (!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
-            setPersistence(false);
-        }
+        setPersistence(datastoreContext.isPersistent());
 
         updateConfigParams(datastoreContext.getShardRaftConfig());
     }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.