Merge "Split out sal-akka-raft example"
authorMoiz Raja <moraja@cisco.com>
Wed, 15 Apr 2015 18:29:20 +0000 (18:29 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 15 Apr 2015 18:29:20 +0000 (18:29 +0000)
23 files changed:
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImplTest.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java

index 85d1a1b..e26502f 100644 (file)
@@ -7,17 +7,37 @@
  */
 package org.opendaylight.controller.config.yang.messagebus.app.impl;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.JmxAttribute;
 import org.opendaylight.controller.config.api.ModuleIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.osgi.framework.BundleContext;
 
+import javax.management.ObjectName;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
 public class MessageBusAppImplModuleTest {
 
     MessageBusAppImplModule messageBusAppImplModule;
@@ -55,5 +75,34 @@ public class MessageBusAppImplModuleTest {
         assertEquals("Set and/or get method/s don't work correctly.", bundleContext, messageBusAppImplModule.getBundleContext());
     }
 
-    //TODO: create MessageBusAppImplModule.createInstance test
-}
+    @Test
+    public void createInstanceTest() throws Exception{
+        org.opendaylight.controller.sal.binding.api.BindingAwareBroker bindingAwareBrokerMock = mock(org.opendaylight.controller.sal.binding.api.BindingAwareBroker.class);
+        Broker brokerMock = mock(Broker.class);
+        doReturn(brokerMock).when(dependencyResolverMock).resolveInstance(eq(org.opendaylight.controller.sal.core.api.Broker.class), any(ObjectName.class), any(JmxAttribute.class));
+        doReturn(bindingAwareBrokerMock).when(dependencyResolverMock).resolveInstance(eq(org.opendaylight.controller.sal.binding.api.BindingAwareBroker.class), any(ObjectName.class), any(JmxAttribute.class));
+        messageBusAppImplModule.resolveDependencies();
+
+        BindingAwareBroker.ProviderContext providerContext = mock(BindingAwareBroker.ProviderContext.class);
+        doReturn(providerContext).when(bindingAwareBrokerMock).registerProvider(any(BindingAwareProvider.class));
+        Broker.ProviderSession providerSessionMock = mock(Broker.ProviderSession.class);
+        doReturn(providerSessionMock).when(brokerMock).registerProvider(any(Provider.class));
+        DataBroker dataBrokerMock = mock(DataBroker.class);
+        doReturn(dataBrokerMock).when(providerContext).getSALService(eq(DataBroker.class));
+        DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
+        doReturn(domNotificationPublishServiceMock).when(providerSessionMock).getService(DOMNotificationPublishService.class);
+        DOMMountPointService domMountPointServiceMock = mock(DOMMountPointService.class);
+        doReturn(domMountPointServiceMock).when(providerSessionMock).getService(DOMMountPointService.class);
+        MountPointService mountPointServiceMock = mock(MountPointService.class);
+        doReturn(mountPointServiceMock).when(providerContext).getSALService(eq(MountPointService.class));
+        RpcProviderRegistry rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
+        doReturn(rpcProviderRegistryMock).when(providerContext).getSALService(eq(RpcProviderRegistry.class));
+
+        WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
+        doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
+        doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class), eq(true));
+
+        assertNotNull("EventSourceRegistryWrapper has not been created correctly.", messageBusAppImplModule.createInstance());
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImplTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImplTest.java
new file mode 100644 (file)
index 0000000..9cce623
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class EventSourceRegistrationImplTest {
+
+    EventSourceRegistrationImplLocal eventSourceRegistrationImplLocal;
+    EventSourceTopology eventSourceTopologyMock;
+
+    @BeforeClass
+    public static void initTestClass() throws IllegalAccessException, InstantiationException {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        EventSource eventSourceMock = mock(EventSource.class);
+        eventSourceTopologyMock = mock(EventSourceTopology.class);
+        eventSourceRegistrationImplLocal = new EventSourceRegistrationImplLocal(eventSourceMock, eventSourceTopologyMock);
+    }
+
+    @Test
+    public void removeRegistrationTest() {
+        eventSourceRegistrationImplLocal.removeRegistration();
+        verify(eventSourceTopologyMock, times(1)).unRegister(any(EventSource.class));
+    }
+
+
+    private class EventSourceRegistrationImplLocal extends EventSourceRegistrationImpl{
+
+        /**
+         * @param instance            of EventSource that has been registered by {@link EventSourceRegistryImpl#registerEventSource(Node, org.opendaylight.controller.messagebus.spi.EventSource)}
+         * @param eventSourceTopology
+         */
+        public EventSourceRegistrationImplLocal(EventSource instance, EventSourceTopology eventSourceTopology) {
+            super(instance, eventSourceTopology);
+        }
+    }
+
+}
\ No newline at end of file
index f369a12..9f513c4 100644 (file)
@@ -74,7 +74,7 @@ public class EventSourceTopicTest {
 
         nodeIdMock = mock(NodeId.class);
         doReturn(nodeIdMock).when(dataObjectMock).getId();
-        doReturn("0").when(nodeIdMock).getValue();
+        doReturn("nodeIdPattern1").when(nodeIdMock).getValue();
     }
 
     @Test
