package org.opendaylight.netconf.topology.singleton.impl;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
import akka.util.Timeout;
import java.util.Collection;
import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
/**
* Managing and reacting on data tree changes in specific netconf node when master writes status to the operational
private final SchemaSourceRegistry schemaRegistry;
private final SchemaRepository schemaRepository;
- private NetconfTopologySetup setup;
- private ListenerRegistration<NetconfNodeManager> dataChangeListenerRegistration;
- private RemoteDeviceId id;
+ private volatile NetconfTopologySetup setup;
+ private volatile ListenerRegistration<NetconfNodeManager> dataChangeListenerRegistration;
+ private volatile RemoteDeviceId id;
+
+ @GuardedBy("this")
private ActorRef slaveActorRef;
+ @GuardedBy("this")
+ private boolean closed;
+
+ @GuardedBy("this")
+ private int lastUpdateCount;
NetconfNodeManager(final NetconfTopologySetup setup,
final RemoteDeviceId id, final Timeout actorResponseWaitTime,
final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
switch (rootNode.getModificationType()) {
case SUBTREE_MODIFIED:
- LOG.debug("{}: Operational for node {} updated. Trying to register slave mount point", id, nodeId);
+ LOG.debug("{}: Operational state for node {} - subtree modified from {} to {}",
+ id, nodeId, rootNode.getDataBefore(), rootNode.getDataAfter());
handleSlaveMountPoint(rootNode);
break;
case WRITE:
if (rootNode.getDataBefore() != null) {
- LOG.debug("{}: Operational for node {} rewrited. Trying to register slave mount point",
- id, nodeId);
+ LOG.debug("{}: Operational state for node {} updated from {} to {}",
+ id, nodeId, rootNode.getDataBefore(), rootNode.getDataAfter());
} else {
- LOG.debug("{}: Operational for node {} created. Trying to register slave mount point",
- id, nodeId);
+ LOG.debug("{}: Operational state for node {} created: {}",
+ id, nodeId, rootNode.getDataAfter());
}
handleSlaveMountPoint(rootNode);
break;
case DELETE:
- LOG.debug("{}: Operational for node {} deleted.", id, nodeId);
+ LOG.debug("{}: Operational state for node {} deleted.", id, nodeId);
unregisterSlaveMountpoint();
break;
default:
}
@Override
- public void close() {
- unregisterSlaveMountpoint();
+ public synchronized void close() {
+ if (closed) {
+ return;
+ }
+
+ closed = true;
closeActor();
if (dataChangeListenerRegistration != null) {
dataChangeListenerRegistration.close();
}
}
+ @GuardedBy("this")
private void closeActor() {
if (slaveActorRef != null) {
LOG.debug("{}: Sending poison pill to {}", id, slaveActorRef);
}
}
- private void unregisterSlaveMountpoint() {
+ private synchronized void unregisterSlaveMountpoint() {
+ lastUpdateCount++;
if (slaveActorRef != null) {
LOG.debug("{}: Sending message to unregister slave mountpoint to {}", id, slaveActorRef);
slaveActorRef.tell(new UnregisterSlaveMountPoint(), ActorRef.noSender());
NetconfTopologyUtils.createTopologyNodeListPath(key, topologyId)), this);
}
- private void handleSlaveMountPoint(final DataObjectModification<Node> rootNode) {
+ private synchronized void handleSlaveMountPoint(final DataObjectModification<Node> rootNode) {
+ if (closed) {
+ return;
+ }
+
@SuppressWarnings("ConstantConditions")
final NetconfNode netconfNodeAfter = rootNode.getDataAfter().getAugmentation(NetconfNode.class);
if (NetconfNodeConnectionStatus.ConnectionStatus.Connected.equals(netconfNodeAfter.getConnectionStatus())) {
+ lastUpdateCount++;
createOrUpdateActorRef();
+
final String masterAddress = netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode();
- final String path = NetconfTopologyUtils.createActorPath(masterAddress,
+ final String masterActorPath = NetconfTopologyUtils.createActorPath(masterAddress,
NetconfTopologyUtils.createMasterActorName(id.getName(),
netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode()));
- setup.getActorSystem().actorSelection(path).tell(new AskForMasterMountPoint(), slaveActorRef);
+
+ final AskForMasterMountPoint askForMasterMountPoint = new AskForMasterMountPoint(slaveActorRef);
+ final ActorSelection masterActor = setup.getActorSystem().actorSelection(masterActorPath);
+
+ LOG.debug("{}: Sending {} message to master {}", id, askForMasterMountPoint, masterActor);
+
+ sendAskForMasterMountPointWithRetries(askForMasterMountPoint, masterActor, 1, lastUpdateCount);
} else {
unregisterSlaveMountpoint();
}
}
+ @GuardedBy("this")
+ private void sendAskForMasterMountPointWithRetries(final AskForMasterMountPoint askForMasterMountPoint,
+ final ActorSelection masterActor, final int tries, final int updateCount) {
+ final Future<Object> future = Patterns.ask(masterActor, askForMasterMountPoint, actorResponseWaitTime);
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ synchronized (this) {
+ // Ignore the response if we were since closed or another notification update occurred.
+ if (closed || updateCount != lastUpdateCount) {
+ return;
+ }
+
+ if (failure instanceof AskTimeoutException) {
+ if (tries <= 5 || tries % 10 == 0) {
+ LOG.warn("{}: Failed to send message to {} - retrying...", id, masterActor, failure);
+ }
+ sendAskForMasterMountPointWithRetries(askForMasterMountPoint, masterActor, tries + 1,
+ updateCount);
+ } else if (failure != null) {
+ LOG.error("{}: Failed to send message {} to {}. Slave mount point could not be created",
+ id, askForMasterMountPoint, masterActor, failure);
+ } else {
+ LOG.debug("{}: {} message to {} succeeded", id, askForMasterMountPoint, masterActor);
+ }
+ }
+ }
+ }, setup.getActorSystem().dispatcher());
+ }
+
+ @GuardedBy("this")
private void createOrUpdateActorRef() {
if (slaveActorRef == null) {
slaveActorRef = setup.getActorSystem().actorOf(NetconfNodeActor.props(setup, id, schemaRegistry,
private RemoteDeviceConnector remoteDeviceConnector;
private NetconfNodeManager netconfNodeManager;
private ActorRef masterActorRef;
- private boolean finalClose = false;
+ private final AtomicBoolean finalClose = new AtomicBoolean(false);
private final AtomicBoolean closed = new AtomicBoolean(false);
- private boolean isMaster;
+ private volatile boolean isMaster;
NetconfTopologyContext(final NetconfTopologySetup netconfTopologyDeviceSetup,
final ServiceGroupIdentifier serviceGroupIdent,
netconfNodeManager = null;
}
- if (!finalClose) {
+ if (!finalClose.get()) {
final String masterAddress =
Cluster.get(netconfTopologyDeviceSetup.getActorSystem()).selfAddress().toString();
masterActorRef = netconfTopologyDeviceSetup.getActorSystem().actorOf(NetconfNodeActor.props(
@Override
public ListenableFuture<Void> closeServiceInstance() {
- if (!finalClose) {
+ if (!finalClose.get()) {
// in case that master changes role to slave, new NodeDeviceManager must be created and listener registered
netconfNodeManager = createNodeDeviceManager();
}
}
void closeFinal() throws Exception {
- finalClose = true;
+ if (!finalClose.compareAndSet(false, true)) {
+ return;
+ }
if (netconfNodeManager != null) {
netconfNodeManager.close();
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.util.Timeout;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
private final NetconfDeviceSalProvider salProvider;
private final ActorSystem actorSystem;
private final Timeout actorResponseWaitTime;
+ private final AtomicBoolean registered = new AtomicBoolean(false);
public SlaveSalFacade(final RemoteDeviceId id,
final ActorSystem actorSystem,
public void registerSlaveMountPoint(final SchemaContext remoteSchemaContext, final DOMRpcService deviceRpc,
final ActorRef masterActorRef) {
+ if (!registered.compareAndSet(false, true)) {
+ return;
+ }
+
final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
final ProxyDOMDataBroker netconfDeviceDataBroker =
LOG.info("{}: Slave mount point registered.", id);
}
- public void unregisterSlaveMountPoint() {
- salProvider.getMountInstance().onTopologyDeviceDisconnected();
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
public void close() {
- unregisterSlaveMountPoint();
- try {
- salProvider.getMountInstance().close();
- } catch (final Exception exception) {
- LOG.warn("{}: Exception in closing slave sal facade: {}", id, exception);
+ if (!registered.compareAndSet(true, false)) {
+ return;
}
- }
-
+ salProvider.getMountInstance().onTopologyDeviceDisconnected();
+ LOG.info("{}: Slave mount point unregistered.", id);
+ }
}
import akka.actor.ActorRef;
import akka.actor.Props;
-import akka.actor.UntypedActor;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
+import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
+import com.google.common.base.Throwables;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
-public final class NetconfNodeActor extends UntypedActor {
-
- private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
+public class NetconfNodeActor extends AbstractUntypedActor {
private final Duration writeTxIdleTimeout;
private final DOMMountPointService mountPointService;
mountPointService));
}
- private NetconfNodeActor(final NetconfTopologySetup setup,
- final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
- final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
- final DOMMountPointService mountPointService) {
+ protected NetconfNodeActor(final NetconfTopologySetup setup,
+ final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
+ final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
+ final DOMMountPointService mountPointService) {
this.setup = setup;
this.id = id;
this.schemaRegistry = schemaRegistry;
@SuppressWarnings("checkstyle:IllegalCatch")
@Override
- public void onReceive(final Object message) throws Exception {
+ public void handleReceive(final Object message) throws Exception {
+ LOG.debug("{}: received message {}", id, message);
+
if (message instanceof CreateInitialMasterActorData) { // master
final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
sender().tell(new MasterActorDataInitialized(), self());
} else if (message instanceof AskForMasterMountPoint) { // master
+ AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint)message;
+
// only master contains reference to deviceDataBroker
if (deviceDataBroker != null) {
- getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
+ LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
+ askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
+ sender());
+ } else {
+ LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
+ sender().tell(new Failure(new NotMasterException(self())), self());
}
} else if (message instanceof YangTextSchemaSourceRequest) { // master
sender().tell(t, self());
}
} else if (message instanceof InvokeRpcMessage) { // master
-
final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
} else if (message instanceof RegisterMountPoint) { //slaves
-
- sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
- registerSlaveMountPoint(getSender());
-
+ RegisterMountPoint registerMountPoint = (RegisterMountPoint)message;
+ sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
+ registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
+ sender().tell(new Success(null), self());
} else if (message instanceof UnregisterSlaveMountPoint) { //slaves
- if (slaveSalManager != null) {
- slaveSalManager.close();
- slaveSalManager = null;
- }
+ unregisterSlaveMountPoint();
} else if (message instanceof RefreshSlaveActor) { //slave
actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
id = ((RefreshSlaveActor) message).getId();
@Override
public void postStop() throws Exception {
- super.postStop();
+ try {
+ super.postStop();
+ } finally {
+ unregisterSlaveMountPoint();
+ }
+ }
+
+ private void unregisterSlaveMountPoint() {
+ if (slaveSalManager != null) {
+ slaveSalManager.close();
+ slaveSalManager = null;
+ }
+
closeSchemaSourceRegistrations();
}
}
private void registerSlaveMountPoint(final ActorRef masterReference) {
- if (this.slaveSalManager != null) {
- slaveSalManager.close();
- }
- closeSchemaSourceRegistrations();
- slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime,
- mountPointService);
-
- final ListenableFuture<SchemaContext> remoteSchemaContext = getSchemaContext(masterReference);
- final DOMRpcService deviceRpcService = getDOMRpcService(masterReference);
+ unregisterSlaveMountPoint();
- Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
- @Override
- public void onSuccess(@Nonnull final SchemaContext result) {
- LOG.info("{}: Schema context resolved: {}", id, result.getModules());
- slaveSalManager.registerSlaveMountPoint(result, deviceRpcService, masterReference);
- }
+ slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
- @Override
- public void onFailure(@Nonnull final Throwable throwable) {
- LOG.error("{}: Failed to register mount point: {}", id, throwable);
- }
- }, MoreExecutors.directExecutor());
+ resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
}
private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
}
- private ListenableFuture<SchemaContext> getSchemaContext(final ActorRef masterReference) {
-
+ private SchemaContextFactory createSchemaContextFactory(final ActorRef masterReference) {
final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime);
final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
.collect(Collectors.toList());
- final SchemaContextFactory schemaContextFactory
- = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
+ return schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
+ }
- return schemaContextFactory.createSchemaContext(sourceIdentifiers);
+ private void resolveSchemaContext(final SchemaContextFactory schemaContextFactory,
+ final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, int tries) {
+ final ListenableFuture<SchemaContext> schemaContextFuture =
+ schemaContextFactory.createSchemaContext(sourceIdentifiers);
+ Futures.addCallback(schemaContextFuture, new FutureCallback<SchemaContext>() {
+ @Override
+ public void onSuccess(@Nonnull final SchemaContext result) {
+ executeInSelf(() -> {
+ // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
+ // resolution.
+ if (slaveSalManager == localSlaveSalManager) {
+ LOG.info("{}: Schema context resolved: {} - registering slave mount point",
+ id, result.getModules());
+ slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
+ masterReference);
+ }
+ });
+ }
+
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ executeInSelf(() -> {
+ if (slaveSalManager == localSlaveSalManager) {
+ final Throwable cause = Throwables.getRootCause(throwable);
+ if (cause instanceof AskTimeoutException) {
+ if (tries <= 5 || tries % 10 == 0) {
+ LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
+ }
+
+ resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
+ masterReference, tries + 1);
+ } else {
+ LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
+ id, throwable);
+ }
+ }
+ });
+ }
+ }, MoreExecutors.directExecutor());
}
private void closeSchemaSourceRegistrations() {
package org.opendaylight.netconf.topology.singleton.messages;
+import akka.actor.ActorRef;
import java.io.Serializable;
/**
*/
public class AskForMasterMountPoint implements Serializable {
private static final long serialVersionUID = 1L;
+
+ private final ActorRef slaveActorRef;
+
+ public AskForMasterMountPoint(ActorRef slaveActorRef) {
+ this.slaveActorRef = slaveActorRef;
+ }
+
+ public ActorRef getSlaveActorRef() {
+ return slaveActorRef;
+ }
+
+ @Override
+ public String toString() {
+ return "AskForMasterMountPoint [slaveActorRef=" + slaveActorRef + "]";
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import akka.actor.ActorRef;
+
+/**
+ * Exception reply indicating the recipient is not the master.
+ *
+ * @author Thomas Pantelis
+ */
+public class NotMasterException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public NotMasterException(final ActorRef recipient) {
+ super("Actor " + recipient + " is not the current master");
+ }
+}
package org.opendaylight.netconf.topology.singleton.messages;
+import akka.actor.ActorRef;
import java.io.Serializable;
import java.util.List;
import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
private static final long serialVersionUID = 1L;
private final List<SourceIdentifier> allSourceIdentifiers;
+ private final ActorRef masterActorRef;
- public RegisterMountPoint(final List<SourceIdentifier> allSourceIdentifiers) {
+ public RegisterMountPoint(final List<SourceIdentifier> allSourceIdentifiers, ActorRef masterActorRef) {
this.allSourceIdentifiers = allSourceIdentifiers;
+ this.masterActorRef = masterActorRef;
}
public List<SourceIdentifier> getSourceIndentifiers() {
return allSourceIdentifiers;
}
+ public ActorRef getMasterActorRef() {
+ return masterActorRef;
+ }
+
+ @Override
+ public String toString() {
+ return "RegisterMountPoint [allSourceIdentifiers=" + allSourceIdentifiers + ", masterActorRef=" + masterActorRef
+ + "]";
+ }
}
public SourceIdentifier getSourceIdentifier() {
return sourceIdentifier;
}
+
+ @Override
+ public String toString() {
+ return "YangTextSchemaSourceRequest [sourceIdentifier=" + sourceIdentifier + "]";
+ }
}
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
// test if slave get right identifiers from master
- final Future<Object> registerMountPointFuture =
- Patterns.ask(masterRef, new AskForMasterMountPoint(),
- TIMEOUT);
+ final TestKit kit = new TestKit(system);
+
+ masterRef.tell(new AskForMasterMountPoint(kit.getRef()), ActorRef.noSender());
- final RegisterMountPoint success =
- (RegisterMountPoint) Await.result(registerMountPointFuture, TIMEOUT.duration());
+ final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
- assertEquals(sourceIdentifiers, success.getSourceIndentifiers());
+ assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
}
system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, registry, schemaRepository, TIMEOUT,
mountPointService));
final List<SourceIdentifier> sources = ImmutableList.of(yang1, yang2);
- slaveRef.tell(new RegisterMountPoint(sources), masterRef);
+ slaveRef.tell(new RegisterMountPoint(sources, masterRef), masterRef);
verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang1));
verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang2));
--- /dev/null
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.MockitoAnnotations.initMocks;
+import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.DELETE;
+import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.SUBTREE_MODIFIED;
+import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.WRITE;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
+import akka.util.Timeout;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.typesafe.config.ConfigFactory;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.api.NetconfDeviceSchemasResolver;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
+import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
+import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
+import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer;
+
+/**
+ * Unit tests for NetconfNodeManager.
+ *
+ * @author Thomas Pantelis
+ */
+public class NetconfNodeManagerTest {
+ private static final String ACTOR_SYSTEM_NAME = "test";
+ private static final RemoteDeviceId DEVICE_ID = new RemoteDeviceId("device", new InetSocketAddress(65535));
+ private static final List<SourceIdentifier> SOURCE_IDENTIFIERS =
+ ImmutableList.of(RevisionSourceIdentifier.create("testID"));
+
+ @Mock
+ private DOMMountPointService mockMountPointService;
+
+ @Mock
+ private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
+
+ @Mock
+ private ObjectRegistration<DOMMountPoint> mockMountPointReg;
+
+ @Mock
+ private DataBroker mockDataBroker;
+
+ @Mock
+ private DOMDataBroker mockDeviceDataBroker;
+
+ @Mock
+ private DOMRpcService mockRpcService;
+
+ @Mock
+ private NetconfDeviceSchemasResolver mockSchemasResolver;
+
+ @Mock
+ private SchemaContextFactory mockSchemaContextFactory;
+
+ private ActorSystem slaveSystem;
+ private ActorSystem masterSystem;
+ private TestActorRef<TestMasterActor> testMasterActorRef;
+ private NetconfNodeManager netconfNodeManager;
+ private String masterAddress;
+
+ @Before
+ public void setup() {
+ initMocks(this);
+
+ final Timeout responseTimeout = Timeout.apply(1, TimeUnit.SECONDS);
+
+ slaveSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Slave"));
+ masterSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Master"));
+
+ masterAddress = Cluster.get(masterSystem).selfAddress().toString();
+
+ SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
+ masterSchemaRepository.registerSchemaSourceListener(
+ TextToASTTransformer.create(masterSchemaRepository, masterSchemaRepository));
+
+ String yangTemplate =
+ "module ID {"
+ + " namespace \"ID\";"
+ + " prefix ID;"
+ + "}";
+
+ SOURCE_IDENTIFIERS.stream().map(
+ sourceId -> masterSchemaRepository.registerSchemaSource(
+ id -> Futures.immediateFuture(YangTextSchemaSource.delegateForByteSource(id,
+ ByteSource.wrap(yangTemplate.replaceAll("ID", id.getName()).getBytes(UTF_8)))),
+ PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, 1)))
+ .collect(Collectors.toList());
+
+ NetconfTopologySetup masterSetup = new NetconfTopologySetup.NetconfTopologySetupBuilder()
+ .setActorSystem(masterSystem).setDataBroker(mockDataBroker).setSchemaResourceDTO(
+ new NetconfDevice.SchemaResourcesDTO(masterSchemaRepository, masterSchemaRepository,
+ mockSchemaContextFactory, mockSchemasResolver)).build();
+
+ testMasterActorRef = TestActorRef.create(masterSystem, Props.create(TestMasterActor.class, masterSetup,
+ DEVICE_ID, responseTimeout, mockMountPointService).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ NetconfTopologyUtils.createMasterActorName(DEVICE_ID.getName(), masterAddress));
+
+ SharedSchemaRepository slaveSchemaRepository = new SharedSchemaRepository("slave");
+ slaveSchemaRepository.registerSchemaSourceListener(
+ TextToASTTransformer.create(slaveSchemaRepository, slaveSchemaRepository));
+
+ NetconfTopologySetup slaveSetup = new NetconfTopologySetup.NetconfTopologySetupBuilder()
+ .setActorSystem(slaveSystem).setDataBroker(mockDataBroker).setSchemaResourceDTO(
+ new NetconfDevice.SchemaResourcesDTO(slaveSchemaRepository, slaveSchemaRepository,
+ mockSchemaContextFactory, mockSchemasResolver)).build();
+
+ netconfNodeManager = new NetconfNodeManager(slaveSetup, DEVICE_ID, responseTimeout,
+ mockMountPointService);
+
+ setupMountPointMocks();
+ }
+
+ @After
+ public void teardown() {
+ TestKit.shutdownActorSystem(slaveSystem, Boolean.TRUE);
+ TestKit.shutdownActorSystem(masterSystem, Boolean.TRUE);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSlaveMountPointRegistration() throws InterruptedException, ExecutionException, TimeoutException {
+ initializeMaster();
+
+ ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
+ doReturn(mockListenerReg).when(mockDataBroker).registerDataTreeChangeListener(any(), any());
+
+ final NodeId nodeId = new NodeId("device");
+ final NodeKey nodeKey = new NodeKey(nodeId);
+ final String topologyId = "topology-netconf";
+ final InstanceIdentifier<Node> nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath(
+ nodeKey, topologyId);
+
+ netconfNodeManager.registerDataTreeChangeListener(topologyId, nodeKey);
+ verify(mockDataBroker).registerDataTreeChangeListener(any(), eq(netconfNodeManager));
+
+ // Invoke onDataTreeChanged with a NetconfNode WRITE to simulate the master writing the operational state to
+ // Connected. Expect the slave mount point created and registered.
+
+ final NetconfNode netconfNode = newNetconfNode();
+ final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(NetconfNode.class, netconfNode).build();
+
+ DataObjectModification<Node> mockDataObjModification = mock(DataObjectModification.class);
+ doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).getIdentifier();
+ doReturn(WRITE).when(mockDataObjModification).getModificationType();
+ doReturn(node).when(mockDataObjModification).getDataAfter();
+
+ netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+ new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+ LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+ verify(mockMountPointBuilder, timeout(5000)).register();
+ verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
+ verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
+ verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
+ verify(mockMountPointService).createMountPoint(DEVICE_ID.getTopologyPath());
+
+ // Notify that the NetconfNode operational state was deleted. Expect the slave mount point closed.
+
+ doReturn(DELETE).when(mockDataObjModification).getModificationType();
+ doReturn(node).when(mockDataObjModification).getDataBefore();
+ doReturn(null).when(mockDataObjModification).getDataAfter();
+
+ netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+ new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+ LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+ verify(mockMountPointReg, timeout(5000)).close();
+
+ // Notify with a NetconfNode operational state WRITE. Expect the slave mount point re-created.
+
+ setupMountPointMocks();
+
+ doReturn(WRITE).when(mockDataObjModification).getModificationType();
+ doReturn(null).when(mockDataObjModification).getDataBefore();
+ doReturn(node).when(mockDataObjModification).getDataAfter();
+
+ netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+ new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+ LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+ verify(mockMountPointBuilder, timeout(5000)).register();
+
+ // Notify again with a NetconfNode operational state WRITE. Expect the prior slave mount point closed and
+ // and a new one registered.
+
+ setupMountPointMocks();
+
+ doReturn(node).when(mockDataObjModification).getDataBefore();
+
+ netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+ new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+ LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+ verify(mockMountPointReg, timeout(5000)).close();
+ verify(mockMountPointBuilder, timeout(5000)).register();
+
+ // Notify that the NetconfNode operational state was changed to UnableToConnect. Expect the slave mount point
+ // closed.
+
+ setupMountPointMocks();
+
+ final Node updatedNode = new NodeBuilder().setNodeId(nodeId)
+ .addAugmentation(NetconfNode.class, new NetconfNodeBuilder(netconfNode)
+ .setConnectionStatus(NetconfNodeConnectionStatus.ConnectionStatus.UnableToConnect)
+ .build()).build();
+
+ doReturn(SUBTREE_MODIFIED).when(mockDataObjModification).getModificationType();
+ doReturn(node).when(mockDataObjModification).getDataBefore();
+ doReturn(updatedNode).when(mockDataObjModification).getDataAfter();
+
+ netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+ new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+ LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+ verify(mockMountPointReg, timeout(5000)).close();
+
+ netconfNodeManager.close();
+ verifyNoMoreInteractions(mockMountPointReg);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSlaveMountPointRegistrationFailuresAndRetries()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final NodeId nodeId = new NodeId("device");
+ final NodeKey nodeKey = new NodeKey(nodeId);
+ final String topologyId = "topology-netconf";
+ final InstanceIdentifier<Node> nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath(
+ nodeKey, topologyId);
+
+ final NetconfNode netconfNode = newNetconfNode();
+ final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(NetconfNode.class, netconfNode).build();
+
+ DataObjectModification<Node> mockDataObjModification = mock(DataObjectModification.class);
+ doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).getIdentifier();
+ doReturn(WRITE).when(mockDataObjModification).getModificationType();
+ doReturn(node).when(mockDataObjModification).getDataAfter();
+
+ // First try the registration where the perceived master hasn't been initialized as the master.
+
+ netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+ new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+ LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+ verify(mockMountPointBuilder, after(1000).never()).register();
+
+ // Initialize the master but drop the initial YangTextSchemaSourceRequest message sent to the master so
+ // it retries.
+
+ initializeMaster();
+
+ CompletableFuture<AskForMasterMountPoint> yangTextSchemaSourceRequestFuture = new CompletableFuture<>();
+ testMasterActorRef.underlyingActor().messagesToDrop.put(YangTextSchemaSourceRequest.class,
+ yangTextSchemaSourceRequestFuture);
+
+ netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+ new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+ LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+ yangTextSchemaSourceRequestFuture.get(5, TimeUnit.SECONDS);
+ verify(mockMountPointBuilder, timeout(5000)).register();
+
+ // Initiate another registration but drop the initial AskForMasterMountPoint message sent to the master so
+ // it retries.
+
+ setupMountPointMocks();
+
+ CompletableFuture<AskForMasterMountPoint> askForMasterMountPointFuture = new CompletableFuture<>();
+ testMasterActorRef.underlyingActor().messagesToDrop.put(AskForMasterMountPoint.class,
+ askForMasterMountPointFuture);
+
+ netconfNodeManager.onDataTreeChanged(Collections.singletonList(
+ new NetconfTopologyManagerTest.CustomTreeModification(new DataTreeIdentifier<>(
+ LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
+
+ askForMasterMountPointFuture.get(5, TimeUnit.SECONDS);
+ verify(mockMountPointReg, timeout(5000)).close();
+ verify(mockMountPointBuilder, timeout(5000)).register();
+
+ setupMountPointMocks();
+ netconfNodeManager.close();
+ verify(mockMountPointReg, timeout(5000)).close();
+ }
+
+ private NetconfNode newNetconfNode() {
+ return new NetconfNodeBuilder()
+ .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+ .setPort(new PortNumber(9999))
+ .setConnectionStatus(NetconfNodeConnectionStatus.ConnectionStatus.Connected)
+ .setClusteredConnectionStatus(new ClusteredConnectionStatusBuilder()
+ .setNetconfMasterNode(masterAddress).build())
+ .build();
+ }
+
+ private void setupMountPointMocks() {
+ reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
+
+ doNothing().when(mockMountPointReg).close();
+
+ doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addInitialSchemaContext(any());
+ doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
+ doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
+
+ doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
+ }
+
+ private void initializeMaster() {
+ TestKit kit = new TestKit(masterSystem);
+
+ testMasterActorRef.tell(new CreateInitialMasterActorData(mockDeviceDataBroker, SOURCE_IDENTIFIERS,
+ mockRpcService), kit.getRef());
+
+ kit.expectMsgClass(MasterActorDataInitialized.class);
+ }
+
+ private static class TestMasterActor extends NetconfNodeActor {
+ final Map<Class<?>, CompletableFuture<? extends Object>> messagesToDrop = new ConcurrentHashMap<>();
+
+ TestMasterActor(NetconfTopologySetup setup, RemoteDeviceId deviceId, Timeout actorResponseWaitTime,
+ DOMMountPointService mountPointService) {
+ super(setup, deviceId, setup.getSchemaResourcesDTO().getSchemaRegistry(),
+ setup.getSchemaResourcesDTO().getSchemaRepository(), actorResponseWaitTime, mountPointService);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ CompletableFuture dropFuture = messagesToDrop.remove(message.getClass());
+ if (dropFuture != null) {
+ dropFuture.complete(message);
+ } else {
+ super.handleReceive(message);
+ }
+ }
+ }
+}
}
}
- private class CustomTreeModification implements DataTreeModification<Node> {
+ static class CustomTreeModification implements DataTreeModification<Node> {
private final DataTreeIdentifier<Node> rootPath;
private final DataObjectModification<Node> rootNode;
--- /dev/null
+Slave {
+ akka {
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+ warn-about-java-serializer-usage = false
+ }
+ remote {
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2550
+ }
+ }
+
+ cluster {
+ roles = [
+ "slave"
+ ]
+ }
+ }
+}
+
+Master {
+ akka {
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+ warn-about-java-serializer-usage = false
+ }
+ remote {
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2552
+ }
+ }
+
+ cluster {
+ roles = [
+ "master"
+ ]
+ }
+ }
+}
--- /dev/null
+org.slf4j.simpleLogger.showDateTime=true
+org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
+org.slf4j.simpleLogger.logFile=System.out
+org.slf4j.simpleLogger.showShortLogName=true
+org.slf4j.simpleLogger.levelInBrackets=true
+org.slf4j.simpleLogger.log.org.opendaylight.netconf.topology.singleton.impl=debug