Slave mount point registration hardening/resiliency 98/71898/6
authorTom Pantelis <tompantelis@gmail.com>
Thu, 10 May 2018 01:38:01 +0000 (21:38 -0400)
committerTom Pantelis <tompantelis@gmail.com>
Thu, 10 May 2018 23:18:37 +0000 (19:18 -0400)
- It's possible for a message to the master to timeout in which
case the slave mount point doesn't get registered so add retries
for the AskForMasterMountPoint message in NetconfNodeManager and
schema resolution in NetconfNodeActor. The AskForMasterMountPoint
and RegisterMountPoint messages were changed to pass the
slave/master ActorRefs in order to send a success/failure reply to
the original caller.

- In NetconfNodeActor, preserve actor safety by executing
ListenableFuture callbacks via the actor's dispatch queue
(using executeInSelf) if the callback accesses actor state.

- Hardened synchronization in NetconfNodeManager and NetconfNodeManager.

- Added UTs for NetconfNodeManager to cover the slave mount point
registration scenarios.

- Added more logging where appropriate

Change-Id: I31c2d6584d56cb87c1b266565bc4b6f4a48ff303
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
13 files changed:
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyContext.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/AskForMasterMountPoint.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/NotMasterException.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/RegisterMountPoint.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/YangTextSchemaSourceRequest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManagerTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManagerTest.java
netconf/netconf-topology-singleton/src/test/resources/application.conf [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/resources/simplelogger.properties [new file with mode: 0644]

index e78e9ee9c7ba55984e2b9fbf962861c8a2418aab..529a538fe76b5f7f86fe45a2ecb5aef8c26c98c3 100644 (file)
@@ -9,10 +9,15 @@
 package org.opendaylight.netconf.topology.singleton.impl;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
 import akka.util.Timeout;
 import java.util.Collection;
 import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
@@ -37,6 +42,7 @@ import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
 
 /**
  * Managing and reacting on data tree changes in specific netconf node when master writes status to the operational
@@ -52,11 +58,18 @@ class NetconfNodeManager
     private final SchemaSourceRegistry schemaRegistry;
     private final SchemaRepository schemaRepository;
 
-    private NetconfTopologySetup setup;
-    private ListenerRegistration<NetconfNodeManager> dataChangeListenerRegistration;
-    private RemoteDeviceId id;
+    private volatile NetconfTopologySetup setup;
+    private volatile ListenerRegistration<NetconfNodeManager> dataChangeListenerRegistration;
+    private volatile RemoteDeviceId id;
+
+    @GuardedBy("this")
     private ActorRef slaveActorRef;
 
+    @GuardedBy("this")
+    private boolean closed;
+
+    @GuardedBy("this")
+    private int lastUpdateCount;
 
     NetconfNodeManager(final NetconfTopologySetup setup,
                        final RemoteDeviceId id, final Timeout actorResponseWaitTime,
@@ -76,21 +89,22 @@ class NetconfNodeManager
             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
             switch (rootNode.getModificationType()) {
                 case SUBTREE_MODIFIED:
-                    LOG.debug("{}: Operational for node {} updated. Trying to register slave mount point", id, nodeId);
+                    LOG.debug("{}: Operational state for node {} - subtree modified from {} to {}",
+                            id, nodeId, rootNode.getDataBefore(), rootNode.getDataAfter());
                     handleSlaveMountPoint(rootNode);
                     break;
                 case WRITE:
                     if (rootNode.getDataBefore() != null) {
-                        LOG.debug("{}: Operational for node {} rewrited. Trying to register slave mount point",
-                                id, nodeId);
+                        LOG.debug("{}: Operational state for node {} updated from {} to {}",
+                                id, nodeId, rootNode.getDataBefore(), rootNode.getDataAfter());
                     } else {
-                        LOG.debug("{}: Operational for node {} created. Trying to register slave mount point",
-                                id, nodeId);
+                        LOG.debug("{}: Operational state for node {} created: {}",
+                                id, nodeId, rootNode.getDataAfter());
                     }
                     handleSlaveMountPoint(rootNode);
                     break;
                 case DELETE:
-                    LOG.debug("{}: Operational for node {} deleted.", id, nodeId);
+                    LOG.debug("{}: Operational state for node {} deleted.", id, nodeId);
                     unregisterSlaveMountpoint();
                     break;
                 default:
@@ -100,8 +114,12 @@ class NetconfNodeManager
     }
 
     @Override
-    public void close() {
-        unregisterSlaveMountpoint();
+    public synchronized void close() {
+        if (closed) {
+            return;
+        }
+
+        closed = true;
         closeActor();
         if (dataChangeListenerRegistration != null) {
             dataChangeListenerRegistration.close();
@@ -109,6 +127,7 @@ class NetconfNodeManager
         }
     }
 
+    @GuardedBy("this")
     private void closeActor() {
         if (slaveActorRef != null) {
             LOG.debug("{}: Sending poison pill to {}", id, slaveActorRef);
@@ -117,7 +136,8 @@ class NetconfNodeManager
         }
     }
 
-    private void unregisterSlaveMountpoint() {
+    private synchronized void unregisterSlaveMountpoint() {
+        lastUpdateCount++;
         if (slaveActorRef != null) {
             LOG.debug("{}: Sending message to unregister slave mountpoint to {}", id, slaveActorRef);
             slaveActorRef.tell(new UnregisterSlaveMountPoint(), ActorRef.noSender());
@@ -131,22 +151,65 @@ class NetconfNodeManager
                         NetconfTopologyUtils.createTopologyNodeListPath(key, topologyId)), this);
     }
 
-    private void handleSlaveMountPoint(final DataObjectModification<Node> rootNode) {
+    private synchronized void handleSlaveMountPoint(final DataObjectModification<Node> rootNode) {
+        if (closed) {
+            return;
+        }
+
         @SuppressWarnings("ConstantConditions")
         final NetconfNode netconfNodeAfter = rootNode.getDataAfter().getAugmentation(NetconfNode.class);
 
         if (NetconfNodeConnectionStatus.ConnectionStatus.Connected.equals(netconfNodeAfter.getConnectionStatus())) {
+            lastUpdateCount++;
             createOrUpdateActorRef();
+
             final String masterAddress = netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode();
-            final String path = NetconfTopologyUtils.createActorPath(masterAddress,
+            final String masterActorPath = NetconfTopologyUtils.createActorPath(masterAddress,
                     NetconfTopologyUtils.createMasterActorName(id.getName(),
                             netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode()));
-            setup.getActorSystem().actorSelection(path).tell(new AskForMasterMountPoint(), slaveActorRef);
+
+            final AskForMasterMountPoint askForMasterMountPoint = new AskForMasterMountPoint(slaveActorRef);
+            final ActorSelection masterActor = setup.getActorSystem().actorSelection(masterActorPath);
+
+            LOG.debug("{}: Sending {} message to master {}", id, askForMasterMountPoint, masterActor);
+
+            sendAskForMasterMountPointWithRetries(askForMasterMountPoint, masterActor, 1, lastUpdateCount);
         } else {
             unregisterSlaveMountpoint();
         }
     }
 
+    @GuardedBy("this")
+    private void sendAskForMasterMountPointWithRetries(final AskForMasterMountPoint askForMasterMountPoint,
+            final ActorSelection masterActor, final int tries, final int updateCount) {
+        final Future<Object> future = Patterns.ask(masterActor, askForMasterMountPoint, actorResponseWaitTime);
+        future.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object response) {
+                synchronized (this) {
+                    // Ignore the response if we were since closed or another notification update occurred.
+                    if (closed || updateCount != lastUpdateCount) {
+                        return;
+                    }
+
+                    if (failure instanceof AskTimeoutException) {
+                        if (tries <= 5 || tries % 10 == 0) {
+                            LOG.warn("{}: Failed to send message to {} - retrying...", id, masterActor, failure);
+                        }
+                        sendAskForMasterMountPointWithRetries(askForMasterMountPoint, masterActor, tries + 1,
+                                updateCount);
+                    } else if (failure != null) {
+                        LOG.error("{}: Failed to send message {} to {}. Slave mount point could not be created",
+                                id, askForMasterMountPoint, masterActor, failure);
+                    } else {
+                        LOG.debug("{}: {} message to {} succeeded", id, askForMasterMountPoint, masterActor);
+                    }
+                }
+            }
+        }, setup.getActorSystem().dispatcher());
+    }
+
+    @GuardedBy("this")
     private void createOrUpdateActorRef() {
         if (slaveActorRef == null) {
             slaveActorRef = setup.getActorSystem().actorOf(NetconfNodeActor.props(setup, id, schemaRegistry,
index 0ac355d6dc0e4b53059e953b43d474cbd21b2ac4..11cd117b1ec3501015fe2689704d3762c1bda52a 100644 (file)
@@ -47,9 +47,9 @@ class NetconfTopologyContext implements ClusterSingletonService {
     private RemoteDeviceConnector remoteDeviceConnector;
     private NetconfNodeManager netconfNodeManager;
     private ActorRef masterActorRef;
-    private boolean finalClose = false;
+    private final AtomicBoolean finalClose = new AtomicBoolean(false);
     private final AtomicBoolean closed = new AtomicBoolean(false);
-    private boolean isMaster;
+    private volatile boolean isMaster;
 
     NetconfTopologyContext(final NetconfTopologySetup netconfTopologyDeviceSetup,
                            final ServiceGroupIdentifier serviceGroupIdent,
@@ -80,7 +80,7 @@ class NetconfTopologyContext implements ClusterSingletonService {
             netconfNodeManager = null;
         }
 
-        if (!finalClose) {
+        if (!finalClose.get()) {
             final String masterAddress =
                     Cluster.get(netconfTopologyDeviceSetup.getActorSystem()).selfAddress().toString();
             masterActorRef = netconfTopologyDeviceSetup.getActorSystem().actorOf(NetconfNodeActor.props(
@@ -97,7 +97,7 @@ class NetconfTopologyContext implements ClusterSingletonService {
     @Override
     public ListenableFuture<Void> closeServiceInstance() {
 
-        if (!finalClose) {
+        if (!finalClose.get()) {
             // in case that master changes role to slave, new NodeDeviceManager must be created and listener registered
             netconfNodeManager = createNodeDeviceManager();
         }
@@ -121,7 +121,9 @@ class NetconfTopologyContext implements ClusterSingletonService {
     }
 
     void closeFinal() throws Exception {
-        finalClose = true;
+        if (!finalClose.compareAndSet(false, true)) {
+            return;
+        }
 
         if (netconfNodeManager != null) {
             netconfNodeManager.close();
index 0abbdaa0ba58aa3d609b4a2d035c2e67d7c4b674..795ba78c75c3235d11b80f1322fad208a406424c 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.netconf.topology.singleton.impl;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
@@ -28,6 +29,7 @@ public class SlaveSalFacade {
     private final NetconfDeviceSalProvider salProvider;
     private final ActorSystem actorSystem;
     private final Timeout actorResponseWaitTime;
+    private final AtomicBoolean registered = new AtomicBoolean(false);
 
     public SlaveSalFacade(final RemoteDeviceId id,
                           final ActorSystem actorSystem,
@@ -41,6 +43,10 @@ public class SlaveSalFacade {
 
     public void registerSlaveMountPoint(final SchemaContext remoteSchemaContext, final DOMRpcService deviceRpc,
                                         final ActorRef masterActorRef) {
+        if (!registered.compareAndSet(false, true)) {
+            return;
+        }
+
         final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
 
         final ProxyDOMDataBroker netconfDeviceDataBroker =
@@ -52,20 +58,13 @@ public class SlaveSalFacade {
         LOG.info("{}: Slave mount point registered.", id);
     }
 
-    public void unregisterSlaveMountPoint() {
-        salProvider.getMountInstance().onTopologyDeviceDisconnected();
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
     public void close() {
-        unregisterSlaveMountPoint();
-        try {
-            salProvider.getMountInstance().close();
-        } catch (final Exception exception) {
-            LOG.warn("{}: Exception in closing slave sal facade: {}", id, exception);
+        if (!registered.compareAndSet(true, false)) {
+            return;
         }
 
-    }
-
+        salProvider.getMountInstance().onTopologyDeviceDisconnected();
 
+        LOG.info("{}: Slave mount point unregistered.", id);
+    }
 }
index 3a3fad9ce46a4628a6b8d8b6d717462e02dd2399..6c297261d8b7d3eb28fc4d225c14e1a0d85fcb3a 100644 (file)
@@ -10,8 +10,11 @@ package org.opendaylight.netconf.topology.singleton.impl.actors;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
+import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
+import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -23,6 +26,7 @@ import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
@@ -43,6 +47,7 @@ import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoi
 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
@@ -68,13 +73,9 @@ import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
 
-public final class NetconfNodeActor extends UntypedActor {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
+public class NetconfNodeActor extends AbstractUntypedActor {
 
     private final Duration writeTxIdleTimeout;
     private final DOMMountPointService mountPointService;
@@ -101,10 +102,10 @@ public final class NetconfNodeActor extends UntypedActor {
                         mountPointService));
     }
 
-    private NetconfNodeActor(final NetconfTopologySetup setup,
-                             final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
-                             final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
-                             final DOMMountPointService mountPointService) {
+    protected NetconfNodeActor(final NetconfTopologySetup setup,
+                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
+                               final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
+                               final DOMMountPointService mountPointService) {
         this.setup = setup;
         this.id = id;
         this.schemaRegistry = schemaRegistry;
@@ -116,7 +117,9 @@ public final class NetconfNodeActor extends UntypedActor {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     @Override
-    public void onReceive(final Object message) throws Exception {
+    public void handleReceive(final Object message) throws Exception {
+        LOG.debug("{}:  received message {}", id, message);
+
         if (message instanceof CreateInitialMasterActorData) { // master
 
             final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
@@ -135,9 +138,16 @@ public final class NetconfNodeActor extends UntypedActor {
             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
             sender().tell(new MasterActorDataInitialized(), self());
         } else if (message instanceof AskForMasterMountPoint) { // master
+            AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint)message;
+
             // only master contains reference to deviceDataBroker
             if (deviceDataBroker != null) {
-                getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
+                LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
+                askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
+                        sender());
+            } else {
+                LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
+                sender().tell(new Failure(new NotMasterException(self())), self());
             }
 
         } else if (message instanceof YangTextSchemaSourceRequest) { // master
@@ -167,20 +177,16 @@ public final class NetconfNodeActor extends UntypedActor {
                 sender().tell(t, self());
             }
         } else if (message instanceof InvokeRpcMessage) { // master
-
             final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
 
         } else if (message instanceof RegisterMountPoint) { //slaves
-
-            sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
-            registerSlaveMountPoint(getSender());
-
+            RegisterMountPoint registerMountPoint = (RegisterMountPoint)message;
+            sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
+            registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
+            sender().tell(new Success(null), self());
         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
-            if (slaveSalManager != null) {
-                slaveSalManager.close();
-                slaveSalManager = null;
-            }
+            unregisterSlaveMountPoint();
         } else if (message instanceof RefreshSlaveActor) { //slave
             actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
             id = ((RefreshSlaveActor) message).getId();
@@ -193,7 +199,19 @@ public final class NetconfNodeActor extends UntypedActor {
 
     @Override
     public void postStop() throws Exception {
-        super.postStop();
+        try {
+            super.postStop();
+        } finally {
+            unregisterSlaveMountPoint();
+        }
+    }
+
+    private void unregisterSlaveMountPoint() {
+        if (slaveSalManager != null) {
+            slaveSalManager.close();
+            slaveSalManager = null;
+        }
+
         closeSchemaSourceRegistrations();
     }
 
@@ -247,36 +265,18 @@ public final class NetconfNodeActor extends UntypedActor {
     }
 
     private void registerSlaveMountPoint(final ActorRef masterReference) {
-        if (this.slaveSalManager != null) {
-            slaveSalManager.close();
-        }
-        closeSchemaSourceRegistrations();
-        slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime,
-                mountPointService);
-
-        final ListenableFuture<SchemaContext> remoteSchemaContext = getSchemaContext(masterReference);
-        final DOMRpcService deviceRpcService = getDOMRpcService(masterReference);
+        unregisterSlaveMountPoint();
 
-        Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
-            @Override
-            public void onSuccess(@Nonnull final SchemaContext result) {
-                LOG.info("{}: Schema context resolved: {}", id, result.getModules());
-                slaveSalManager.registerSlaveMountPoint(result, deviceRpcService, masterReference);
-            }
+        slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
 
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                LOG.error("{}: Failed to register mount point: {}", id, throwable);
-            }
-        }, MoreExecutors.directExecutor());
+        resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
     }
 
     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
     }
 
-    private ListenableFuture<SchemaContext> getSchemaContext(final ActorRef masterReference) {
-
+    private SchemaContextFactory createSchemaContextFactory(final ActorRef masterReference) {
         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
                 new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime);
         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
@@ -288,10 +288,48 @@ public final class NetconfNodeActor extends UntypedActor {
                                 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
                 .collect(Collectors.toList());
 
-        final SchemaContextFactory schemaContextFactory
-                = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
+        return schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
+    }
 
-        return schemaContextFactory.createSchemaContext(sourceIdentifiers);
+    private void resolveSchemaContext(final SchemaContextFactory schemaContextFactory,
+            final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, int tries) {
+        final ListenableFuture<SchemaContext> schemaContextFuture =
+                schemaContextFactory.createSchemaContext(sourceIdentifiers);
+        Futures.addCallback(schemaContextFuture, new FutureCallback<SchemaContext>() {
+            @Override
+            public void onSuccess(@Nonnull final SchemaContext result) {
+                executeInSelf(() -> {
+                    // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
+                    // resolution.
+                    if (slaveSalManager == localSlaveSalManager) {
+                        LOG.info("{}: Schema context resolved: {} - registering slave mount point",
+                                id, result.getModules());
+                        slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
+                                masterReference);
+                    }
+                });
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                executeInSelf(() -> {
+                    if (slaveSalManager == localSlaveSalManager) {
+                        final Throwable cause = Throwables.getRootCause(throwable);
+                        if (cause instanceof AskTimeoutException) {
+                            if (tries <= 5 || tries % 10 == 0) {
+                                LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
+                            }
+
+                            resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
+                                    masterReference, tries + 1);
+                        } else {
+                            LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
+                                    id, throwable);
+                        }
+                    }
+                });
+            }
+        }, MoreExecutors.directExecutor());
     }
 
     private void closeSchemaSourceRegistrations() {
index 4d56e1f17b13983947e70283c764d25fae25e28a..7c7fa49b150b9a4f8a8022b1b5c110ae37b0e351 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.netconf.topology.singleton.messages;
 
+import akka.actor.ActorRef;
 import java.io.Serializable;
 
 /**
@@ -16,4 +17,19 @@ import java.io.Serializable;
  */
 public class AskForMasterMountPoint implements Serializable {
     private static final long serialVersionUID = 1L;
+
+    private final ActorRef slaveActorRef;
+
+    public AskForMasterMountPoint(ActorRef slaveActorRef) {
+        this.slaveActorRef = slaveActorRef;
+    }
+
+    public ActorRef getSlaveActorRef() {
+        return slaveActorRef;
+    }
+
+    @Override
+    public String toString() {
+        return "AskForMasterMountPoint [slaveActorRef=" + slaveActorRef + "]";
+    }
 }
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/NotMasterException.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/NotMasterException.java
new file mode 100644 (file)
index 0000000..a980806
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import akka.actor.ActorRef;
+
+/**
+ * Exception reply indicating the recipient is not the master.
+ *
+ * @author Thomas Pantelis
+ */
+public class NotMasterException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public NotMasterException(final ActorRef recipient) {
+        super("Actor " + recipient + " is not the current master");
+    }
+}
index 0023103a5c227ab1a790540a509156b081da7772..fbca23c3c7bd70978eba6b96c6e03ebf253b8e03 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.netconf.topology.singleton.messages;
 