@@ -84,4 +84,4 @@ public class EventSourceTopicTest {
         verify(eventSourceServiceMock, times(1)).joinTopic(any(JoinTopicInput.class));
     }
 
-}
+}
\ No newline at end of file
index ced2e1f..50ae4d9 100644 (file)
@@ -16,13 +16,16 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -57,6 +60,7 @@ public class EventSourceTopologyTest {
     CreateTopicInput createTopicInputMock;
     ListenerRegistration listenerRegistrationMock;
     NodeKey nodeKey;
+    RpcRegistration<EventAggregatorService> aggregatorRpcReg;
 
     @BeforeClass
     public static void initTestClass() throws IllegalAccessException, InstantiationException {
@@ -76,7 +80,7 @@ public class EventSourceTopologyTest {
     }
 
     private void constructorTestHelper(){
-        RpcRegistration<EventAggregatorService> aggregatorRpcReg = mock(RpcRegistration.class);
+        aggregatorRpcReg = mock(RpcRegistration.class);
         EventSourceService eventSourceService = mock(EventSourceService.class);
         doReturn(aggregatorRpcReg).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
         doReturn(eventSourceService).when(rpcProviderRegistryMock).getRpcService(EventSourceService.class);
@@ -87,11 +91,11 @@ public class EventSourceTopologyTest {
         doReturn(checkedFutureMock).when(writeTransactionMock).submit();
     }
 
-//TODO: create test for createTopic
-//    public void createTopicTest() throws Exception{
-//        createTopicTestHelper();
-//        assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
-//    }
+    @Test
+    public void createTopicTest() throws Exception{
+        topicTestHelper();
+        assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
+    }
 
     private void topicTestHelper() throws Exception{
         constructorTestHelper();
@@ -138,6 +142,19 @@ public class EventSourceTopologyTest {
         assertNotNull("Instance has not been created correctly.", eventSourceTopology.destroyTopic(destroyTopicInput));
     }
 
+    @Test
+    public void closeTest() throws Exception{
+        constructorTestHelper();
+        topicTestHelper();
+        Map<DataChangeListener, ListenerRegistration<DataChangeListener>> localMap = getTopicListenerRegistrations();
+        DataChangeListener dataChangeListenerMock = mock(DataChangeListener.class);
+        ListenerRegistration<DataChangeListener> listenerListenerRegistrationMock = (ListenerRegistration<DataChangeListener>) mock(ListenerRegistration.class);
+        localMap.put(dataChangeListenerMock, listenerListenerRegistrationMock);
+        eventSourceTopology.close();
+        verify(aggregatorRpcReg, times(1)).close();
+        verify(listenerListenerRegistrationMock, times(1)).close();
+    }
+
     @Test
     public void registerTest() throws Exception {
         topicTestHelper();
@@ -154,4 +171,46 @@ public class EventSourceTopologyTest {
         verify(routedRpcRegistrationMock, times(1)).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
     }
 
-}
+    @Test
+    public void unregisterTest() throws Exception {
+        topicTestHelper();
+        EventSource eventSourceMock = mock(EventSource.class);
+        NodeId nodeId = new NodeId("nodeIdValue1");
+        nodeKey = new NodeKey(nodeId);
+        Map<NodeKey, BindingAwareBroker.RoutedRpcRegistration<EventSourceService>> localMap = getRoutedRpcRegistrations();
+        NodeKey nodeKeyMock = mock(NodeKey.class);
+        doReturn(nodeKeyMock).when(eventSourceMock).getSourceNodeKey();
+        BindingAwareBroker.RoutedRpcRegistration<EventSourceService> routedRpcRegistrationMock = (BindingAwareBroker.RoutedRpcRegistration<EventSourceService>) mock(BindingAwareBroker.RoutedRpcRegistration.class);
+        localMap.put(nodeKeyMock, routedRpcRegistrationMock);
+        eventSourceTopology.unRegister(eventSourceMock);
+        verify(routedRpcRegistrationMock, times(1)).close();
+    }
+
+    @Test
+    public void registerEventSourceTest() throws Exception {
+        topicTestHelper();
+        Node nodeMock = mock(Node.class);
+        EventSource eventSourceMock = mock(EventSource.class);
+        NodeId nodeId = new NodeId("nodeIdValue1");
+        nodeKey = new NodeKey(nodeId);
+        doReturn(nodeKey).when(nodeMock).getKey();
+        doReturn(nodeKey).when(eventSourceMock).getSourceNodeKey();
+        BindingAwareBroker.RoutedRpcRegistration routedRpcRegistrationMock = mock(BindingAwareBroker.RoutedRpcRegistration.class);
+        doReturn(routedRpcRegistrationMock).when(rpcProviderRegistryMock).addRoutedRpcImplementation(EventSourceService.class, eventSourceMock);
+        doNothing().when(routedRpcRegistrationMock).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
+        assertNotNull("Return value has not been created correctly.", eventSourceTopology.registerEventSource(eventSourceMock));
+    }
+
+    private Map getTopicListenerRegistrations() throws Exception{
+        Field nesField = EventSourceTopology.class.getDeclaredField("topicListenerRegistrations");
+        nesField.setAccessible(true);
+        return (Map) nesField.get(eventSourceTopology);
+    }
+
+    private Map getRoutedRpcRegistrations() throws Exception{
+        Field nesField = EventSourceTopology.class.getDeclaredField("routedRpcRegistrations");
+        nesField.setAccessible(true);
+        return (Map) nesField.get(eventSourceTopology);
+    }
+
+}
\ No newline at end of file
index 61fa30f..1d6b825 100644 (file)
@@ -84,11 +84,11 @@ public class NetconfEventSourceManagerTest {
 
         netconfEventSourceManager =
                 NetconfEventSourceManager.create(dataBrokerMock,
-                                                 domNotificationPublishServiceMock,
-                                                 domMountPointServiceMock,
-                                                 mountPointServiceMock,
-                                                 eventSourceRegistry,
-                                                 namespaceToStreamList);
+                        domNotificationPublishServiceMock,
+                        domMountPointServiceMock,
+                        mountPointServiceMock,
+                        eventSourceRegistry,
+                        namespaceToStreamList);
     }
 
     @Test
@@ -125,12 +125,14 @@ public class NetconfEventSourceManagerTest {
         Map<InstanceIdentifier, DataObject> mapUpdate = new HashMap<>();
         InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
         Node dataObjectMock = mock(Node.class);
+
         if(create){
             mapCreate.put(instanceIdentifierMock, dataObjectMock);
         }
         if(update){
             mapUpdate.put(instanceIdentifierMock, dataObjectMock);
         }
+
         doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData();
         doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData();
         NetconfNode netconfNodeMock = mock(NetconfNode.class);
@@ -171,4 +173,4 @@ public class NetconfEventSourceManagerTest {
         doReturn(esrMock).when(eventSourceRegistry).registerEventSource(any(EventSource.class));
     }
 
-}
+}
\ No newline at end of file
index 58da9e3..ed90257 100644 (file)
@@ -7,22 +7,8 @@
  */
 package org.opendaylight.controller.messagebus.app.impl;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.binding.api.BindingService;
@@ -53,21 +39,32 @@ import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class NetconfEventSourceTest {
 
     NetconfEventSource netconfEventSource;
     DOMMountPoint domMountPointMock;
     JoinTopicInput joinTopicInputMock;
-    AsyncDataChangeEvent asyncDataChangeEventMock;
-    Node dataObjectMock;
 
     @Before
     public void setUp() throws Exception {
         Map<String, String> streamMap = new HashMap<>();
-        streamMap.put("string1", "string2");
+        streamMap.put("uriStr1", "string2");
         domMountPointMock = mock(DOMMountPoint.class);
         DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
         MountPoint mountPointMock = mock(MountPoint.class);
@@ -80,9 +77,9 @@ public class NetconfEventSourceTest {
         doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
         doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
         org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node node
-            = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class);
+                = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class);
         org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId nodeId
-            = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1");
+                = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1");
         doReturn(nodeId).when(node).getNodeId();
         netconfEventSource = new NetconfEventSource(node, streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock);
     }
@@ -143,7 +140,7 @@ public class NetconfEventSourceTest {
         doReturn(topicId).when(joinTopicInputMock).getTopicId();
         NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
         doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
-        doReturn("regexString1").when(notificationPatternMock).getValue();
+        doReturn("uriStr1").when(notificationPatternMock).getValue();
 
         SchemaContext schemaContextMock = mock(SchemaContext.class);
         doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
@@ -165,6 +162,13 @@ public class NetconfEventSourceTest {
         doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
         ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class);
         doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class));
