import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.slf4j.LoggerFactory;
/**
- * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
+ * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
*/
public class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
public static final Function<QName, SourceIdentifier> QNAME_TO_SOURCE_ID_FUNCTION =
input -> RevisionSourceIdentifier.create(input.getLocalName(),
- Optional.fromNullable(input.getFormattedRevision()));
+ Optional.fromNullable(input.getFormattedRevision()));
protected final RemoteDeviceId id;
private final boolean reconnectOnSchemasChange;
private final NetconfDeviceSchemasResolver stateSchemasResolver;
private final NotificationHandler notificationHandler;
protected final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
+ @GuardedBy("this")
+ private boolean connected = false;
// Message transformer is constructed once the schemas are available
private MessageTransformer<NetconfMessage> messageTransformer;
// Yang models are being downloaded in this method and it would cause a
// deadlock if we used the netty thread
// http://netty.io/wiki/thread-model.html
+ setConnected(true);
LOG.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
final NetconfDeviceRpc initRpc = getRpcForInitialization(listener, remoteSessionCapabilities.isNotificationsSupported());
}
private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc, final NetconfDeviceCommunicator listener) {
- // TODO check whether the model describing create subscription is present in schema
+ // TODO check whether the model describing create subscription is present in schema
// Perhaps add a default schema context to support create-subscription if the model was not provided (same as what we do for base netconf operations in transformer)
final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResultListenableFuture = deviceRpc.invokeRpc(
- NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME),
- NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
+ NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME),
+ NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() {
@Override
return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
}
- void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) {
- final BaseSchema baseSchema =
- remoteSessionCapabilities.isNotificationsSupported() ?
- BaseSchema.BASE_NETCONF_CTX_WITH_NOTIFICATIONS :
- BaseSchema.BASE_NETCONF_CTX;
- messageTransformer = new NetconfMessageTransformer(result, true, baseSchema);
-
- updateTransformer(messageTransformer);
- // salFacade.onDeviceConnected has to be called before the notification handler is initialized
- salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
- notificationHandler.onRemoteSchemaUp(messageTransformer);
-
- LOG.info("{}: Netconf connector initialized successfully", id);
+ private synchronized void handleSalInitializationSuccess(final SchemaContext result,
+ final NetconfSessionPreferences remoteSessionCapabilities,
+ final DOMRpcService deviceRpc) {
+ //NetconfDevice.SchemaSetup can complete after NetconfDeviceCommunicator was closed. In that case do nothing,
+ //since salFacade.onDeviceDisconnected was already called.
+ if (connected) {
+ final BaseSchema baseSchema =
+ remoteSessionCapabilities.isNotificationsSupported() ?
+ BaseSchema.BASE_NETCONF_CTX_WITH_NOTIFICATIONS :
+ BaseSchema.BASE_NETCONF_CTX;
+ messageTransformer = new NetconfMessageTransformer(result, true, baseSchema);
+
+ updateTransformer(messageTransformer);
+ // salFacade.onDeviceConnected has to be called before the notification handler is initialized
+ salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
+ notificationHandler.onRemoteSchemaUp(messageTransformer);
+
+ LOG.info("{}: Netconf connector initialized successfully", id);
+ } else {
+ LOG.warn("{}: Device communicator was closed before schema setup finished.", id);
+ }
}
- void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ private void handleSalInitializationFailure(final Throwable t,
+ final RemoteDeviceCommunicator<NetconfMessage> listener) {
LOG.error("{}: Initialization in sal failed, disconnecting from device", id, t);
listener.close();
onRemoteSessionDown();
messageTransformer = transformer;
}
+ private synchronized void setConnected(final boolean connected) {
+ this.connected = connected;
+ }
+
private void addProvidedSourcesToSchemaRegistry(final DeviceSources deviceSources) {
final SchemaSourceProvider<YangTextSchemaSource> yangProvider = deviceSources.getSourceProvider();
for (final SourceIdentifier sourceId : deviceSources.getProvidedSources()) {
@Override
public void onRemoteSessionDown() {
+ setConnected(false);
notificationHandler.onRemoteSchemaDown();
salFacade.onDeviceDisconnected();
@Override
public void onRemoteSessionFailed(final Throwable throwable) {
+ setConnected(false);
salFacade.onDeviceFailed(throwable);
}
private final NetconfDeviceSchemasResolver stateSchemasResolver;
DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities,
- final RemoteDeviceId id, final NetconfDeviceSchemasResolver stateSchemasResolver) {
+ final RemoteDeviceId id, final NetconfDeviceSchemasResolver stateSchemasResolver) {
this.deviceRpc = deviceRpc;
this.remoteSessionCapabilities = remoteSessionCapabilities;
this.id = id;
}
final SchemaSourceProvider<YangTextSchemaSource> sourceProvider;
- if(availableSchemas instanceof LibraryModulesSchemas) {
+ if (availableSchemas instanceof LibraryModulesSchemas) {
sourceProvider = new YangLibrarySchemaYangSourceProvider(id,
((LibraryModulesSchemas) availableSchemas).getAvailableModels());
} else {
private Collection<SourceIdentifier> filterMissingSources(final Collection<SourceIdentifier> requiredSources) {
return requiredSources.parallelStream().filter(sourceIdentifier -> {
- boolean remove = false;
- try {
- schemaRepository.getSchemaSource(sourceIdentifier, ASTSchemaSource.class).checkedGet();
- } catch (SchemaSourceException e) {
- remove = true;
- }
- return remove;
- }).collect(Collectors.toList());
+ boolean remove = false;
+ try {
+ schemaRepository.getSchemaSource(sourceIdentifier, ASTSchemaSource.class).checkedGet();
+ } catch (SchemaSourceException e) {
+ remove = true;
+ }
+ return remove;
+ }).collect(Collectors.toList());
}
/**
handleSalInitializationSuccess(result, remoteSessionCapabilities, getDeviceSpecificRpc(result));
return;
} catch (final Throwable t) {
- if (t instanceof MissingSchemaSourceException){
+ if (t instanceof MissingSchemaSourceException) {
requiredSources = handleMissingSchemaSourceException(requiredSources, (MissingSchemaSourceException) t);
} else if (t instanceof SchemaResolutionException) {
// schemaBuilderFuture.checkedGet() throws only SchemaResolutionException
return qname;
}
}
- LOG.warn("Unable to map identifier to a devices reported capability: {} Available: {}",identifier, deviceSources.getRequiredSourcesQName());
+ LOG.warn("Unable to map identifier to a devices reported capability: {} Available: {}", identifier, deviceSources.getRequiredSourcesQName());
// return null since we cannot find the QName, this capability will be removed from required sources and not reported as unresolved-capability
return null;
}
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyCollectionOf;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.after;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
}
try {
notification = new NetconfMessage(XmlUtil.readXmlToDocument(NetconfDeviceTest.class.getResourceAsStream("/notification-payload.xml")));
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new ExceptionInInitializerError(e);
}
}
public void testNotificationBeforeSchema() throws Exception {
final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
final NetconfDeviceCommunicator listener = getListener();
-
+ final SchemaContextFactory schemaContextProviderFactory = mock(SchemaContextFactory.class);
+ final SettableFuture<SchemaContext> schemaFuture = SettableFuture.create();
+ doReturn(Futures.makeChecked(schemaFuture, e -> new SchemaResolutionException("fail")))
+ .when(schemaContextProviderFactory).createSchemaContext(any(Collection.class));
final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO
- = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), getSchemaRepository(), getSchemaFactory(), stateSchemasResolver);
+ = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), getSchemaRepository(), schemaContextProviderFactory, stateSchemasResolver);
final NetconfDevice device = new NetconfDeviceBuilder()
.setReconnectOnSchemasChange(true)
.setSchemaResourcesDTO(schemaResourcesDTO)
.setSalFacade(facade)
.build();
+ final NetconfSessionPreferences sessionCaps = getSessionCaps(true,
+ Lists.newArrayList(TEST_CAPABILITY));
+ device.onRemoteSessionUp(sessionCaps, listener);
device.onNotification(notification);
device.onNotification(notification);
verify(facade, times(0)).onNotification(any(DOMNotification.class));
-
- final NetconfSessionPreferences sessionCaps = getSessionCaps(true,
- Lists.newArrayList(TEST_CAPABILITY));
-
- final DOMRpcService deviceRpc = mock(DOMRpcService.class);
-
- device.handleSalInitializationSuccess(NetconfToNotificationTest.getNotificationSchemaContext(getClass(), false), sessionCaps, deviceRpc);
-
+ schemaFuture.set(NetconfToNotificationTest.getNotificationSchemaContext(getClass(), false));
verify(facade, timeout(10000).times(2)).onNotification(any(DOMNotification.class));
device.onNotification(notification);
verify(facade, timeout(5000).times(2)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionPreferences.class), any(DOMRpcService.class));
}
+ @Test
+ public void testNetconfDeviceDisconnectListenerCallCancellation() throws Exception {
+ final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
+ final NetconfDeviceCommunicator listener = getListener();
+ final SchemaContextFactory schemaContextProviderFactory = mock(SchemaContextFactory.class);
+ final SettableFuture<SchemaContext> schemaFuture = SettableFuture.create();
+ doReturn(Futures.makeChecked(schemaFuture, e -> new SchemaResolutionException("fail")))
+ .when(schemaContextProviderFactory).createSchemaContext(any(Collection.class));
+ final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO
+ = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), getSchemaRepository(),
+ schemaContextProviderFactory, stateSchemasResolver);
+ final NetconfDevice device = new NetconfDeviceBuilder()
+ .setReconnectOnSchemasChange(true)
+ .setSchemaResourcesDTO(schemaResourcesDTO)
+ .setGlobalProcessingExecutor(getExecutor())
+ .setId(getId())
+ .setSalFacade(facade)
+ .build();
+ final NetconfSessionPreferences sessionCaps = getSessionCaps(true,
+ Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&revision=" + TEST_REVISION));
+ //session up, start schema resolution
+ device.onRemoteSessionUp(sessionCaps, listener);
+ //session closed
+ device.onRemoteSessionDown();
+ verify(facade, timeout(5000)).onDeviceDisconnected();
+ //complete schema setup
+ schemaFuture.set(getSchema());
+ //facade.onDeviceDisconnected() was called, so facade.onDeviceConnected() shouldn't be called anymore
+ verify(facade, after(1000).never()).onDeviceConnected(any(), any(), any());
+ }
+
@Test
public void testNetconfDeviceAvailableCapabilitiesBuilding() throws Exception {
final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
.setId(getId())
.setSalFacade(facade)
.build();
- NetconfDevice netconfSpy = Mockito.spy(device);
+ final NetconfDevice netconfSpy = Mockito.spy(device);
final NetconfSessionPreferences sessionCaps = getSessionCaps(true,
Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&revision=" + TEST_REVISION));
- Map<QName, AvailableCapability.CapabilityOrigin> moduleBasedCaps = new HashMap<>();
+ final Map<QName, AvailableCapability.CapabilityOrigin> moduleBasedCaps = new HashMap<>();
moduleBasedCaps.putAll(sessionCaps.getModuleBasedCapsOrigin());
moduleBasedCaps.put(QName.create("(test:qname:side:loading)test"), AvailableCapability.CapabilityOrigin.UserDefined);
netconfSpy.onRemoteSessionUp(sessionCaps.replaceModuleCaps(moduleBasedCaps), listener);
- ArgumentCaptor argument = ArgumentCaptor.forClass(NetconfSessionPreferences.class);
- verify(netconfSpy, timeout(5000)).handleSalInitializationSuccess(any(SchemaContext.class), (NetconfSessionPreferences) argument.capture(), any(DOMRpcService.class));
- NetconfDeviceCapabilities netconfDeviceCaps = ((NetconfSessionPreferences) argument.getValue()).getNetconfDeviceCapabilities();
+ final ArgumentCaptor argument = ArgumentCaptor.forClass(NetconfSessionPreferences.class);
+ verify(facade, timeout(5000)).onDeviceConnected(any(SchemaContext.class), (NetconfSessionPreferences) argument.capture(), any(DOMRpcService.class));
+ final NetconfDeviceCapabilities netconfDeviceCaps = ((NetconfSessionPreferences) argument.getValue()).getNetconfDeviceCapabilities();
netconfDeviceCaps.getResolvedCapabilities().forEach(entry -> assertEquals("Builded 'AvailableCapability' schemas should match input capabilities.",
moduleBasedCaps.get(QName.create(entry.getCapability())).getName(), entry.getCapabilityOrigin().getName()));