+import akka.actor.ActorRef;
 import java.io.Serializable;
 import java.util.List;
 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
@@ -19,13 +20,24 @@ public class RegisterMountPoint implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final List<SourceIdentifier> allSourceIdentifiers;
+    private final ActorRef masterActorRef;
 
-    public RegisterMountPoint(final List<SourceIdentifier> allSourceIdentifiers) {
+    public RegisterMountPoint(final List<SourceIdentifier> allSourceIdentifiers, ActorRef masterActorRef) {
         this.allSourceIdentifiers = allSourceIdentifiers;
+        this.masterActorRef = masterActorRef;
     }
 
     public List<SourceIdentifier> getSourceIndentifiers() {
         return allSourceIdentifiers;
     }
 
+    public ActorRef getMasterActorRef() {
+        return masterActorRef;
+    }
+
+    @Override
+    public String toString() {
+        return "RegisterMountPoint [allSourceIdentifiers=" + allSourceIdentifiers + ", masterActorRef=" + masterActorRef
+                + "]";
+    }
 }
index ffc3d1334b864610d28b81efc992095a31b09332..5f3bb14a71a5e4d08cd9f48c8d502c9521057926 100644 (file)
@@ -27,4 +27,9 @@ public class YangTextSchemaSourceRequest implements Serializable {
     public SourceIdentifier getSourceIdentifier() {
         return sourceIdentifier;
     }
+
+    @Override
+    public String toString() {
+        return "YangTextSchemaSourceRequest [sourceIdentifier=" + sourceIdentifier + "]";
+    }
 }