+
+        Optional<DOMService> optionalMock = (Optional<DOMService>) mock(Optional.class);
+        doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class);
+        DOMRpcService domRpcServiceMock = mock(DOMRpcService.class);
+        doReturn(domRpcServiceMock).when(optionalMock).get();
+        CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+        doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class));
     }
 
 //TODO: create Test for NetConfEventSource#onNotification
@@ -175,4 +179,4 @@ public class NetconfEventSourceTest {
         return (Set) nesField.get(netconfEventSource);
     }
 
-}
+}
\ No newline at end of file
index 6dacb97..b3f6438 100644 (file)
@@ -57,4 +57,4 @@ public class TopicDOMNotificationTest {
         String bodyString = "TopicDOMNotification [body=" + containerNodeBodyMockToString + "]";
         assertEquals("String has not been created correctly.", bodyString, topicDOMNotification.toString());
     }
-}
+}
\ No newline at end of file
index 5dc8361..57603a5 100644 (file)
@@ -122,17 +122,28 @@ class RaftActorRecoverySupport {
     }
 
     private void onRecoveredApplyLogEntries(long toIndex) {
+        long lastUnappliedIndex = context.getLastApplied() + 1;
+
         if(log.isDebugEnabled()) {
-            log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    context.getId(), context.getLastApplied() + 1, toIndex);
+            log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
+                    context.getId(), lastUnappliedIndex, toIndex);
         }
 
