*/
private final RaftActorContextImpl context;
- private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
+ private final DelegatingPersistentDataProvider delegatingPersistenceProvider;
private final PersistentDataProvider persistentProvider;
Optional<ConfigParams> configParams, short payloadVersion) {
persistentProvider = new PersistentDataProvider(this);
+ delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
+
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-1, -1, peerAddresses,
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
+
+/**
+ * The DelegatingPersistentDataProvider used by RaftActor to override the configured persistent provider to
+ * persist ReplicatedLogEntry's based on whether or not the payload is a PersistentPayload instance.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorDelegatingPersistentDataProvider extends DelegatingPersistentDataProvider {
+ private final PersistentDataProvider persistentProvider;
+
+ RaftActorDelegatingPersistentDataProvider(DataPersistenceProvider delegate,
+ PersistentDataProvider persistentProvider) {
+ super(delegate);
+ this.persistentProvider = Preconditions.checkNotNull(persistentProvider);
+ }
+
+ @Override
+ public <T> void persist(T o, Procedure<T> procedure) {
+ if(getDelegate().isRecoveryApplicable()) {
+ super.persist(o, procedure);
+ } else {
+ boolean isPersistentPayload = false;
+ if(o instanceof ReplicatedLogEntry) {
+ isPersistentPayload = ((ReplicatedLogEntry)o).getData() instanceof PersistentPayload;
+ }
+
+ if(isPersistentPayload) {
+ persistentProvider.persist(o, procedure);
+ } else {
+ super.persist(o, procedure);
+ }
+ }
+ }
+}
import java.util.Map;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*
* @author Thomas Pantelis
*/
-public class ServerConfigurationPayload extends Payload implements Serializable {
+public class ServerConfigurationPayload extends Payload implements PersistentPayload, Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(ServerConfigurationPayload.class);
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import akka.japi.Procedure;
+import com.google.protobuf.GeneratedMessage.GeneratedExtension;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
+
+/**
+ * Unit tests for RaftActorDelegatingPersistentDataProvider.
+ *
+ * @author Thomas Pantelis
+ */
+public class RaftActorDelegatingPersistentDataProviderTest {
+ private static final Payload PERSISTENT_PAYLOAD = new TestPersistentPayload();
+
+ private static final Payload NON_PERSISTENT_PAYLOAD = new TestNonPersistentPayload();
+
+ private static final Object OTHER_DATA_OBJECT = new Object();
+
+ @Mock
+ private ReplicatedLogEntry mockPersistentLogEntry;
+
+ @Mock
+ private ReplicatedLogEntry mockNonPersistentLogEntry;
+
+ @Mock
+ private DataPersistenceProvider mockDelegateProvider;
+
+ @Mock
+ private PersistentDataProvider mockPersistentProvider;
+
+ @SuppressWarnings("rawtypes")
+ @Mock
+ private Procedure mockProcedure;
+
+ private RaftActorDelegatingPersistentDataProvider provider;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ doReturn(PERSISTENT_PAYLOAD).when(mockPersistentLogEntry).getData();
+ doReturn(NON_PERSISTENT_PAYLOAD).when(mockNonPersistentLogEntry).getData();
+ provider = new RaftActorDelegatingPersistentDataProvider(mockDelegateProvider, mockPersistentProvider);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testPersistWithPersistenceEnabled() {
+ doReturn(true).when(mockDelegateProvider).isRecoveryApplicable();
+
+ provider.persist(mockPersistentLogEntry, mockProcedure);
+ verify(mockDelegateProvider).persist(mockPersistentLogEntry, mockProcedure);
+
+ provider.persist(mockNonPersistentLogEntry, mockProcedure);
+ verify(mockDelegateProvider).persist(mockNonPersistentLogEntry, mockProcedure);
+
+ provider.persist(OTHER_DATA_OBJECT, mockProcedure);
+ verify(mockDelegateProvider).persist(OTHER_DATA_OBJECT, mockProcedure);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testPersistWithPersistenceDisabled() {
+ doReturn(false).when(mockDelegateProvider).isRecoveryApplicable();
+
+ provider.persist(mockPersistentLogEntry, mockProcedure);
+ verify(mockPersistentProvider).persist(mockPersistentLogEntry, mockProcedure);
+
+ provider.persist(mockNonPersistentLogEntry, mockProcedure);
+ verify(mockDelegateProvider).persist(mockNonPersistentLogEntry, mockProcedure);
+
+ provider.persist(OTHER_DATA_OBJECT, mockProcedure);
+ verify(mockDelegateProvider).persist(OTHER_DATA_OBJECT, mockProcedure);
+ }
+
+ static class TestNonPersistentPayload extends Payload {
+ @SuppressWarnings("rawtypes")
+ @Override
+ public <T> Map<GeneratedExtension, T> encode() {
+ return null;
+ }
+
+ @Override
+ public Payload decode(
+ org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) {
+ return null;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+ }
+
+ static class TestPersistentPayload extends TestNonPersistentPayload implements PersistentPayload {
+ }
+}
assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
+
+ List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
+ assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
+ ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
+ assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
+ assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
+ assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
+
+ persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
+ assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
+ logEntry = persistedLogEntries.get(0);
+ assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
+ assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
+ assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
+ logEntry.getData().getClass());
}
@Test
public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
+ setPersistence(false);
}
static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
+
+/**
+ * This is a tagging interface for a Payload implementation that needs to always be persisted regardless of
+ * whether or not the component is configured to be persistent.
+ *
+ * @author Thomas Pantelis
+ */
+public interface PersistentPayload {
+}