index 485f3c506beed160d9062ed208fda31b50a016a7..1daa70be8ba2858847f19a07d2306a9f44fcfeac 100644 (file)
@@ -27,6 +27,7 @@ import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
 import akka.util.Timeout;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
@@ -181,14 +182,13 @@ public class NetconfNodeActorTest {
 
         // test if slave get right identifiers from master
 
-        final Future<Object> registerMountPointFuture =
-                Patterns.ask(masterRef, new AskForMasterMountPoint(),
-                        TIMEOUT);
+        final TestKit kit = new TestKit(system);
+
+        masterRef.tell(new AskForMasterMountPoint(kit.getRef()), ActorRef.noSender());
 
-        final RegisterMountPoint success =
-                (RegisterMountPoint) Await.result(registerMountPointFuture, TIMEOUT.duration());
+        final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
 
-        assertEquals(sourceIdentifiers, success.getSourceIndentifiers());
+        assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
 
     }
 
@@ -213,7 +213,7 @@ public class NetconfNodeActorTest {
                 system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, registry, schemaRepository, TIMEOUT,
                         mountPointService));
         final List<SourceIdentifier> sources = ImmutableList.of(yang1, yang2);
-        slaveRef.tell(new RegisterMountPoint(sources), masterRef);
+        slaveRef.tell(new RegisterMountPoint(sources, masterRef), masterRef);
 
         verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang1));
         verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang2));
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManagerTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManagerTest.java
new file mode 100644 (file)
index 0000000..873038d
--- /dev/null
@@ -0,0 +1,410 @@
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.MockitoAnnotations.initMocks;
+import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.DELETE;
+import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.SUBTREE_MODIFIED;
+import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.WRITE;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
+import akka.util.Timeout;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.typesafe.config.ConfigFactory;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.api.NetconfDeviceSchemasResolver;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
+import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
+import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
+import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer;
+
+/**
+ * Unit tests for NetconfNodeManager.
+ *
+ * @author Thomas Pantelis
+ */
+public class NetconfNodeManagerTest {
+    private static final String ACTOR_SYSTEM_NAME = "test";
+    private static final RemoteDeviceId DEVICE_ID = new RemoteDeviceId("device", new InetSocketAddress(65535));
+    private static final List<SourceIdentifier> SOURCE_IDENTIFIERS =
+            ImmutableList.of(RevisionSourceIdentifier.create("testID"));
+
+    @Mock
+    private DOMMountPointService mockMountPointService;
+
+    @Mock
+    private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
+
+    @Mock
+    private ObjectRegistration<DOMMountPoint> mockMountPointReg;
+
+    @Mock
+    private DataBroker mockDataBroker;
+
+    @Mock
+    private DOMDataBroker mockDeviceDataBroker;
+
+    @Mock
+    private DOMRpcService mockRpcService;
+
+    @Mock
+    private NetconfDeviceSchemasResolver mockSchemasResolver;
+
+    @Mock
+    private SchemaContextFactory mockSchemaContextFactory;
+
+    private ActorSystem slaveSystem;
+    private ActorSystem masterSystem;
+    private TestActorRef<TestMasterActor> testMasterActorRef;
+    private NetconfNodeManager netconfNodeManager;
+    private String masterAddress;
+
+    @Before
+    public void setup() {
+        initMocks(this);
+
+        final Timeout responseTimeout = Timeout.apply(1, TimeUnit.SECONDS);
+
+        slaveSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Slave"));
+        masterSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Master"));
+
+        masterAddress = Cluster.get(masterSystem).selfAddress().toString();
+
+        SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
+        masterSchemaRepository.registerSchemaSourceListener(
+                TextToASTTransformer.create(masterSchemaRepository, masterSchemaRepository));
+
+        String yangTemplate =
+                  "module ID {"
+                + "  namespace \"ID\";"
+                + "  prefix ID;"
+                + "}";
+
+        SOURCE_IDENTIFIERS.stream().map(
+            sourceId -> masterSchemaRepository.registerSchemaSource(
+                id -> Futures.immediateFuture(YangTextSchemaSource.delegateForByteSource(id,
+                        ByteSource.wrap(yangTemplate.replaceAll("ID", id.getName()).getBytes(UTF_8)))),
+                PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, 1)))
+        .collect(Collectors.toList());
+
+        NetconfTopologySetup masterSetup = new NetconfTopologySetup.NetconfTopologySetupBuilder()
+                .setActorSystem(masterSystem).setDataBroker(mockDataBroker).setSchemaResourceDTO(
+                        new NetconfDevice.SchemaResourcesDTO(masterSchemaRepository, masterSchemaRepository,
+                                mockSchemaContextFactory, mockSchemasResolver)).build();
+
+        testMasterActorRef = TestActorRef.create(masterSystem, Props.create(TestMasterActor.class, masterSetup,
+                DEVICE_ID, responseTimeout, mockMountPointService).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                NetconfTopologyUtils.createMasterActorName(DEVICE_ID.getName(), masterAddress));
+
+        SharedSchemaRepository slaveSchemaRepository = new SharedSchemaRepository("slave");
+        slaveSchemaRepository.registerSchemaSourceListener(
+                TextToASTTransformer.create(slaveSchemaRepository, slaveSchemaRepository));
+
+        NetconfTopologySetup slaveSetup = new NetconfTopologySetup.NetconfTopologySetupBuilder()
+                .setActorSystem(slaveSystem).setDataBroker(mockDataBroker).setSchemaResourceDTO(
+                        new NetconfDevice.SchemaResourcesDTO(slaveSchemaRepository, slaveSchemaRepository,
+                                mockSchemaContextFactory, mockSchemasResolver)).build();
+
+        netconfNodeManager = new NetconfNodeManager(slaveSetup, DEVICE_ID, responseTimeout,
+                mockMountPointService);
+
+        setupMountPointMocks();
+    }
+
+    @After
+    public void teardown() {
+        TestKit.shutdownActorSystem(slaveSystem, Boolean.TRUE);
+        TestKit.shutdownActorSystem(masterSystem, Boolean.TRUE);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testSlaveMountPointRegistration() throws InterruptedException, ExecutionException, TimeoutException {
+        initializeMaster();
+
+        ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
+        doReturn(mockListenerReg).when(mockDataBroker).registerDataTreeChangeListener(any(), any());
+
+        final NodeId nodeId = new NodeId("device");
+        final NodeKey nodeKey = new NodeKey(nodeId);
+        final String topologyId = "topology-netconf";
+        final InstanceIdentifier<Node> nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath(
+                nodeKey, topologyId);
+
+        netconfNodeManager.registerDataTreeChangeListener(topologyId, nodeKey);
+        verify(mockDataBroker).registerDataTreeChangeListener(any(), eq(netconfNodeManager));
+
+        // Invoke onDataTreeChanged with a NetconfNode WRITE to simulate the master writing the operational state to
+        // Connected. Expect the slave mount point created and registered.
+
+        final NetconfNode netconfNode = newNetconfNode();
+        final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(NetconfNode.class, netconfNode).build();
+
+        DataObjectModification<Node> mockDataObjModification = mock(DataObjectModification.class);
+        doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).getIdentifier();
+        doReturn(WRITE).when(mockDataObjModification).getModificationType();
+        doReturn(node).when(mockDataObjModification).getDataAfter();
+
+        netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+                new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+                        LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+        verify(mockMountPointBuilder, timeout(5000)).register();
+        verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
+        verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
+        verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
+        verify(mockMountPointService).createMountPoint(DEVICE_ID.getTopologyPath());
+
+        // Notify that the NetconfNode operational state was deleted. Expect the slave mount point closed.
+
+        doReturn(DELETE).when(mockDataObjModification).getModificationType();
+        doReturn(node).when(mockDataObjModification).getDataBefore();
+        doReturn(null).when(mockDataObjModification).getDataAfter();
+
+        netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+                new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+                        LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+        verify(mockMountPointReg, timeout(5000)).close();
+
+        // Notify with a NetconfNode operational state WRITE. Expect the slave mount point re-created.
+
+        setupMountPointMocks();
+
+        doReturn(WRITE).when(mockDataObjModification).getModificationType();
+        doReturn(null).when(mockDataObjModification).getDataBefore();
+        doReturn(node).when(mockDataObjModification).getDataAfter();
+
+        netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+                new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+                        LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+        verify(mockMountPointBuilder, timeout(5000)).register();
+
+        // Notify again with a NetconfNode operational state WRITE. Expect the prior slave mount point closed and
+        // and a new one registered.
+
+        setupMountPointMocks();
+
+        doReturn(node).when(mockDataObjModification).getDataBefore();
+
+        netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+                new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+                        LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+        verify(mockMountPointReg, timeout(5000)).close();
+        verify(mockMountPointBuilder, timeout(5000)).register();
+
+        // Notify that the NetconfNode operational state was changed to UnableToConnect. Expect the slave mount point
+        // closed.
+
+        setupMountPointMocks();
+
+        final Node updatedNode = new NodeBuilder().setNodeId(nodeId)
+                .addAugmentation(NetconfNode.class, new NetconfNodeBuilder(netconfNode)
+                        .setConnectionStatus(NetconfNodeConnectionStatus.ConnectionStatus.UnableToConnect)
+                        .build()).build();
+
+        doReturn(SUBTREE_MODIFIED).when(mockDataObjModification).getModificationType();
+        doReturn(node).when(mockDataObjModification).getDataBefore();
+        doReturn(updatedNode).when(mockDataObjModification).getDataAfter();
+
+        netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+                new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+                        LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+        verify(mockMountPointReg, timeout(5000)).close();
+
+        netconfNodeManager.close();
+        verifyNoMoreInteractions(mockMountPointReg);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testSlaveMountPointRegistrationFailuresAndRetries()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final NodeId nodeId = new NodeId("device");
+        final NodeKey nodeKey = new NodeKey(nodeId);
+        final String topologyId = "topology-netconf";
+        final InstanceIdentifier<Node> nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath(
+                nodeKey, topologyId);
+
+        final NetconfNode netconfNode = newNetconfNode();
+        final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(NetconfNode.class, netconfNode).build();
+
+        DataObjectModification<Node> mockDataObjModification = mock(DataObjectModification.class);
+        doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).getIdentifier();
+        doReturn(WRITE).when(mockDataObjModification).getModificationType();
+        doReturn(node).when(mockDataObjModification).getDataAfter();
+
+        // First try the registration where the perceived master hasn't been initialized as the master.
+
+        netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+                new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+                        LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+        verify(mockMountPointBuilder, after(1000).never()).register();
+
+        // Initialize the master but drop the initial YangTextSchemaSourceRequest message sent to the master so
+        // it retries.
+
+        initializeMaster();
+
+        CompletableFuture<AskForMasterMountPoint> yangTextSchemaSourceRequestFuture = new CompletableFuture<>();
+        testMasterActorRef.underlyingActor().messagesToDrop.put(YangTextSchemaSourceRequest.class,
+                yangTextSchemaSourceRequestFuture);
+
+        netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+                new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+                        LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+        yangTextSchemaSourceRequestFuture.get(5, TimeUnit.SECONDS);
+        verify(mockMountPointBuilder, timeout(5000)).register();
+
+        // Initiate another registration but drop the initial AskForMasterMountPoint message sent to the master so
+        // it retries.
+
+        setupMountPointMocks();
+
+        CompletableFuture<AskForMasterMountPoint> askForMasterMountPointFuture = new CompletableFuture<>();
+        testMasterActorRef.underlyingActor().messagesToDrop.put(AskForMasterMountPoint.class,
+                askForMasterMountPointFuture);
+
+        netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+                new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+                        LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+        askForMasterMountPointFuture.get(5, TimeUnit.SECONDS);
+        verify(mockMountPointReg, timeout(5000)).close();
+        verify(mockMountPointBuilder, timeout(5000)).register();
+
+        setupMountPointMocks();
+        netconfNodeManager.close();
+        verify(mockMountPointReg, timeout(5000)).close();
+    }
+
+    private NetconfNode newNetconfNode() {
+        return new NetconfNodeBuilder()
+                .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+                .setPort(new PortNumber(9999))
+                .setConnectionStatus(NetconfNodeConnectionStatus.ConnectionStatus.Connected)
+                .setClusteredConnectionStatus(new ClusteredConnectionStatusBuilder()
+                        .setNetconfMasterNode(masterAddress).build())
+                .build();
+    }
+
+    private void setupMountPointMocks() {
+        reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
+
+        doNothing().when(mockMountPointReg).close();
+
+        doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addInitialSchemaContext(any());
+        doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
+        doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
+
+        doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
+    }
+
+    private void initializeMaster() {
+        TestKit kit = new TestKit(masterSystem);
+
+        testMasterActorRef.tell(new CreateInitialMasterActorData(mockDeviceDataBroker, SOURCE_IDENTIFIERS,
+                mockRpcService), kit.getRef());
+
+        kit.expectMsgClass(MasterActorDataInitialized.class);
+    }
+
+    private static class TestMasterActor extends NetconfNodeActor {
+        final Map<Class<?>, CompletableFuture<? extends Object>> messagesToDrop = new ConcurrentHashMap<>();
+
+        TestMasterActor(NetconfTopologySetup setup, RemoteDeviceId deviceId, Timeout actorResponseWaitTime,
+                DOMMountPointService mountPointService) {
+            super(setup, deviceId, setup.getSchemaResourcesDTO().getSchemaRegistry(),
+                    setup.getSchemaResourcesDTO().getSchemaRepository(), actorResponseWaitTime, mountPointService);
+        }
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        @Override
+        public void handleReceive(Object message) throws Exception {
+            CompletableFuture dropFuture = messagesToDrop.remove(message.getClass());
+            if (dropFuture != null) {
+                dropFuture.complete(message);
+            } else {
+                super.handleReceive(message);
+            }
+        }
+    }
+}
index 6a0e02e6782e39179ea942cdf84ad4c1ab5d9139..b3d4be1cde2c7f9b1931acdf42ef80f541944cde 100644 (file)
@@ -274,7 +274,7 @@ public class NetconfTopologyManagerTest {
         }
     }
 