-        for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
-            batchRecoveredLogEntry(replicatedLog().get(i));
+        long lastApplied = lastUnappliedIndex - 1;
+        for (long i = lastUnappliedIndex; i <= toIndex; i++) {
+            ReplicatedLogEntry logEntry = replicatedLog().get(i);
+            if(logEntry != null) {
+                lastApplied++;
+                batchRecoveredLogEntry(logEntry);
+            } else {
+                // Shouldn't happen but cover it anyway.
+                log.error("Log entry not found for index {}", i);
+                break;
+            }
         }
 
-        context.setLastApplied(toIndex);
-        context.setCommitIndex(toIndex);
+        context.setLastApplied(lastApplied);
+        context.setCommitIndex(lastApplied);
     }
 
     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
index 8121f75..f4f936b 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.protobuf.ByteString;
+import java.util.List;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -19,7 +20,6 @@ import org.slf4j.Logger;
 
 public class SnapshotManager implements SnapshotState {
 
-
     private final SnapshotState IDLE = new Idle();
     private final SnapshotState CAPTURING = new Capturing();
     private final SnapshotState PERSISTING = new Persisting();
@@ -35,6 +35,7 @@ public class SnapshotManager implements SnapshotState {
 
     private SnapshotState currentState = IDLE;
     private CaptureSnapshot captureSnapshot;
+    private long lastSequenceNumber = -1;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
@@ -184,19 +185,26 @@ public class SnapshotManager implements SnapshotState {
             long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
 
             // send a CaptureSnapshot to self to make the expensive operation async.
+
+            List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
+
             captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
                     lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
-                    newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
+                    newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
 
             SnapshotManager.this.currentState = CAPTURING;
 
-            if(targetFollower != null){
-                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
-            } else {
+            if(captureSnapshot.isInstallSnapshotInitiated()) {
                 LOG.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
+            } else {
+                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
             }
 
+            lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
+
+            LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
+
             context.getActor().tell(captureSnapshot, context.getActor());
 
             return true;
@@ -261,7 +269,7 @@ public class SnapshotManager implements SnapshotState {
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
             Snapshot sn = Snapshot.create(snapshotBytes,
-                    context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+                    captureSnapshot.getUnAppliedEntries(),
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
@@ -336,8 +344,9 @@ public class SnapshotManager implements SnapshotState {
             persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
                     sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
-            persistenceProvider.deleteMessages(sequenceNumber);
+            persistenceProvider.deleteMessages(lastSequenceNumber);
 
+            lastSequenceNumber = -1;
             SnapshotManager.this.currentState = IDLE;
         }
 
index daa8f77..7c182f0 100644 (file)
@@ -8,6 +8,10 @@
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
+import java.util.Collections;
+import java.util.List;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
 public class CaptureSnapshot {
     private final long lastAppliedIndex;
     private final long lastAppliedTerm;
@@ -16,14 +20,17 @@ public class CaptureSnapshot {
     private final boolean installSnapshotInitiated;
     private final long replicatedToAllIndex;
     private final long replicatedToAllTerm;
+    private final List<ReplicatedLogEntry> unAppliedEntries;
 
-    public CaptureSnapshot(long lastIndex, long lastTerm,
-        long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) {
-        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false);
+    public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm,
+            long replicatedToAllIndex, long replicatedToAllTerm, List<ReplicatedLogEntry> unAppliedEntries) {
+        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex, replicatedToAllTerm,
+                unAppliedEntries, false);
     }
 
-    public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex,
-        long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) {
+    public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex,
+            long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm,
+            List<ReplicatedLogEntry> unAppliedEntries, boolean installSnapshotInitiated) {
         this.lastIndex = lastIndex;
         this.lastTerm = lastTerm;
         this.lastAppliedIndex = lastAppliedIndex;
@@ -31,6 +38,7 @@ public class CaptureSnapshot {
         this.installSnapshotInitiated = installSnapshotInitiated;
         this.replicatedToAllIndex = replicatedToAllIndex;
         this.replicatedToAllTerm = replicatedToAllTerm;
+        this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : Collections.<ReplicatedLogEntry>emptyList();
     }
 
     public long getLastAppliedIndex() {
@@ -61,6 +69,10 @@ public class CaptureSnapshot {
         return replicatedToAllTerm;
     }
 
+    public List<ReplicatedLogEntry> getUnAppliedEntries() {
+        return unAppliedEntries;
+    }
+
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
@@ -68,7 +80,9 @@ public class CaptureSnapshot {
                 .append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=")
                 .append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated)
                 .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=")
-                .append(replicatedToAllTerm).append("]");
+                .append(replicatedToAllTerm).append(", unAppliedEntries size=").append(unAppliedEntries.size()).append("]");
         return builder.toString();
     }
+
+
 }
index 1289ed7..977cf0e 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
@@ -20,6 +19,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -51,7 +51,6 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
         private final TestActorRef<MessageCollectorActor> collectorActor;
         private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
-        private volatile byte[] snapshot;
 
         private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
                 TestActorRef<MessageCollectorActor> collectorActor) {
@@ -112,19 +111,13 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
         @Override
         public void createSnapshot(ActorRef actorRef) {
-            if(snapshot != null) {
-                getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
+            try {
+                actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
+            } catch (Exception e) {
+                e.printStackTrace();
             }
         }
 
-        @Override
-        public void applyRecoverySnapshot(byte[] bytes) {
-        }
-
-        void setSnapshot(byte[] snapshot) {
-            this.snapshot = snapshot;
-        }
-
         public ActorRef collectorActor() {
             return collectorActor;
         }
@@ -158,6 +151,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     protected long initialTerm = 5;
     protected long currentTerm;
 
+    protected List<Object> expSnapshotState = new ArrayList<>();
+
     @After
     public void tearDown() {
         InMemoryJournal.clear();
@@ -215,13 +210,20 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         });
     }
 
+    @SuppressWarnings("unchecked")
     protected void verifySnapshot(String prefix, Snapshot snapshot, long lastAppliedTerm,
-            int lastAppliedIndex, long lastTerm, long lastIndex, byte[] data) {
+            int lastAppliedIndex, long lastTerm, long lastIndex)
+                    throws Exception {
         assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm());
         assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex());
         assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm());
         assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex());
-        assertArrayEquals(prefix + " Snapshot getState", data, snapshot.getState());
+
+        List<Object> actualState = (List<Object>)MockRaftActor.toObject(snapshot.getState());
+        assertEquals(prefix + " Snapshot getState size", expSnapshotState.size(), actualState.size());
+        for(int i = 0; i < expSnapshotState.size(); i++) {
+            assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i));
+        }
     }
 
     protected void verifyPersistedJournal(String persistenceId, List<? extends ReplicatedLogEntry> expJournal) {
index 53110b3..586ca8c 100644 (file)
@@ -154,10 +154,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
     }
 
