Use normal identify messages first 30/31430/2
authorTomas Cere <tcere@cisco.com>
Tue, 15 Dec 2015 09:33:11 +0000 (10:33 +0100)
committerTomas Cere <tcere@cisco.com>
Wed, 16 Dec 2015 15:13:07 +0000 (16:13 +0100)
Prevents crashing akka if bundles containing our messages
weren't loaded on remote node yet.
Add resyncs on remote node's coming up.

Change-Id: I2c6076c1c09ad4a839f4e5e60e172805bdb45ad6
Signed-off-by: Tomas Cere <tcere@cisco.com>
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseTopologyManager.java
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyManagerCallback.java
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/TopologyMountPointFacade.java

index f3bceab49aa3d0fcb5dd30a10c2f302788f0095d..77417acc2ae18d0eef8cddc027ceddd87a321ccd 100644 (file)
@@ -8,9 +8,12 @@
 
 package org.opendaylight.netconf.topology.util;
 
+import akka.actor.ActorContext;
+import akka.actor.ActorIdentity;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
+import akka.actor.Identify;
 import akka.actor.TypedActor;
 import akka.actor.TypedActorExtension;
 import akka.actor.TypedProps;
@@ -24,6 +27,8 @@ import akka.cluster.ClusterEvent.ReachableMember;
 import akka.cluster.ClusterEvent.UnreachableMember;
 import akka.cluster.Member;
 import akka.dispatch.OnComplete;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -33,8 +38,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.netconf.topology.RoleChangeStrategy;
 import org.opendaylight.netconf.topology.StateAggregator;
 import org.opendaylight.netconf.topology.TopologyManager;
@@ -52,17 +61,22 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise.DefaultPromise;
 
 public final class BaseTopologyManager
         implements TopologyManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
+    private static final InstanceIdentifier<NetworkTopology> NETWORK_TOPOLOGY_PATH = InstanceIdentifier.builder(NetworkTopology.class).build();
+
+    private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
 
     private final ActorSystem system;
     private final TypedActorExtension typedExtension;
@@ -121,6 +135,8 @@ public final class BaseTopologyManager
         // election has not yet happened
         this.isMaster = isMaster;
 
+        this.topologyListPath = NETWORK_TOPOLOGY_PATH.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
+
         LOG.debug("Base manager started ", +id);
     }
 
@@ -516,7 +532,8 @@ public final class BaseTopologyManager
             final String path = member.address() + PATH + topologyId;
             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
 
-            clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
+            // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka.
+            clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
         } else if (message instanceof MemberExited) {
             // remove peer
             final Member member = ((MemberExited) message).member();
@@ -544,20 +561,71 @@ public final class BaseTopologyManager
             final String path = member.address() + PATH + topologyId;
             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
 
+            clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
+        } else if (message instanceof ActorIdentity) {
+            LOG.debug("Received ActorIdentity message", message);
+            final String path = ((ActorIdentity) message).correlationId() + PATH + topologyId;
+            if (((ActorIdentity) message).getRef() == null) {
+                LOG.debug("ActorIdentity has null actor ref, retrying..", message);
+                final ActorRef self = TypedActor.context().self();
+                final ActorContext context = TypedActor.context();
+                system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() {
+                    @Override
+                    public void run() {
+                        LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path);
+                        context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self);
+
+                    }
+                }, system.dispatcher());
+                return;
+            }
+            LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path);
+
             clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
         } else if (message instanceof CustomIdentifyMessageReply) {
-            LOG.debug("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
+
+            LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
                 peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
+                if (isMaster) {
+                    resyncPeer(peer);
+                }
             }
         } else if (message instanceof CustomIdentifyMessage) {
-            LOG.debug("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
+            LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
                 peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
+                if (isMaster) {
+                    resyncPeer(peer);
+                }
             }
             actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
         }
     }
+
+    private void resyncPeer(final TopologyManager peer) {
+        final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
+        final CheckedFuture<Optional<Topology>, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath);
+
+        Futures.addCallback(read, new FutureCallback<Optional<Topology>>() {
+            @Override
+            public void onSuccess(Optional<Topology> result) {
+                if (result.isPresent()) {
+                    for (final Node node : result.get().getNode()) {
+                        final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
+                        peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
+                        // we dont care about the future from now on since we will be notified by the onConnected event
+                    }
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                LOG.error("Unable to read from datastore");
+            }
+        });
+
+    }
 }
