import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability.FailureReason;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability;
import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
/**
* This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
*/
-public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage> {
+public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
};
private final RemoteDeviceId id;
+ private final boolean reconnectOnSchemasChange;
private final SchemaContextFactory schemaContextFactory;
private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
final ExecutorService globalProcessingExecutor, final MessageTransformer<NetconfMessage> messageTransformer) {
+ this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, messageTransformer, false);
+ }
+
+ // FIXME reduce parameters
+ public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
+ final ExecutorService globalProcessingExecutor, final MessageTransformer<NetconfMessage> messageTransformer, final boolean reconnectOnSchemasChange) {
this.id = id;
+ this.reconnectOnSchemasChange = reconnectOnSchemasChange;
this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry();
this.messageTransformer = messageTransformer;
this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory();
@Override
public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
- final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ final NetconfDeviceCommunicator listener) {
// SchemaContext setup has to be performed in a dedicated thread since
// we are in a netty thread in this method
// Yang models are being downloaded in this method and it would cause a
final DeviceSourcesResolver task = new DeviceSourcesResolver(deviceRpc, remoteSessionCapabilities, id, stateSchemasResolver);
final ListenableFuture<DeviceSources> sourceResolverFuture = processingExecutor.submit(task);
+ if(shouldListenOnSchemaChange(remoteSessionCapabilities)) {
+ registerToBaseNetconfStream(deviceRpc, listener);
+ }
+
final FutureCallback<DeviceSources> resolvedSourceCallback = new FutureCallback<DeviceSources>() {
@Override
public void onSuccess(final DeviceSources result) {
}
+ private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc, final NetconfDeviceCommunicator listener) {
+ final ListenableFuture<RpcResult<CompositeNode>> rpcResultListenableFuture =
+ deviceRpc.invokeRpc(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME, NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
+
+ final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() {
+ @Override
+ public Optional<CompositeNode> filterNotification(final CompositeNode notification) {
+ if (isCapabilityChanged(notification)) {
+ logger.info("{}: Schemas change detected, reconnecting", id);
+ // Only disconnect is enough, the reconnecting nature of the connector will take care of reconnecting
+ listener.disconnect();
+ return Optional.absent();
+ }
+ return Optional.of(notification);
+ }
+
+ private boolean isCapabilityChanged(final CompositeNode notification) {
+ return notification.getNodeType().equals(NetconfCapabilityChange.QNAME);
+ }
+ };
+
+ Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<CompositeNode>>() {
+ @Override
+ public void onSuccess(final RpcResult<CompositeNode> result) {
+ notificationHandler.addNotificationFilter(filter);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ logger.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly", t);
+ }
+ });
+ }
+
+ private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) {
+ return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
+ }
+
private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) {
updateMessageTransformer(result);
salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
notificationHandler.onRemoteSchemaUp();
- logger.debug("{}: Initialization in sal successful", id);
logger.info("{}: Netconf connector initialized successfully", id);
}
/**
* Update initial message transformer to use retrieved schema
- * @param currentSchemaContext
*/
private void updateMessageTransformer(final SchemaContext currentSchemaContext) {
messageTransformer.onGlobalContextUpdated(currentSchemaContext);
if (t instanceof MissingSchemaSourceException) {
final SourceIdentifier missingSource = ((MissingSchemaSourceException) t).getSourceId();
logger.warn("{}: Unable to build schema context, missing source {}, will reattempt without it", id, missingSource);
- capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource)), FailureReason.MissingSource);
+ capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource)), UnavailableCapability.FailureReason.MissingSource);
setUpSchema(stripMissingSource(requiredSources, missingSource));
// In case resolution error, try only with resolved sources
// TODO check for infinite loop
final SchemaResolutionException resolutionException = (SchemaResolutionException) t;
final Set<SourceIdentifier> unresolvedSources = resolutionException.getUnsatisfiedImports().keySet();
- capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), FailureReason.UnableToResolve);
+ capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), UnavailableCapability.FailureReason.UnableToResolve);
logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports());
setUpSchema(resolutionException.getResolvedSources());
// unknown error, fail