From a393412402efbe796658af79272204cda6a8c409 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Tue, 15 Dec 2015 10:33:11 +0100 Subject: [PATCH] Use normal identify messages first Prevents crashing akka if bundles containing our messages weren't loaded on remote node yet. Add resyncs on remote node's coming up. Change-Id: I2c6076c1c09ad4a839f4e5e60e172805bdb45ad6 Signed-off-by: Tomas Cere --- .../topology/util/BaseTopologyManager.java | 74 ++++++++++++++++++- .../impl/NetconfNodeManagerCallback.java | 27 ++++--- .../impl/NetconfTopologyManagerCallback.java | 3 + .../pipeline/TopologyMountPointFacade.java | 7 ++ 4 files changed, 99 insertions(+), 12 deletions(-) diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseTopologyManager.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseTopologyManager.java index f3bceab49a..77417acc2a 100644 --- a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseTopologyManager.java +++ b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseTopologyManager.java @@ -8,9 +8,12 @@ package org.opendaylight.netconf.topology.util; +import akka.actor.ActorContext; +import akka.actor.ActorIdentity; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; +import akka.actor.Identify; import akka.actor.TypedActor; import akka.actor.TypedActorExtension; import akka.actor.TypedProps; @@ -24,6 +27,8 @@ import akka.cluster.ClusterEvent.ReachableMember; import akka.cluster.ClusterEvent.UnreachableMember; import akka.cluster.Member; import akka.dispatch.OnComplete; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -33,8 +38,12 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Random; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.netconf.topology.RoleChangeStrategy; import org.opendaylight.netconf.topology.StateAggregator; import org.opendaylight.netconf.topology.TopologyManager; @@ -52,17 +61,22 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; import scala.concurrent.impl.Promise.DefaultPromise; public final class BaseTopologyManager implements TopologyManager { private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class); + private static final InstanceIdentifier NETWORK_TOPOLOGY_PATH = InstanceIdentifier.builder(NetworkTopology.class).build(); + + private final KeyedInstanceIdentifier topologyListPath; private final ActorSystem system; private final TypedActorExtension typedExtension; @@ -121,6 +135,8 @@ public final class BaseTopologyManager // election has not yet happened this.isMaster = isMaster; + this.topologyListPath = NETWORK_TOPOLOGY_PATH.child(Topology.class, new TopologyKey(new TopologyId(topologyId))); + LOG.debug("Base manager started ", +id); } @@ -516,7 +532,8 @@ public final class BaseTopologyManager final String path = member.address() + PATH + topologyId; LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path); - clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self()); + // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka. + clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self()); } else if (message instanceof MemberExited) { // remove peer final Member member = ((MemberExited) message).member(); @@ -544,20 +561,71 @@ public final class BaseTopologyManager final String path = member.address() + PATH + topologyId; LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path); + clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self()); + } else if (message instanceof ActorIdentity) { + LOG.debug("Received ActorIdentity message", message); + final String path = ((ActorIdentity) message).correlationId() + PATH + topologyId; + if (((ActorIdentity) message).getRef() == null) { + LOG.debug("ActorIdentity has null actor ref, retrying..", message); + final ActorRef self = TypedActor.context().self(); + final ActorContext context = TypedActor.context(); + system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() { + @Override + public void run() { + LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path); + context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self); + + } + }, system.dispatcher()); + return; + } + LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path); + clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self()); } else if (message instanceof CustomIdentifyMessageReply) { - LOG.debug("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress()); + + LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress()); if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) { final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef); peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer); + if (isMaster) { + resyncPeer(peer); + } } } else if (message instanceof CustomIdentifyMessage) { - LOG.debug("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress()); + LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress()); if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) { final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef); peers.put(((CustomIdentifyMessage) message).getAddress(), peer); + if (isMaster) { + resyncPeer(peer); + } } actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self()); } } + + private void resyncPeer(final TopologyManager peer) { + final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction(); + final CheckedFuture, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath); + + Futures.addCallback(read, new FutureCallback>() { + @Override + public void onSuccess(Optional result) { + if (result.isPresent()) { + for (final Node node : result.get().getNode()) { + final Entry> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node); + peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue())); + // we dont care about the future from now on since we will be notified by the onConnected event + } + } + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Unable to read from datastore"); + } + }); + + } } diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java index cb74134708..4c5e04b211 100644 --- a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java @@ -14,7 +14,6 @@ import akka.actor.ActorSystem; import akka.actor.TypedActor; import akka.actor.TypedProps; import akka.cluster.Cluster; -import akka.cluster.Member; import akka.dispatch.OnComplete; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; @@ -184,7 +183,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ @Nullable final Node configNode) { final NetconfNode netconfNode = configNode == null ? currentOperationalNode.getAugmentation(NetconfNode.class) : configNode.getAugmentation(NetconfNode.class); - return new NodeBuilder() + final Node failedNode = new NodeBuilder() .setNodeId(nodeId) .addAugmentation(NetconfNode.class, new NetconfNodeBuilder() @@ -202,6 +201,12 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ .build()) .build()) .build(); + + if (currentOperationalNode == null) { + currentOperationalNode = failedNode; + } + + return failedNode; } @Nonnull @Override public ListenableFuture onNodeCreated(@Nonnull final NodeId nodeId, @@ -264,7 +269,9 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ @Nonnull final Node configNode) { // first disconnect this node topologyDispatcher.unregisterMountPoint(nodeId); - registration.close(); + if (registration != null) { + registration.close(); + } topologyDispatcher.disconnectNode(nodeId); // now reinit this connection with new settings @@ -273,7 +280,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ Futures.addCallback(connectionFuture, new FutureCallback() { @Override public void onSuccess(@Nullable NetconfDeviceCapabilities result) { - registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this); + registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager); } @Override @@ -316,7 +323,9 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ @Nonnull @Override public ListenableFuture onNodeDeleted(@Nonnull final NodeId nodeId) { // cleanup and disconnect topologyDispatcher.unregisterMountPoint(nodeId); - registration.close(); + if (registration != null) { + registration.close(); + } roleChangeStrategy.unregisterRoleCandidate(); return topologyDispatcher.disconnectNode(nodeId); } @@ -330,7 +339,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ @Override public void onRoleChanged(final RoleChangeDTO roleChangeDTO) { - topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId()); + topologyDispatcher.unregisterMountPoint(new NodeId(nodeId)); isMaster = roleChangeDTO.isOwner(); if (isMaster) { @@ -398,9 +407,9 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ connected = false; if (isMaster) { // announce that master mount point is going down - for (final Member member : clusterExtension.state().getMembers()) { - actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null); - } +// for (final Member member : clusterExtension.state().getMembers()) { +// actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null); +// } // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters isMaster = false; // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyManagerCallback.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyManagerCallback.java index 739aa66eb2..3a9c8db732 100644 --- a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyManagerCallback.java +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyManagerCallback.java @@ -127,6 +127,9 @@ public class NetconfTopologyManagerCallback implements TopologyManagerCallback { @Nonnull @Override public ListenableFuture getCurrentStatusForNode(@Nonnull NodeId nodeId) { + if (!nodes.containsKey(nodeId)) { + nodes.put(nodeId, createNodeManager(nodeId)); + } return nodes.get(nodeId).getCurrentStatusForNode(nodeId); } diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/TopologyMountPointFacade.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/TopologyMountPointFacade.java index ec294b268f..33ebb89f55 100644 --- a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/TopologyMountPointFacade.java +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/TopologyMountPointFacade.java @@ -28,6 +28,7 @@ import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPrefe import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint; +import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,6 +152,12 @@ public class TopologyMountPointFacade implements AutoCloseable, RemoteDeviceHand salProvider.getMountInstance().onTopologyDeviceDisconnected(); if (deviceDataBroker != null) { LOG.warn("Stopping master data broker for device {}", id.getName()); + for (final Member member : Cluster.get(actorSystem).state().getMembers()) { + if (member.address().equals(Cluster.get(actorSystem).selfAddress())) { + continue; + } + actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + id.getName()).tell(new AnnounceMasterMountPointDown(), null); + } TypedActor.get(actorSystem).stop(deviceDataBroker); deviceDataBroker = null; } -- 2.36.6