-
     @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
         actorDelegate.applyState(clientActor, identifier, data);
-        LOG.info("{}: applyState called", persistenceId());
+        LOG.info("{}: applyState called: {}", persistenceId(), data);
+
+        state.add(data);
     }
 
     @Override
@@ -235,7 +236,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         return this.getId();
     }
 
-    private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
+    public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
         Object obj = null;
         ByteArrayInputStream bis = null;
         ObjectInputStream ois = null;
index 2ced72c..ae9c784 100644 (file)
@@ -124,7 +124,7 @@ public class RaftActorSnapshotMessageSupportTest {
     @Test
     public void testOnCaptureSnapshot() throws Exception {
 
-        sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1));
+        sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null));
 
         ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
         verify(mockSnapshotManager).create(procedure.capture());
index 5062f8f..82ebcd1 100644 (file)
@@ -60,10 +60,14 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 public class RaftActorTest extends AbstractActorTest {
 
+    static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
+
     private TestActorFactory factory;
 
     @Before
@@ -91,6 +95,8 @@ public class RaftActorTest extends AbstractActorTest {
 
     @Test
     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
+        TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
+
         new JavaTestKit(getSystem()) {{
             String persistenceId = factory.generateActorId("follower-");
 
@@ -101,9 +107,9 @@ public class RaftActorTest extends AbstractActorTest {
             // log entry.
             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
+            ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
-                    ImmutableMap.<String, String>builder().put("member1", "address").build(),
-                    Optional.<ConfigParams>of(config)), persistenceId);
+                    peerAddresses, Optional.<ConfigParams>of(config)), persistenceId);
 
             watch(followerActor);
 
@@ -156,8 +162,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             //reinstate the actor
             TestActorRef<MockRaftActor> ref = factory.createTestActor(
-                    MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
-                            Optional.<ConfigParams>of(config)));
+                    MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
 
             MockRaftActor mockRaftActor = ref.underlyingActor();
 
@@ -176,6 +181,8 @@ public class RaftActorTest extends AbstractActorTest {
 
             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
         }};
+
+        TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
     }
 
     @Test
@@ -275,7 +282,7 @@ public class RaftActorTest extends AbstractActorTest {
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
         mockRaftActor.handleCommand(applySnapshot);
 
-        CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1);
+        CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
         mockRaftActor.handleCommand(captureSnapshot);
 
