Change onNodeUpdated to first cleanup previous state 32/31432/3
authorTomas Cere <tcere@cisco.com>
Wed, 16 Dec 2015 14:37:29 +0000 (15:37 +0100)
committerTomas Cere <tcere@cisco.com>
Mon, 21 Dec 2015 10:14:35 +0000 (11:14 +0100)
Change-Id: Ied3a631466753663601905d0b1d7f138e6fc6542
Signed-off-by: Tomas Cere <tcere@cisco.com>
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseTopologyManager.java
opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/ActorTest.java

index 77417acc2ae18d0eef8cddc027ceddd87a321ccd..682555ef931145313f471e551c1ad6757249aa9c 100644 (file)
@@ -35,9 +35,11 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
@@ -93,6 +95,7 @@ public final class BaseTopologyManager
     private final NodeWriter naSalNodeWriter;
     private final String topologyId;
     private final TopologyManagerCallback delegateTopologyHandler;
+    private final Set<NodeId> created = new HashSet<>();
 
     private final Map<Address, TopologyManager> peers = new HashMap<>();
     private TopologyManager masterPeer = null;
@@ -160,6 +163,11 @@ public final class BaseTopologyManager
     public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
         LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
 
+        if (created.contains(nodeId)) {
+            LOG.warn("Node{} already exists, triggering update..", nodeId);
+            return onNodeUpdated(nodeId, node);
+        }
+        created.add(nodeId);
         final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
 
         if (isMaster) {
@@ -223,55 +231,37 @@ public final class BaseTopologyManager
     public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
         LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
 
-        final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
-
         // Master needs to trigger onNodeUpdated on peers and combine results
         if (isMaster) {
-            futures.add(delegateTopologyHandler.onNodeUpdated(nodeId, node));
-            for (TopologyManager topologyManager : peers.values()) {
-                // convert binding into NormalizedNode for transfer
-                final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
-
-                // add a future into our futures that gets its completion status from the converted scala future
-                final SettableFuture<Node> settableFuture = SettableFuture.create();
-                futures.add(settableFuture);
-                final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeUpdated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
-                scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
-                    @Override
-                    public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
-                        if (failure != null) {
-                            settableFuture.setException(failure);
-                            return;
+            // first cleanup old node
+            final ListenableFuture<Void> deleteFuture = onNodeDeleted(nodeId);
+            final SettableFuture<Node> createFuture = SettableFuture.create();
+            final TopologyManager selfProxy = TypedActor.self();
+            final ActorContext context = TypedActor.context();
+            Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void result) {
+                    LOG.warn("Delete part of update succesfull, triggering create");
+                    // trigger create on all nodes
+                    Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback<Node>() {
+                        @Override
+                        public void onSuccess(Node result) {
+                            createFuture.set(result);
                         }
-                        final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
-                                codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
-                        final Node value = (Node) fromNormalizedNode.getValue();
 
-                        settableFuture.set(value);
-                    }
-                }, TypedActor.context().dispatcher());
-            }
-
-            final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
-            Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
-                @Override
-                public void onSuccess(final Node result) {
-                    // FIXME make this (writing state data for nodes) optional and customizable
-                    // this should be possible with providing your own NodeWriter implementation, maybe rename this interface?
-                    naSalNodeWriter.update(nodeId, result);
+                        @Override
+                        public void onFailure(Throwable t) {
+                            createFuture.setException(t);
+                        }
+                    }, context.dispatcher());
                 }
 
                 @Override
-                public void onFailure(final Throwable t) {
-                    // If the combined connection attempt failed, set the node to connection failed
-                    naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
-                    // FIXME disconnect those which succeeded
-                    // just issue a delete on delegateTopologyHandler that gets handled on lower level
+                public void onFailure(Throwable t) {
+                    LOG.warn("Delete part of update failed, {}", t);
                 }
-            });
-
-            //combine peer futures
-            return aggregatedFuture;
+            }, context.dispatcher());
+            return createFuture;
         }
 
         // Trigger update on this slave
@@ -286,6 +276,7 @@ public final class BaseTopologyManager
     @Override
     public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
         final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
