From: Moiz Raja Date: Wed, 15 Apr 2015 18:29:20 +0000 (+0000) Subject: Merge "Split out sal-akka-raft example" X-Git-Tag: release/lithium~264 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=e3a22ae5edead2319553bb4dfce59e359386d535;hp=bf72d1435affe9307ec5a0f11220747f8e4ec050 Merge "Split out sal-akka-raft example" --- diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java index 7db7dcc333..d06a1a8273 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java @@ -49,4 +49,4 @@ public class MessageBusAppImplModuleFactoryTest { assertNotNull("Module has not been created correctly.", messageBusAppImplModuleFactory.createModule("instanceName1", dependencyResolverMock, dynamicMBeanWithInstanceMock, bundleContextMock)); } -} +} \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java index 85d1a1b109..e26502f949 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java @@ -7,17 +7,37 @@ */ package org.opendaylight.controller.config.yang.messagebus.app.impl; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Mockito.mock; - import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.config.api.DependencyResolver; +import org.opendaylight.controller.config.api.JmxAttribute; import org.opendaylight.controller.config.api.ModuleIdentifier; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.MountPointService; +import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; +import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; +import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.sal.core.api.Provider; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.osgi.framework.BundleContext; +import javax.management.ObjectName; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + public class MessageBusAppImplModuleTest { MessageBusAppImplModule messageBusAppImplModule; @@ -55,5 +75,34 @@ public class MessageBusAppImplModuleTest { assertEquals("Set and/or get method/s don't work correctly.", bundleContext, messageBusAppImplModule.getBundleContext()); } - //TODO: create MessageBusAppImplModule.createInstance test -} + @Test + public void createInstanceTest() throws Exception{ + org.opendaylight.controller.sal.binding.api.BindingAwareBroker bindingAwareBrokerMock = mock(org.opendaylight.controller.sal.binding.api.BindingAwareBroker.class); + Broker brokerMock = mock(Broker.class); + doReturn(brokerMock).when(dependencyResolverMock).resolveInstance(eq(org.opendaylight.controller.sal.core.api.Broker.class), any(ObjectName.class), any(JmxAttribute.class)); + doReturn(bindingAwareBrokerMock).when(dependencyResolverMock).resolveInstance(eq(org.opendaylight.controller.sal.binding.api.BindingAwareBroker.class), any(ObjectName.class), any(JmxAttribute.class)); + messageBusAppImplModule.resolveDependencies(); + + BindingAwareBroker.ProviderContext providerContext = mock(BindingAwareBroker.ProviderContext.class); + doReturn(providerContext).when(bindingAwareBrokerMock).registerProvider(any(BindingAwareProvider.class)); + Broker.ProviderSession providerSessionMock = mock(Broker.ProviderSession.class); + doReturn(providerSessionMock).when(brokerMock).registerProvider(any(Provider.class)); + DataBroker dataBrokerMock = mock(DataBroker.class); + doReturn(dataBrokerMock).when(providerContext).getSALService(eq(DataBroker.class)); + DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class); + doReturn(domNotificationPublishServiceMock).when(providerSessionMock).getService(DOMNotificationPublishService.class); + DOMMountPointService domMountPointServiceMock = mock(DOMMountPointService.class); + doReturn(domMountPointServiceMock).when(providerSessionMock).getService(DOMMountPointService.class); + MountPointService mountPointServiceMock = mock(MountPointService.class); + doReturn(mountPointServiceMock).when(providerContext).getSALService(eq(MountPointService.class)); + RpcProviderRegistry rpcProviderRegistryMock = mock(RpcProviderRegistry.class); + doReturn(rpcProviderRegistryMock).when(providerContext).getSALService(eq(RpcProviderRegistry.class)); + + WriteTransaction writeTransactionMock = mock(WriteTransaction.class); + doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction(); + doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class), eq(true)); + + assertNotNull("EventSourceRegistryWrapper has not been created correctly.", messageBusAppImplModule.createInstance()); + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImplTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImplTest.java new file mode 100644 index 0000000000..9cce623523 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImplTest.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.app.impl; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.messagebus.spi.EventSource; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class EventSourceRegistrationImplTest { + + EventSourceRegistrationImplLocal eventSourceRegistrationImplLocal; + EventSourceTopology eventSourceTopologyMock; + + @BeforeClass + public static void initTestClass() throws IllegalAccessException, InstantiationException { + } + + @Before + public void setUp() throws Exception { + EventSource eventSourceMock = mock(EventSource.class); + eventSourceTopologyMock = mock(EventSourceTopology.class); + eventSourceRegistrationImplLocal = new EventSourceRegistrationImplLocal(eventSourceMock, eventSourceTopologyMock); + } + + @Test + public void removeRegistrationTest() { + eventSourceRegistrationImplLocal.removeRegistration(); + verify(eventSourceTopologyMock, times(1)).unRegister(any(EventSource.class)); + } + + + private class EventSourceRegistrationImplLocal extends EventSourceRegistrationImpl{ + + /** + * @param instance of EventSource that has been registered by {@link EventSourceRegistryImpl#registerEventSource(Node, org.opendaylight.controller.messagebus.spi.EventSource)} + * @param eventSourceTopology + */ + public EventSourceRegistrationImplLocal(EventSource instance, EventSourceTopology eventSourceTopology) { + super(instance, eventSourceTopology); + } + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java index f369a128ad..9f513c464b 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java @@ -74,7 +74,7 @@ public class EventSourceTopicTest { nodeIdMock = mock(NodeId.class); doReturn(nodeIdMock).when(dataObjectMock).getId(); - doReturn("0").when(nodeIdMock).getValue(); + doReturn("nodeIdPattern1").when(nodeIdMock).getValue(); } @Test @@ -84,4 +84,4 @@ public class EventSourceTopicTest { verify(eventSourceServiceMock, times(1)).joinTopic(any(JoinTopicInput.class)); } -} +} \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java index ced2e1f01b..50ae4d9389 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java @@ -16,13 +16,16 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; @@ -57,6 +60,7 @@ public class EventSourceTopologyTest { CreateTopicInput createTopicInputMock; ListenerRegistration listenerRegistrationMock; NodeKey nodeKey; + RpcRegistration aggregatorRpcReg; @BeforeClass public static void initTestClass() throws IllegalAccessException, InstantiationException { @@ -76,7 +80,7 @@ public class EventSourceTopologyTest { } private void constructorTestHelper(){ - RpcRegistration aggregatorRpcReg = mock(RpcRegistration.class); + aggregatorRpcReg = mock(RpcRegistration.class); EventSourceService eventSourceService = mock(EventSourceService.class); doReturn(aggregatorRpcReg).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class)); doReturn(eventSourceService).when(rpcProviderRegistryMock).getRpcService(EventSourceService.class); @@ -87,11 +91,11 @@ public class EventSourceTopologyTest { doReturn(checkedFutureMock).when(writeTransactionMock).submit(); } -//TODO: create test for createTopic -// public void createTopicTest() throws Exception{ -// createTopicTestHelper(); -// assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock)); -// } + @Test + public void createTopicTest() throws Exception{ + topicTestHelper(); + assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock)); + } private void topicTestHelper() throws Exception{ constructorTestHelper(); @@ -138,6 +142,19 @@ public class EventSourceTopologyTest { assertNotNull("Instance has not been created correctly.", eventSourceTopology.destroyTopic(destroyTopicInput)); } + @Test + public void closeTest() throws Exception{ + constructorTestHelper(); + topicTestHelper(); + Map> localMap = getTopicListenerRegistrations(); + DataChangeListener dataChangeListenerMock = mock(DataChangeListener.class); + ListenerRegistration listenerListenerRegistrationMock = (ListenerRegistration) mock(ListenerRegistration.class); + localMap.put(dataChangeListenerMock, listenerListenerRegistrationMock); + eventSourceTopology.close(); + verify(aggregatorRpcReg, times(1)).close(); + verify(listenerListenerRegistrationMock, times(1)).close(); + } + @Test public void registerTest() throws Exception { topicTestHelper(); @@ -154,4 +171,46 @@ public class EventSourceTopologyTest { verify(routedRpcRegistrationMock, times(1)).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class)); } -} + @Test + public void unregisterTest() throws Exception { + topicTestHelper(); + EventSource eventSourceMock = mock(EventSource.class); + NodeId nodeId = new NodeId("nodeIdValue1"); + nodeKey = new NodeKey(nodeId); + Map> localMap = getRoutedRpcRegistrations(); + NodeKey nodeKeyMock = mock(NodeKey.class); + doReturn(nodeKeyMock).when(eventSourceMock).getSourceNodeKey(); + BindingAwareBroker.RoutedRpcRegistration routedRpcRegistrationMock = (BindingAwareBroker.RoutedRpcRegistration) mock(BindingAwareBroker.RoutedRpcRegistration.class); + localMap.put(nodeKeyMock, routedRpcRegistrationMock); + eventSourceTopology.unRegister(eventSourceMock); + verify(routedRpcRegistrationMock, times(1)).close(); + } + + @Test + public void registerEventSourceTest() throws Exception { + topicTestHelper(); + Node nodeMock = mock(Node.class); + EventSource eventSourceMock = mock(EventSource.class); + NodeId nodeId = new NodeId("nodeIdValue1"); + nodeKey = new NodeKey(nodeId); + doReturn(nodeKey).when(nodeMock).getKey(); + doReturn(nodeKey).when(eventSourceMock).getSourceNodeKey(); + BindingAwareBroker.RoutedRpcRegistration routedRpcRegistrationMock = mock(BindingAwareBroker.RoutedRpcRegistration.class); + doReturn(routedRpcRegistrationMock).when(rpcProviderRegistryMock).addRoutedRpcImplementation(EventSourceService.class, eventSourceMock); + doNothing().when(routedRpcRegistrationMock).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class)); + assertNotNull("Return value has not been created correctly.", eventSourceTopology.registerEventSource(eventSourceMock)); + } + + private Map getTopicListenerRegistrations() throws Exception{ + Field nesField = EventSourceTopology.class.getDeclaredField("topicListenerRegistrations"); + nesField.setAccessible(true); + return (Map) nesField.get(eventSourceTopology); + } + + private Map getRoutedRpcRegistrations() throws Exception{ + Field nesField = EventSourceTopology.class.getDeclaredField("routedRpcRegistrations"); + nesField.setAccessible(true); + return (Map) nesField.get(eventSourceTopology); + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java index 61fa30f40e..1d6b825c9f 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java @@ -84,11 +84,11 @@ public class NetconfEventSourceManagerTest { netconfEventSourceManager = NetconfEventSourceManager.create(dataBrokerMock, - domNotificationPublishServiceMock, - domMountPointServiceMock, - mountPointServiceMock, - eventSourceRegistry, - namespaceToStreamList); + domNotificationPublishServiceMock, + domMountPointServiceMock, + mountPointServiceMock, + eventSourceRegistry, + namespaceToStreamList); } @Test @@ -125,12 +125,14 @@ public class NetconfEventSourceManagerTest { Map mapUpdate = new HashMap<>(); InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class); Node dataObjectMock = mock(Node.class); + if(create){ mapCreate.put(instanceIdentifierMock, dataObjectMock); } if(update){ mapUpdate.put(instanceIdentifierMock, dataObjectMock); } + doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData(); doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData(); NetconfNode netconfNodeMock = mock(NetconfNode.class); @@ -171,4 +173,4 @@ public class NetconfEventSourceManagerTest { doReturn(esrMock).when(eventSourceRegistry).registerEventSource(any(EventSource.class)); } -} +} \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java index 58da9e3eb1..ed9025780a 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java @@ -7,22 +7,8 @@ */ package org.opendaylight.controller.messagebus.app.impl; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.lang.reflect.Field; -import java.net.URI; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.md.sal.binding.api.BindingService; @@ -53,21 +39,32 @@ import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaPath; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; +import java.lang.reflect.Field; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class NetconfEventSourceTest { NetconfEventSource netconfEventSource; DOMMountPoint domMountPointMock; JoinTopicInput joinTopicInputMock; - AsyncDataChangeEvent asyncDataChangeEventMock; - Node dataObjectMock; @Before public void setUp() throws Exception { Map streamMap = new HashMap<>(); - streamMap.put("string1", "string2"); + streamMap.put("uriStr1", "string2"); domMountPointMock = mock(DOMMountPoint.class); DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class); MountPoint mountPointMock = mock(MountPoint.class); @@ -80,9 +77,9 @@ public class NetconfEventSourceTest { doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get(); doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class); org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node node - = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class); + = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class); org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId nodeId - = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1"); + = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1"); doReturn(nodeId).when(node).getNodeId(); netconfEventSource = new NetconfEventSource(node, streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock); } @@ -143,7 +140,7 @@ public class NetconfEventSourceTest { doReturn(topicId).when(joinTopicInputMock).getTopicId(); NotificationPattern notificationPatternMock = mock(NotificationPattern.class); doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern(); - doReturn("regexString1").when(notificationPatternMock).getValue(); + doReturn("uriStr1").when(notificationPatternMock).getValue(); SchemaContext schemaContextMock = mock(SchemaContext.class); doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext(); @@ -165,6 +162,13 @@ public class NetconfEventSourceTest { doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get(); ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class); doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class)); + + Optional optionalMock = (Optional) mock(Optional.class); + doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class); + DOMRpcService domRpcServiceMock = mock(DOMRpcService.class); + doReturn(domRpcServiceMock).when(optionalMock).get(); + CheckedFuture checkedFutureMock = mock(CheckedFuture.class); + doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class)); } //TODO: create Test for NetConfEventSource#onNotification @@ -175,4 +179,4 @@ public class NetconfEventSourceTest { return (Set) nesField.get(netconfEventSource); } -} +} \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java index 6dacb9738a..b3f6438cc4 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java @@ -57,4 +57,4 @@ public class TopicDOMNotificationTest { String bodyString = "TopicDOMNotification [body=" + containerNodeBodyMockToString + "]"; assertEquals("String has not been created correctly.", bodyString, topicDOMNotification.toString()); } -} +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index 5dc8361cc4..57603a5058 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -122,17 +122,28 @@ class RaftActorRecoverySupport { } private void onRecoveredApplyLogEntries(long toIndex) { + long lastUnappliedIndex = context.getLastApplied() + 1; + if(log.isDebugEnabled()) { - log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}", - context.getId(), context.getLastApplied() + 1, toIndex); + log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}", + context.getId(), lastUnappliedIndex, toIndex); } - for (long i = context.getLastApplied() + 1; i <= toIndex; i++) { - batchRecoveredLogEntry(replicatedLog().get(i)); + long lastApplied = lastUnappliedIndex - 1; + for (long i = lastUnappliedIndex; i <= toIndex; i++) { + ReplicatedLogEntry logEntry = replicatedLog().get(i); + if(logEntry != null) { + lastApplied++; + batchRecoveredLogEntry(logEntry); + } else { + // Shouldn't happen but cover it anyway. + log.error("Log entry not found for index {}", i); + break; + } } - context.setLastApplied(toIndex); - context.setCommitIndex(toIndex); + context.setLastApplied(lastApplied); + context.setCommitIndex(lastApplied); } private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 8121f75191..f4f936bf16 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft; import akka.japi.Procedure; import akka.persistence.SnapshotSelectionCriteria; import com.google.protobuf.ByteString; +import java.util.List; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -19,7 +20,6 @@ import org.slf4j.Logger; public class SnapshotManager implements SnapshotState { - private final SnapshotState IDLE = new Idle(); private final SnapshotState CAPTURING = new Capturing(); private final SnapshotState PERSISTING = new Persisting(); @@ -35,6 +35,7 @@ public class SnapshotManager implements SnapshotState { private SnapshotState currentState = IDLE; private CaptureSnapshot captureSnapshot; + private long lastSequenceNumber = -1; public SnapshotManager(RaftActorContext context, Logger logger) { this.context = context; @@ -184,19 +185,26 @@ public class SnapshotManager implements SnapshotState { long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm(); // send a CaptureSnapshot to self to make the expensive operation async. + + List unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1); + captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(), lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm, - newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null); + newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null); SnapshotManager.this.currentState = CAPTURING; - if(targetFollower != null){ - LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot); - } else { + if(captureSnapshot.isInstallSnapshotInitiated()) { LOG.info("{}: Initiating snapshot capture {} to install on {}", persistenceId(), captureSnapshot, targetFollower); + } else { + LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot); } + lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber(); + + LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber); + context.getActor().tell(captureSnapshot, context.getActor()); return true; @@ -261,7 +269,7 @@ public class SnapshotManager implements SnapshotState { // when snapshot is saved async, SaveSnapshotSuccess is raised. Snapshot sn = Snapshot.create(snapshotBytes, - context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), + captureSnapshot.getUnAppliedEntries(), captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); @@ -336,8 +344,9 @@ public class SnapshotManager implements SnapshotState { persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria( sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); - persistenceProvider.deleteMessages(sequenceNumber); + persistenceProvider.deleteMessages(lastSequenceNumber); + lastSequenceNumber = -1; SnapshotManager.this.currentState = IDLE; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java index daa8f7768a..7c182f04e4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java @@ -8,6 +8,10 @@ package org.opendaylight.controller.cluster.raft.base.messages; +import java.util.Collections; +import java.util.List; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; + public class CaptureSnapshot { private final long lastAppliedIndex; private final long lastAppliedTerm; @@ -16,14 +20,17 @@ public class CaptureSnapshot { private final boolean installSnapshotInitiated; private final long replicatedToAllIndex; private final long replicatedToAllTerm; + private final List unAppliedEntries; - public CaptureSnapshot(long lastIndex, long lastTerm, - long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) { - this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false); + public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm, + long replicatedToAllIndex, long replicatedToAllTerm, List unAppliedEntries) { + this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex, replicatedToAllTerm, + unAppliedEntries, false); } - public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex, - long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) { + public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, + long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, + List unAppliedEntries, boolean installSnapshotInitiated) { this.lastIndex = lastIndex; this.lastTerm = lastTerm; this.lastAppliedIndex = lastAppliedIndex; @@ -31,6 +38,7 @@ public class CaptureSnapshot { this.installSnapshotInitiated = installSnapshotInitiated; this.replicatedToAllIndex = replicatedToAllIndex; this.replicatedToAllTerm = replicatedToAllTerm; + this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : Collections.emptyList(); } public long getLastAppliedIndex() { @@ -61,6 +69,10 @@ public class CaptureSnapshot { return replicatedToAllTerm; } + public List getUnAppliedEntries() { + return unAppliedEntries; + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); @@ -68,7 +80,9 @@ public class CaptureSnapshot { .append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=") .append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated) .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=") - .append(replicatedToAllTerm).append("]"); + .append(replicatedToAllTerm).append(", unAppliedEntries size=").append(unAppliedEntries.size()).append("]"); return builder.toString(); } + + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 1289ed7f90..977cf0ef5e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -7,7 +7,6 @@ */ package org.opendaylight.controller.cluster.raft; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import akka.actor.ActorRef; import akka.actor.PoisonPill; @@ -20,6 +19,7 @@ import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -51,7 +51,6 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest private final TestActorRef collectorActor; private final Map, Boolean> dropMessages = new ConcurrentHashMap<>(); - private volatile byte[] snapshot; private TestRaftActor(String id, Map peerAddresses, ConfigParams config, TestActorRef collectorActor) { @@ -112,19 +111,13 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest @Override public void createSnapshot(ActorRef actorRef) { - if(snapshot != null) { - getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender()); + try { + actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef); + } catch (Exception e) { + e.printStackTrace(); } } - @Override - public void applyRecoverySnapshot(byte[] bytes) { - } - - void setSnapshot(byte[] snapshot) { - this.snapshot = snapshot; - } - public ActorRef collectorActor() { return collectorActor; } @@ -158,6 +151,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest protected long initialTerm = 5; protected long currentTerm; + protected List expSnapshotState = new ArrayList<>(); + @After public void tearDown() { InMemoryJournal.clear(); @@ -215,13 +210,20 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest }); } + @SuppressWarnings("unchecked") protected void verifySnapshot(String prefix, Snapshot snapshot, long lastAppliedTerm, - int lastAppliedIndex, long lastTerm, long lastIndex, byte[] data) { + int lastAppliedIndex, long lastTerm, long lastIndex) + throws Exception { assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm()); assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex()); assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm()); assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex()); - assertArrayEquals(prefix + " Snapshot getState", data, snapshot.getState()); + + List actualState = (List)MockRaftActor.toObject(snapshot.getState()); + assertEquals(prefix + " Snapshot getState size", expSnapshotState.size(), actualState.size()); + for(int i = 0; i < expSnapshotState.size(); i++) { + assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i)); + } } protected void verifyPersistedJournal(String persistenceId, List expJournal) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index 53110b3583..586ca8cda0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -154,10 +154,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier)); } - @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { actorDelegate.applyState(clientActor, identifier, data); - LOG.info("{}: applyState called", persistenceId()); + LOG.info("{}: applyState called: {}", persistenceId(), data); + + state.add(data); } @Override @@ -235,7 +236,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return this.getId(); } - private Object toObject(byte[] bs) throws ClassNotFoundException, IOException { + public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; ObjectInputStream ois = null; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java index 2ced72c531..ae9c784a55 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java @@ -124,7 +124,7 @@ public class RaftActorSnapshotMessageSupportTest { @Test public void testOnCaptureSnapshot() throws Exception { - sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1)); + sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null)); ArgumentCaptor procedure = ArgumentCaptor.forClass(Procedure.class); verify(mockSnapshotManager).create(procedure.capture()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 5062f8f6e0..82ebcd1fbd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -60,10 +60,14 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; public class RaftActorTest extends AbstractActorTest { + static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class); + private TestActorFactory factory; @Before @@ -91,6 +95,8 @@ public class RaftActorTest extends AbstractActorTest { @Test public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception { + TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting"); + new JavaTestKit(getSystem()) {{ String persistenceId = factory.generateActorId("follower-"); @@ -101,9 +107,9 @@ public class RaftActorTest extends AbstractActorTest { // log entry. config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + ImmutableMap peerAddresses = ImmutableMap.builder().put("member1", "address").build(); ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId, - ImmutableMap.builder().put("member1", "address").build(), - Optional.of(config)), persistenceId); + peerAddresses, Optional.of(config)), persistenceId); watch(followerActor); @@ -156,8 +162,7 @@ public class RaftActorTest extends AbstractActorTest { //reinstate the actor TestActorRef ref = factory.createTestActor( - MockRaftActor.props(persistenceId, Collections.emptyMap(), - Optional.of(config))); + MockRaftActor.props(persistenceId, peerAddresses, Optional.of(config))); MockRaftActor mockRaftActor = ref.underlyingActor(); @@ -176,6 +181,8 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState()); }}; + + TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending"); } @Test @@ -275,7 +282,7 @@ public class RaftActorTest extends AbstractActorTest { doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot)); mockRaftActor.handleCommand(applySnapshot); - CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1); + CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null); doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot)); mockRaftActor.handleCommand(captureSnapshot); @@ -863,7 +870,7 @@ public class RaftActorTest extends AbstractActorTest { }}; } - private ByteString fromObject(Object snapshot) throws Exception { + public static ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; try { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java new file mode 100644 index 0000000000..a8f490e751 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import static org.junit.Assert.assertEquals; +import akka.persistence.SaveSnapshotSuccess; +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; + +/** + * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication. + * + * @author Thomas Pantelis + */ +public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { + + private MockPayload payload0; + private MockPayload payload1; + + @Before + public void setup() { + follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)), + newFollowerConfigParams()); + + peerAddresses = ImmutableMap.builder(). + put(follower1Id, follower1Actor.path().toString()).build(); + + leaderConfigParams = newLeaderConfigParams(); + leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams); + + follower1CollectorActor = follower1Actor.underlyingActor().collectorActor(); + leaderCollectorActor = leaderActor.underlyingActor().collectorActor(); + + leaderContext = leaderActor.underlyingActor().getRaftActorContext(); + } + + @Test + public void testStatePersistedBetweenSnapshotCaptureAndPersist() { + + send2InitialPayloads(); + + // Block these messages initially so we can control the sequence. + leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class); + leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class); + follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); + + MockPayload payload2 = sendPayloadData(leaderActor, "two"); + + // This should trigger a snapshot. + MockPayload payload3 = sendPayloadData(leaderActor, "three"); + + MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3); + + CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching( + leaderCollectorActor, CaptureSnapshot.class); + + // First, deliver the CaptureSnapshot to the leader. + leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class); + leaderActor.tell(captureSnapshot, leaderActor); + + // Send another payload. + MockPayload payload4 = sendPayloadData(leaderActor, "four"); + + // Now deliver the AppendEntries to the follower + follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class); + + MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3); + + // Now deliver the CaptureSnapshotReply to the leader. + CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching( + leaderCollectorActor, CaptureSnapshotReply.class); + leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class); + leaderActor.tell(captureSnapshotReply, leaderActor); + + // Wait for snapshot complete. + MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); + + reinstateLeaderActor(); + + assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); + assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex()); + assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size()); + assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 4, leaderContext.getCommitIndex()); + assertEquals("Leader last applied", 4, leaderContext.getLastApplied()); + + assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4), + leaderActor.underlyingActor().getState()); + } + + @Test + public void testStatePersistedBetweenInitiateSnapshotAndCapture() { + + send2InitialPayloads(); + + // Block these messages initially so we can control the sequence. + leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class); + follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); + + MockPayload payload2 = sendPayloadData(leaderActor, "two"); + + // This should trigger a snapshot. + MockPayload payload3 = sendPayloadData(leaderActor, "three"); + + // Send another payload. + MockPayload payload4 = sendPayloadData(leaderActor, "four"); + + MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3); + + CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching( + leaderCollectorActor, CaptureSnapshot.class); + + // First, deliver the AppendEntries to the follower + follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class); + + MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3); + + // Now deliver the CaptureSnapshot to the leader. + leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class); + leaderActor.tell(captureSnapshot, leaderActor); + + // Wait for snapshot complete. + MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); + + reinstateLeaderActor(); + + assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); + assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex()); + assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size()); + assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 4, leaderContext.getCommitIndex()); + assertEquals("Leader last applied", 4, leaderContext.getLastApplied()); + + // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so + // were included in the snapshot. They were also included as unapplied entries in the snapshot as + // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the + // state on recovery by the ApplyJournalEntries messages which remained in the persisted log. + // This is a side effect of trimming the persisted log to the sequence number captured at the time + // the snapshot was initiated. + assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2, + payload3, payload4), leaderActor.underlyingActor().getState()); + } + + @Test + public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() { + + send2InitialPayloads(); + + // Block these messages initially so we can control the sequence. + follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); + + MockPayload payload2 = sendPayloadData(leaderActor, "two"); + + // This should trigger a snapshot. + MockPayload payload3 = sendPayloadData(leaderActor, "three"); + + // Send another payload. + MockPayload payload4 = sendPayloadData(leaderActor, "four"); + + MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3); + + // Wait for snapshot complete. + MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); + + // Now deliver the AppendEntries to the follower + follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class); + + MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3); + + reinstateLeaderActor(); + + assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); + assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex()); + assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size()); + assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 4, leaderContext.getCommitIndex()); + assertEquals("Leader last applied", 4, leaderContext.getLastApplied()); + + assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4), + leaderActor.underlyingActor().getState()); + } + + private void reinstateLeaderActor() { + killActor(leaderActor); + + leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams); + + leaderActor.underlyingActor().waitForRecoveryComplete(); + + leaderContext = leaderActor.underlyingActor().getRaftActorContext(); + } + + private void send2InitialPayloads() { + waitUntilLeader(leaderActor); + currentTerm = leaderContext.getTermInformation().getCurrentTerm(); + + payload0 = sendPayloadData(leaderActor, "zero"); + payload1 = sendPayloadData(leaderActor, "one"); + + // Verify the leader applies the states. + MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2); + + assertEquals("Leader last applied", 1, leaderContext.getLastApplied()); + + MessageCollectorActor.clearMessages(leaderCollectorActor); + MessageCollectorActor.clearMessages(follower1CollectorActor); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java index bd670fd581..c74705d13f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java @@ -14,7 +14,7 @@ import java.util.List; import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; @@ -36,13 +36,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt private MockPayload recoveredPayload0; private MockPayload recoveredPayload1; private MockPayload recoveredPayload2; + private MockPayload payload3; private MockPayload payload4; private MockPayload payload5; private MockPayload payload6; private MockPayload payload7; @Test - public void runTest() { + public void runTest() throws Exception { testLog.info("testReplicationAndSnapshots starting"); // Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less @@ -55,7 +56,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1)); recoveredPayload2 = new MockPayload("two"); InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2)); - InMemoryJournal.addEntry(leaderId, seqId++, new ApplyLogEntries(2)); + InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2)); origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class); @@ -157,19 +158,21 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt * 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this * scenario, the follower consensus and application of state is delayed until after the snapshot * completes. + * @throws Exception */ - private void testFirstSnapshot() { + private void testFirstSnapshot() throws Exception { testLog.info("testFirstSnapshot starting"); - byte[] snapshot = new byte[] {1,2,3,4}; - leaderActor.underlyingActor().setSnapshot(snapshot); + expSnapshotState.add(recoveredPayload0); + expSnapshotState.add(recoveredPayload1); + expSnapshotState.add(recoveredPayload2); // Delay the consensus by temporarily dropping the AppendEntries to both followers. follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); // Send the payload. - MockPayload payload3 = sendPayloadData(leaderActor, "three"); + payload3 = sendPayloadData(leaderActor, "three"); // Wait for snapshot complete. MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); @@ -185,7 +188,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index. List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3, snapshot); + verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3); @@ -286,12 +289,15 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt /** * Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until * consensus occurs and the leader applies the state. + * @throws Exception */ - private void testSecondSnapshot() { + private void testSecondSnapshot() throws Exception { testLog.info("testSecondSnapshot starting"); - byte[] snapshot = new byte[] {5,6,7,8}; - leaderActor.underlyingActor().setSnapshot(snapshot); + expSnapshotState.add(payload3); + expSnapshotState.add(payload4); + expSnapshotState.add(payload5); + expSnapshotState.add(payload6); // Delay the CaptureSnapshot message to the leader actor. leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class); @@ -341,11 +347,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex()); assertEquals("Leader commit index", 7, leaderContext.getCommitIndex()); - // Verify the persisted snapshot. This should reflect the advanced snapshot index as the last applied - // log entry (6). + expSnapshotState.add(payload7); + + // Verify the persisted snapshot. This should reflect the snapshot index as the last applied + // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data + // when the snapshot is created (ie when the CaptureSnapshot is processed). List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7, snapshot); + verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7); @@ -404,6 +413,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt leaderActor.underlyingActor().waitForRecoveryComplete(); + leaderContext = leaderActor.underlyingActor().getRaftActorContext(); + assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex()); assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java index d4a9f7701b..ff9b8ce630 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java @@ -7,7 +7,6 @@ */ package org.opendaylight.controller.cluster.raft; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import akka.persistence.SaveSnapshotSuccess; import com.google.common.collect.ImmutableMap; @@ -185,6 +184,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); + expSnapshotState.add(payload0); + expSnapshotState.add(payload1); + expSnapshotState.add(payload2); + testLog.info("testInitialReplications complete"); } @@ -198,8 +201,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A testLog.info("testSubsequentReplicationsAndSnapshots starting: sending first payload, replicatedToAllIndex: {}", leader.getReplicatedToAllIndex()); - leaderActor.underlyingActor().setSnapshot(new byte[] {2}); - follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); // Send the first payload - this should cause the first snapshot. @@ -207,8 +208,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); - byte[] snapshot = new byte[] {6}; - leaderActor.underlyingActor().setSnapshot(snapshot); + expSnapshotState.add(payload3); testLog.info("testSubsequentReplicationsAndSnapshots: sending 4 more payloads"); @@ -273,7 +273,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify the leader's persisted snapshot. List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7, snapshot); + verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 4, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4); @@ -313,6 +313,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); + expSnapshotState.add(payload4); + expSnapshotState.add(payload5); + expSnapshotState.add(payload6); + expSnapshotState.add(payload7); + testLog.info("testSubsequentReplicationsAndSnapshots complete"); } @@ -327,8 +332,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A leader.getReplicatedToAllIndex()); leaderActor.underlyingActor().setMockTotalMemory(1000); - byte[] snapshot = new byte[] {6}; - leaderActor.underlyingActor().setSnapshot(snapshot); // We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal. InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2); @@ -351,6 +354,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class); Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot); + expSnapshotState.add(payload8); + // Send another payload with a large enough relative size in combination with the last payload // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot. payload9 = sendPayloadData(leaderActor, "nine", 201); @@ -383,7 +388,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify the leader's persisted snapshot. List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9, snapshot); + verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 9, payload9); @@ -451,6 +456,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.clearMessages(leaderCollectorActor); MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); + + expSnapshotState.add(payload10); } /** @@ -467,8 +474,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A InstallSnapshot installSnapshot; InstallSnapshotReply installSnapshotReply; - byte[] snapshot = new byte[] {10}; - leaderActor.underlyingActor().setSnapshot(snapshot); + expSnapshotState.add(payload9); // Now stop dropping AppendEntries in follower 2. follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class); @@ -480,7 +486,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks()); assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm()); assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex()); - assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray()); + //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray()); installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class); assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm()); @@ -490,7 +496,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify follower 2 applies the snapshot. applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class); - verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8, snapshot); + verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8); assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size()); // Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot. @@ -523,7 +529,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0); Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1); - verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9, snapshot); + verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9); unAppliedEntry = persistedSnapshot.getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size()); @@ -535,16 +541,14 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A /** * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower. + * @throws Exception */ - private void testFinalReplicationsAndSnapshot() { + private void testFinalReplicationsAndSnapshot() throws Exception { List applyStates; ApplyState applyState; testLog.info("testFinalReplicationsAndSnapshot starting: replicatedToAllIndex: {}", leader.getReplicatedToAllIndex()); - byte[] snapshot = new byte[] {14}; - leaderActor.underlyingActor().setSnapshot(snapshot); - // Send another payload - a snapshot should occur. payload11 = sendPayloadData(leaderActor, "eleven"); @@ -557,7 +561,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify the leader's last persisted snapshot (previous ones may not be purged yet). List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1); - verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11, snapshot); + verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11); List unAppliedEntry = persistedSnapshot.getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 11, payload11); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java index 5a0d5aed74..8ab762f786 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -69,6 +69,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(10L).when(mockConfigParams).getSnapshotBatchCount(); doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog(); doReturn("123").when(mockRaftActorContext).getId(); + doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider(); doReturn("123").when(mockRaftActorBehavior).getLeaderId(); ElectionTerm mockElectionTerm = mock(ElectionTerm.class); @@ -384,6 +385,8 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testCommit(){ + doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber(); + // when replicatedToAllIndex = -1 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); @@ -397,7 +400,7 @@ public class SnapshotManagerTest extends AbstractActorTest { verify(mockReplicatedLog).snapshotCommit(); - verify(mockDataPersistenceProvider).deleteMessages(100L); + verify(mockDataPersistenceProvider).deleteMessages(50L); ArgumentCaptor criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class); @@ -438,6 +441,8 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testCallingCommitMultipleTimesCausesNoHarm(){ + doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber(); + // when replicatedToAllIndex = -1 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); @@ -453,7 +458,7 @@ public class SnapshotManagerTest extends AbstractActorTest { verify(mockReplicatedLog, times(1)).snapshotCommit(); - verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L); + verify(mockDataPersistenceProvider, times(1)).deleteMessages(50L); verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class)); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java index 0737d75a7f..d482e28401 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java @@ -216,6 +216,7 @@ public class InMemoryJournal extends AsyncWriteJournal { @Override public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) { + LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr); Map journal = journals.get(persistenceId); if(journal != null) { synchronized (journal) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index a0987cd5d6..71799c92d4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -40,6 +40,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -219,7 +220,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction>> nodes; - private final InMemoryDOMDataStore dataStore; - - NormalizedNodeAggregator(YangInstanceIdentifier rootIdentifier, List>> nodes, - SchemaContext schemaContext){ + private final DataTree dataTree; + private NormalizedNodeAggregator(YangInstanceIdentifier rootIdentifier, List>> nodes, + SchemaContext schemaContext) { this.rootIdentifier = rootIdentifier; this.nodes = nodes; - this.dataStore = new InMemoryDOMDataStore("aggregator", executorService); - this.dataStore.onGlobalContextUpdated(schemaContext); + this.dataTree = InMemoryDataTreeFactory.getInstance().create(); + this.dataTree.setSchemaContext(schemaContext); } /** @@ -46,44 +37,35 @@ public class NormalizedNodeAggregator { * @param nodes * @param schemaContext * @return - * @throws ExecutionException - * @throws InterruptedException + * @throws DataValidationFailedException */ public static Optional> aggregate(YangInstanceIdentifier rootIdentifier, List>> nodes, - SchemaContext schemaContext) - throws ExecutionException, InterruptedException { + SchemaContext schemaContext) throws DataValidationFailedException { return new NormalizedNodeAggregator(rootIdentifier, nodes, schemaContext).aggregate(); } - private Optional> aggregate() throws ExecutionException, InterruptedException { + private Optional> aggregate() throws DataValidationFailedException { return combine().getRootNode(); } - private NormalizedNodeAggregator combine() throws InterruptedException, ExecutionException { - DOMStoreWriteTransaction domStoreWriteTransaction = dataStore.newWriteOnlyTransaction(); + private NormalizedNodeAggregator combine() throws DataValidationFailedException { + DataTreeModification mod = dataTree.takeSnapshot().newModification(); - for(Optional> node : nodes) { - if(node.isPresent()) { - domStoreWriteTransaction.merge(rootIdentifier, node.get()); + for (Optional> node : nodes) { + if (node.isPresent()) { + mod.merge(rootIdentifier, node.get()); } } - DOMStoreThreePhaseCommitCohort ready = domStoreWriteTransaction.ready(); - ready.canCommit().get(); - ready.preCommit().get(); - ready.commit().get(); + + dataTree.validate(mod); + final DataTreeCandidate candidate = dataTree.prepare(mod); + dataTree.commit(candidate); return this; } - private Optional> getRootNode() throws InterruptedException, ExecutionException { - DOMStoreReadTransaction readTransaction = dataStore.newReadOnlyTransaction(); - - CheckedFuture>, ReadFailedException> read = - readTransaction.read(rootIdentifier); - - return read.get(); + private Optional> getRootNode() { + return dataTree.takeSnapshot().readNode(rootIdentifier); } - - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java index 40d3704d2c..8c8631089c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java @@ -29,13 +29,14 @@ import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class NormalizedNodeAggregatorTest { @Test - public void testAggregate() throws InterruptedException, ExecutionException, ReadFailedException { + public void testAggregate() throws InterruptedException, ExecutionException, ReadFailedException, DataValidationFailedException { SchemaContext schemaContext = SchemaContextHelper.full(); NormalizedNode expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); NormalizedNode expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);