From c42a5e91e0dcfc499b33a321ef45c0d310d366cc Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 22 Aug 2016 17:32:59 -0400 Subject: [PATCH] Fix issues when persistence enabled When persistence is dynamically enabled, we start persisting subsequent log entries which causes issues on restart due to a gap in journal indexes. We need to persist a snapshot with the current state to avoid this. Also if persistence is disabled, we still persist ReplicatedLogEntry instances that have a PersistentPayload, of which ServerConfigurationPayload is currently the only one. This also can cause gaps in the persisted journal indexes which cause issues if persistence is later enabled. To avoid this, we really shouldn't persist ReplicatedLogEntry instances at all if data persistence is disabled since we don't add them to the in-memory journal on recovery anyway - we just recover and apply the ServerConfigurationPayload. Instead we should persist just the ServerConfigurationPayload. Change-Id: Ief78d68423b33aac1649220a36d32ff50f493eb7 Signed-off-by: Tom Pantelis --- .../controller/cluster/raft/RaftActor.java | 16 +++++++++---- ...ActorDelegatingPersistentDataProvider.java | 17 ++++++++------ .../raft/RaftActorRecoverySupport.java | 4 +++- ...rDelegatingPersistentDataProviderTest.java | 14 ++++++++--- ...ftActorServerConfigurationSupportTest.java | 23 ++++++++----------- .../controller/cluster/datastore/Shard.java | 6 +---- 6 files changed, 45 insertions(+), 35 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 93336c8acb..dce9ee8d59 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -134,7 +134,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context = new RaftActorContextImpl(this.getSelf(), this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, - (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), + configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl(), delegatingPersistenceProvider, LOG); context.setPayloadVersion(payloadVersion); @@ -275,7 +275,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if(message instanceof InitiateCaptureSnapshot) { captureSnapshot(); } else if(message instanceof SwitchBehavior) { - switchBehavior(((SwitchBehavior) message)); + switchBehavior((SwitchBehavior) message); } else if(message instanceof LeaderTransitioning) { onLeaderTransitioning(); } else if(message instanceof Shutdown) { @@ -488,7 +488,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } if (roleChangeNotifier.isPresent() && - (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) { + (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) { roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName , currentBehavior.state().name()), getSelf()); } @@ -663,9 +663,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected void setPersistence(boolean persistent) { - if(persistent) { + DataPersistenceProvider currentPersistence = persistence(); + if(persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) { setPersistence(new PersistentDataProvider(this)); - } else { + + if(getCurrentBehavior() != null) { + LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId()); + captureSnapshot(); + } + } else if(!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) { setPersistence(new NonPersistentDataProvider() { /** * The way snapshotting works is, diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java index 466609d893..84e2dafafa 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java @@ -12,6 +12,7 @@ import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload; /** @@ -30,17 +31,19 @@ class RaftActorDelegatingPersistentDataProvider extends DelegatingPersistentData } @Override - public void persist(T o, Procedure procedure) { + public void persist(final T o, final Procedure procedure) { if(getDelegate().isRecoveryApplicable()) { super.persist(o, procedure); } else { - boolean isPersistentPayload = false; if(o instanceof ReplicatedLogEntry) { - isPersistentPayload = ((ReplicatedLogEntry)o).getData() instanceof PersistentPayload; - } - - if(isPersistentPayload) { - persistentProvider.persist(o, procedure); + Payload payload = ((ReplicatedLogEntry)o).getData(); + if(payload instanceof PersistentPayload) { + // We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes + // on recovery if data persistence is later enabled. + persistentProvider.persist(payload, p -> procedure.apply(o)); + } else { + super.persist(o, procedure); + } } else { super.persist(o, procedure); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index adcd6684bf..15d98b5289 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -66,6 +66,8 @@ class RaftActorRecoverySupport { onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex()); } else if (message instanceof DeleteEntries) { onDeleteEntries((DeleteEntries) message); + } else if (message instanceof ServerConfigurationPayload) { + context.updatePeerIds((ServerConfigurationPayload)message); } else if (message instanceof RecoveryCompleted) { recoveryComplete = true; onRecoveryCompletedMessage(persistentProvider); @@ -260,7 +262,7 @@ class RaftActorRecoverySupport { replicatedLog().getSnapshotTerm(), replicatedLog().size()); if(dataRecoveredWithPersistenceDisabled || - (hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable())) { + hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) { if(hasMigratedDataRecovered) { log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId()); } else { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProviderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProviderTest.java index 3b45e1184b..c4a21d9663 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProviderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProviderTest.java @@ -7,11 +7,14 @@ */ package org.opendaylight.controller.cluster.raft; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.never; import akka.japi.Procedure; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; @@ -72,13 +75,18 @@ public class RaftActorDelegatingPersistentDataProviderTest { verify(mockDelegateProvider).persist(OTHER_DATA_OBJECT, mockProcedure); } - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test - public void testPersistWithPersistenceDisabled() { + public void testPersistWithPersistenceDisabled() throws Exception { doReturn(false).when(mockDelegateProvider).isRecoveryApplicable(); provider.persist(mockPersistentLogEntry, mockProcedure); - verify(mockPersistentProvider).persist(mockPersistentLogEntry, mockProcedure); + + ArgumentCaptor procedureCaptor = ArgumentCaptor.forClass(Procedure.class); + verify(mockPersistentProvider).persist(eq(PERSISTENT_PAYLOAD), procedureCaptor.capture()); + verify(mockDelegateProvider, never()).persist(mockNonPersistentLogEntry, mockProcedure); + procedureCaptor.getValue().apply(PERSISTENT_PAYLOAD); + verify(mockProcedure).apply(mockPersistentLogEntry); provider.persist(mockNonPersistentLogEntry, mockProcedure); verify(mockDelegateProvider).persist(mockNonPersistentLogEntry, mockProcedure); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index 3145950b80..cba4dd9cdf 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -208,20 +208,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex()); assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied()); - List persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class); - assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size()); - ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0); - assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm()); - assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex()); - assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass()); - - persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class); - assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size()); - logEntry = persistedLogEntries.get(0); - assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm()); - assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex()); - assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, - logEntry.getData().getClass()); + assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0, + InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class).size()); + assertEquals("Leader persisted ServerConfigurationPayload entries", 1, + InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size()); + + assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0, + InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class).size()); + assertEquals("New follower persisted ServerConfigurationPayload entries", 1, + InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size()); LOG.info("testAddServerWithExistingFollower ending"); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 1b12462a4a..c3f76530bc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -312,11 +312,7 @@ public class Shard extends RaftActor { setTransactionCommitTimeout(); - if (datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) { - setPersistence(true); - } else if (!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) { - setPersistence(false); - } + setPersistence(datastoreContext.isPersistent()); updateConfigParams(datastoreContext.getShardRaftConfig()); } -- 2.36.6