X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2FNetconfNodeManager.java;h=95c86efcbd3e43abf993450b65029af74ee0b0e1;hb=8e59d67f1b7580c2135cbcc229d4c377c8cc1b09;hp=7cc3d29b86bacd555d37c669a4f1fba41fa52865;hpb=e7149587aacbb9a2afae4f89db6976574588ac47;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java index 7cc3d29b86..95c86efcbd 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java @@ -5,24 +5,31 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.topology.singleton.impl; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.PoisonPill; +import akka.dispatch.OnComplete; +import akka.pattern.AskTimeoutException; +import akka.pattern.Patterns; +import akka.util.Timeout; import java.util.Collection; import javax.annotation.Nonnull; -import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener; -import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; -import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; -import org.opendaylight.controller.md.sal.binding.api.DataTreeModification; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener; +import org.opendaylight.mdsal.binding.api.DataObjectModification; +import org.opendaylight.mdsal.binding.api.DataTreeIdentifier; +import org.opendaylight.mdsal.binding.api.DataTreeModification; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService; import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils; import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint; +import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor; import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus; @@ -30,10 +37,10 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; -import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; /** * Managing and reacting on data tree changes in specific netconf node when master writes status to the operational @@ -44,20 +51,29 @@ class NetconfNodeManager private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManager.class); - private NetconfTopologySetup setup; - private ListenerRegistration dataChangeListenerRegistration; - private RemoteDeviceId id; - private final SchemaSourceRegistry schemaRegistry; - private final SchemaRepository schemaRepository; + private final Timeout actorResponseWaitTime; + private final DOMMountPointService mountPointService; + + private volatile NetconfTopologySetup setup; + private volatile ListenerRegistration dataChangeListenerRegistration; + private volatile RemoteDeviceId id; + + @GuardedBy("this") private ActorRef slaveActorRef; + @GuardedBy("this") + private boolean closed; + + @GuardedBy("this") + private int lastUpdateCount; + NetconfNodeManager(final NetconfTopologySetup setup, - final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry, - final SchemaRepository schemaRepository) { + final RemoteDeviceId id, final Timeout actorResponseWaitTime, + final DOMMountPointService mountPointService) { this.setup = setup; this.id = id; - this.schemaRegistry = schemaRegistry; - this.schemaRepository = schemaRepository; + this.actorResponseWaitTime = actorResponseWaitTime; + this.mountPointService = mountPointService; } @Override @@ -67,20 +83,23 @@ class NetconfNodeManager final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier()); switch (rootNode.getModificationType()) { case SUBTREE_MODIFIED: - LOG.debug("{}: Operational for node {} updated. Trying to register slave mount point", id, nodeId); + LOG.debug("{}: Operational state for node {} - subtree modified from {} to {}", + id, nodeId, rootNode.getDataBefore(), rootNode.getDataAfter()); handleSlaveMountPoint(rootNode); break; case WRITE: if (rootNode.getDataBefore() != null) { - LOG.debug("{}: Operational for node {} rewrited. Trying to register slave mount point", id, nodeId); + LOG.debug("{}: Operational state for node {} updated from {} to {}", + id, nodeId, rootNode.getDataBefore(), rootNode.getDataAfter()); } else { - LOG.debug("{}: Operational for node {} created. Trying to register slave mount point", id, nodeId); + LOG.debug("{}: Operational state for node {} created: {}", + id, nodeId, rootNode.getDataAfter()); } handleSlaveMountPoint(rootNode); break; case DELETE: - LOG.debug("{}: Operational for node {} deleted. Trying to remove slave mount point", id, nodeId); - closeActor(); + LOG.debug("{}: Operational state for node {} deleted.", id, nodeId); + unregisterSlaveMountpoint(); break; default: LOG.debug("{}: Uknown operation for node: {}", id, nodeId); @@ -89,50 +108,110 @@ class NetconfNodeManager } @Override - public void close() { - closeActor(); + public synchronized void close() { + if (closed) { + return; + } + closed = true; + closeActor(); if (dataChangeListenerRegistration != null) { dataChangeListenerRegistration.close(); dataChangeListenerRegistration = null; } } + @GuardedBy("this") private void closeActor() { if (slaveActorRef != null) { - slaveActorRef.tell(new UnregisterSlaveMountPoint(), ActorRef.noSender()); + LOG.debug("{}: Sending poison pill to {}", id, slaveActorRef); slaveActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); slaveActorRef = null; } } + private synchronized void unregisterSlaveMountpoint() { + lastUpdateCount++; + if (slaveActorRef != null) { + LOG.debug("{}: Sending message to unregister slave mountpoint to {}", id, slaveActorRef); + slaveActorRef.tell(new UnregisterSlaveMountPoint(), ActorRef.noSender()); + } + } + void registerDataTreeChangeListener(final String topologyId, final NodeKey key) { - LOG.debug("{}: Registering data tree change listener on node {}", id, key); + final InstanceIdentifier path = NetconfTopologyUtils.createTopologyNodeListPath(key, topologyId); + LOG.debug("{}: Registering data tree change listener on path {}", id, path); dataChangeListenerRegistration = setup.getDataBroker().registerDataTreeChangeListener( - new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, - NetconfTopologyUtils.createTopologyNodeListPath(key, topologyId)), this); + DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, path), this); } - private void handleSlaveMountPoint(final DataObjectModification rootNode) { + private synchronized void handleSlaveMountPoint(final DataObjectModification rootNode) { + if (closed) { + return; + } + @SuppressWarnings("ConstantConditions") - final NetconfNode netconfNodeAfter = rootNode.getDataAfter().getAugmentation(NetconfNode.class); + final NetconfNode netconfNodeAfter = rootNode.getDataAfter().augmentation(NetconfNode.class); if (NetconfNodeConnectionStatus.ConnectionStatus.Connected.equals(netconfNodeAfter.getConnectionStatus())) { - createActorRef(); + lastUpdateCount++; + createOrUpdateActorRef(); + final String masterAddress = netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode(); - final String path = NetconfTopologyUtils.createActorPath(masterAddress, + final String masterActorPath = NetconfTopologyUtils.createActorPath(masterAddress, NetconfTopologyUtils.createMasterActorName(id.getName(), netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode())); - setup.getActorSystem().actorSelection(path).tell(new AskForMasterMountPoint(), slaveActorRef); - } else { ; - closeActor(); + + final AskForMasterMountPoint askForMasterMountPoint = new AskForMasterMountPoint(slaveActorRef); + final ActorSelection masterActor = setup.getActorSystem().actorSelection(masterActorPath); + + LOG.debug("{}: Sending {} message to master {}", id, askForMasterMountPoint, masterActor); + + sendAskForMasterMountPointWithRetries(askForMasterMountPoint, masterActor, 1, lastUpdateCount); + } else { + unregisterSlaveMountpoint(); } } - private void createActorRef() { + @GuardedBy("this") + private void sendAskForMasterMountPointWithRetries(final AskForMasterMountPoint askForMasterMountPoint, + final ActorSelection masterActor, final int tries, final int updateCount) { + final Future future = Patterns.ask(masterActor, askForMasterMountPoint, actorResponseWaitTime); + future.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object response) { + synchronized (this) { + // Ignore the response if we were since closed or another notification update occurred. + if (closed || updateCount != lastUpdateCount) { + return; + } + + if (failure instanceof AskTimeoutException) { + if (tries <= 5 || tries % 10 == 0) { + LOG.warn("{}: Failed to send message to {} - retrying...", id, masterActor, failure); + } + sendAskForMasterMountPointWithRetries(askForMasterMountPoint, masterActor, tries + 1, + updateCount); + } else if (failure != null) { + LOG.error("{}: Failed to send message {} to {}. Slave mount point could not be created", + id, askForMasterMountPoint, masterActor, failure); + } else { + LOG.debug("{}: {} message to {} succeeded", id, askForMasterMountPoint, masterActor); + } + } + } + }, setup.getActorSystem().dispatcher()); + } + + @GuardedBy("this") + private void createOrUpdateActorRef() { if (slaveActorRef == null) { - slaveActorRef = setup.getActorSystem().actorOf(NetconfNodeActor.props(setup, id, schemaRegistry, - schemaRepository), id.getName()); + slaveActorRef = setup.getActorSystem().actorOf(NetconfNodeActor.props(setup, id, actorResponseWaitTime, + mountPointService)); + LOG.debug("{}: Slave actor created with name {}", id, slaveActorRef); + } else { + slaveActorRef + .tell(new RefreshSlaveActor(setup, id, actorResponseWaitTime), ActorRef.noSender()); } }