When persistence is dynamically enabled, we start persisting subsequent log
entries which causes issues on restart due to a gap in journal indexes. We need to
persist a snapshot with the current state to avoid this.
Also if persistence is disabled, we still persist ReplicatedLogEntry instances that
have a PersistentPayload, of which ServerConfigurationPayload is currently the only one.
This also can cause gaps in the persisted journal indexes which cause issues if
persistence is later enabled. To avoid this, we really shouldn't persist
ReplicatedLogEntry instances at all if data persistence is disabled since we don't
add them to the in-memory journal on recovery anyway - we just recover and apply the
ServerConfigurationPayload. Instead we should persist just the
ServerConfigurationPayload.
Change-Id: Ief78d68423b33aac1649220a36d32ff50f493eb7
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit
c42a5e91e0dcfc499b33a321ef45c0d310d366cc)
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-1, -1, peerAddresses,
- (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
+ configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl(),
delegatingPersistenceProvider, LOG);
context.setPayloadVersion(payloadVersion);
} else if(message instanceof InitiateCaptureSnapshot) {
captureSnapshot();
} else if(message instanceof SwitchBehavior) {
- switchBehavior(((SwitchBehavior) message));
+ switchBehavior((SwitchBehavior) message);
} else if(message instanceof LeaderTransitioning) {
onLeaderTransitioning();
} else if(message instanceof Shutdown) {
}
if (roleChangeNotifier.isPresent() &&
- (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
+ (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
currentBehavior.state().name()), getSelf());
}
}
protected void setPersistence(boolean persistent) {
- if(persistent) {
+ DataPersistenceProvider currentPersistence = persistence();
+ if(persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
setPersistence(new PersistentDataProvider(this));
- } else {
+
+ if(getCurrentBehavior() != null) {
+ LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
+ captureSnapshot();
+ }
+ } else if(!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
setPersistence(new NonPersistentDataProvider() {
/**
* The way snapshotting works is,
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
/**
}
@Override
- public <T> void persist(T o, Procedure<T> procedure) {
+ public <T> void persist(final T o, final Procedure<T> procedure) {
if(getDelegate().isRecoveryApplicable()) {
super.persist(o, procedure);
} else {
- boolean isPersistentPayload = false;
if(o instanceof ReplicatedLogEntry) {
- isPersistentPayload = ((ReplicatedLogEntry)o).getData() instanceof PersistentPayload;
- }
-
- if(isPersistentPayload) {
- persistentProvider.persist(o, procedure);
+ Payload payload = ((ReplicatedLogEntry)o).getData();
+ if(payload instanceof PersistentPayload) {
+ // We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes
+ // on recovery if data persistence is later enabled.
+ persistentProvider.persist(payload, p -> procedure.apply(o));
+ } else {
+ super.persist(o, procedure);
+ }
} else {
super.persist(o, procedure);
}
onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
} else if (message instanceof DeleteEntries) {
onDeleteEntries((DeleteEntries) message);
+ } else if (message instanceof ServerConfigurationPayload) {
+ context.updatePeerIds((ServerConfigurationPayload)message);
} else if (message instanceof RecoveryCompleted) {
recoveryComplete = true;
onRecoveryCompletedMessage(persistentProvider);
replicatedLog().getSnapshotTerm(), replicatedLog().size());
if(dataRecoveredWithPersistenceDisabled ||
- (hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable())) {
+ hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
if(hasMigratedDataRecovered) {
log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
} else {
*/
package org.opendaylight.controller.cluster.raft;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
import akka.japi.Procedure;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
verify(mockDelegateProvider).persist(OTHER_DATA_OBJECT, mockProcedure);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
- public void testPersistWithPersistenceDisabled() {
+ public void testPersistWithPersistenceDisabled() throws Exception {
doReturn(false).when(mockDelegateProvider).isRecoveryApplicable();
provider.persist(mockPersistentLogEntry, mockProcedure);
- verify(mockPersistentProvider).persist(mockPersistentLogEntry, mockProcedure);
+
+ ArgumentCaptor<Procedure> procedureCaptor = ArgumentCaptor.forClass(Procedure.class);
+ verify(mockPersistentProvider).persist(eq(PERSISTENT_PAYLOAD), procedureCaptor.capture());
+ verify(mockDelegateProvider, never()).persist(mockNonPersistentLogEntry, mockProcedure);
+ procedureCaptor.getValue().apply(PERSISTENT_PAYLOAD);
+ verify(mockProcedure).apply(mockPersistentLogEntry);
provider.persist(mockNonPersistentLogEntry, mockProcedure);
verify(mockDelegateProvider).persist(mockNonPersistentLogEntry, mockProcedure);
assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
- List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
- assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
- ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
- assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
- assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
- assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
-
- persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
- assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
- logEntry = persistedLogEntries.get(0);
- assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
- assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
- assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
- logEntry.getData().getClass());
+ assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
+ InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class).size());
+ assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
+ InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
+
+ assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
+ InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class).size());
+ assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
+ InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
LOG.info("testAddServerWithExistingFollower ending");
}
setTransactionCommitTimeout();
- if (datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
- setPersistence(true);
- } else if (!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
- setPersistence(false);
- }
+ setPersistence(datastoreContext.isPersistent());
updateConfigParams(datastoreContext.getShardRaftConfig());
}