X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2FNetconfNodeManager.java;h=1fdce0c0b85a2bece422b51303757c8e74f87b43;hb=4685af5ad0d723e1aaaacba29b577d37a59a1043;hp=e78e9ee9c7ba55984e2b9fbf962861c8a2418aab;hpb=ddfdd7b56162e9c45639388b10407d45f5f0ba13;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java index e78e9ee9c7..1fdce0c0b8 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java @@ -9,10 +9,15 @@ package org.opendaylight.netconf.topology.singleton.impl; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.PoisonPill; +import akka.dispatch.OnComplete; +import akka.pattern.AskTimeoutException; +import akka.pattern.Patterns; import akka.util.Timeout; import java.util.Collection; import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener; import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; @@ -33,10 +38,12 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; /** * Managing and reacting on data tree changes in specific netconf node when master writes status to the operational @@ -52,11 +59,18 @@ class NetconfNodeManager private final SchemaSourceRegistry schemaRegistry; private final SchemaRepository schemaRepository; - private NetconfTopologySetup setup; - private ListenerRegistration dataChangeListenerRegistration; - private RemoteDeviceId id; + private volatile NetconfTopologySetup setup; + private volatile ListenerRegistration dataChangeListenerRegistration; + private volatile RemoteDeviceId id; + + @GuardedBy("this") private ActorRef slaveActorRef; + @GuardedBy("this") + private boolean closed; + + @GuardedBy("this") + private int lastUpdateCount; NetconfNodeManager(final NetconfTopologySetup setup, final RemoteDeviceId id, final Timeout actorResponseWaitTime, @@ -76,21 +90,22 @@ class NetconfNodeManager final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier()); switch (rootNode.getModificationType()) { case SUBTREE_MODIFIED: - LOG.debug("{}: Operational for node {} updated. Trying to register slave mount point", id, nodeId); + LOG.debug("{}: Operational state for node {} - subtree modified from {} to {}", + id, nodeId, rootNode.getDataBefore(), rootNode.getDataAfter()); handleSlaveMountPoint(rootNode); break; case WRITE: if (rootNode.getDataBefore() != null) { - LOG.debug("{}: Operational for node {} rewrited. Trying to register slave mount point", - id, nodeId); + LOG.debug("{}: Operational state for node {} updated from {} to {}", + id, nodeId, rootNode.getDataBefore(), rootNode.getDataAfter()); } else { - LOG.debug("{}: Operational for node {} created. Trying to register slave mount point", - id, nodeId); + LOG.debug("{}: Operational state for node {} created: {}", + id, nodeId, rootNode.getDataAfter()); } handleSlaveMountPoint(rootNode); break; case DELETE: - LOG.debug("{}: Operational for node {} deleted.", id, nodeId); + LOG.debug("{}: Operational state for node {} deleted.", id, nodeId); unregisterSlaveMountpoint(); break; default: @@ -100,8 +115,12 @@ class NetconfNodeManager } @Override - public void close() { - unregisterSlaveMountpoint(); + public synchronized void close() { + if (closed) { + return; + } + + closed = true; closeActor(); if (dataChangeListenerRegistration != null) { dataChangeListenerRegistration.close(); @@ -109,6 +128,7 @@ class NetconfNodeManager } } + @GuardedBy("this") private void closeActor() { if (slaveActorRef != null) { LOG.debug("{}: Sending poison pill to {}", id, slaveActorRef); @@ -117,7 +137,8 @@ class NetconfNodeManager } } - private void unregisterSlaveMountpoint() { + private synchronized void unregisterSlaveMountpoint() { + lastUpdateCount++; if (slaveActorRef != null) { LOG.debug("{}: Sending message to unregister slave mountpoint to {}", id, slaveActorRef); slaveActorRef.tell(new UnregisterSlaveMountPoint(), ActorRef.noSender()); @@ -125,28 +146,71 @@ class NetconfNodeManager } void registerDataTreeChangeListener(final String topologyId, final NodeKey key) { - LOG.debug("{}: Registering data tree change listener on node {}", id, key); + final InstanceIdentifier path = NetconfTopologyUtils.createTopologyNodeListPath(key, topologyId); + LOG.debug("{}: Registering data tree change listener on path {}", id, path); dataChangeListenerRegistration = setup.getDataBroker().registerDataTreeChangeListener( - new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, - NetconfTopologyUtils.createTopologyNodeListPath(key, topologyId)), this); + new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, path), this); } - private void handleSlaveMountPoint(final DataObjectModification rootNode) { + private synchronized void handleSlaveMountPoint(final DataObjectModification rootNode) { + if (closed) { + return; + } + @SuppressWarnings("ConstantConditions") - final NetconfNode netconfNodeAfter = rootNode.getDataAfter().getAugmentation(NetconfNode.class); + final NetconfNode netconfNodeAfter = rootNode.getDataAfter().augmentation(NetconfNode.class); if (NetconfNodeConnectionStatus.ConnectionStatus.Connected.equals(netconfNodeAfter.getConnectionStatus())) { + lastUpdateCount++; createOrUpdateActorRef(); + final String masterAddress = netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode(); - final String path = NetconfTopologyUtils.createActorPath(masterAddress, + final String masterActorPath = NetconfTopologyUtils.createActorPath(masterAddress, NetconfTopologyUtils.createMasterActorName(id.getName(), netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode())); - setup.getActorSystem().actorSelection(path).tell(new AskForMasterMountPoint(), slaveActorRef); + + final AskForMasterMountPoint askForMasterMountPoint = new AskForMasterMountPoint(slaveActorRef); + final ActorSelection masterActor = setup.getActorSystem().actorSelection(masterActorPath); + + LOG.debug("{}: Sending {} message to master {}", id, askForMasterMountPoint, masterActor); + + sendAskForMasterMountPointWithRetries(askForMasterMountPoint, masterActor, 1, lastUpdateCount); } else { unregisterSlaveMountpoint(); } } + @GuardedBy("this") + private void sendAskForMasterMountPointWithRetries(final AskForMasterMountPoint askForMasterMountPoint, + final ActorSelection masterActor, final int tries, final int updateCount) { + final Future future = Patterns.ask(masterActor, askForMasterMountPoint, actorResponseWaitTime); + future.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object response) { + synchronized (this) { + // Ignore the response if we were since closed or another notification update occurred. + if (closed || updateCount != lastUpdateCount) { + return; + } + + if (failure instanceof AskTimeoutException) { + if (tries <= 5 || tries % 10 == 0) { + LOG.warn("{}: Failed to send message to {} - retrying...", id, masterActor, failure); + } + sendAskForMasterMountPointWithRetries(askForMasterMountPoint, masterActor, tries + 1, + updateCount); + } else if (failure != null) { + LOG.error("{}: Failed to send message {} to {}. Slave mount point could not be created", + id, askForMasterMountPoint, masterActor, failure); + } else { + LOG.debug("{}: {} message to {} succeeded", id, askForMasterMountPoint, masterActor); + } + } + } + }, setup.getActorSystem().dispatcher()); + } + + @GuardedBy("this") private void createOrUpdateActorRef() { if (slaveActorRef == null) { slaveActorRef = setup.getActorSystem().actorOf(NetconfNodeActor.props(setup, id, schemaRegistry,