Change onNodeUpdated to first cleanup previous state
[netconf.git] / opendaylight / netconf / abstract-topology / src / main / java / org / opendaylight / netconf / topology / util / BaseTopologyManager.java
index 77417acc2ae18d0eef8cddc027ceddd87a321ccd..682555ef931145313f471e551c1ad6757249aa9c 100644 (file)
@@ -35,9 +35,11 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
@@ -93,6 +95,7 @@ public final class BaseTopologyManager
     private final NodeWriter naSalNodeWriter;
     private final String topologyId;
     private final TopologyManagerCallback delegateTopologyHandler;
+    private final Set<NodeId> created = new HashSet<>();
 
     private final Map<Address, TopologyManager> peers = new HashMap<>();
     private TopologyManager masterPeer = null;
@@ -160,6 +163,11 @@ public final class BaseTopologyManager
     public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
         LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
 
+        if (created.contains(nodeId)) {
+            LOG.warn("Node{} already exists, triggering update..", nodeId);
+            return onNodeUpdated(nodeId, node);
+        }
+        created.add(nodeId);
         final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
 
         if (isMaster) {
@@ -223,55 +231,37 @@ public final class BaseTopologyManager
     public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
         LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
 
-        final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
-
         // Master needs to trigger onNodeUpdated on peers and combine results
         if (isMaster) {
-            futures.add(delegateTopologyHandler.onNodeUpdated(nodeId, node));
-            for (TopologyManager topologyManager : peers.values()) {
-                // convert binding into NormalizedNode for transfer
-                final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
-
-                // add a future into our futures that gets its completion status from the converted scala future
-                final SettableFuture<Node> settableFuture = SettableFuture.create();
-                futures.add(settableFuture);
-                final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeUpdated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
-                scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
-                    @Override
-                    public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
-                        if (failure != null) {
-                            settableFuture.setException(failure);
-                            return;
+            // first cleanup old node
+            final ListenableFuture<Void> deleteFuture = onNodeDeleted(nodeId);
+            final SettableFuture<Node> createFuture = SettableFuture.create();
+            final TopologyManager selfProxy = TypedActor.self();
+            final ActorContext context = TypedActor.context();
+            Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void result) {
+                    LOG.warn("Delete part of update succesfull, triggering create");
+                    // trigger create on all nodes
+                    Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback<Node>() {
+                        @Override
+                        public void onSuccess(Node result) {
+                            createFuture.set(result);
                         }
-                        final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
-                                codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
-                        final Node value = (Node) fromNormalizedNode.getValue();
 
-                        settableFuture.set(value);
-                    }
-                }, TypedActor.context().dispatcher());
-            }
-
-            final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
-            Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
-                @Override
-                public void onSuccess(final Node result) {
-                    // FIXME make this (writing state data for nodes) optional and customizable
-                    // this should be possible with providing your own NodeWriter implementation, maybe rename this interface?
-                    naSalNodeWriter.update(nodeId, result);
+                        @Override
+                        public void onFailure(Throwable t) {
+                            createFuture.setException(t);
+                        }
+                    }, context.dispatcher());
                 }
 
                 @Override
-                public void onFailure(final Throwable t) {
-                    // If the combined connection attempt failed, set the node to connection failed
-                    naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
-                    // FIXME disconnect those which succeeded
-                    // just issue a delete on delegateTopologyHandler that gets handled on lower level
+                public void onFailure(Throwable t) {
+                    LOG.warn("Delete part of update failed, {}", t);
                 }
-            });
-
-            //combine peer futures
-            return aggregatedFuture;
+            }, context.dispatcher());
+            return createFuture;
         }
 
         // Trigger update on this slave
@@ -286,6 +276,7 @@ public final class BaseTopologyManager
     @Override
     public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
         final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
+        created.remove(nodeId);
 
         // Master needs to trigger delete on peers and combine results
         if (isMaster) {