-    private class CustomTreeModification  implements DataTreeModification<Node> {
+    static class CustomTreeModification  implements DataTreeModification<Node> {
 
         private final DataTreeIdentifier<Node> rootPath;
         private final DataObjectModification<Node> rootNode;
diff --git a/netconf/netconf-topology-singleton/src/test/resources/application.conf b/netconf/netconf-topology-singleton/src/test/resources/application.conf
new file mode 100644 (file)
index 0000000..86b5453
--- /dev/null
@@ -0,0 +1,41 @@
+Slave {
+  akka {
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+      warn-about-java-serializer-usage = false
+    }
+    remote {
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2550
+      }
+    }
+
+    cluster {
+      roles = [
+        "slave"
+      ]
+    }
+  }
+}
+
+Master {
+  akka {
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+      warn-about-java-serializer-usage = false
+    }
+    remote {
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2552
+      }
+    }
+
+    cluster {
+      roles = [
+        "master"
+      ]
+    }
+  }
+}
diff --git a/netconf/netconf-topology-singleton/src/test/resources/simplelogger.properties b/netconf/netconf-topology-singleton/src/test/resources/simplelogger.properties
new file mode 100644 (file)
index 0000000..b4f3775
--- /dev/null
@@ -0,0 +1,6 @@
+org.slf4j.simpleLogger.showDateTime=true
+org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
+org.slf4j.simpleLogger.logFile=System.out
+org.slf4j.simpleLogger.showShortLogName=true
+org.slf4j.simpleLogger.levelInBrackets=true
+org.slf4j.simpleLogger.log.org.opendaylight.netconf.topology.singleton.impl=debug