@@ -863,7 +870,7 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
-    private ByteString fromObject(Object snapshot) throws Exception {
+    public static ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
         try {
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java
new file mode 100644 (file)
index 0000000..a8f490e
--- /dev/null
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import akka.persistence.SaveSnapshotSuccess;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+
+/**
+ * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
+ *
+ * @author Thomas Pantelis
+ */
+public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
+
+    private MockPayload payload0;
+    private MockPayload payload1;
+
+    @Before
+    public void setup() {
+        follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+                newFollowerConfigParams());
+
+        peerAddresses = ImmutableMap.<String, String>builder().
+                put(follower1Id, follower1Actor.path().toString()).build();
+
+        leaderConfigParams = newLeaderConfigParams();
+        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+        follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+        leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+    }
+
+    @Test
+    public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
+
+        send2InitialPayloads();
+
+        // Block these messages initially so we can control the sequence.
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
+                leaderCollectorActor, CaptureSnapshot.class);
+
+        // First, deliver the CaptureSnapshot to the leader.
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
+        leaderActor.tell(captureSnapshot, leaderActor);
+
+        // Send another payload.
+        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+        // Now deliver the AppendEntries to the follower
+        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+        // Now deliver the CaptureSnapshotReply to the leader.
+        CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
+                leaderCollectorActor, CaptureSnapshotReply.class);
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
+        leaderActor.tell(captureSnapshotReply, leaderActor);
+
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        reinstateLeaderActor();
+
+        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+                leaderActor.underlyingActor().getState());
+    }
+
+    @Test
+    public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
+
+        send2InitialPayloads();
+
+        // Block these messages initially so we can control the sequence.
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+        // Send another payload.
+        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
+                leaderCollectorActor, CaptureSnapshot.class);
+
+        // First, deliver the AppendEntries to the follower
+        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+        // Now deliver the CaptureSnapshot to the leader.
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
+        leaderActor.tell(captureSnapshot, leaderActor);
+
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        reinstateLeaderActor();
+
+        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+        // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
+        // were included in the snapshot. They were also included as unapplied entries in the snapshot as
+        // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
+        // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
+        // This is a side effect of trimming the persisted log to the sequence number captured at the time
+        // the snapshot was initiated.
+        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
+                payload3, payload4), leaderActor.underlyingActor().getState());
+    }
+
+    @Test
+    public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
+
+        send2InitialPayloads();
+
+        // Block these messages initially so we can control the sequence.
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+        // Send another payload.
+        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        // Now deliver the AppendEntries to the follower
+        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+        reinstateLeaderActor();
+
+        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+                leaderActor.underlyingActor().getState());
+    }
+
+    private void reinstateLeaderActor() {
+        killActor(leaderActor);
+
+        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+        leaderActor.underlyingActor().waitForRecoveryComplete();
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+    }
+
+    private void send2InitialPayloads() {
+        waitUntilLeader(leaderActor);
+        currentTerm = leaderContext.getTermInformation().getCurrentTerm();
+
+        payload0 = sendPayloadData(leaderActor, "zero");
+        payload1 = sendPayloadData(leaderActor, "one");
+
+        // Verify the leader applies the states.
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
+
+        assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
+
+        MessageCollectorActor.clearMessages(leaderCollectorActor);
+        MessageCollectorActor.clearMessages(follower1CollectorActor);
+    }
+}
index bd670fd..c74705d 100644 (file)
@@ -14,7 +14,7 @@ import java.util.List;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -36,13 +36,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
     private MockPayload recoveredPayload0;
     private MockPayload recoveredPayload1;
     private MockPayload recoveredPayload2;
+    private MockPayload payload3;
     private MockPayload payload4;
     private MockPayload payload5;
     private MockPayload payload6;
     private MockPayload payload7;
 
     @Test
-    public void runTest() {
+    public void runTest() throws Exception {
         testLog.info("testReplicationAndSnapshots starting");
 
         // Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less
@@ -55,7 +56,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1));
         recoveredPayload2 = new MockPayload("two");
         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2));
-        InMemoryJournal.addEntry(leaderId, seqId++, new ApplyLogEntries(2));
+        InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2));
 
         origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
 
@@ -157,19 +158,21 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
      * 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
      * scenario, the follower consensus and application of state is delayed until after the snapshot
      * completes.
+     * @throws Exception
      */
