context = new RaftActorContextImpl(this.getSelf(),
this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-1, -1, peerAddresses,
- (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
+ configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl(),
delegatingPersistenceProvider, LOG);
context.setPayloadVersion(payloadVersion);
} else if(message instanceof InitiateCaptureSnapshot) {
captureSnapshot();
} else if(message instanceof SwitchBehavior) {
- switchBehavior(((SwitchBehavior) message));
+ switchBehavior((SwitchBehavior) message);
} else if(message instanceof LeaderTransitioning) {
onLeaderTransitioning();
} else if(message instanceof Shutdown) {
}
if (roleChangeNotifier.isPresent() &&
- (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
+ (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
currentBehavior.state().name()), getSelf());
}
}
protected void setPersistence(boolean persistent) {
- if(persistent) {
+ DataPersistenceProvider currentPersistence = persistence();
+ if(persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
setPersistence(new PersistentDataProvider(this));
- } else {
+
+ if(getCurrentBehavior() != null) {
+ LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
+ captureSnapshot();
+ }
+ } else if(!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
setPersistence(new NonPersistentDataProvider() {
/**
* The way snapshotting works is,
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
/**
}
@Override
- public <T> void persist(T o, Procedure<T> procedure) {
+ public <T> void persist(final T o, final Procedure<T> procedure) {
if(getDelegate().isRecoveryApplicable()) {
super.persist(o, procedure);
} else {
- boolean isPersistentPayload = false;
if(o instanceof ReplicatedLogEntry) {
- isPersistentPayload = ((ReplicatedLogEntry)o).getData() instanceof PersistentPayload;
- }
-
- if(isPersistentPayload) {
- persistentProvider.persist(o, procedure);
+ Payload payload = ((ReplicatedLogEntry)o).getData();
+ if(payload instanceof PersistentPayload) {
+ // We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes
+ // on recovery if data persistence is later enabled.
+ persistentProvider.persist(payload, p -> procedure.apply(o));
+ } else {
+ super.persist(o, procedure);
+ }
} else {
super.persist(o, procedure);
}
onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
} else if (message instanceof DeleteEntries) {
onDeleteEntries((DeleteEntries) message);
+ } else if (message instanceof ServerConfigurationPayload) {
+ context.updatePeerIds((ServerConfigurationPayload)message);
} else if (message instanceof RecoveryCompleted) {
recoveryComplete = true;
onRecoveryCompletedMessage(persistentProvider);
replicatedLog().getSnapshotTerm(), replicatedLog().size());
if(dataRecoveredWithPersistenceDisabled ||
- (hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable())) {
+ hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
if(hasMigratedDataRecovered) {
log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
} else {
*/
package org.opendaylight.controller.cluster.raft;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
import akka.japi.Procedure;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
verify(mockDelegateProvider).persist(OTHER_DATA_OBJECT, mockProcedure);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
- public void testPersistWithPersistenceDisabled() {
+ public void testPersistWithPersistenceDisabled() throws Exception {
doReturn(false).when(mockDelegateProvider).isRecoveryApplicable();
provider.persist(mockPersistentLogEntry, mockProcedure);
- verify(mockPersistentProvider).persist(mockPersistentLogEntry, mockProcedure);
+
+ ArgumentCaptor<Procedure> procedureCaptor = ArgumentCaptor.forClass(Procedure.class);
+ verify(mockPersistentProvider).persist(eq(PERSISTENT_PAYLOAD), procedureCaptor.capture());
+ verify(mockDelegateProvider, never()).persist(mockNonPersistentLogEntry, mockProcedure);
+ procedureCaptor.getValue().apply(PERSISTENT_PAYLOAD);
+ verify(mockProcedure).apply(mockPersistentLogEntry);
provider.persist(mockNonPersistentLogEntry, mockProcedure);
verify(mockDelegateProvider).persist(mockNonPersistentLogEntry, mockProcedure);
assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
- List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
- assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
- ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
- assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
- assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
- assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
-
- persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
- assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
- logEntry = persistedLogEntries.get(0);
- assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
- assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
- assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
- logEntry.getData().getClass());
+ assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
+ InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class).size());
+ assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
+ InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
+
+ assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
+ InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class).size());
+ assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
+ InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
LOG.info("testAddServerWithExistingFollower ending");
}
setTransactionCommitTimeout();
- if (datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
- setPersistence(true);
- } else if (!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
- setPersistence(false);
- }
+ setPersistence(datastoreContext.isPersistent());
updateConfigParams(datastoreContext.getShardRaftConfig());
}