From c103c8f5f021db325586db9c123e6b30eed1385c Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Wed, 16 Dec 2015 15:37:29 +0100 Subject: [PATCH] Change onNodeUpdated to first cleanup previous state Change-Id: Ied3a631466753663601905d0b1d7f138e6fc6542 Signed-off-by: Tomas Cere --- .../topology/util/BaseTopologyManager.java | 73 ++++++++----------- .../netconf/topology/ActorTest.java | 41 +++++++++++ 2 files changed, 73 insertions(+), 41 deletions(-) diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseTopologyManager.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseTopologyManager.java index 77417acc2a..682555ef93 100644 --- a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseTopologyManager.java +++ b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseTopologyManager.java @@ -35,9 +35,11 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Random; +import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; @@ -93,6 +95,7 @@ public final class BaseTopologyManager private final NodeWriter naSalNodeWriter; private final String topologyId; private final TopologyManagerCallback delegateTopologyHandler; + private final Set created = new HashSet<>(); private final Map peers = new HashMap<>(); private TopologyManager masterPeer = null; @@ -160,6 +163,11 @@ public final class BaseTopologyManager public ListenableFuture onNodeCreated(final NodeId nodeId, final Node node) { LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster); + if (created.contains(nodeId)) { + LOG.warn("Node{} already exists, triggering update..", nodeId); + return onNodeUpdated(nodeId, node); + } + created.add(nodeId); final ArrayList> futures = new ArrayList<>(); if (isMaster) { @@ -223,55 +231,37 @@ public final class BaseTopologyManager public ListenableFuture onNodeUpdated(final NodeId nodeId, final Node node) { LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue()); - final ArrayList> futures = new ArrayList<>(); - // Master needs to trigger onNodeUpdated on peers and combine results if (isMaster) { - futures.add(delegateTopologyHandler.onNodeUpdated(nodeId, node)); - for (TopologyManager topologyManager : peers.values()) { - // convert binding into NormalizedNode for transfer - final Entry> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node); - - // add a future into our futures that gets its completion status from the converted scala future - final SettableFuture settableFuture = SettableFuture.create(); - futures.add(settableFuture); - final Future scalaFuture = topologyManager.onRemoteNodeUpdated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue())); - scalaFuture.onComplete(new OnComplete() { - @Override - public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable { - if (failure != null) { - settableFuture.setException(failure); - return; + // first cleanup old node + final ListenableFuture deleteFuture = onNodeDeleted(nodeId); + final SettableFuture createFuture = SettableFuture.create(); + final TopologyManager selfProxy = TypedActor.self(); + final ActorContext context = TypedActor.context(); + Futures.addCallback(deleteFuture, new FutureCallback() { + @Override + public void onSuccess(Void result) { + LOG.warn("Delete part of update succesfull, triggering create"); + // trigger create on all nodes + Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback() { + @Override + public void onSuccess(Node result) { + createFuture.set(result); } - final Entry, DataObject> fromNormalizedNode = - codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode()); - final Node value = (Node) fromNormalizedNode.getValue(); - settableFuture.set(value); - } - }, TypedActor.context().dispatcher()); - } - - final ListenableFuture aggregatedFuture = aggregator.combineUpdateAttempts(futures); - Futures.addCallback(aggregatedFuture, new FutureCallback() { - @Override - public void onSuccess(final Node result) { - // FIXME make this (writing state data for nodes) optional and customizable - // this should be possible with providing your own NodeWriter implementation, maybe rename this interface? - naSalNodeWriter.update(nodeId, result); + @Override + public void onFailure(Throwable t) { + createFuture.setException(t); + } + }, context.dispatcher()); } @Override - public void onFailure(final Throwable t) { - // If the combined connection attempt failed, set the node to connection failed - naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node)); - // FIXME disconnect those which succeeded - // just issue a delete on delegateTopologyHandler that gets handled on lower level + public void onFailure(Throwable t) { + LOG.warn("Delete part of update failed, {}", t); } - }); - - //combine peer futures - return aggregatedFuture; + }, context.dispatcher()); + return createFuture; } // Trigger update on this slave @@ -286,6 +276,7 @@ public final class BaseTopologyManager @Override public ListenableFuture onNodeDeleted(final NodeId nodeId) { final ArrayList> futures = new ArrayList<>(); + created.remove(nodeId); // Master needs to trigger delete on peers and combine results if (isMaster) { diff --git a/opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/ActorTest.java b/opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/ActorTest.java index 599acc9247..df1b464ee4 100644 --- a/opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/ActorTest.java +++ b/opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/ActorTest.java @@ -18,9 +18,11 @@ import akka.actor.TypedActor; import akka.actor.TypedActorExtension; import akka.actor.TypedProps; import akka.japi.Creator; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -35,6 +37,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javassist.ClassPool; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -42,9 +45,11 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMNotification; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; @@ -65,10 +70,14 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev15 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder; import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator; @@ -93,6 +102,9 @@ public class ActorTest { @Mock private DataBroker dataBroker; + @Mock + private ReadOnlyTransaction mockedReadOnlyTx; + private static final BindingNormalizedNodeCodecRegistry CODEC_REGISTRY; static { @@ -126,11 +138,22 @@ public class ActorTest { @Before public void setup() { MockitoAnnotations.initMocks(this); + final SettableFuture> settableFuture = SettableFuture.create(); + final CheckedFuture, ReadFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function() { + @Nullable + @Override + public ReadFailedException apply(Exception input) { + return new ReadFailedException("Dummy future should never return this"); + } + }); + settableFuture.set(Optional.absent()); + when(mockedReadOnlyTx.read(any(LogicalDatastoreType.class), any(InstanceIdentifier.class))).thenReturn(checkedFuture); when(dataBroker.registerDataChangeListener( any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataChangeListener.class), any(DataChangeScope.class))).thenReturn(null); + when(dataBroker.newReadOnlyTransaction()).thenReturn(mockedReadOnlyTx); } private void setMaster(final TopologyManager manager) { @@ -204,6 +227,8 @@ public class ActorTest { } }); } + LOG.debug("Waiting for updates to finish"); + Futures.allAsList(futures).get(); final List> deleteFutures = new ArrayList<>(); @@ -380,6 +405,8 @@ public class ActorTest { .setHost(netconfNode.getHost()) .setPort(netconfNode.getPort()) .setConnectionStatus(ConnectionStatus.Connecting) + .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList()).build()) + .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList()).build()) .setClusteredConnectionStatus( new ClusteredConnectionStatusBuilder() .setNodeStatus( @@ -404,6 +431,8 @@ public class ActorTest { .setHost(netconfNode.getHost()) .setPort(netconfNode.getPort()) .setConnectionStatus(ConnectionStatus.UnableToConnect) + .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList()).build()) + .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList()).build()) .setClusteredConnectionStatus( new ClusteredConnectionStatusBuilder() .setNodeStatus( @@ -429,6 +458,8 @@ public class ActorTest { .setConnectionStatus(ConnectionStatus.Connected) .setHost(augmentation.getHost()) .setPort(augmentation.getPort()) + .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList()).build()) + .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList()).build()) .setClusteredConnectionStatus( new ClusteredConnectionStatusBuilder() .setNodeStatus( @@ -454,6 +485,8 @@ public class ActorTest { .setConnectionStatus(ConnectionStatus.Connected) .setHost(augmentation.getHost()) .setPort(augmentation.getPort()) + .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList()).build()) + .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList()).build()) .setClusteredConnectionStatus( new ClusteredConnectionStatusBuilder() .setNodeStatus( @@ -532,6 +565,8 @@ public class ActorTest { .setConnectionStatus(ConnectionStatus.Connected) .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1")))) .setPort(new PortNumber(2555)) + .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList()).build()) + .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList()).build()) .build()) .build()); } @@ -547,6 +582,8 @@ public class ActorTest { .setConnectionStatus(ConnectionStatus.Connected) .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1")))) .setPort(new PortNumber(65535)) + .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList()).build()) + .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList()).build()) .build()) .build()); } @@ -583,6 +620,8 @@ public class ActorTest { .setConnectionStatus(ConnectionStatus.Connecting) .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1")))) .setPort(new PortNumber(65535)) + .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList()).build()) + .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList()).build()) .build()) .build(); } @@ -597,6 +636,8 @@ public class ActorTest { .setConnectionStatus(ConnectionStatus.UnableToConnect) .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1")))) .setPort(new PortNumber(65535)) + .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList()).build()) + .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList()).build()) .build()) .build(); } -- 2.36.6