assertNotNull("Module has not been created correctly.", messageBusAppImplModuleFactory.createModule("instanceName1", dependencyResolverMock, dynamicMBeanWithInstanceMock, bundleContextMock));
}
-}
+}
\ No newline at end of file
*/
package org.opendaylight.controller.config.yang.messagebus.app.impl;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.JmxAttribute;
import org.opendaylight.controller.config.api.ModuleIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.osgi.framework.BundleContext;
+import javax.management.ObjectName;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
public class MessageBusAppImplModuleTest {
MessageBusAppImplModule messageBusAppImplModule;
assertEquals("Set and/or get method/s don't work correctly.", bundleContext, messageBusAppImplModule.getBundleContext());
}
- //TODO: create MessageBusAppImplModule.createInstance test
-}
+ @Test
+ public void createInstanceTest() throws Exception{
+ org.opendaylight.controller.sal.binding.api.BindingAwareBroker bindingAwareBrokerMock = mock(org.opendaylight.controller.sal.binding.api.BindingAwareBroker.class);
+ Broker brokerMock = mock(Broker.class);
+ doReturn(brokerMock).when(dependencyResolverMock).resolveInstance(eq(org.opendaylight.controller.sal.core.api.Broker.class), any(ObjectName.class), any(JmxAttribute.class));
+ doReturn(bindingAwareBrokerMock).when(dependencyResolverMock).resolveInstance(eq(org.opendaylight.controller.sal.binding.api.BindingAwareBroker.class), any(ObjectName.class), any(JmxAttribute.class));
+ messageBusAppImplModule.resolveDependencies();
+
+ BindingAwareBroker.ProviderContext providerContext = mock(BindingAwareBroker.ProviderContext.class);
+ doReturn(providerContext).when(bindingAwareBrokerMock).registerProvider(any(BindingAwareProvider.class));
+ Broker.ProviderSession providerSessionMock = mock(Broker.ProviderSession.class);
+ doReturn(providerSessionMock).when(brokerMock).registerProvider(any(Provider.class));
+ DataBroker dataBrokerMock = mock(DataBroker.class);
+ doReturn(dataBrokerMock).when(providerContext).getSALService(eq(DataBroker.class));
+ DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
+ doReturn(domNotificationPublishServiceMock).when(providerSessionMock).getService(DOMNotificationPublishService.class);
+ DOMMountPointService domMountPointServiceMock = mock(DOMMountPointService.class);
+ doReturn(domMountPointServiceMock).when(providerSessionMock).getService(DOMMountPointService.class);
+ MountPointService mountPointServiceMock = mock(MountPointService.class);
+ doReturn(mountPointServiceMock).when(providerContext).getSALService(eq(MountPointService.class));
+ RpcProviderRegistry rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
+ doReturn(rpcProviderRegistryMock).when(providerContext).getSALService(eq(RpcProviderRegistry.class));
+
+ WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
+ doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
+ doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class), eq(true));
+
+ assertNotNull("EventSourceRegistryWrapper has not been created correctly.", messageBusAppImplModule.createInstance());
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class EventSourceRegistrationImplTest {
+
+ EventSourceRegistrationImplLocal eventSourceRegistrationImplLocal;
+ EventSourceTopology eventSourceTopologyMock;
+
+ @BeforeClass
+ public static void initTestClass() throws IllegalAccessException, InstantiationException {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ EventSource eventSourceMock = mock(EventSource.class);
+ eventSourceTopologyMock = mock(EventSourceTopology.class);
+ eventSourceRegistrationImplLocal = new EventSourceRegistrationImplLocal(eventSourceMock, eventSourceTopologyMock);
+ }
+
+ @Test
+ public void removeRegistrationTest() {
+ eventSourceRegistrationImplLocal.removeRegistration();
+ verify(eventSourceTopologyMock, times(1)).unRegister(any(EventSource.class));
+ }
+
+
+ private class EventSourceRegistrationImplLocal extends EventSourceRegistrationImpl{
+
+ /**
+ * @param instance of EventSource that has been registered by {@link EventSourceRegistryImpl#registerEventSource(Node, org.opendaylight.controller.messagebus.spi.EventSource)}
+ * @param eventSourceTopology
+ */
+ public EventSourceRegistrationImplLocal(EventSource instance, EventSourceTopology eventSourceTopology) {
+ super(instance, eventSourceTopology);
+ }
+ }
+
+}
\ No newline at end of file
nodeIdMock = mock(NodeId.class);
doReturn(nodeIdMock).when(dataObjectMock).getId();
- doReturn("0").when(nodeIdMock).getValue();
+ doReturn("nodeIdPattern1").when(nodeIdMock).getValue();
}
@Test
verify(eventSourceServiceMock, times(1)).joinTopic(any(JoinTopicInput.class));
}
-}
+}
\ No newline at end of file
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
CreateTopicInput createTopicInputMock;
ListenerRegistration listenerRegistrationMock;
NodeKey nodeKey;
+ RpcRegistration<EventAggregatorService> aggregatorRpcReg;
@BeforeClass
public static void initTestClass() throws IllegalAccessException, InstantiationException {
}
private void constructorTestHelper(){
- RpcRegistration<EventAggregatorService> aggregatorRpcReg = mock(RpcRegistration.class);
+ aggregatorRpcReg = mock(RpcRegistration.class);
EventSourceService eventSourceService = mock(EventSourceService.class);
doReturn(aggregatorRpcReg).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
doReturn(eventSourceService).when(rpcProviderRegistryMock).getRpcService(EventSourceService.class);
doReturn(checkedFutureMock).when(writeTransactionMock).submit();
}
-//TODO: create test for createTopic
-// public void createTopicTest() throws Exception{
-// createTopicTestHelper();
-// assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
-// }
+ @Test
+ public void createTopicTest() throws Exception{
+ topicTestHelper();
+ assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
+ }
private void topicTestHelper() throws Exception{
constructorTestHelper();
assertNotNull("Instance has not been created correctly.", eventSourceTopology.destroyTopic(destroyTopicInput));
}
+ @Test
+ public void closeTest() throws Exception{
+ constructorTestHelper();
+ topicTestHelper();
+ Map<DataChangeListener, ListenerRegistration<DataChangeListener>> localMap = getTopicListenerRegistrations();
+ DataChangeListener dataChangeListenerMock = mock(DataChangeListener.class);
+ ListenerRegistration<DataChangeListener> listenerListenerRegistrationMock = (ListenerRegistration<DataChangeListener>) mock(ListenerRegistration.class);
+ localMap.put(dataChangeListenerMock, listenerListenerRegistrationMock);
+ eventSourceTopology.close();
+ verify(aggregatorRpcReg, times(1)).close();
+ verify(listenerListenerRegistrationMock, times(1)).close();
+ }
+
@Test
public void registerTest() throws Exception {
topicTestHelper();
verify(routedRpcRegistrationMock, times(1)).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
}
-}
+ @Test
+ public void unregisterTest() throws Exception {
+ topicTestHelper();
+ EventSource eventSourceMock = mock(EventSource.class);
+ NodeId nodeId = new NodeId("nodeIdValue1");
+ nodeKey = new NodeKey(nodeId);
+ Map<NodeKey, BindingAwareBroker.RoutedRpcRegistration<EventSourceService>> localMap = getRoutedRpcRegistrations();
+ NodeKey nodeKeyMock = mock(NodeKey.class);
+ doReturn(nodeKeyMock).when(eventSourceMock).getSourceNodeKey();
+ BindingAwareBroker.RoutedRpcRegistration<EventSourceService> routedRpcRegistrationMock = (BindingAwareBroker.RoutedRpcRegistration<EventSourceService>) mock(BindingAwareBroker.RoutedRpcRegistration.class);
+ localMap.put(nodeKeyMock, routedRpcRegistrationMock);
+ eventSourceTopology.unRegister(eventSourceMock);
+ verify(routedRpcRegistrationMock, times(1)).close();
+ }
+
+ @Test
+ public void registerEventSourceTest() throws Exception {
+ topicTestHelper();
+ Node nodeMock = mock(Node.class);
+ EventSource eventSourceMock = mock(EventSource.class);
+ NodeId nodeId = new NodeId("nodeIdValue1");
+ nodeKey = new NodeKey(nodeId);
+ doReturn(nodeKey).when(nodeMock).getKey();
+ doReturn(nodeKey).when(eventSourceMock).getSourceNodeKey();
+ BindingAwareBroker.RoutedRpcRegistration routedRpcRegistrationMock = mock(BindingAwareBroker.RoutedRpcRegistration.class);
+ doReturn(routedRpcRegistrationMock).when(rpcProviderRegistryMock).addRoutedRpcImplementation(EventSourceService.class, eventSourceMock);
+ doNothing().when(routedRpcRegistrationMock).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
+ assertNotNull("Return value has not been created correctly.", eventSourceTopology.registerEventSource(eventSourceMock));
+ }
+
+ private Map getTopicListenerRegistrations() throws Exception{
+ Field nesField = EventSourceTopology.class.getDeclaredField("topicListenerRegistrations");
+ nesField.setAccessible(true);
+ return (Map) nesField.get(eventSourceTopology);
+ }
+
+ private Map getRoutedRpcRegistrations() throws Exception{
+ Field nesField = EventSourceTopology.class.getDeclaredField("routedRpcRegistrations");
+ nesField.setAccessible(true);
+ return (Map) nesField.get(eventSourceTopology);
+ }
+
+}
\ No newline at end of file
netconfEventSourceManager =
NetconfEventSourceManager.create(dataBrokerMock,
- domNotificationPublishServiceMock,
- domMountPointServiceMock,
- mountPointServiceMock,
- eventSourceRegistry,
- namespaceToStreamList);
+ domNotificationPublishServiceMock,
+ domMountPointServiceMock,
+ mountPointServiceMock,
+ eventSourceRegistry,
+ namespaceToStreamList);
}
@Test
Map<InstanceIdentifier, DataObject> mapUpdate = new HashMap<>();
InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
Node dataObjectMock = mock(Node.class);
+
if(create){
mapCreate.put(instanceIdentifierMock, dataObjectMock);
}
if(update){
mapUpdate.put(instanceIdentifierMock, dataObjectMock);
}
+
doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData();
doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData();
NetconfNode netconfNodeMock = mock(NetconfNode.class);
doReturn(esrMock).when(eventSourceRegistry).registerEventSource(any(EventSource.class));
}
-}
+}
\ No newline at end of file
*/
package org.opendaylight.controller.messagebus.app.impl;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.md.sal.binding.api.BindingService;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class NetconfEventSourceTest {
NetconfEventSource netconfEventSource;
DOMMountPoint domMountPointMock;
JoinTopicInput joinTopicInputMock;
- AsyncDataChangeEvent asyncDataChangeEventMock;
- Node dataObjectMock;
@Before
public void setUp() throws Exception {
Map<String, String> streamMap = new HashMap<>();
- streamMap.put("string1", "string2");
+ streamMap.put("uriStr1", "string2");
domMountPointMock = mock(DOMMountPoint.class);
DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
MountPoint mountPointMock = mock(MountPoint.class);
doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node node
- = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class);
+ = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class);
org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId nodeId
- = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1");
+ = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1");
doReturn(nodeId).when(node).getNodeId();
netconfEventSource = new NetconfEventSource(node, streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock);
}
doReturn(topicId).when(joinTopicInputMock).getTopicId();
NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
- doReturn("regexString1").when(notificationPatternMock).getValue();
+ doReturn("uriStr1").when(notificationPatternMock).getValue();
SchemaContext schemaContextMock = mock(SchemaContext.class);
doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class);
doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class));
+
+ Optional<DOMService> optionalMock = (Optional<DOMService>) mock(Optional.class);
+ doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class);
+ DOMRpcService domRpcServiceMock = mock(DOMRpcService.class);
+ doReturn(domRpcServiceMock).when(optionalMock).get();
+ CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+ doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class));
}
//TODO: create Test for NetConfEventSource#onNotification
return (Set) nesField.get(netconfEventSource);
}
-}
+}
\ No newline at end of file
String bodyString = "TopicDOMNotification [body=" + containerNodeBodyMockToString + "]";
assertEquals("String has not been created correctly.", bodyString, topicDOMNotification.toString());
}
-}
+}
\ No newline at end of file
}
private void onRecoveredApplyLogEntries(long toIndex) {
+ long lastUnappliedIndex = context.getLastApplied() + 1;
+
if(log.isDebugEnabled()) {
- log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
- context.getId(), context.getLastApplied() + 1, toIndex);
+ log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
+ context.getId(), lastUnappliedIndex, toIndex);
}
- for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
- batchRecoveredLogEntry(replicatedLog().get(i));
+ long lastApplied = lastUnappliedIndex - 1;
+ for (long i = lastUnappliedIndex; i <= toIndex; i++) {
+ ReplicatedLogEntry logEntry = replicatedLog().get(i);
+ if(logEntry != null) {
+ lastApplied++;
+ batchRecoveredLogEntry(logEntry);
+ } else {
+ // Shouldn't happen but cover it anyway.
+ log.error("Log entry not found for index {}", i);
+ break;
+ }
}
- context.setLastApplied(toIndex);
- context.setCommitIndex(toIndex);
+ context.setLastApplied(lastApplied);
+ context.setCommitIndex(lastApplied);
}
private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.protobuf.ByteString;
+import java.util.List;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
public class SnapshotManager implements SnapshotState {
-
private final SnapshotState IDLE = new Idle();
private final SnapshotState CAPTURING = new Capturing();
private final SnapshotState PERSISTING = new Persisting();
private SnapshotState currentState = IDLE;
private CaptureSnapshot captureSnapshot;
+ private long lastSequenceNumber = -1;
public SnapshotManager(RaftActorContext context, Logger logger) {
this.context = context;
long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
// send a CaptureSnapshot to self to make the expensive operation async.
+
+ List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
+
captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
- newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
+ newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
SnapshotManager.this.currentState = CAPTURING;
- if(targetFollower != null){
- LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
- } else {
+ if(captureSnapshot.isInstallSnapshotInitiated()) {
LOG.info("{}: Initiating snapshot capture {} to install on {}",
persistenceId(), captureSnapshot, targetFollower);
+ } else {
+ LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
}
+ lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
+
+ LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
+
context.getActor().tell(captureSnapshot, context.getActor());
return true;
// when snapshot is saved async, SaveSnapshotSuccess is raised.
Snapshot sn = Snapshot.create(snapshotBytes,
- context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+ captureSnapshot.getUnAppliedEntries(),
captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
- persistenceProvider.deleteMessages(sequenceNumber);
+ persistenceProvider.deleteMessages(lastSequenceNumber);
+ lastSequenceNumber = -1;
SnapshotManager.this.currentState = IDLE;
}
package org.opendaylight.controller.cluster.raft.base.messages;
+import java.util.Collections;
+import java.util.List;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
public class CaptureSnapshot {
private final long lastAppliedIndex;
private final long lastAppliedTerm;
private final boolean installSnapshotInitiated;
private final long replicatedToAllIndex;
private final long replicatedToAllTerm;
+ private final List<ReplicatedLogEntry> unAppliedEntries;
- public CaptureSnapshot(long lastIndex, long lastTerm,
- long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) {
- this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false);
+ public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm,
+ long replicatedToAllIndex, long replicatedToAllTerm, List<ReplicatedLogEntry> unAppliedEntries) {
+ this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex, replicatedToAllTerm,
+ unAppliedEntries, false);
}
- public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex,
- long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) {
+ public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex,
+ long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm,
+ List<ReplicatedLogEntry> unAppliedEntries, boolean installSnapshotInitiated) {
this.lastIndex = lastIndex;
this.lastTerm = lastTerm;
this.lastAppliedIndex = lastAppliedIndex;
this.installSnapshotInitiated = installSnapshotInitiated;
this.replicatedToAllIndex = replicatedToAllIndex;
this.replicatedToAllTerm = replicatedToAllTerm;
+ this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : Collections.<ReplicatedLogEntry>emptyList();
}
public long getLastAppliedIndex() {
return replicatedToAllTerm;
}
+ public List<ReplicatedLogEntry> getUnAppliedEntries() {
+ return unAppliedEntries;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
.append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=")
.append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated)
.append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=")
- .append(replicatedToAllTerm).append("]");
+ .append(replicatedToAllTerm).append(", unAppliedEntries size=").append(unAppliedEntries.size()).append("]");
return builder.toString();
}
+
+
}
*/
package org.opendaylight.controller.cluster.raft;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
private final TestActorRef<MessageCollectorActor> collectorActor;
private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
- private volatile byte[] snapshot;
private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
TestActorRef<MessageCollectorActor> collectorActor) {
@Override
public void createSnapshot(ActorRef actorRef) {
- if(snapshot != null) {
- getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
+ try {
+ actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
- @Override
- public void applyRecoverySnapshot(byte[] bytes) {
- }
-
- void setSnapshot(byte[] snapshot) {
- this.snapshot = snapshot;
- }
-
public ActorRef collectorActor() {
return collectorActor;
}
protected long initialTerm = 5;
protected long currentTerm;
+ protected List<Object> expSnapshotState = new ArrayList<>();
+
@After
public void tearDown() {
InMemoryJournal.clear();
});
}
+ @SuppressWarnings("unchecked")
protected void verifySnapshot(String prefix, Snapshot snapshot, long lastAppliedTerm,
- int lastAppliedIndex, long lastTerm, long lastIndex, byte[] data) {
+ int lastAppliedIndex, long lastTerm, long lastIndex)
+ throws Exception {
assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm());
assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex());
assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm());
assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex());
- assertArrayEquals(prefix + " Snapshot getState", data, snapshot.getState());
+
+ List<Object> actualState = (List<Object>)MockRaftActor.toObject(snapshot.getState());
+ assertEquals(prefix + " Snapshot getState size", expSnapshotState.size(), actualState.size());
+ for(int i = 0; i < expSnapshotState.size(); i++) {
+ assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i));
+ }
}
protected void verifyPersistedJournal(String persistenceId, List<? extends ReplicatedLogEntry> expJournal) {
return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
}
-
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
actorDelegate.applyState(clientActor, identifier, data);
- LOG.info("{}: applyState called", persistenceId());
+ LOG.info("{}: applyState called: {}", persistenceId(), data);
+
+ state.add(data);
}
@Override
return this.getId();
}
- private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
+ public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
Object obj = null;
ByteArrayInputStream bis = null;
ObjectInputStream ois = null;
@Test
public void testOnCaptureSnapshot() throws Exception {
- sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1));
+ sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null));
ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
verify(mockSnapshotManager).create(procedure.capture());
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
public class RaftActorTest extends AbstractActorTest {
+ static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
+
private TestActorFactory factory;
@Before
@Test
public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
+ TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
+
new JavaTestKit(getSystem()) {{
String persistenceId = factory.generateActorId("follower-");
// log entry.
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
- ImmutableMap.<String, String>builder().put("member1", "address").build(),
- Optional.<ConfigParams>of(config)), persistenceId);
+ peerAddresses, Optional.<ConfigParams>of(config)), persistenceId);
watch(followerActor);
//reinstate the actor
TestActorRef<MockRaftActor> ref = factory.createTestActor(
- MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
- Optional.<ConfigParams>of(config)));
+ MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
MockRaftActor mockRaftActor = ref.underlyingActor();
assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
}};
+
+ TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
}
@Test
doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
mockRaftActor.handleCommand(applySnapshot);
- CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1);
+ CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
mockRaftActor.handleCommand(captureSnapshot);
}};
}
- private ByteString fromObject(Object snapshot) throws Exception {
+ public static ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
try {
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import akka.persistence.SaveSnapshotSuccess;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+
+/**
+ * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
+ *
+ * @author Thomas Pantelis
+ */
+public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
+
+ private MockPayload payload0;
+ private MockPayload payload1;
+
+ @Before
+ public void setup() {
+ follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+ newFollowerConfigParams());
+
+ peerAddresses = ImmutableMap.<String, String>builder().
+ put(follower1Id, follower1Actor.path().toString()).build();
+
+ leaderConfigParams = newLeaderConfigParams();
+ leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+ follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+ leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+ leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+ }
+
+ @Test
+ public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
+
+ send2InitialPayloads();
+
+ // Block these messages initially so we can control the sequence.
+ leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+ leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
+ follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+ MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+ // This should trigger a snapshot.
+ MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+ MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+ CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
+ leaderCollectorActor, CaptureSnapshot.class);
+
+ // First, deliver the CaptureSnapshot to the leader.
+ leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
+ leaderActor.tell(captureSnapshot, leaderActor);
+
+ // Send another payload.
+ MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+ // Now deliver the AppendEntries to the follower
+ follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+ // Now deliver the CaptureSnapshotReply to the leader.
+ CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
+ leaderCollectorActor, CaptureSnapshotReply.class);
+ leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
+ leaderActor.tell(captureSnapshotReply, leaderActor);
+
+ // Wait for snapshot complete.
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ reinstateLeaderActor();
+
+ assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+ assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+ assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+ assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+ assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+ assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+ leaderActor.underlyingActor().getState());
+ }
+
+ @Test
+ public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
+
+ send2InitialPayloads();
+
+ // Block these messages initially so we can control the sequence.
+ leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+ follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+ MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+ // This should trigger a snapshot.
+ MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+ // Send another payload.
+ MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+ MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+ CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
+ leaderCollectorActor, CaptureSnapshot.class);
+
+ // First, deliver the AppendEntries to the follower
+ follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+ // Now deliver the CaptureSnapshot to the leader.
+ leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
+ leaderActor.tell(captureSnapshot, leaderActor);
+
+ // Wait for snapshot complete.
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ reinstateLeaderActor();
+
+ assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+ assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+ assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+ assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+ assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+ // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
+ // were included in the snapshot. They were also included as unapplied entries in the snapshot as
+ // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
+ // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
+ // This is a side effect of trimming the persisted log to the sequence number captured at the time
+ // the snapshot was initiated.
+ assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
+ payload3, payload4), leaderActor.underlyingActor().getState());
+ }
+
+ @Test
+ public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
+
+ send2InitialPayloads();
+
+ // Block these messages initially so we can control the sequence.
+ follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+ MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+ // This should trigger a snapshot.
+ MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+ // Send another payload.
+ MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+ MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+ // Wait for snapshot complete.
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ // Now deliver the AppendEntries to the follower
+ follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+ reinstateLeaderActor();
+
+ assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+ assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+ assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+ assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+ assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+ assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+ leaderActor.underlyingActor().getState());
+ }
+
+ private void reinstateLeaderActor() {
+ killActor(leaderActor);
+
+ leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+ leaderActor.underlyingActor().waitForRecoveryComplete();
+
+ leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+ }
+
+ private void send2InitialPayloads() {
+ waitUntilLeader(leaderActor);
+ currentTerm = leaderContext.getTermInformation().getCurrentTerm();
+
+ payload0 = sendPayloadData(leaderActor, "zero");
+ payload1 = sendPayloadData(leaderActor, "one");
+
+ // Verify the leader applies the states.
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
+
+ assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
+
+ MessageCollectorActor.clearMessages(leaderCollectorActor);
+ MessageCollectorActor.clearMessages(follower1CollectorActor);
+ }
+}
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
private MockPayload recoveredPayload0;
private MockPayload recoveredPayload1;
private MockPayload recoveredPayload2;
+ private MockPayload payload3;
private MockPayload payload4;
private MockPayload payload5;
private MockPayload payload6;
private MockPayload payload7;
@Test
- public void runTest() {
+ public void runTest() throws Exception {
testLog.info("testReplicationAndSnapshots starting");
// Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less
InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1));
recoveredPayload2 = new MockPayload("two");
InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2));
- InMemoryJournal.addEntry(leaderId, seqId++, new ApplyLogEntries(2));
+ InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2));
origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
* 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
* scenario, the follower consensus and application of state is delayed until after the snapshot
* completes.
+ * @throws Exception
*/
- private void testFirstSnapshot() {
+ private void testFirstSnapshot() throws Exception {
testLog.info("testFirstSnapshot starting");
- byte[] snapshot = new byte[] {1,2,3,4};
- leaderActor.underlyingActor().setSnapshot(snapshot);
+ expSnapshotState.add(recoveredPayload0);
+ expSnapshotState.add(recoveredPayload1);
+ expSnapshotState.add(recoveredPayload2);
// Delay the consensus by temporarily dropping the AppendEntries to both followers.
follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
// Send the payload.
- MockPayload payload3 = sendPayloadData(leaderActor, "three");
+ payload3 = sendPayloadData(leaderActor, "three");
// Wait for snapshot complete.
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
// the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
- verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3, snapshot);
+ verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
/**
* Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
* consensus occurs and the leader applies the state.
+ * @throws Exception
*/
- private void testSecondSnapshot() {
+ private void testSecondSnapshot() throws Exception {
testLog.info("testSecondSnapshot starting");
- byte[] snapshot = new byte[] {5,6,7,8};
- leaderActor.underlyingActor().setSnapshot(snapshot);
+ expSnapshotState.add(payload3);
+ expSnapshotState.add(payload4);
+ expSnapshotState.add(payload5);
+ expSnapshotState.add(payload6);
// Delay the CaptureSnapshot message to the leader actor.
leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
- // Verify the persisted snapshot. This should reflect the advanced snapshot index as the last applied
- // log entry (6).
+ expSnapshotState.add(payload7);
+
+ // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
+ // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
+ // when the snapshot is created (ie when the CaptureSnapshot is processed).
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
- verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7, snapshot);
+ verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
leaderActor.underlyingActor().waitForRecoveryComplete();
+ leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+
assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
*/
package org.opendaylight.controller.cluster.raft;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import akka.persistence.SaveSnapshotSuccess;
import com.google.common.collect.ImmutableMap;
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
+ expSnapshotState.add(payload0);
+ expSnapshotState.add(payload1);
+ expSnapshotState.add(payload2);
+
testLog.info("testInitialReplications complete");
}
testLog.info("testSubsequentReplicationsAndSnapshots starting: sending first payload, replicatedToAllIndex: {}",
leader.getReplicatedToAllIndex());
- leaderActor.underlyingActor().setSnapshot(new byte[] {2});
-
follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
// Send the first payload - this should cause the first snapshot.
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
- byte[] snapshot = new byte[] {6};
- leaderActor.underlyingActor().setSnapshot(snapshot);
+ expSnapshotState.add(payload3);
testLog.info("testSubsequentReplicationsAndSnapshots: sending 4 more payloads");
// Verify the leader's persisted snapshot.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
- verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7, snapshot);
+ verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 4, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
+ expSnapshotState.add(payload4);
+ expSnapshotState.add(payload5);
+ expSnapshotState.add(payload6);
+ expSnapshotState.add(payload7);
+
testLog.info("testSubsequentReplicationsAndSnapshots complete");
}
leader.getReplicatedToAllIndex());
leaderActor.underlyingActor().setMockTotalMemory(1000);
- byte[] snapshot = new byte[] {6};
- leaderActor.underlyingActor().setSnapshot(snapshot);
// We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal.
InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2);
CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
+ expSnapshotState.add(payload8);
+
// Send another payload with a large enough relative size in combination with the last payload
// that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
payload9 = sendPayloadData(leaderActor, "nine", 201);
// Verify the leader's persisted snapshot.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
- verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9, snapshot);
+ verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 9, payload9);
MessageCollectorActor.clearMessages(leaderCollectorActor);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
+
+ expSnapshotState.add(payload10);
}
/**
InstallSnapshot installSnapshot;
InstallSnapshotReply installSnapshotReply;
- byte[] snapshot = new byte[] {10};
- leaderActor.underlyingActor().setSnapshot(snapshot);
+ expSnapshotState.add(payload9);
// Now stop dropping AppendEntries in follower 2.
follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex());
- assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
+ //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
// Verify follower 2 applies the snapshot.
applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
- verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8, snapshot);
+ verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8);
assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
// Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot.
persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
- verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9, snapshot);
+ verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9);
unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
/**
* Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
* snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
+ * @throws Exception
*/
- private void testFinalReplicationsAndSnapshot() {
+ private void testFinalReplicationsAndSnapshot() throws Exception {
List<ApplyState> applyStates;
ApplyState applyState;
testLog.info("testFinalReplicationsAndSnapshot starting: replicatedToAllIndex: {}", leader.getReplicatedToAllIndex());
- byte[] snapshot = new byte[] {14};
- leaderActor.underlyingActor().setSnapshot(snapshot);
-
// Send another payload - a snapshot should occur.
payload11 = sendPayloadData(leaderActor, "eleven");
// Verify the leader's last persisted snapshot (previous ones may not be purged yet).
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
- verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11, snapshot);
+ verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 11, payload11);
doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
doReturn("123").when(mockRaftActorContext).getId();
+ doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
doReturn("123").when(mockRaftActorBehavior).getLeaderId();
ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
@Test
public void testCommit(){
+ doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
+
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
verify(mockReplicatedLog).snapshotCommit();
- verify(mockDataPersistenceProvider).deleteMessages(100L);
+ verify(mockDataPersistenceProvider).deleteMessages(50L);
ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
@Test
public void testCallingCommitMultipleTimesCausesNoHarm(){
+ doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
+
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
verify(mockReplicatedLog, times(1)).snapshotCommit();
- verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
+ verify(mockDataPersistenceProvider, times(1)).deleteMessages(50L);
verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
}
@Override
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+ LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr);
Map<Long, Object> journal = journals.get(persistenceId);
if(journal != null) {
synchronized (journal) {
import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
try {
proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
future.get(), actorContext.getSchemaContext()));
- } catch (InterruptedException | ExecutionException e) {
+ } catch (DataValidationFailedException | InterruptedException | ExecutionException e) {
proxyFuture.setException(e);
}
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.datastore.utils;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public class NormalizedNodeAggregator {
-
- private static final ExecutorService executorService = MoreExecutors.newDirectExecutorService();
-
private final YangInstanceIdentifier rootIdentifier;
private final List<Optional<NormalizedNode<?, ?>>> nodes;
- private final InMemoryDOMDataStore dataStore;
-
- NormalizedNodeAggregator(YangInstanceIdentifier rootIdentifier, List<Optional<NormalizedNode<?, ?>>> nodes,
- SchemaContext schemaContext){
+ private final DataTree dataTree;
+ private NormalizedNodeAggregator(YangInstanceIdentifier rootIdentifier, List<Optional<NormalizedNode<?, ?>>> nodes,
+ SchemaContext schemaContext) {
this.rootIdentifier = rootIdentifier;
this.nodes = nodes;
- this.dataStore = new InMemoryDOMDataStore("aggregator", executorService);
- this.dataStore.onGlobalContextUpdated(schemaContext);
+ this.dataTree = InMemoryDataTreeFactory.getInstance().create();
+ this.dataTree.setSchemaContext(schemaContext);
}
/**
* @param nodes
* @param schemaContext
* @return
- * @throws ExecutionException
- * @throws InterruptedException
+ * @throws DataValidationFailedException
*/
public static Optional<NormalizedNode<?,?>> aggregate(YangInstanceIdentifier rootIdentifier,
List<Optional<NormalizedNode<?, ?>>> nodes,
- SchemaContext schemaContext)
- throws ExecutionException, InterruptedException {
+ SchemaContext schemaContext) throws DataValidationFailedException {
return new NormalizedNodeAggregator(rootIdentifier, nodes, schemaContext).aggregate();
}
- private Optional<NormalizedNode<?,?>> aggregate() throws ExecutionException, InterruptedException {
+ private Optional<NormalizedNode<?,?>> aggregate() throws DataValidationFailedException {
return combine().getRootNode();
}
- private NormalizedNodeAggregator combine() throws InterruptedException, ExecutionException {
- DOMStoreWriteTransaction domStoreWriteTransaction = dataStore.newWriteOnlyTransaction();
+ private NormalizedNodeAggregator combine() throws DataValidationFailedException {
+ DataTreeModification mod = dataTree.takeSnapshot().newModification();
- for(Optional<NormalizedNode<?,?>> node : nodes) {
- if(node.isPresent()) {
- domStoreWriteTransaction.merge(rootIdentifier, node.get());
+ for (Optional<NormalizedNode<?,?>> node : nodes) {
+ if (node.isPresent()) {
+ mod.merge(rootIdentifier, node.get());
}
}
- DOMStoreThreePhaseCommitCohort ready = domStoreWriteTransaction.ready();
- ready.canCommit().get();
- ready.preCommit().get();
- ready.commit().get();
+
+ dataTree.validate(mod);
+ final DataTreeCandidate candidate = dataTree.prepare(mod);
+ dataTree.commit(candidate);
return this;
}
- private Optional<NormalizedNode<?, ?>> getRootNode() throws InterruptedException, ExecutionException {
- DOMStoreReadTransaction readTransaction = dataStore.newReadOnlyTransaction();
-
- CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
- readTransaction.read(rootIdentifier);
-
- return read.get();
+ private Optional<NormalizedNode<?, ?>> getRootNode() {
+ return dataTree.takeSnapshot().readNode(rootIdentifier);
}
-
-
}
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public class NormalizedNodeAggregatorTest {
@Test
- public void testAggregate() throws InterruptedException, ExecutionException, ReadFailedException {
+ public void testAggregate() throws InterruptedException, ExecutionException, ReadFailedException, DataValidationFailedException {
SchemaContext schemaContext = SchemaContextHelper.full();
NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);