*/
package org.opendaylight.netconf.sal.connect.netconf;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
private static final Logger LOG = LoggerFactory.getLogger(NetconfDevice.class);
- public static final Function<QName, SourceIdentifier> QNAME_TO_SOURCE_ID_FUNCTION =
- input -> RevisionSourceIdentifier.create(input.getLocalName(),
- Optional.fromNullable(input.getFormattedRevision()));
-
protected final RemoteDeviceId id;
private final boolean reconnectOnSchemasChange;
private final NetconfDeviceSchemasResolver stateSchemasResolver;
private final NotificationHandler notificationHandler;
protected final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations =
- Lists.newArrayList();
+ new ArrayList<>();
+ @GuardedBy("this")
+ private boolean connected = false;
// Message transformer is constructed once the schemas are available
private MessageTransformer<NetconfMessage> messageTransformer;
// Yang models are being downloaded in this method and it would cause a
// deadlock if we used the netty thread
// http://netty.io/wiki/thread-model.html
+ setConnected(true);
LOG.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
final NetconfDeviceRpc initRpc =
}
};
- Futures.addCallback(sourceResolverFuture, resolvedSourceCallback);
+ Futures.addCallback(sourceResolverFuture, resolvedSourceCallback, MoreExecutors.directExecutor());
}
private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc,
final NetconfDeviceCommunicator listener) {
- // TODO check whether the model describing create subscription is present in schema
+ // TODO check whether the model describing create subscription is present in schema
// Perhaps add a default schema context to support create-subscription if the model was not provided
// (same as what we do for base netconf operations in transformer)
final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResultListenableFuture = deviceRpc.invokeRpc(
- NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME),
- NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
+ NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME),
+ NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() {
@Override
LOG.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly",
throwable);
}
- });
+ }, MoreExecutors.directExecutor());
}
private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) {
return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
}
- void handleSalInitializationSuccess(final SchemaContext result,
+ private synchronized void handleSalInitializationSuccess(final SchemaContext result,
final NetconfSessionPreferences remoteSessionCapabilities,
final DOMRpcService deviceRpc) {
- final BaseSchema baseSchema =
+ //NetconfDevice.SchemaSetup can complete after NetconfDeviceCommunicator was closed. In that case do nothing,
+ //since salFacade.onDeviceDisconnected was already called.
+ if (connected) {
+ final BaseSchema baseSchema =
remoteSessionCapabilities.isNotificationsSupported()
? BaseSchema.BASE_NETCONF_CTX_WITH_NOTIFICATIONS : BaseSchema.BASE_NETCONF_CTX;
- messageTransformer = new NetconfMessageTransformer(result, true, baseSchema);
+ messageTransformer = new NetconfMessageTransformer(result, true, baseSchema);
- updateTransformer(messageTransformer);
- // salFacade.onDeviceConnected has to be called before the notification handler is initialized
- salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
- notificationHandler.onRemoteSchemaUp(messageTransformer);
+ updateTransformer(messageTransformer);
+ // salFacade.onDeviceConnected has to be called before the notification handler is initialized
+ salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
+ notificationHandler.onRemoteSchemaUp(messageTransformer);
- LOG.info("{}: Netconf connector initialized successfully", id);
+ LOG.info("{}: Netconf connector initialized successfully", id);
+ } else {
+ LOG.warn("{}: Device communicator was closed before schema setup finished.", id);
+ }
}
- void handleSalInitializationFailure(final Throwable throwable,
- final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ private void handleSalInitializationFailure(final Throwable throwable,
+ final RemoteDeviceCommunicator<NetconfMessage> listener) {
LOG.error("{}: Initialization in sal failed, disconnecting from device", id, throwable);
listener.close();
onRemoteSessionDown();
messageTransformer = transformer;
}
+ private synchronized void setConnected(final boolean connected) {
+ this.connected = connected;
+ }
+
private void addProvidedSourcesToSchemaRegistry(final DeviceSources deviceSources) {
final SchemaSourceProvider<YangTextSchemaSource> yangProvider = deviceSources.getSourceProvider();
for (final SourceIdentifier sourceId : deviceSources.getProvidedSources()) {
@Override
public void onRemoteSessionDown() {
+ setConnected(false);
notificationHandler.onRemoteSchemaDown();
salFacade.onDeviceDisconnected();
@Override
public void onRemoteSessionFailed(final Throwable throwable) {
+ setConnected(false);
salFacade.onDeviceFailed(throwable);
}
}
public Collection<SourceIdentifier> getRequiredSources() {
- return Collections2.transform(requiredSources, QNAME_TO_SOURCE_ID_FUNCTION);
+ return Collections2.transform(requiredSources, DeviceSources::toSourceId);
}
public Collection<SourceIdentifier> getProvidedSources() {
- return Collections2.transform(providedSources, QNAME_TO_SOURCE_ID_FUNCTION);
+ return Collections2.transform(providedSources, DeviceSources::toSourceId);
}
public SchemaSourceProvider<YangTextSchemaSource> getSourceProvider() {
return sourceProvider;
}
+
+ private static SourceIdentifier toSourceId(final QName input) {
+ return RevisionSourceIdentifier.create(input.getLocalName(),
+ Optional.fromNullable(input.getFormattedRevision()));
+ }
}
/**
capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities
.getNonModuleCaps().stream().map(entry -> new AvailableCapabilityBuilder()
- .setCapability(entry).setCapabilityOrigin(
+ .setCapability(entry).setCapabilityOrigin(
remoteSessionCapabilities.getNonModuleBasedCapsOrigin().get(entry)).build())
.collect(Collectors.toList()));
handleSalInitializationSuccess(result, remoteSessionCapabilities, getDeviceSpecificRpc(result));
return;
- } catch (final Throwable t) {
- if (t instanceof MissingSchemaSourceException) {
- requiredSources =
- handleMissingSchemaSourceException(requiredSources, (MissingSchemaSourceException) t);
- } else if (t instanceof SchemaResolutionException) {
- // schemaBuilderFuture.checkedGet() throws only SchemaResolutionException
- // that might be wrapping a MissingSchemaSourceException so we need to look
- // at the cause of the exception to make sure we don't misinterpret it.
- if (t.getCause() instanceof MissingSchemaSourceException) {
- requiredSources = handleMissingSchemaSourceException(
- requiredSources, (MissingSchemaSourceException) t.getCause());
- continue;
- }
- requiredSources =
- handleSchemaResolutionException(requiredSources, (SchemaResolutionException) t);
- } else {
- // unknown error, fail
- handleSalInitializationFailure(t, listener);
- return;
+ } catch (final SchemaResolutionException e) {
+ // schemaBuilderFuture.checkedGet() throws only SchemaResolutionException
+ // that might be wrapping a MissingSchemaSourceException so we need to look
+ // at the cause of the exception to make sure we don't misinterpret it.
+ if (e.getCause() instanceof MissingSchemaSourceException) {
+ requiredSources = handleMissingSchemaSourceException(
+ requiredSources, (MissingSchemaSourceException) e.getCause());
+ continue;
}
+ requiredSources = handleSchemaResolutionException(requiredSources, e);
+ } catch (final Exception e) {
+ // unknown error, fail
+ handleSalInitializationFailure(e, listener);
+ return;
}
}
// No more sources, fail