-    private void testFirstSnapshot() {
+    private void testFirstSnapshot() throws Exception {
         testLog.info("testFirstSnapshot starting");
 
-        byte[] snapshot = new byte[] {1,2,3,4};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(recoveredPayload0);
+        expSnapshotState.add(recoveredPayload1);
+        expSnapshotState.add(recoveredPayload2);
 
         // Delay the consensus by temporarily dropping the AppendEntries to both followers.
         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
         // Send the payload.
-        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+        payload3 = sendPayloadData(leaderActor, "three");
 
         // Wait for snapshot complete.
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
@@ -185,7 +188,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
@@ -286,12 +289,15 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
     /**
      * Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
      * consensus occurs and the leader applies the state.
+     * @throws Exception
      */
-    private void testSecondSnapshot() {
+    private void testSecondSnapshot() throws Exception {
         testLog.info("testSecondSnapshot starting");
 
-        byte[] snapshot = new byte[] {5,6,7,8};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload3);
+        expSnapshotState.add(payload4);
+        expSnapshotState.add(payload5);
+        expSnapshotState.add(payload6);
 
         // Delay the CaptureSnapshot message to the leader actor.
         leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
@@ -341,11 +347,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
 
-        // Verify the persisted snapshot. This should reflect the advanced snapshot index as the last applied
-        // log entry (6).
+        expSnapshotState.add(payload7);
+
+        // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
+        // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
+        // when the snapshot is created (ie when the CaptureSnapshot is processed).
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
@@ -404,6 +413,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
 
         leaderActor.underlyingActor().waitForRecoveryComplete();
 
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+
         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
         assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
index d4a9f77..ff9b8ce 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.collect.ImmutableMap;
@@ -185,6 +184,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
 
+        expSnapshotState.add(payload0);
+        expSnapshotState.add(payload1);
+        expSnapshotState.add(payload2);
+
         testLog.info("testInitialReplications complete");
     }
 
@@ -198,8 +201,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("testSubsequentReplicationsAndSnapshots starting: sending first payload, replicatedToAllIndex: {}",
                 leader.getReplicatedToAllIndex());
 
-        leaderActor.underlyingActor().setSnapshot(new byte[] {2});
-
         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
         // Send the first payload - this should cause the first snapshot.
@@ -207,8 +208,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
 
-        byte[] snapshot = new byte[] {6};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload3);
 
         testLog.info("testSubsequentReplicationsAndSnapshots: sending 4 more payloads");
 
@@ -273,7 +273,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's persisted snapshot.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 4, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
@@ -313,6 +313,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
 
+        expSnapshotState.add(payload4);
+        expSnapshotState.add(payload5);
+        expSnapshotState.add(payload6);
+        expSnapshotState.add(payload7);
+
         testLog.info("testSubsequentReplicationsAndSnapshots complete");
     }
 
@@ -327,8 +332,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
                 leader.getReplicatedToAllIndex());
 
         leaderActor.underlyingActor().setMockTotalMemory(1000);
-        byte[] snapshot = new byte[] {6};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
 
         // We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal.
         InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2);
@@ -351,6 +354,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
         Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
 
+        expSnapshotState.add(payload8);
+
         // Send another payload with a large enough relative size in combination with the last payload
         // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
         payload9 = sendPayloadData(leaderActor, "nine", 201);
@@ -383,7 +388,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's persisted snapshot.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 9, payload9);
@@ -451,6 +456,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(leaderCollectorActor);
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
+
+        expSnapshotState.add(payload10);
     }
 
     /**
@@ -467,8 +474,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         InstallSnapshot installSnapshot;
         InstallSnapshotReply installSnapshotReply;
 
-        byte[] snapshot = new byte[] {10};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload9);
 
         // Now stop dropping AppendEntries in follower 2.
         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
@@ -480,7 +486,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
         assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
         assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex());
-        assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
+        //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
 
         installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
         assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
@@ -490,7 +496,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         // Verify follower 2 applies the snapshot.
         applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
-        verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8, snapshot);
+        verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8);
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
 
         // Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot.
@@ -523,7 +529,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
-        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9, snapshot);
+        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9);
         unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
 
@@ -535,16 +541,14 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
     /**
      * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
      * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
+     * @throws Exception
      */
-    private void testFinalReplicationsAndSnapshot() {
+    private void testFinalReplicationsAndSnapshot() throws Exception {
         List<ApplyState> applyStates;
         ApplyState applyState;
 
         testLog.info("testFinalReplicationsAndSnapshot starting: replicatedToAllIndex: {}", leader.getReplicatedToAllIndex());
 
-        byte[] snapshot = new byte[] {14};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
-
         // Send another payload - a snapshot should occur.
         payload11 = sendPayloadData(leaderActor, "eleven");
 
@@ -557,7 +561,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's last persisted snapshot (previous ones may not be purged yet).
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
-        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11, snapshot);
+        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 11, payload11);
index 5a0d5ae..8ab762f 100644 (file)
@@ -69,6 +69,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
         doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
         doReturn("123").when(mockRaftActorContext).getId();
+        doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
         doReturn("123").when(mockRaftActorBehavior).getLeaderId();
 
         ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
@@ -384,6 +385,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCommit(){
+        doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
+
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
@@ -397,7 +400,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockReplicatedLog).snapshotCommit();
 
-        verify(mockDataPersistenceProvider).deleteMessages(100L);
+        verify(mockDataPersistenceProvider).deleteMessages(50L);
 
         ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
 
@@ -438,6 +441,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCallingCommitMultipleTimesCausesNoHarm(){
+        doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
+
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
@@ -453,7 +458,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockReplicatedLog, times(1)).snapshotCommit();
 
