X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2FNetconfNodeManager.java;h=95c86efcbd3e43abf993450b65029af74ee0b0e1;hb=8e59d67f1b7580c2135cbcc229d4c377c8cc1b09;hp=bb623a92da180ae746c0ec56cb6968fb7cb9d926;hpb=4672d8989262238165119849a5874643276e1160;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java index bb623a92da..95c86efcbd 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java @@ -5,26 +5,31 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.topology.singleton.impl; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.PoisonPill; +import akka.dispatch.OnComplete; +import akka.pattern.AskTimeoutException; +import akka.pattern.Patterns; import akka.util.Timeout; import java.util.Collection; import javax.annotation.Nonnull; -import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener; -import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; -import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; -import org.opendaylight.controller.md.sal.binding.api.DataTreeModification; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener; +import org.opendaylight.mdsal.binding.api.DataObjectModification; +import org.opendaylight.mdsal.binding.api.DataTreeIdentifier; +import org.opendaylight.mdsal.binding.api.DataTreeModification; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService; import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils; import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint; +import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor; import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus; @@ -32,10 +37,10 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; -import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; /** * Managing and reacting on data tree changes in specific netconf node when master writes status to the operational @@ -48,22 +53,25 @@ class NetconfNodeManager private final Timeout actorResponseWaitTime; private final DOMMountPointService mountPointService; - private final SchemaSourceRegistry schemaRegistry; - private final SchemaRepository schemaRepository; - private NetconfTopologySetup setup; - private ListenerRegistration dataChangeListenerRegistration; - private RemoteDeviceId id; + private volatile NetconfTopologySetup setup; + private volatile ListenerRegistration dataChangeListenerRegistration; + private volatile RemoteDeviceId id; + + @GuardedBy("this") private ActorRef slaveActorRef; + @GuardedBy("this") + private boolean closed; + + @GuardedBy("this") + private int lastUpdateCount; NetconfNodeManager(final NetconfTopologySetup setup, final RemoteDeviceId id, final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) { this.setup = setup; this.id = id; - this.schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry(); - this.schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository(); this.actorResponseWaitTime = actorResponseWaitTime; this.mountPointService = mountPointService; } @@ -75,22 +83,23 @@ class NetconfNodeManager final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier()); switch (rootNode.getModificationType()) { case SUBTREE_MODIFIED: - LOG.debug("{}: Operational for node {} updated. Trying to register slave mount point", id, nodeId); + LOG.debug("{}: Operational state for node {} - subtree modified from {} to {}", + id, nodeId, rootNode.getDataBefore(), rootNode.getDataAfter()); handleSlaveMountPoint(rootNode); break; case WRITE: if (rootNode.getDataBefore() != null) { - LOG.debug("{}: Operational for node {} rewrited. Trying to register slave mount point", - id, nodeId); + LOG.debug("{}: Operational state for node {} updated from {} to {}", + id, nodeId, rootNode.getDataBefore(), rootNode.getDataAfter()); } else { - LOG.debug("{}: Operational for node {} created. Trying to register slave mount point", - id, nodeId); + LOG.debug("{}: Operational state for node {} created: {}", + id, nodeId, rootNode.getDataAfter()); } handleSlaveMountPoint(rootNode); break; case DELETE: - LOG.debug("{}: Operational for node {} deleted. Trying to remove slave mount point", id, nodeId); - closeActor(); + LOG.debug("{}: Operational state for node {} deleted.", id, nodeId); + unregisterSlaveMountpoint(); break; default: LOG.debug("{}: Uknown operation for node: {}", id, nodeId); @@ -99,50 +108,110 @@ class NetconfNodeManager } @Override - public void close() { - closeActor(); + public synchronized void close() { + if (closed) { + return; + } + closed = true; + closeActor(); if (dataChangeListenerRegistration != null) { dataChangeListenerRegistration.close(); dataChangeListenerRegistration = null; } } + @GuardedBy("this") private void closeActor() { if (slaveActorRef != null) { - slaveActorRef.tell(new UnregisterSlaveMountPoint(), ActorRef.noSender()); + LOG.debug("{}: Sending poison pill to {}", id, slaveActorRef); slaveActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); slaveActorRef = null; } } + private synchronized void unregisterSlaveMountpoint() { + lastUpdateCount++; + if (slaveActorRef != null) { + LOG.debug("{}: Sending message to unregister slave mountpoint to {}", id, slaveActorRef); + slaveActorRef.tell(new UnregisterSlaveMountPoint(), ActorRef.noSender()); + } + } + void registerDataTreeChangeListener(final String topologyId, final NodeKey key) { - LOG.debug("{}: Registering data tree change listener on node {}", id, key); + final InstanceIdentifier path = NetconfTopologyUtils.createTopologyNodeListPath(key, topologyId); + LOG.debug("{}: Registering data tree change listener on path {}", id, path); dataChangeListenerRegistration = setup.getDataBroker().registerDataTreeChangeListener( - new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, - NetconfTopologyUtils.createTopologyNodeListPath(key, topologyId)), this); + DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, path), this); } - private void handleSlaveMountPoint(final DataObjectModification rootNode) { + private synchronized void handleSlaveMountPoint(final DataObjectModification rootNode) { + if (closed) { + return; + } + @SuppressWarnings("ConstantConditions") - final NetconfNode netconfNodeAfter = rootNode.getDataAfter().getAugmentation(NetconfNode.class); + final NetconfNode netconfNodeAfter = rootNode.getDataAfter().augmentation(NetconfNode.class); if (NetconfNodeConnectionStatus.ConnectionStatus.Connected.equals(netconfNodeAfter.getConnectionStatus())) { - createActorRef(); + lastUpdateCount++; + createOrUpdateActorRef(); + final String masterAddress = netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode(); - final String path = NetconfTopologyUtils.createActorPath(masterAddress, + final String masterActorPath = NetconfTopologyUtils.createActorPath(masterAddress, NetconfTopologyUtils.createMasterActorName(id.getName(), netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode())); - setup.getActorSystem().actorSelection(path).tell(new AskForMasterMountPoint(), slaveActorRef); + + final AskForMasterMountPoint askForMasterMountPoint = new AskForMasterMountPoint(slaveActorRef); + final ActorSelection masterActor = setup.getActorSystem().actorSelection(masterActorPath); + + LOG.debug("{}: Sending {} message to master {}", id, askForMasterMountPoint, masterActor); + + sendAskForMasterMountPointWithRetries(askForMasterMountPoint, masterActor, 1, lastUpdateCount); } else { - closeActor(); + unregisterSlaveMountpoint(); } } - private void createActorRef() { + @GuardedBy("this") + private void sendAskForMasterMountPointWithRetries(final AskForMasterMountPoint askForMasterMountPoint, + final ActorSelection masterActor, final int tries, final int updateCount) { + final Future future = Patterns.ask(masterActor, askForMasterMountPoint, actorResponseWaitTime); + future.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object response) { + synchronized (this) { + // Ignore the response if we were since closed or another notification update occurred. + if (closed || updateCount != lastUpdateCount) { + return; + } + + if (failure instanceof AskTimeoutException) { + if (tries <= 5 || tries % 10 == 0) { + LOG.warn("{}: Failed to send message to {} - retrying...", id, masterActor, failure); + } + sendAskForMasterMountPointWithRetries(askForMasterMountPoint, masterActor, tries + 1, + updateCount); + } else if (failure != null) { + LOG.error("{}: Failed to send message {} to {}. Slave mount point could not be created", + id, askForMasterMountPoint, masterActor, failure); + } else { + LOG.debug("{}: {} message to {} succeeded", id, askForMasterMountPoint, masterActor); + } + } + } + }, setup.getActorSystem().dispatcher()); + } + + @GuardedBy("this") + private void createOrUpdateActorRef() { if (slaveActorRef == null) { - slaveActorRef = setup.getActorSystem().actorOf(NetconfNodeActor.props(setup, id, schemaRegistry, - schemaRepository, actorResponseWaitTime, mountPointService), id.getName()); + slaveActorRef = setup.getActorSystem().actorOf(NetconfNodeActor.props(setup, id, actorResponseWaitTime, + mountPointService)); + LOG.debug("{}: Slave actor created with name {}", id, slaveActorRef); + } else { + slaveActorRef + .tell(new RefreshSlaveActor(setup, id, actorResponseWaitTime), ActorRef.noSender()); } }