+ }
+
+ private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc, final NetconfDeviceCommunicator listener) {
+ // TODO check whether the model describing create subscription is present in schema
+ // Perhaps add a default schema context to support create-subscription if the model was not provided (same as what we do for base netconf operations in transformer)
+ final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResultListenableFuture =
+ deviceRpc.invokeRpc(NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME), NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
+
+ final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() {
+ @Override
+ public Optional<DOMNotification> filterNotification(final DOMNotification notification) {
+ if (isCapabilityChanged(notification)) {
+ logger.info("{}: Schemas change detected, reconnecting", id);
+ // Only disconnect is enough, the reconnecting nature of the connector will take care of reconnecting
+ listener.disconnect();
+ return Optional.absent();
+ }
+ return Optional.of(notification);
+ }
+
+ private boolean isCapabilityChanged(final DOMNotification notification) {
+ return notification.getBody().getNodeType().equals(NetconfCapabilityChange.QNAME);
+ }
+ };
+
+ Futures.addCallback(rpcResultListenableFuture, new FutureCallback<DOMRpcResult>() {
+ @Override
+ public void onSuccess(final DOMRpcResult domRpcResult) {
+ notificationHandler.addNotificationFilter(filter);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ logger.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly", t);
+ }
+ });
+ }