From d682d29c57e64070078813dd1e70883a4ba258ee Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 9 May 2018 21:38:01 -0400 Subject: [PATCH] Slave mount point registration hardening/resiliency - It's possible for a message to the master to timeout in which case the slave mount point doesn't get registered so add retries for the AskForMasterMountPoint message in NetconfNodeManager and schema resolution in NetconfNodeActor. The AskForMasterMountPoint and RegisterMountPoint messages were changed to pass the slave/master ActorRefs in order to send a success/failure reply to the original caller. - In NetconfNodeActor, preserve actor safety by executing ListenableFuture callbacks via the actor's dispatch queue (using executeInSelf) if the callback accesses actor state. - Hardened synchronization in NetconfNodeManager and NetconfNodeManager. - Added UTs for NetconfNodeManager to cover the slave mount point registration scenarios. - Added more logging where appropriate Change-Id: I31c2d6584d56cb87c1b266565bc4b6f4a48ff303 Signed-off-by: Tom Pantelis --- .../singleton/impl/NetconfNodeManager.java | 93 +++- .../impl/NetconfTopologyContext.java | 12 +- .../singleton/impl/SlaveSalFacade.java | 23 +- .../impl/actors/NetconfNodeActor.java | 132 ++++-- .../messages/AskForMasterMountPoint.java | 16 + .../messages/NotMasterException.java | 23 + .../messages/RegisterMountPoint.java | 14 +- .../messages/YangTextSchemaSourceRequest.java | 5 + .../singleton/impl/NetconfNodeActorTest.java | 14 +- .../impl/NetconfNodeManagerTest.java | 410 ++++++++++++++++++ .../impl/NetconfTopologyManagerTest.java | 2 +- .../src/test/resources/application.conf | 41 ++ .../test/resources/simplelogger.properties | 6 + 13 files changed, 703 insertions(+), 88 deletions(-) create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/NotMasterException.java create mode 100644 netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManagerTest.java create mode 100644 netconf/netconf-topology-singleton/src/test/resources/application.conf create mode 100644 netconf/netconf-topology-singleton/src/test/resources/simplelogger.properties diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java index e78e9ee9c7..529a538fe7 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java @@ -9,10 +9,15 @@ package org.opendaylight.netconf.topology.singleton.impl; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.PoisonPill; +import akka.dispatch.OnComplete; +import akka.pattern.AskTimeoutException; +import akka.pattern.Patterns; import akka.util.Timeout; import java.util.Collection; import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener; import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; @@ -37,6 +42,7 @@ import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; /** * Managing and reacting on data tree changes in specific netconf node when master writes status to the operational @@ -52,11 +58,18 @@ class NetconfNodeManager private final SchemaSourceRegistry schemaRegistry; private final SchemaRepository schemaRepository; - private NetconfTopologySetup setup; - private ListenerRegistration dataChangeListenerRegistration; - private RemoteDeviceId id; + private volatile NetconfTopologySetup setup; + private volatile ListenerRegistration dataChangeListenerRegistration; + private volatile RemoteDeviceId id; + + @GuardedBy("this") private ActorRef slaveActorRef; + @GuardedBy("this") + private boolean closed; + + @GuardedBy("this") + private int lastUpdateCount; NetconfNodeManager(final NetconfTopologySetup setup, final RemoteDeviceId id, final Timeout actorResponseWaitTime, @@ -76,21 +89,22 @@ class NetconfNodeManager final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier()); switch (rootNode.getModificationType()) { case SUBTREE_MODIFIED: - LOG.debug("{}: Operational for node {} updated. Trying to register slave mount point", id, nodeId); + LOG.debug("{}: Operational state for node {} - subtree modified from {} to {}", + id, nodeId, rootNode.getDataBefore(), rootNode.getDataAfter()); handleSlaveMountPoint(rootNode); break; case WRITE: if (rootNode.getDataBefore() != null) { - LOG.debug("{}: Operational for node {} rewrited. Trying to register slave mount point", - id, nodeId); + LOG.debug("{}: Operational state for node {} updated from {} to {}", + id, nodeId, rootNode.getDataBefore(), rootNode.getDataAfter()); } else { - LOG.debug("{}: Operational for node {} created. Trying to register slave mount point", - id, nodeId); + LOG.debug("{}: Operational state for node {} created: {}", + id, nodeId, rootNode.getDataAfter()); } handleSlaveMountPoint(rootNode); break; case DELETE: - LOG.debug("{}: Operational for node {} deleted.", id, nodeId); + LOG.debug("{}: Operational state for node {} deleted.", id, nodeId); unregisterSlaveMountpoint(); break; default: @@ -100,8 +114,12 @@ class NetconfNodeManager } @Override - public void close() { - unregisterSlaveMountpoint(); + public synchronized void close() { + if (closed) { + return; + } + + closed = true; closeActor(); if (dataChangeListenerRegistration != null) { dataChangeListenerRegistration.close(); @@ -109,6 +127,7 @@ class NetconfNodeManager } } + @GuardedBy("this") private void closeActor() { if (slaveActorRef != null) { LOG.debug("{}: Sending poison pill to {}", id, slaveActorRef); @@ -117,7 +136,8 @@ class NetconfNodeManager } } - private void unregisterSlaveMountpoint() { + private synchronized void unregisterSlaveMountpoint() { + lastUpdateCount++; if (slaveActorRef != null) { LOG.debug("{}: Sending message to unregister slave mountpoint to {}", id, slaveActorRef); slaveActorRef.tell(new UnregisterSlaveMountPoint(), ActorRef.noSender()); @@ -131,22 +151,65 @@ class NetconfNodeManager NetconfTopologyUtils.createTopologyNodeListPath(key, topologyId)), this); } - private void handleSlaveMountPoint(final DataObjectModification rootNode) { + private synchronized void handleSlaveMountPoint(final DataObjectModification rootNode) { + if (closed) { + return; + } + @SuppressWarnings("ConstantConditions") final NetconfNode netconfNodeAfter = rootNode.getDataAfter().getAugmentation(NetconfNode.class); if (NetconfNodeConnectionStatus.ConnectionStatus.Connected.equals(netconfNodeAfter.getConnectionStatus())) { + lastUpdateCount++; createOrUpdateActorRef(); + final String masterAddress = netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode(); - final String path = NetconfTopologyUtils.createActorPath(masterAddress, + final String masterActorPath = NetconfTopologyUtils.createActorPath(masterAddress, NetconfTopologyUtils.createMasterActorName(id.getName(), netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode())); - setup.getActorSystem().actorSelection(path).tell(new AskForMasterMountPoint(), slaveActorRef); + + final AskForMasterMountPoint askForMasterMountPoint = new AskForMasterMountPoint(slaveActorRef); + final ActorSelection masterActor = setup.getActorSystem().actorSelection(masterActorPath); + + LOG.debug("{}: Sending {} message to master {}", id, askForMasterMountPoint, masterActor); + + sendAskForMasterMountPointWithRetries(askForMasterMountPoint, masterActor, 1, lastUpdateCount); } else { unregisterSlaveMountpoint(); } } + @GuardedBy("this") + private void sendAskForMasterMountPointWithRetries(final AskForMasterMountPoint askForMasterMountPoint, + final ActorSelection masterActor, final int tries, final int updateCount) { + final Future future = Patterns.ask(masterActor, askForMasterMountPoint, actorResponseWaitTime); + future.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object response) { + synchronized (this) { + // Ignore the response if we were since closed or another notification update occurred. + if (closed || updateCount != lastUpdateCount) { + return; + } + + if (failure instanceof AskTimeoutException) { + if (tries <= 5 || tries % 10 == 0) { + LOG.warn("{}: Failed to send message to {} - retrying...", id, masterActor, failure); + } + sendAskForMasterMountPointWithRetries(askForMasterMountPoint, masterActor, tries + 1, + updateCount); + } else if (failure != null) { + LOG.error("{}: Failed to send message {} to {}. Slave mount point could not be created", + id, askForMasterMountPoint, masterActor, failure); + } else { + LOG.debug("{}: {} message to {} succeeded", id, askForMasterMountPoint, masterActor); + } + } + } + }, setup.getActorSystem().dispatcher()); + } + + @GuardedBy("this") private void createOrUpdateActorRef() { if (slaveActorRef == null) { slaveActorRef = setup.getActorSystem().actorOf(NetconfNodeActor.props(setup, id, schemaRegistry, diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyContext.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyContext.java index 0ac355d6dc..11cd117b1e 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyContext.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyContext.java @@ -47,9 +47,9 @@ class NetconfTopologyContext implements ClusterSingletonService { private RemoteDeviceConnector remoteDeviceConnector; private NetconfNodeManager netconfNodeManager; private ActorRef masterActorRef; - private boolean finalClose = false; + private final AtomicBoolean finalClose = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false); - private boolean isMaster; + private volatile boolean isMaster; NetconfTopologyContext(final NetconfTopologySetup netconfTopologyDeviceSetup, final ServiceGroupIdentifier serviceGroupIdent, @@ -80,7 +80,7 @@ class NetconfTopologyContext implements ClusterSingletonService { netconfNodeManager = null; } - if (!finalClose) { + if (!finalClose.get()) { final String masterAddress = Cluster.get(netconfTopologyDeviceSetup.getActorSystem()).selfAddress().toString(); masterActorRef = netconfTopologyDeviceSetup.getActorSystem().actorOf(NetconfNodeActor.props( @@ -97,7 +97,7 @@ class NetconfTopologyContext implements ClusterSingletonService { @Override public ListenableFuture closeServiceInstance() { - if (!finalClose) { + if (!finalClose.get()) { // in case that master changes role to slave, new NodeDeviceManager must be created and listener registered netconfNodeManager = createNodeDeviceManager(); } @@ -121,7 +121,9 @@ class NetconfTopologyContext implements ClusterSingletonService { } void closeFinal() throws Exception { - finalClose = true; + if (!finalClose.compareAndSet(false, true)) { + return; + } if (netconfNodeManager != null) { netconfNodeManager.close(); diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java index 0abbdaa0ba..795ba78c75 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java @@ -11,6 +11,7 @@ package org.opendaylight.netconf.topology.singleton.impl; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.util.Timeout; +import java.util.concurrent.atomic.AtomicBoolean; import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService; @@ -28,6 +29,7 @@ public class SlaveSalFacade { private final NetconfDeviceSalProvider salProvider; private final ActorSystem actorSystem; private final Timeout actorResponseWaitTime; + private final AtomicBoolean registered = new AtomicBoolean(false); public SlaveSalFacade(final RemoteDeviceId id, final ActorSystem actorSystem, @@ -41,6 +43,10 @@ public class SlaveSalFacade { public void registerSlaveMountPoint(final SchemaContext remoteSchemaContext, final DOMRpcService deviceRpc, final ActorRef masterActorRef) { + if (!registered.compareAndSet(false, true)) { + return; + } + final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService(); final ProxyDOMDataBroker netconfDeviceDataBroker = @@ -52,20 +58,13 @@ public class SlaveSalFacade { LOG.info("{}: Slave mount point registered.", id); } - public void unregisterSlaveMountPoint() { - salProvider.getMountInstance().onTopologyDeviceDisconnected(); - } - - @SuppressWarnings("checkstyle:IllegalCatch") public void close() { - unregisterSlaveMountPoint(); - try { - salProvider.getMountInstance().close(); - } catch (final Exception exception) { - LOG.warn("{}: Exception in closing slave sal facade: {}", id, exception); + if (!registered.compareAndSet(true, false)) { + return; } - } - + salProvider.getMountInstance().onTopologyDeviceDisconnected(); + LOG.info("{}: Slave mount point unregistered.", id); + } } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java index 3a3fad9ce4..6c297261d8 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java @@ -10,8 +10,11 @@ package org.opendaylight.netconf.topology.singleton.impl.actors; import akka.actor.ActorRef; import akka.actor.Props; -import akka.actor.UntypedActor; +import akka.actor.Status.Failure; +import akka.actor.Status.Success; +import akka.pattern.AskTimeoutException; import akka.util.Timeout; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -23,6 +26,7 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider; import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider; import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy; @@ -43,6 +47,7 @@ import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoi import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.NotMasterException; import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor; import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint; @@ -68,13 +73,9 @@ import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; -public final class NetconfNodeActor extends UntypedActor { - - private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class); +public class NetconfNodeActor extends AbstractUntypedActor { private final Duration writeTxIdleTimeout; private final DOMMountPointService mountPointService; @@ -101,10 +102,10 @@ public final class NetconfNodeActor extends UntypedActor { mountPointService)); } - private NetconfNodeActor(final NetconfTopologySetup setup, - final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry, - final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime, - final DOMMountPointService mountPointService) { + protected NetconfNodeActor(final NetconfTopologySetup setup, + final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry, + final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime, + final DOMMountPointService mountPointService) { this.setup = setup; this.id = id; this.schemaRegistry = schemaRegistry; @@ -116,7 +117,9 @@ public final class NetconfNodeActor extends UntypedActor { @SuppressWarnings("checkstyle:IllegalCatch") @Override - public void onReceive(final Object message) throws Exception { + public void handleReceive(final Object message) throws Exception { + LOG.debug("{}: received message {}", id, message); + if (message instanceof CreateInitialMasterActorData) { // master final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message; @@ -135,9 +138,16 @@ public final class NetconfNodeActor extends UntypedActor { id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId(); sender().tell(new MasterActorDataInitialized(), self()); } else if (message instanceof AskForMasterMountPoint) { // master + AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint)message; + // only master contains reference to deviceDataBroker if (deviceDataBroker != null) { - getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf()); + LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef()); + askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()), + sender()); + } else { + LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint); + sender().tell(new Failure(new NotMasterException(self())), self()); } } else if (message instanceof YangTextSchemaSourceRequest) { // master @@ -167,20 +177,16 @@ public final class NetconfNodeActor extends UntypedActor { sender().tell(t, self()); } } else if (message instanceof InvokeRpcMessage) { // master - final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message; invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender()); } else if (message instanceof RegisterMountPoint) { //slaves - - sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers(); - registerSlaveMountPoint(getSender()); - + RegisterMountPoint registerMountPoint = (RegisterMountPoint)message; + sourceIdentifiers = registerMountPoint.getSourceIndentifiers(); + registerSlaveMountPoint(registerMountPoint.getMasterActorRef()); + sender().tell(new Success(null), self()); } else if (message instanceof UnregisterSlaveMountPoint) { //slaves - if (slaveSalManager != null) { - slaveSalManager.close(); - slaveSalManager = null; - } + unregisterSlaveMountPoint(); } else if (message instanceof RefreshSlaveActor) { //slave actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime(); id = ((RefreshSlaveActor) message).getId(); @@ -193,7 +199,19 @@ public final class NetconfNodeActor extends UntypedActor { @Override public void postStop() throws Exception { - super.postStop(); + try { + super.postStop(); + } finally { + unregisterSlaveMountPoint(); + } + } + + private void unregisterSlaveMountPoint() { + if (slaveSalManager != null) { + slaveSalManager.close(); + slaveSalManager = null; + } + closeSchemaSourceRegistrations(); } @@ -247,36 +265,18 @@ public final class NetconfNodeActor extends UntypedActor { } private void registerSlaveMountPoint(final ActorRef masterReference) { - if (this.slaveSalManager != null) { - slaveSalManager.close(); - } - closeSchemaSourceRegistrations(); - slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, - mountPointService); - - final ListenableFuture remoteSchemaContext = getSchemaContext(masterReference); - final DOMRpcService deviceRpcService = getDOMRpcService(masterReference); + unregisterSlaveMountPoint(); - Futures.addCallback(remoteSchemaContext, new FutureCallback() { - @Override - public void onSuccess(@Nonnull final SchemaContext result) { - LOG.info("{}: Schema context resolved: {}", id, result.getModules()); - slaveSalManager.registerSlaveMountPoint(result, deviceRpcService, masterReference); - } + slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService); - @Override - public void onFailure(@Nonnull final Throwable throwable) { - LOG.error("{}: Failed to register mount point: {}", id, throwable); - } - }, MoreExecutors.directExecutor()); + resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1); } private DOMRpcService getDOMRpcService(final ActorRef masterReference) { return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime); } - private ListenableFuture getSchemaContext(final ActorRef masterReference) { - + private SchemaContextFactory createSchemaContextFactory(final ActorRef masterReference) { final RemoteYangTextSourceProvider remoteYangTextSourceProvider = new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime); final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, @@ -288,10 +288,48 @@ public final class NetconfNodeActor extends UntypedActor { YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue()))) .collect(Collectors.toList()); - final SchemaContextFactory schemaContextFactory - = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT); + return schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT); + } - return schemaContextFactory.createSchemaContext(sourceIdentifiers); + private void resolveSchemaContext(final SchemaContextFactory schemaContextFactory, + final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, int tries) { + final ListenableFuture schemaContextFuture = + schemaContextFactory.createSchemaContext(sourceIdentifiers); + Futures.addCallback(schemaContextFuture, new FutureCallback() { + @Override + public void onSuccess(@Nonnull final SchemaContext result) { + executeInSelf(() -> { + // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context + // resolution. + if (slaveSalManager == localSlaveSalManager) { + LOG.info("{}: Schema context resolved: {} - registering slave mount point", + id, result.getModules()); + slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference), + masterReference); + } + }); + } + + @Override + public void onFailure(@Nonnull final Throwable throwable) { + executeInSelf(() -> { + if (slaveSalManager == localSlaveSalManager) { + final Throwable cause = Throwables.getRootCause(throwable); + if (cause instanceof AskTimeoutException) { + if (tries <= 5 || tries % 10 == 0) { + LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable); + } + + resolveSchemaContext(schemaContextFactory, localSlaveSalManager, + masterReference, tries + 1); + } else { + LOG.error("{}: Failed to resolve schema context - unable to register slave mount point", + id, throwable); + } + } + }); + } + }, MoreExecutors.directExecutor()); } private void closeSchemaSourceRegistrations() { diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/AskForMasterMountPoint.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/AskForMasterMountPoint.java index 4d56e1f17b..7c7fa49b15 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/AskForMasterMountPoint.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/AskForMasterMountPoint.java @@ -8,6 +8,7 @@ package org.opendaylight.netconf.topology.singleton.messages; +import akka.actor.ActorRef; import java.io.Serializable; /** @@ -16,4 +17,19 @@ import java.io.Serializable; */ public class AskForMasterMountPoint implements Serializable { private static final long serialVersionUID = 1L; + + private final ActorRef slaveActorRef; + + public AskForMasterMountPoint(ActorRef slaveActorRef) { + this.slaveActorRef = slaveActorRef; + } + + public ActorRef getSlaveActorRef() { + return slaveActorRef; + } + + @Override + public String toString() { + return "AskForMasterMountPoint [slaveActorRef=" + slaveActorRef + "]"; + } } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/NotMasterException.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/NotMasterException.java new file mode 100644 index 0000000000..a9808063be --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/NotMasterException.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.topology.singleton.messages; + +import akka.actor.ActorRef; + +/** + * Exception reply indicating the recipient is not the master. + * + * @author Thomas Pantelis + */ +public class NotMasterException extends Exception { + private static final long serialVersionUID = 1L; + + public NotMasterException(final ActorRef recipient) { + super("Actor " + recipient + " is not the current master"); + } +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/RegisterMountPoint.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/RegisterMountPoint.java index 0023103a5c..fbca23c3c7 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/RegisterMountPoint.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/RegisterMountPoint.java @@ -8,6 +8,7 @@ package org.opendaylight.netconf.topology.singleton.messages; +import akka.actor.ActorRef; import java.io.Serializable; import java.util.List; import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; @@ -19,13 +20,24 @@ public class RegisterMountPoint implements Serializable { private static final long serialVersionUID = 1L; private final List allSourceIdentifiers; + private final ActorRef masterActorRef; - public RegisterMountPoint(final List allSourceIdentifiers) { + public RegisterMountPoint(final List allSourceIdentifiers, ActorRef masterActorRef) { this.allSourceIdentifiers = allSourceIdentifiers; + this.masterActorRef = masterActorRef; } public List getSourceIndentifiers() { return allSourceIdentifiers; } + public ActorRef getMasterActorRef() { + return masterActorRef; + } + + @Override + public String toString() { + return "RegisterMountPoint [allSourceIdentifiers=" + allSourceIdentifiers + ", masterActorRef=" + masterActorRef + + "]"; + } } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/YangTextSchemaSourceRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/YangTextSchemaSourceRequest.java index ffc3d1334b..5f3bb14a71 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/YangTextSchemaSourceRequest.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/YangTextSchemaSourceRequest.java @@ -27,4 +27,9 @@ public class YangTextSchemaSourceRequest implements Serializable { public SourceIdentifier getSourceIdentifier() { return sourceIdentifier; } + + @Override + public String toString() { + return "YangTextSchemaSourceRequest [sourceIdentifier=" + sourceIdentifier + "]"; + } } diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java index 485f3c506b..1daa70be8b 100644 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java @@ -27,6 +27,7 @@ import akka.actor.Props; import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import akka.testkit.javadsl.TestKit; import akka.util.Timeout; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; @@ -181,14 +182,13 @@ public class NetconfNodeActorTest { // test if slave get right identifiers from master - final Future registerMountPointFuture = - Patterns.ask(masterRef, new AskForMasterMountPoint(), - TIMEOUT); + final TestKit kit = new TestKit(system); + + masterRef.tell(new AskForMasterMountPoint(kit.getRef()), ActorRef.noSender()); - final RegisterMountPoint success = - (RegisterMountPoint) Await.result(registerMountPointFuture, TIMEOUT.duration()); + final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class); - assertEquals(sourceIdentifiers, success.getSourceIndentifiers()); + assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers()); } @@ -213,7 +213,7 @@ public class NetconfNodeActorTest { system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, registry, schemaRepository, TIMEOUT, mountPointService)); final List sources = ImmutableList.of(yang1, yang2); - slaveRef.tell(new RegisterMountPoint(sources), masterRef); + slaveRef.tell(new RegisterMountPoint(sources, masterRef), masterRef); verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang1)); verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang2)); diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManagerTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManagerTest.java new file mode 100644 index 0000000000..873038de33 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManagerTest.java @@ -0,0 +1,410 @@ +/* + * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.topology.singleton.impl; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.after; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.MockitoAnnotations.initMocks; +import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.DELETE; +import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.SUBTREE_MODIFIED; +import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.WRITE; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.cluster.Cluster; +import akka.dispatch.Dispatchers; +import akka.testkit.TestActorRef; +import akka.testkit.javadsl.TestKit; +import akka.util.Timeout; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.io.ByteSource; +import com.google.common.util.concurrent.Futures; +import com.typesafe.config.ConfigFactory; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; +import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.netconf.sal.connect.api.NetconfDeviceSchemasResolver; +import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor; +import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; +import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils; +import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint; +import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; +import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; +import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.concepts.ObjectRegistration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; +import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; +import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; +import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository; +import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer; + +/** + * Unit tests for NetconfNodeManager. + * + * @author Thomas Pantelis + */ +public class NetconfNodeManagerTest { + private static final String ACTOR_SYSTEM_NAME = "test"; + private static final RemoteDeviceId DEVICE_ID = new RemoteDeviceId("device", new InetSocketAddress(65535)); + private static final List SOURCE_IDENTIFIERS = + ImmutableList.of(RevisionSourceIdentifier.create("testID")); + + @Mock + private DOMMountPointService mockMountPointService; + + @Mock + private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder; + + @Mock + private ObjectRegistration mockMountPointReg; + + @Mock + private DataBroker mockDataBroker; + + @Mock + private DOMDataBroker mockDeviceDataBroker; + + @Mock + private DOMRpcService mockRpcService; + + @Mock + private NetconfDeviceSchemasResolver mockSchemasResolver; + + @Mock + private SchemaContextFactory mockSchemaContextFactory; + + private ActorSystem slaveSystem; + private ActorSystem masterSystem; + private TestActorRef testMasterActorRef; + private NetconfNodeManager netconfNodeManager; + private String masterAddress; + + @Before + public void setup() { + initMocks(this); + + final Timeout responseTimeout = Timeout.apply(1, TimeUnit.SECONDS); + + slaveSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Slave")); + masterSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Master")); + + masterAddress = Cluster.get(masterSystem).selfAddress().toString(); + + SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master"); + masterSchemaRepository.registerSchemaSourceListener( + TextToASTTransformer.create(masterSchemaRepository, masterSchemaRepository)); + + String yangTemplate = + "module ID {" + + " namespace \"ID\";" + + " prefix ID;" + + "}"; + + SOURCE_IDENTIFIERS.stream().map( + sourceId -> masterSchemaRepository.registerSchemaSource( + id -> Futures.immediateFuture(YangTextSchemaSource.delegateForByteSource(id, + ByteSource.wrap(yangTemplate.replaceAll("ID", id.getName()).getBytes(UTF_8)))), + PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, 1))) + .collect(Collectors.toList()); + + NetconfTopologySetup masterSetup = new NetconfTopologySetup.NetconfTopologySetupBuilder() + .setActorSystem(masterSystem).setDataBroker(mockDataBroker).setSchemaResourceDTO( + new NetconfDevice.SchemaResourcesDTO(masterSchemaRepository, masterSchemaRepository, + mockSchemaContextFactory, mockSchemasResolver)).build(); + + testMasterActorRef = TestActorRef.create(masterSystem, Props.create(TestMasterActor.class, masterSetup, + DEVICE_ID, responseTimeout, mockMountPointService).withDispatcher(Dispatchers.DefaultDispatcherId()), + NetconfTopologyUtils.createMasterActorName(DEVICE_ID.getName(), masterAddress)); + + SharedSchemaRepository slaveSchemaRepository = new SharedSchemaRepository("slave"); + slaveSchemaRepository.registerSchemaSourceListener( + TextToASTTransformer.create(slaveSchemaRepository, slaveSchemaRepository)); + + NetconfTopologySetup slaveSetup = new NetconfTopologySetup.NetconfTopologySetupBuilder() + .setActorSystem(slaveSystem).setDataBroker(mockDataBroker).setSchemaResourceDTO( + new NetconfDevice.SchemaResourcesDTO(slaveSchemaRepository, slaveSchemaRepository, + mockSchemaContextFactory, mockSchemasResolver)).build(); + + netconfNodeManager = new NetconfNodeManager(slaveSetup, DEVICE_ID, responseTimeout, + mockMountPointService); + + setupMountPointMocks(); + } + + @After + public void teardown() { + TestKit.shutdownActorSystem(slaveSystem, Boolean.TRUE); + TestKit.shutdownActorSystem(masterSystem, Boolean.TRUE); + } + + @SuppressWarnings("unchecked") + @Test + public void testSlaveMountPointRegistration() throws InterruptedException, ExecutionException, TimeoutException { + initializeMaster(); + + ListenerRegistration mockListenerReg = mock(ListenerRegistration.class); + doReturn(mockListenerReg).when(mockDataBroker).registerDataTreeChangeListener(any(), any()); + + final NodeId nodeId = new NodeId("device"); + final NodeKey nodeKey = new NodeKey(nodeId); + final String topologyId = "topology-netconf"; + final InstanceIdentifier nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath( + nodeKey, topologyId); + + netconfNodeManager.registerDataTreeChangeListener(topologyId, nodeKey); + verify(mockDataBroker).registerDataTreeChangeListener(any(), eq(netconfNodeManager)); + + // Invoke onDataTreeChanged with a NetconfNode WRITE to simulate the master writing the operational state to + // Connected. Expect the slave mount point created and registered. + + final NetconfNode netconfNode = newNetconfNode(); + final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(NetconfNode.class, netconfNode).build(); + + DataObjectModification mockDataObjModification = mock(DataObjectModification.class); + doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).getIdentifier(); + doReturn(WRITE).when(mockDataObjModification).getModificationType(); + doReturn(node).when(mockDataObjModification).getDataAfter(); + + netconfNodeManager.onDataTreeChanged(Collections.singletonList( + new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>( + LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification))); + + verify(mockMountPointBuilder, timeout(5000)).register(); + verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any()); + verify(mockMountPointService).createMountPoint(DEVICE_ID.getTopologyPath()); + + // Notify that the NetconfNode operational state was deleted. Expect the slave mount point closed. + + doReturn(DELETE).when(mockDataObjModification).getModificationType(); + doReturn(node).when(mockDataObjModification).getDataBefore(); + doReturn(null).when(mockDataObjModification).getDataAfter(); + + netconfNodeManager.onDataTreeChanged(Collections.singletonList( + new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>( + LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification))); + + verify(mockMountPointReg, timeout(5000)).close(); + + // Notify with a NetconfNode operational state WRITE. Expect the slave mount point re-created. + + setupMountPointMocks(); + + doReturn(WRITE).when(mockDataObjModification).getModificationType(); + doReturn(null).when(mockDataObjModification).getDataBefore(); + doReturn(node).when(mockDataObjModification).getDataAfter(); + + netconfNodeManager.onDataTreeChanged(Collections.singletonList( + new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>( + LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification))); + + verify(mockMountPointBuilder, timeout(5000)).register(); + + // Notify again with a NetconfNode operational state WRITE. Expect the prior slave mount point closed and + // and a new one registered. + + setupMountPointMocks(); + + doReturn(node).when(mockDataObjModification).getDataBefore(); + + netconfNodeManager.onDataTreeChanged(Collections.singletonList( + new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>( + LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification))); + + verify(mockMountPointReg, timeout(5000)).close(); + verify(mockMountPointBuilder, timeout(5000)).register(); + + // Notify that the NetconfNode operational state was changed to UnableToConnect. Expect the slave mount point + // closed. + + setupMountPointMocks(); + + final Node updatedNode = new NodeBuilder().setNodeId(nodeId) + .addAugmentation(NetconfNode.class, new NetconfNodeBuilder(netconfNode) + .setConnectionStatus(NetconfNodeConnectionStatus.ConnectionStatus.UnableToConnect) + .build()).build(); + + doReturn(SUBTREE_MODIFIED).when(mockDataObjModification).getModificationType(); + doReturn(node).when(mockDataObjModification).getDataBefore(); + doReturn(updatedNode).when(mockDataObjModification).getDataAfter(); + + netconfNodeManager.onDataTreeChanged(Collections.singletonList( + new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>( + LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification))); + + verify(mockMountPointReg, timeout(5000)).close(); + + netconfNodeManager.close(); + verifyNoMoreInteractions(mockMountPointReg); + } + + @SuppressWarnings("unchecked") + @Test + public void testSlaveMountPointRegistrationFailuresAndRetries() + throws InterruptedException, ExecutionException, TimeoutException { + final NodeId nodeId = new NodeId("device"); + final NodeKey nodeKey = new NodeKey(nodeId); + final String topologyId = "topology-netconf"; + final InstanceIdentifier nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath( + nodeKey, topologyId); + + final NetconfNode netconfNode = newNetconfNode(); + final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(NetconfNode.class, netconfNode).build(); + + DataObjectModification mockDataObjModification = mock(DataObjectModification.class); + doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).getIdentifier(); + doReturn(WRITE).when(mockDataObjModification).getModificationType(); + doReturn(node).when(mockDataObjModification).getDataAfter(); + + // First try the registration where the perceived master hasn't been initialized as the master. + + netconfNodeManager.onDataTreeChanged(Collections.singletonList( + new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>( + LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification))); + + verify(mockMountPointBuilder, after(1000).never()).register(); + + // Initialize the master but drop the initial YangTextSchemaSourceRequest message sent to the master so + // it retries. + + initializeMaster(); + + CompletableFuture yangTextSchemaSourceRequestFuture = new CompletableFuture<>(); + testMasterActorRef.underlyingActor().messagesToDrop.put(YangTextSchemaSourceRequest.class, + yangTextSchemaSourceRequestFuture); + + netconfNodeManager.onDataTreeChanged(Collections.singletonList( + new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>( + LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification))); + + yangTextSchemaSourceRequestFuture.get(5, TimeUnit.SECONDS); + verify(mockMountPointBuilder, timeout(5000)).register(); + + // Initiate another registration but drop the initial AskForMasterMountPoint message sent to the master so + // it retries. + + setupMountPointMocks(); + + CompletableFuture askForMasterMountPointFuture = new CompletableFuture<>(); + testMasterActorRef.underlyingActor().messagesToDrop.put(AskForMasterMountPoint.class, + askForMasterMountPointFuture); + + netconfNodeManager.onDataTreeChanged(Collections.singletonList( + new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>( + LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification))); + + askForMasterMountPointFuture.get(5, TimeUnit.SECONDS); + verify(mockMountPointReg, timeout(5000)).close(); + verify(mockMountPointBuilder, timeout(5000)).register(); + + setupMountPointMocks(); + netconfNodeManager.close(); + verify(mockMountPointReg, timeout(5000)).close(); + } + + private NetconfNode newNetconfNode() { + return new NetconfNodeBuilder() + .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1")))) + .setPort(new PortNumber(9999)) + .setConnectionStatus(NetconfNodeConnectionStatus.ConnectionStatus.Connected) + .setClusteredConnectionStatus(new ClusteredConnectionStatusBuilder() + .setNetconfMasterNode(masterAddress).build()) + .build(); + } + + private void setupMountPointMocks() { + reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg); + + doNothing().when(mockMountPointReg).close(); + + doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addInitialSchemaContext(any()); + doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any()); + doReturn(mockMountPointReg).when(mockMountPointBuilder).register(); + + doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any()); + } + + private void initializeMaster() { + TestKit kit = new TestKit(masterSystem); + + testMasterActorRef.tell(new CreateInitialMasterActorData(mockDeviceDataBroker, SOURCE_IDENTIFIERS, + mockRpcService), kit.getRef()); + + kit.expectMsgClass(MasterActorDataInitialized.class); + } + + private static class TestMasterActor extends NetconfNodeActor { + final Map, CompletableFuture> messagesToDrop = new ConcurrentHashMap<>(); + + TestMasterActor(NetconfTopologySetup setup, RemoteDeviceId deviceId, Timeout actorResponseWaitTime, + DOMMountPointService mountPointService) { + super(setup, deviceId, setup.getSchemaResourcesDTO().getSchemaRegistry(), + setup.getSchemaResourcesDTO().getSchemaRepository(), actorResponseWaitTime, mountPointService); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void handleReceive(Object message) throws Exception { + CompletableFuture dropFuture = messagesToDrop.remove(message.getClass()); + if (dropFuture != null) { + dropFuture.complete(message); + } else { + super.handleReceive(message); + } + } + } +} diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManagerTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManagerTest.java index 6a0e02e678..b3d4be1cde 100644 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManagerTest.java +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManagerTest.java @@ -274,7 +274,7 @@ public class NetconfTopologyManagerTest { } } - private class CustomTreeModification implements DataTreeModification { + static class CustomTreeModification implements DataTreeModification { private final DataTreeIdentifier rootPath; private final DataObjectModification rootNode; diff --git a/netconf/netconf-topology-singleton/src/test/resources/application.conf b/netconf/netconf-topology-singleton/src/test/resources/application.conf new file mode 100644 index 0000000000..86b5453e86 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/test/resources/application.conf @@ -0,0 +1,41 @@ +Slave { + akka { + actor { + provider = "akka.cluster.ClusterActorRefProvider" + warn-about-java-serializer-usage = false + } + remote { + netty.tcp { + hostname = "127.0.0.1" + port = 2550 + } + } + + cluster { + roles = [ + "slave" + ] + } + } +} + +Master { + akka { + actor { + provider = "akka.cluster.ClusterActorRefProvider" + warn-about-java-serializer-usage = false + } + remote { + netty.tcp { + hostname = "127.0.0.1" + port = 2552 + } + } + + cluster { + roles = [ + "master" + ] + } + } +} diff --git a/netconf/netconf-topology-singleton/src/test/resources/simplelogger.properties b/netconf/netconf-topology-singleton/src/test/resources/simplelogger.properties new file mode 100644 index 0000000000..b4f3775146 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/test/resources/simplelogger.properties @@ -0,0 +1,6 @@ +org.slf4j.simpleLogger.showDateTime=true +org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a +org.slf4j.simpleLogger.logFile=System.out +org.slf4j.simpleLogger.showShortLogName=true +org.slf4j.simpleLogger.levelInBrackets=true +org.slf4j.simpleLogger.log.org.opendaylight.netconf.topology.singleton.impl=debug -- 2.36.6