import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
+import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus;
handleSlaveMountPoint(rootNode);
break;
case DELETE:
- LOG.debug("{}: Operational for node {} deleted. Trying to remove slave mount point", id, nodeId);
- closeActor();
+ LOG.debug("{}: Operational for node {} deleted.", id, nodeId);
+ unregisterSlaveMountpoint();
break;
default:
LOG.debug("{}: Uknown operation for node: {}", id, nodeId);
@Override
public void close() {
+ unregisterSlaveMountpoint();
closeActor();
-
if (dataChangeListenerRegistration != null) {
dataChangeListenerRegistration.close();
dataChangeListenerRegistration = null;
private void closeActor() {
if (slaveActorRef != null) {
- slaveActorRef.tell(new UnregisterSlaveMountPoint(), ActorRef.noSender());
+ LOG.debug("{}: Sending poison pill to {}", id, slaveActorRef);
slaveActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
slaveActorRef = null;
}
}
+ private void unregisterSlaveMountpoint() {
+ if (slaveActorRef != null) {
+ LOG.debug("{}: Sending message to unregister slave mountpoint to {}", id, slaveActorRef);
+ slaveActorRef.tell(new UnregisterSlaveMountPoint(), ActorRef.noSender());
+ }
+ }
+
void registerDataTreeChangeListener(final String topologyId, final NodeKey key) {
LOG.debug("{}: Registering data tree change listener on node {}", id, key);
dataChangeListenerRegistration = setup.getDataBroker().registerDataTreeChangeListener(
final NetconfNode netconfNodeAfter = rootNode.getDataAfter().getAugmentation(NetconfNode.class);
if (NetconfNodeConnectionStatus.ConnectionStatus.Connected.equals(netconfNodeAfter.getConnectionStatus())) {
- createActorRef();
+ createOrUpdateActorRef();
final String masterAddress = netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode();
final String path = NetconfTopologyUtils.createActorPath(masterAddress,
NetconfTopologyUtils.createMasterActorName(id.getName(),
netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode()));
setup.getActorSystem().actorSelection(path).tell(new AskForMasterMountPoint(), slaveActorRef);
} else {
- closeActor();
+ unregisterSlaveMountpoint();
}
}
- private void createActorRef() {
+ private void createOrUpdateActorRef() {
if (slaveActorRef == null) {
slaveActorRef = setup.getActorSystem().actorOf(NetconfNodeActor.props(setup, id, schemaRegistry,
- schemaRepository, actorResponseWaitTime, mountPointService), id.getName());
+ schemaRepository, actorResponseWaitTime, mountPointService));
+ LOG.debug("{}: Slave actor created with name {}", id, slaveActorRef);
+ } else {
+ slaveActorRef
+ .tell(new RefreshSlaveActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime),
+ ActorRef.noSender());
}
}
context.refresh(createSetup(instanceIdentifier, node));
}
+ // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
+ // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
+ // retry registration several times and log the error.
+ // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
Preconditions.checkNotNull(netconfNode);
new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
actorResponseWaitTime, mountPointService);
- final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
- clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
+ int tries = 3;
+ while (true) {
+ try {
+ final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
+ clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
+ clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
+ contexts.put(instanceIdentifier, newNetconfTopologyContext);
+ break;
+ } catch (final RuntimeException e) {
+ LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
+
+ if (--tries <= 0) {
+ LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
+ newNetconfTopologyContext, e);
+ close();
+ break;
+ }
+ }
+ }
- clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
- contexts.put(instanceIdentifier, newNetconfTopologyContext);
}
@SuppressWarnings("checkstyle:IllegalCatch")
import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
+import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
- private final SchemaSourceRegistry schemaRegistry;
- private final SchemaRepository schemaRepository;
- private final Timeout actorResponseWaitTime;
private final Duration writeTxIdleTimeout;
private final DOMMountPointService mountPointService;
+ private SchemaSourceRegistry schemaRegistry;
+ private SchemaRepository schemaRepository;
+ private Timeout actorResponseWaitTime;
private RemoteDeviceId id;
private NetconfTopologySetup setup;
private List<SourceIdentifier> sourceIdentifiers;
slaveSalManager.close();
slaveSalManager = null;
}
-
+ } else if (message instanceof RefreshSlaveActor) { //slave
+ actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
+ id = ((RefreshSlaveActor) message).getId();
+ schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
+ setup = ((RefreshSlaveActor) message).getSetup();
+ schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
}
+
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import akka.util.Timeout;
+import java.io.Serializable;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
+
+public class RefreshSlaveActor implements Serializable {
+
+ private final SchemaRepository schemaRepository;
+ private final RemoteDeviceId id;
+ private final SchemaSourceRegistry schemaRegistry;
+ private final NetconfTopologySetup setup;
+ private final Timeout actorResponseWaitTime;
+
+ public RefreshSlaveActor(final NetconfTopologySetup setup, final RemoteDeviceId id,
+ final SchemaSourceRegistry schemaRegistry, final SchemaRepository schemaRepository,
+ final Timeout actorResponseWaitTime) {
+ this.setup = setup;
+ this.id = id;
+ this.schemaRegistry = schemaRegistry;
+ this.schemaRepository = schemaRepository;
+ this.actorResponseWaitTime = actorResponseWaitTime;
+ }
+
+ public Timeout getActorResponseWaitTime() {
+ return actorResponseWaitTime;
+ }
+
+ public SchemaRepository getSchemaRepository() {
+ return schemaRepository;
+ }
+
+ public RemoteDeviceId getId() {
+ return id;
+ }
+
+ public SchemaSourceRegistry getSchemaRegistry() {
+ return schemaRegistry;
+ }
+
+ public NetconfTopologySetup getSetup() {
+ return setup;
+ }
+}
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@Test
public void testWriteConfiguration() throws Exception {
+ writeConfiguration(false);
+ }
- final ClusterSingletonServiceRegistration clusterRegistration = mock(ClusterSingletonServiceRegistration.class);
+ @Test
+ public void testWriteConfigurationFail() throws Exception {
+ writeConfiguration(true);
+ }
+
+ @Test
+ public void testRegisterDataTreeChangeListener() {
+
+ final WriteTransaction wtx = mock(WriteTransaction.class);
+
+ doReturn(wtx).when(dataBroker).newWriteOnlyTransaction();
+ doNothing().when(wtx).merge(any(), any(), any());
+ doReturn(Futures.immediateCheckedFuture(null)).when(wtx).submit();
+ doReturn(null).when(dataBroker).registerDataChangeListener(any(), any(), any(), any());
+
+ netconfTopologyManager.init();
+
+ // verify if listener is called with right parameters = registered on right path
+
+ verify(dataBroker, times(1)).registerDataTreeChangeListener(
+ new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, NetconfTopologyUtils
+ .createTopologyListPath(topologyId).child(Node.class)), netconfTopologyManager);
+
+ }
+
+ @Test
+ public void testClose() throws Exception {
final Field fieldContexts = NetconfTopologyManager.class.getDeclaredField("contexts");
fieldContexts.setAccessible(true);
+ @SuppressWarnings("unchecked") final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts =
+ (Map<InstanceIdentifier<Node>, NetconfTopologyContext>) fieldContexts.get(netconfTopologyManager);
+
+ final Field fieldClusterRegistrations =
+ NetconfTopologyManager.class.getDeclaredField("clusterRegistrations");
+ fieldClusterRegistrations.setAccessible(true);
@SuppressWarnings("unchecked")
- final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts =
+ final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration> clusterRegistrations =
+ (Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>)
+ fieldClusterRegistrations.get(netconfTopologyManager);
+
+ final InstanceIdentifier<Node> instanceIdentifier = NetconfTopologyUtils.createTopologyNodeListPath(
+ new NodeKey(new NodeId("node-id-1")), "topology-1");
+
+
+ final NetconfTopologyContext context = mock(NetconfTopologyContext.class);
+ final ClusterSingletonServiceRegistration clusterRegistration =
+ mock(ClusterSingletonServiceRegistration.class);
+ contexts.put(instanceIdentifier, context);
+ clusterRegistrations.put(instanceIdentifier, clusterRegistration);
+
+ doNothing().when(context).closeFinal();
+ doNothing().when(clusterRegistration).close();
+
+ netconfTopologyManager.close();
+ verify(context, times(1)).closeFinal();
+ verify(clusterRegistration, times(1)).close();
+
+ assertTrue(contexts.isEmpty());
+ assertTrue(clusterRegistrations.isEmpty());
+
+ }
+
+ private void writeConfiguration(final boolean fail) throws Exception {
+
+ final ClusterSingletonServiceRegistration clusterRegistration = mock(ClusterSingletonServiceRegistration.class);
+
+ final Field fieldContexts = NetconfTopologyManager.class.getDeclaredField("contexts");
+ fieldContexts.setAccessible(true);
+ @SuppressWarnings("unchecked") final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts =
(Map<InstanceIdentifier<Node>, NetconfTopologyContext>) fieldContexts.get(netconfTopologyManager);
- final Field fieldClusterRegistrations = NetconfTopologyManager.class.getDeclaredField("clusterRegistrations");
+ final Field fieldClusterRegistrations =
+ NetconfTopologyManager.class.getDeclaredField("clusterRegistrations");
fieldClusterRegistrations.setAccessible(true);
@SuppressWarnings("unchecked")
final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration> clusterRegistrations =
final Collection<DataTreeModification<Node>> changes = new ArrayList<>();
final InstanceIdentifier<Node> instanceIdentifier = NetconfTopologyUtils.createTopologyNodeListPath(
- new NodeKey(new NodeId("node-id-1")),"topology-1");
+ new NodeKey(new NodeId("node-id-1")), "topology-1");
final InstanceIdentifier<Node> instanceIdentifierDiferent = NetconfTopologyUtils.createTopologyNodeListPath(
- new NodeKey(new NodeId("node-id-2")),"topology-2");
+ new NodeKey(new NodeId("node-id-2")), "topology-2");
final DataTreeIdentifier<Node> rootIdentifier =
new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, instanceIdentifier);
final DataTreeIdentifier<Node> rootIdentifierDifferent =
new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, instanceIdentifierDiferent);
- @SuppressWarnings("unchecked")
- final DataObjectModification<Node> objectModification = mock(DataObjectModification.class);
+ @SuppressWarnings("unchecked") final DataObjectModification<Node> objectModification =
+ mock(DataObjectModification.class);
final NetconfNode netconfNode = new NetconfNodeBuilder()
.setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
final Identifier key = new NodeKey(new NodeId("node-id"));
- @SuppressWarnings("unchecked")
- final InstanceIdentifier.IdentifiableItem<Node, NodeKey> pathArgument =
+ @SuppressWarnings("unchecked") final InstanceIdentifier.IdentifiableItem<Node, NodeKey> pathArgument =
new InstanceIdentifier.IdentifiableItem(Node.class, key);
// testing WRITE on two identical rootIdentifiers and one different
-
- changes.add(new CustomTreeModification(rootIdentifier, objectModification));
- changes.add(new CustomTreeModification(rootIdentifier, objectModification));
- changes.add(new CustomTreeModification(rootIdentifierDifferent, objectModification));
-
+ if (fail) {
+ changes.add(new CustomTreeModification(rootIdentifier, objectModification));
+ } else {
+ changes.add(new CustomTreeModification(rootIdentifier, objectModification));
+ changes.add(new CustomTreeModification(rootIdentifier, objectModification));
+ changes.add(new CustomTreeModification(rootIdentifierDifferent, objectModification));
+ }
doReturn(WRITE).when(objectModification).getModificationType();
doReturn(node).when(objectModification).getDataAfter();
doReturn(pathArgument).when(objectModification).getIdentifier();
- doReturn(clusterRegistration).when(clusterSingletonServiceProvider).registerClusterSingletonService(any());
-
- netconfTopologyManager.onDataTreeChanged(changes);
-
- verify(clusterSingletonServiceProvider, times(2)).registerClusterSingletonService(any());
-
- // only two created contexts
- assertEquals(2, contexts.size());
- assertTrue(contexts.containsKey(rootIdentifier.getRootIdentifier()));
- assertTrue(contexts.containsKey(rootIdentifierDifferent.getRootIdentifier()));
-
- // only two created cluster registrations
- assertEquals(2, contexts.size());
- assertTrue(clusterRegistrations.containsKey(rootIdentifier.getRootIdentifier()));
- assertTrue(clusterRegistrations.containsKey(rootIdentifierDifferent.getRootIdentifier()));
-
- // after delete there should be no context and clustered registrations
- doReturn(DELETE).when(objectModification).getModificationType();
-
- doNothing().when(clusterRegistration).close();
+ if (fail) {
+ doThrow(new RuntimeException("error")).when(clusterSingletonServiceProvider)
+ .registerClusterSingletonService(any());
+ } else {
+ doReturn(clusterRegistration).when(clusterSingletonServiceProvider).registerClusterSingletonService(any());
+ }
netconfTopologyManager.onDataTreeChanged(changes);
- verify(clusterRegistration, times(2)).close();
-
- // empty map of contexts
- assertTrue(contexts.isEmpty());
- assertFalse(contexts.containsKey(rootIdentifier.getRootIdentifier()));
- assertFalse(contexts.containsKey(rootIdentifierDifferent.getRootIdentifier()));
-
- // empty map of clustered registrations
- assertTrue(clusterRegistrations.isEmpty());
- assertFalse(clusterRegistrations.containsKey(rootIdentifier.getRootIdentifier()));
- assertFalse(clusterRegistrations.containsKey(rootIdentifierDifferent.getRootIdentifier()));
-
- }
-
- @Test
- public void testRegisterDataTreeChangeListener() {
+ if (fail) {
+ verify(clusterSingletonServiceProvider, times(3))
+ .registerClusterSingletonService(any());
+ assertTrue(contexts.isEmpty());
+ assertTrue(clusterRegistrations.isEmpty());
+ } else {
+ verify(clusterSingletonServiceProvider, times(2))
+ .registerClusterSingletonService(any());
- final WriteTransaction wtx = mock(WriteTransaction.class);
+ // only two created contexts
+ assertEquals(2, contexts.size());
+ assertTrue(contexts.containsKey(rootIdentifier.getRootIdentifier()));
+ assertTrue(contexts.containsKey(rootIdentifierDifferent.getRootIdentifier()));
- doReturn(wtx).when(dataBroker).newWriteOnlyTransaction();
- doNothing().when(wtx).merge(any(), any(), any());
- doReturn(Futures.immediateCheckedFuture(null)).when(wtx).submit();
- doReturn(null).when(dataBroker).registerDataChangeListener(any(), any(), any(), any());
+ // only two created cluster registrations
+ assertEquals(2, clusterRegistrations.size());
+ assertTrue(clusterRegistrations.containsKey(rootIdentifier.getRootIdentifier()));
+ assertTrue(clusterRegistrations.containsKey(rootIdentifierDifferent.getRootIdentifier()));
- netconfTopologyManager.init();
+ // after delete there should be no context and clustered registrations
+ doReturn(DELETE).when(objectModification).getModificationType();
- // verify if listener is called with right parameters = registered on right path
+ doNothing().when(clusterRegistration).close();
- verify(dataBroker, times(1)).registerDataTreeChangeListener(
- new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, NetconfTopologyUtils
- .createTopologyListPath(topologyId).child(Node.class)), netconfTopologyManager);
+ netconfTopologyManager.onDataTreeChanged(changes);
- }
+ verify(clusterRegistration, times(2)).close();
- @Test
- public void testClose() throws Exception {
-
- final Field fieldContexts = NetconfTopologyManager.class.getDeclaredField("contexts");
- fieldContexts.setAccessible(true);
- @SuppressWarnings("unchecked")
- final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts =
- (Map<InstanceIdentifier<Node>, NetconfTopologyContext>) fieldContexts.get(netconfTopologyManager);
-
- final Field fieldClusterRegistrations = NetconfTopologyManager.class.getDeclaredField("clusterRegistrations");
- fieldClusterRegistrations.setAccessible(true);
- @SuppressWarnings("unchecked")
- final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration> clusterRegistrations =
- (Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>)
- fieldClusterRegistrations.get(netconfTopologyManager);
-
- final InstanceIdentifier<Node> instanceIdentifier = NetconfTopologyUtils.createTopologyNodeListPath(
- new NodeKey(new NodeId("node-id-1")),"topology-1");
-
-
- final NetconfTopologyContext context = mock(NetconfTopologyContext.class);
- final ClusterSingletonServiceRegistration clusterRegistration =
- mock(ClusterSingletonServiceRegistration.class);
- contexts.put(instanceIdentifier, context);
- clusterRegistrations.put(instanceIdentifier, clusterRegistration);
-
- doNothing().when(context).closeFinal();
- doNothing().when(clusterRegistration).close();
-
- netconfTopologyManager.close();
- verify(context, times(1)).closeFinal();
- verify(clusterRegistration, times(1)).close();
-
- assertTrue(contexts.isEmpty());
- assertTrue(clusterRegistrations.isEmpty());
+ // empty map of contexts
+ assertTrue(contexts.isEmpty());
+ assertFalse(contexts.containsKey(rootIdentifier.getRootIdentifier()));
+ assertFalse(contexts.containsKey(rootIdentifierDifferent.getRootIdentifier()));
+ // empty map of clustered registrations
+ assertTrue(clusterRegistrations.isEmpty());
+ assertFalse(clusterRegistrations.containsKey(rootIdentifier.getRootIdentifier()));
+ assertFalse(clusterRegistrations.containsKey(rootIdentifierDifferent.getRootIdentifier()));
+ }
}
private class CustomTreeModification implements DataTreeModification<Node> {
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
public static final Function<QName, SourceIdentifier> QNAME_TO_SOURCE_ID_FUNCTION =
input -> RevisionSourceIdentifier.create(input.getLocalName(),
- Optional.fromNullable(input.getFormattedRevision()));
+ Optional.fromNullable(input.getFormattedRevision()));
protected final RemoteDeviceId id;
private final boolean reconnectOnSchemasChange;
private final NotificationHandler notificationHandler;
protected final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations =
Lists.newArrayList();
+ @GuardedBy("this")
+ private boolean connected = false;
// Message transformer is constructed once the schemas are available
private MessageTransformer<NetconfMessage> messageTransformer;
// Yang models are being downloaded in this method and it would cause a
// deadlock if we used the netty thread
// http://netty.io/wiki/thread-model.html
+ setConnected(true);
LOG.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
final NetconfDeviceRpc initRpc =
private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc,
final NetconfDeviceCommunicator listener) {
- // TODO check whether the model describing create subscription is present in schema
+ // TODO check whether the model describing create subscription is present in schema
// Perhaps add a default schema context to support create-subscription if the model was not provided
// (same as what we do for base netconf operations in transformer)
final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResultListenableFuture = deviceRpc.invokeRpc(
- NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME),
- NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
+ NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME),
+ NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() {
@Override
return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
}
- void handleSalInitializationSuccess(final SchemaContext result,
+ private synchronized void handleSalInitializationSuccess(final SchemaContext result,
final NetconfSessionPreferences remoteSessionCapabilities,
final DOMRpcService deviceRpc) {
- final BaseSchema baseSchema =
+ //NetconfDevice.SchemaSetup can complete after NetconfDeviceCommunicator was closed. In that case do nothing,
+ //since salFacade.onDeviceDisconnected was already called.
+ if (connected) {
+ final BaseSchema baseSchema =
remoteSessionCapabilities.isNotificationsSupported()
? BaseSchema.BASE_NETCONF_CTX_WITH_NOTIFICATIONS : BaseSchema.BASE_NETCONF_CTX;
- messageTransformer = new NetconfMessageTransformer(result, true, baseSchema);
+ messageTransformer = new NetconfMessageTransformer(result, true, baseSchema);
- updateTransformer(messageTransformer);
- // salFacade.onDeviceConnected has to be called before the notification handler is initialized
- salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
- notificationHandler.onRemoteSchemaUp(messageTransformer);
+ updateTransformer(messageTransformer);
+ // salFacade.onDeviceConnected has to be called before the notification handler is initialized
+ salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
+ notificationHandler.onRemoteSchemaUp(messageTransformer);
- LOG.info("{}: Netconf connector initialized successfully", id);
+ LOG.info("{}: Netconf connector initialized successfully", id);
+ } else {
+ LOG.warn("{}: Device communicator was closed before schema setup finished.", id);
+ }
}
- void handleSalInitializationFailure(final Throwable throwable,
- final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ private void handleSalInitializationFailure(final Throwable throwable,
+ final RemoteDeviceCommunicator<NetconfMessage> listener) {
LOG.error("{}: Initialization in sal failed, disconnecting from device", id, throwable);
listener.close();
onRemoteSessionDown();
messageTransformer = transformer;
}
+ private synchronized void setConnected(final boolean connected) {
+ this.connected = connected;
+ }
+
private void addProvidedSourcesToSchemaRegistry(final DeviceSources deviceSources) {
final SchemaSourceProvider<YangTextSchemaSource> yangProvider = deviceSources.getSourceProvider();
for (final SourceIdentifier sourceId : deviceSources.getProvidedSources()) {
@Override
public void onRemoteSessionDown() {
+ setConnected(false);
notificationHandler.onRemoteSchemaDown();
salFacade.onDeviceDisconnected();
@Override
public void onRemoteSessionFailed(final Throwable throwable) {
+ setConnected(false);
salFacade.onDeviceFailed(throwable);
}
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyCollectionOf;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.after;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
public void testNotificationBeforeSchema() throws Exception {
final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
final NetconfDeviceCommunicator listener = getListener();
-
- final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO(
- getSchemaRegistry(), getSchemaRepository(), getSchemaFactory(), STATE_SCHEMAS_RESOLVER);
+ final SchemaContextFactory schemaContextProviderFactory = mock(SchemaContextFactory.class);
+ final SettableFuture<SchemaContext> schemaFuture = SettableFuture.create();
+ doReturn(Futures.makeChecked(schemaFuture, e -> new SchemaResolutionException("fail")))
+ .when(schemaContextProviderFactory).createSchemaContext(any(Collection.class));
+ final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
+ new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), getSchemaRepository(),
+ schemaContextProviderFactory, STATE_SCHEMAS_RESOLVER);
final NetconfDevice device = new NetconfDeviceBuilder()
.setReconnectOnSchemasChange(true)
.setSchemaResourcesDTO(schemaResourcesDTO)
.setSalFacade(facade)
.build();
- device.onNotification(NOTIFICATION);
- device.onNotification(NOTIFICATION);
-
- verify(facade, times(0)).onNotification(any(DOMNotification.class));
-
final NetconfSessionPreferences sessionCaps = getSessionCaps(true,
Lists.newArrayList(TEST_CAPABILITY));
+ device.onRemoteSessionUp(sessionCaps, listener);
- final DOMRpcService deviceRpc = mock(DOMRpcService.class);
-
- device.handleSalInitializationSuccess(
- NetconfToNotificationTest.getNotificationSchemaContext(getClass(), false), sessionCaps, deviceRpc);
+ device.onNotification(NOTIFICATION);
+ device.onNotification(NOTIFICATION);
+ verify(facade, times(0)).onNotification(any(DOMNotification.class));
+ verify(facade, times(0)).onNotification(any(DOMNotification.class));
+ schemaFuture.set(NetconfToNotificationTest.getNotificationSchemaContext(getClass(), false));
verify(facade, timeout(10000).times(2)).onNotification(any(DOMNotification.class));
device.onNotification(NOTIFICATION);
any(SchemaContext.class), any(NetconfSessionPreferences.class), any(DOMRpcService.class));
}
+ @Test
+ public void testNetconfDeviceDisconnectListenerCallCancellation() throws Exception {
+ final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
+ final NetconfDeviceCommunicator listener = getListener();
+ final SchemaContextFactory schemaContextProviderFactory = mock(SchemaContextFactory.class);
+ final SettableFuture<SchemaContext> schemaFuture = SettableFuture.create();
+ doReturn(Futures.makeChecked(schemaFuture, e -> new SchemaResolutionException("fail")))
+ .when(schemaContextProviderFactory).createSchemaContext(any(Collection.class));
+ final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO
+ = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), getSchemaRepository(),
+ schemaContextProviderFactory, STATE_SCHEMAS_RESOLVER);
+ final NetconfDevice device = new NetconfDeviceBuilder()
+ .setReconnectOnSchemasChange(true)
+ .setSchemaResourcesDTO(schemaResourcesDTO)
+ .setGlobalProcessingExecutor(getExecutor())
+ .setId(getId())
+ .setSalFacade(facade)
+ .build();
+ final NetconfSessionPreferences sessionCaps = getSessionCaps(true,
+ Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&revision=" + TEST_REVISION));
+ //session up, start schema resolution
+ device.onRemoteSessionUp(sessionCaps, listener);
+ //session closed
+ device.onRemoteSessionDown();
+ verify(facade, timeout(5000)).onDeviceDisconnected();
+ //complete schema setup
+ schemaFuture.set(getSchema());
+ //facade.onDeviceDisconnected() was called, so facade.onDeviceConnected() shouldn't be called anymore
+ verify(facade, after(1000).never()).onDeviceConnected(any(), any(), any());
+ }
+
@Test
public void testNetconfDeviceAvailableCapabilitiesBuilding() throws Exception {
final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
.setId(getId())
.setSalFacade(facade)
.build();
- NetconfDevice netconfSpy = Mockito.spy(device);
+ final NetconfDevice netconfSpy = Mockito.spy(device);
final NetconfSessionPreferences sessionCaps = getSessionCaps(true,
Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&revision=" + TEST_REVISION));
- Map<QName, AvailableCapability.CapabilityOrigin> moduleBasedCaps = new HashMap<>();
+ final Map<QName, AvailableCapability.CapabilityOrigin> moduleBasedCaps = new HashMap<>();
moduleBasedCaps.putAll(sessionCaps.getModuleBasedCapsOrigin());
moduleBasedCaps
.put(QName.create("(test:qname:side:loading)test"), AvailableCapability.CapabilityOrigin.UserDefined);
netconfSpy.onRemoteSessionUp(sessionCaps.replaceModuleCaps(moduleBasedCaps), listener);
- ArgumentCaptor argument = ArgumentCaptor.forClass(NetconfSessionPreferences.class);
- verify(netconfSpy, timeout(5000)).handleSalInitializationSuccess(
- any(SchemaContext.class), (NetconfSessionPreferences) argument.capture(), any(DOMRpcService.class));
- NetconfDeviceCapabilities netconfDeviceCaps =
- ((NetconfSessionPreferences) argument.getValue()).getNetconfDeviceCapabilities();
+ final ArgumentCaptor<NetconfSessionPreferences> argument =
+ ArgumentCaptor.forClass(NetconfSessionPreferences.class);
+ verify(facade, timeout(5000))
+ .onDeviceConnected(any(SchemaContext.class), argument.capture(), any(DOMRpcService.class));
+ final NetconfDeviceCapabilities netconfDeviceCaps = argument.getValue().getNetconfDeviceCapabilities();
netconfDeviceCaps.getResolvedCapabilities()
.forEach(entry -> assertEquals("Builded 'AvailableCapability' schemas should match input capabilities.",