index cb74134708409f97355df2041427621e700d61f0..4c5e04b211a33aaeb37a33d2a31ed44db12bfc5d 100644 (file)
@@ -14,7 +14,6 @@ import akka.actor.ActorSystem;
 import akka.actor.TypedActor;
 import akka.actor.TypedProps;
 import akka.cluster.Cluster;
-import akka.cluster.Member;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
@@ -184,7 +183,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
                                                   @Nullable final Node configNode) {
         final NetconfNode netconfNode = configNode == null ? currentOperationalNode.getAugmentation(NetconfNode.class) : configNode.getAugmentation(NetconfNode.class);
 
-        return new NodeBuilder()
+        final Node failedNode = new NodeBuilder()
                 .setNodeId(nodeId)
                 .addAugmentation(NetconfNode.class,
                         new NetconfNodeBuilder()
@@ -202,6 +201,12 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
                                                 .build())
                                 .build())
                 .build();
+
+        if (currentOperationalNode == null) {
+            currentOperationalNode = failedNode;
+        }
+
+        return failedNode;
     }
 
     @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
@@ -264,7 +269,9 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
                                                 @Nonnull final Node configNode) {
         // first disconnect this node
         topologyDispatcher.unregisterMountPoint(nodeId);
-        registration.close();
+        if (registration != null) {
+            registration.close();
+        }
         topologyDispatcher.disconnectNode(nodeId);
 
         // now reinit this connection with new settings
@@ -273,7 +280,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
             @Override
             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
-                registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
+                registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
             }
 
             @Override
@@ -316,7 +323,9 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
     @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
         // cleanup and disconnect
         topologyDispatcher.unregisterMountPoint(nodeId);
-        registration.close();
+        if (registration != null) {
+            registration.close();
+        }
         roleChangeStrategy.unregisterRoleCandidate();
         return topologyDispatcher.disconnectNode(nodeId);
     }
@@ -330,7 +339,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
 
     @Override
     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
-        topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
+        topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
 
         isMaster = roleChangeDTO.isOwner();
         if (isMaster) {
@@ -398,9 +407,9 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
         connected = false;
         if (isMaster) {
             // announce that master mount point is going down
-            for (final Member member : clusterExtension.state().getMembers()) {
-                actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null);
-            }
+//            for (final Member member : clusterExtension.state().getMembers()) {
+//                actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null);
+//            }
             // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
             isMaster = false;
             // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
index 739aa66eb2df6e452d9ce0291d04fcde614a9450..3a9c8db73201749ebcb01425e91ae6e250806579 100644 (file)
@@ -127,6 +127,9 @@ public class NetconfTopologyManagerCallback implements TopologyManagerCallback {
     @Nonnull
     @Override
     public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
+        if (!nodes.containsKey(nodeId)) {
+            nodes.put(nodeId, createNodeManager(nodeId));
+        }
         return nodes.get(nodeId).getCurrentStatusForNode(nodeId);
     }
 
index ec294b268f4528e94f5b9e495980a818a6a29165..33ebb89f559478e3dda9be4c01f40e5807f947f7 100644 (file)
@@ -28,6 +28,7 @@ import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPrefe
 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -151,6 +152,12 @@ public class TopologyMountPointFacade implements AutoCloseable, RemoteDeviceHand
         salProvider.getMountInstance().onTopologyDeviceDisconnected();
         if (deviceDataBroker != null) {
             LOG.warn("Stopping master data broker for device {}", id.getName());
+            for (final Member member : Cluster.get(actorSystem).state().getMembers()) {
+                if (member.address().equals(Cluster.get(actorSystem).selfAddress())) {
+                    continue;
+                }
+                actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + id.getName()).tell(new AnnounceMasterMountPointDown(), null);
+            }
             TypedActor.get(actorSystem).stop(deviceDataBroker);
             deviceDataBroker = null;
         }