-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.util.concurrent.EventExecutor;
-
-public class NetconfDevice implements Provider, //
- DataReader<InstanceIdentifier, CompositeNode>, //
- DataCommitHandler<InstanceIdentifier, CompositeNode>, //
- RpcImplementation, //
- AutoCloseable {
-
- InetSocketAddress socketAddress;
-
- MountProvisionInstance mountInstance;
-
- EventExecutor eventExecutor;
-
- ExecutorService processingExecutor;
-
- InstanceIdentifier path;
-
- ReconnectStrategy reconnectStrategy;
-
- AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
-
- private NetconfDeviceSchemaContextProvider deviceContextProvider;
-
- protected Logger logger;
-
- Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
- Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
- Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
- List<RpcRegistration> rpcReg;
-
- String name;
-
- MountProvisionService mountService;
-
- NetconfClientDispatcher dispatcher;
-
- static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
-
- SchemaSourceProvider<InputStream> remoteSourceProvider;
-
- DataBrokerService dataBroker;
-
- NetconfDeviceListener listener;
-
- public NetconfDevice(String name) {
- this.name = name;
- this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
- this.path = InstanceIdentifier.builder(INVENTORY_PATH)
- .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
- }
-
- public void start() {
- checkState(dispatcher != null, "Dispatcher must be set.");
- checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
- checkState(eventExecutor != null, "Event executor must be set.");
-
- listener = new NetconfDeviceListener(this);
-
- logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
-
- dispatcher.createClient(socketAddress, listener, reconnectStrategy);
- }
-
- Optional<SchemaContext> getSchemaContext() {
- if (deviceContextProvider == null) {
- return Optional.absent();
- }
- return deviceContextProvider.currentContext;
- }
-
- void bringDown() {
- if (rpcReg != null) {
- for (RpcRegistration reg : rpcReg) {
- reg.close();
- }
- rpcReg = null;
- }
- closeGracefully(confReaderReg);
- confReaderReg = null;
- closeGracefully(operReaderReg);
- operReaderReg = null;
- closeGracefully(commitHandlerReg);
- commitHandlerReg = null;
-
- updateDeviceState(false, Collections.<QName> emptySet());
- }
-
- private void closeGracefully(final AutoCloseable resource) {
- if (resource != null) {
- try {
- resource.close();
- } catch (Exception e) {
- logger.warn("Ignoring exception while closing {}", resource, e);
- }
- }
- }
-
- void bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
- remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
- deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
- deviceContextProvider.createContextFromCapabilities(capabilities);
- if (mountInstance != null && getSchemaContext().isPresent()) {
- mountInstance.setSchemaContext(getSchemaContext().get());
- }
-
- updateDeviceState(true, capabilities);
-
- if (mountInstance != null) {
- confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
- operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
- commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
-
- List<RpcRegistration> rpcs = new ArrayList<>();
- // TODO same condition twice
- if (mountInstance != null && getSchemaContext().isPresent()) {
- for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
- rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), this));
- }
- }
- rpcReg = rpcs;
- }
- }
-
- private void updateDeviceState(boolean up, Set<QName> capabilities) {
- DataModificationTransaction transaction = dataBroker.beginTransaction();
-
- CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
- it.setQName(INVENTORY_NODE);
- it.addLeaf(INVENTORY_ID, name);
- it.addLeaf(INVENTORY_CONNECTED, up);
-
- logger.debug("Client capabilities {}", capabilities);
- for (QName capability : capabilities) {
- it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability);
- }
-
- logger.debug("Update device state transaction " + transaction.getIdentifier()
- + " putting operational data started.");
- transaction.removeOperationalData(path);
- transaction.putOperationalData(path, it.toInstance());
- logger.debug("Update device state transaction " + transaction.getIdentifier()
- + " putting operational data ended.");
-
- // FIXME: this has to be asynchronous
- RpcResult<TransactionStatus> transactionStatus = null;
- try {
- transactionStatus = transaction.commit().get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for response", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Read configuration data " + path + " failed", e);
- }
- // TODO better ex handling
-
- if (transactionStatus.isSuccessful()) {
- logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
- } else {
- logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
- logger.debug("Update device state transaction status " + transaction.getStatus());
- }
- }
-
- @Override
- public CompositeNode readConfigurationData(InstanceIdentifier path) {
- RpcResult<CompositeNode> result = null;
- try {
- result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
- wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for response", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Read configuration data " + path + " failed", e);
- }
-
- CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
- return data == null ? null : (CompositeNode) findNode(data, path);
- }
-
- @Override
- public CompositeNode readOperationalData(InstanceIdentifier path) {
- RpcResult<CompositeNode> result = null;
- try {
- result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for response", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Read configuration data " + path + " failed", e);
- }
-
- CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
- return (CompositeNode) findNode(data, path);
- }
-
- @Override
- public Set<QName> getSupportedRpcs() {
- return Collections.emptySet();
- }
-
- @Override
- public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
- return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()));
- }
-
- @Override
- public Collection<ProviderFunctionality> getProviderFunctionality() {
- return Collections.emptySet();
- }