-        verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
+        verify(mockDataPersistenceProvider, times(1)).deleteMessages(50L);
 
         verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
     }
index 0737d75..d482e28 100644 (file)
@@ -216,6 +216,7 @@ public class InMemoryJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+        LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr);
         Map<Long, Object> journal = journals.get(persistenceId);
         if(journal != null) {
             synchronized (journal) {
index a0987cd..71799c9 100644 (file)
@@ -40,6 +40,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -219,7 +220,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
                 try {
                     proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
                             future.get(), actorContext.getSchemaContext()));
-                } catch (InterruptedException | ExecutionException e) {
+                } catch (DataValidationFailedException | InterruptedException | ExecutionException e) {
                     proxyFuture.setException(e);
                 }
             }
index eb13078..a406b9a 100644 (file)
@@ -5,39 +5,30 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore.utils;
 
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class NormalizedNodeAggregator {
-
-    private static final ExecutorService executorService = MoreExecutors.newDirectExecutorService();
-
     private final YangInstanceIdentifier rootIdentifier;
     private final List<Optional<NormalizedNode<?, ?>>> nodes;
-    private final InMemoryDOMDataStore dataStore;
-
-    NormalizedNodeAggregator(YangInstanceIdentifier rootIdentifier, List<Optional<NormalizedNode<?, ?>>> nodes,
-                             SchemaContext schemaContext){
+    private final DataTree dataTree;
 
+    private NormalizedNodeAggregator(YangInstanceIdentifier rootIdentifier, List<Optional<NormalizedNode<?, ?>>> nodes,
+                             SchemaContext schemaContext) {
         this.rootIdentifier = rootIdentifier;
         this.nodes = nodes;
-        this.dataStore = new InMemoryDOMDataStore("aggregator", executorService);
-        this.dataStore.onGlobalContextUpdated(schemaContext);
+        this.dataTree = InMemoryDataTreeFactory.getInstance().create();
+        this.dataTree.setSchemaContext(schemaContext);
     }
 
     /**
@@ -46,44 +37,35 @@ public class NormalizedNodeAggregator {
      * @param nodes
      * @param schemaContext
      * @return
-     * @throws ExecutionException
-     * @throws InterruptedException
+     * @throws DataValidationFailedException
      */
     public static Optional<NormalizedNode<?,?>> aggregate(YangInstanceIdentifier rootIdentifier,
                                                           List<Optional<NormalizedNode<?, ?>>> nodes,
-                                                          SchemaContext schemaContext)
-            throws ExecutionException, InterruptedException {
+                                                          SchemaContext schemaContext) throws DataValidationFailedException {
         return new NormalizedNodeAggregator(rootIdentifier, nodes, schemaContext).aggregate();
     }
 
-    private Optional<NormalizedNode<?,?>> aggregate() throws ExecutionException, InterruptedException {
+    private Optional<NormalizedNode<?,?>> aggregate() throws DataValidationFailedException {
         return combine().getRootNode();
     }
 
-    private NormalizedNodeAggregator combine() throws InterruptedException, ExecutionException {
-        DOMStoreWriteTransaction domStoreWriteTransaction = dataStore.newWriteOnlyTransaction();
+    private NormalizedNodeAggregator combine() throws DataValidationFailedException {
+        DataTreeModification mod = dataTree.takeSnapshot().newModification();
 
-        for(Optional<NormalizedNode<?,?>> node : nodes) {
-            if(node.isPresent()) {
-                domStoreWriteTransaction.merge(rootIdentifier, node.get());
+        for (Optional<NormalizedNode<?,?>> node : nodes) {
+            if (node.isPresent()) {
+                mod.merge(rootIdentifier, node.get());
             }
         }
-        DOMStoreThreePhaseCommitCohort ready = domStoreWriteTransaction.ready();
-        ready.canCommit().get();
-        ready.preCommit().get();
-        ready.commit().get();
+
+        dataTree.validate(mod);
+        final DataTreeCandidate candidate = dataTree.prepare(mod);
+        dataTree.commit(candidate);
 
         return this;
     }
 
-    private Optional<NormalizedNode<?, ?>> getRootNode() throws InterruptedException, ExecutionException {
-        DOMStoreReadTransaction readTransaction = dataStore.newReadOnlyTransaction();
-
-        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
-                readTransaction.read(rootIdentifier);
-
-        return read.get();
+    private Optional<NormalizedNode<?, ?>> getRootNode() {
+        return dataTree.takeSnapshot().readNode(rootIdentifier);
     }
-
-
 }
index 40d3704..8c86310 100644 (file)
@@ -29,13 +29,14 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class NormalizedNodeAggregatorTest {
 
     @Test
-    public void testAggregate() throws InterruptedException, ExecutionException, ReadFailedException {
+    public void testAggregate() throws InterruptedException, ExecutionException, ReadFailedException, DataValidationFailedException {
         SchemaContext schemaContext = SchemaContextHelper.full();
         NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);