+        created.remove(nodeId);
 
         // Master needs to trigger delete on peers and combine results
         if (isMaster) {
index 599acc92473dc7c1289f9f4aa1c60503cb1f3120..df1b464ee4ecb9731e34d1db38420ec62b22c91f 100644 (file)
@@ -18,9 +18,11 @@ import akka.actor.TypedActor;
 import akka.actor.TypedActorExtension;
 import akka.actor.TypedProps;
 import akka.japi.Creator;
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -35,6 +37,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import javassist.ClassPool;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -42,9 +45,11 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
@@ -65,10 +70,14 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev15
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
@@ -93,6 +102,9 @@ public class ActorTest {
     @Mock
     private DataBroker dataBroker;
 
+    @Mock
+    private ReadOnlyTransaction mockedReadOnlyTx;
+
     private static final BindingNormalizedNodeCodecRegistry CODEC_REGISTRY;
 
     static {
@@ -126,11 +138,22 @@ public class ActorTest {
     @Before
     public void setup() {
         MockitoAnnotations.initMocks(this);
+        final SettableFuture<Optional<Topology>> settableFuture = SettableFuture.create();
+        final CheckedFuture<Optional<Topology>, ReadFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
+            @Nullable
+            @Override
+            public ReadFailedException apply(Exception input) {
+                return new ReadFailedException("Dummy future should never return this");
+            }
+        });
+        settableFuture.set(Optional.<Topology>absent());
+        when(mockedReadOnlyTx.read(any(LogicalDatastoreType.class), any(InstanceIdentifier.class))).thenReturn(checkedFuture);
         when(dataBroker.registerDataChangeListener(
                 any(LogicalDatastoreType.class),
                 any(InstanceIdentifier.class),
                 any(DataChangeListener.class),
                 any(DataChangeScope.class))).thenReturn(null);
+        when(dataBroker.newReadOnlyTransaction()).thenReturn(mockedReadOnlyTx);
     }
 
     private void setMaster(final TopologyManager manager) {
@@ -204,6 +227,8 @@ public class ActorTest {
                 }
             });
         }
+        LOG.debug("Waiting for updates to finish");
+        Futures.allAsList(futures).get();
 
 
         final List<ListenableFuture<Void>> deleteFutures = new ArrayList<>();
@@ -380,6 +405,8 @@ public class ActorTest {
                                     .setHost(netconfNode.getHost())
                                     .setPort(netconfNode.getPort())
                                     .setConnectionStatus(ConnectionStatus.Connecting)
+                                    .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+                                    .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
                                     .setClusteredConnectionStatus(
                                             new ClusteredConnectionStatusBuilder()
                                                     .setNodeStatus(
@@ -404,6 +431,8 @@ public class ActorTest {
                                     .setHost(netconfNode.getHost())
                                     .setPort(netconfNode.getPort())
                                     .setConnectionStatus(ConnectionStatus.UnableToConnect)
+                                    .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+                                    .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
                                     .setClusteredConnectionStatus(
                                             new ClusteredConnectionStatusBuilder()
                                                     .setNodeStatus(
@@ -429,6 +458,8 @@ public class ActorTest {
                                     .setConnectionStatus(ConnectionStatus.Connected)
                                     .setHost(augmentation.getHost())
                                     .setPort(augmentation.getPort())
+                                    .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+                                    .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
                                     .setClusteredConnectionStatus(
                                             new ClusteredConnectionStatusBuilder()
                                                     .setNodeStatus(
@@ -454,6 +485,8 @@ public class ActorTest {
                                     .setConnectionStatus(ConnectionStatus.Connected)
                                     .setHost(augmentation.getHost())
                                     .setPort(augmentation.getPort())
+                                    .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+                                    .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
                                     .setClusteredConnectionStatus(
                                             new ClusteredConnectionStatusBuilder()
                                                     .setNodeStatus(
@@ -532,6 +565,8 @@ public class ActorTest {
                                     .setConnectionStatus(ConnectionStatus.Connected)
                                     .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
                                     .setPort(new PortNumber(2555))
+                                    .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+                                    .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
                                     .build())
                     .build());
         }
@@ -547,6 +582,8 @@ public class ActorTest {
                                     .setConnectionStatus(ConnectionStatus.Connected)
                                     .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
                                     .setPort(new PortNumber(65535))
+                                    .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+                                    .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
                                     .build())
                     .build());
         }
@@ -583,6 +620,8 @@ public class ActorTest {
                                     .setConnectionStatus(ConnectionStatus.Connecting)
                                     .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
                                     .setPort(new PortNumber(65535))
+                                    .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+                                    .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
                                     .build())
                     .build();
         }
@@ -597,6 +636,8 @@ public class ActorTest {
                                     .setConnectionStatus(ConnectionStatus.UnableToConnect)
                                     .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
                                     .setPort(new PortNumber(65535))
+                                    .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+                                    .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
                                     .build())
                     .build();
         }