import java.io.InputStream
import java.net.InetSocketAddress
import java.net.URI
+import java.util.ArrayList
+import java.util.Collection
import java.util.Collections
import java.util.List
import java.util.Set
import java.util.concurrent.ExecutorService
-import java.util.concurrent.Future
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
import org.opendaylight.controller.md.sal.common.api.data.DataModification
import org.opendaylight.controller.md.sal.common.api.data.DataReader
-import org.opendaylight.controller.netconf.api.NetconfMessage
-import org.opendaylight.controller.netconf.client.NetconfClient
import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
-import org.opendaylight.controller.netconf.util.xml.XmlUtil
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
import org.opendaylight.controller.sal.core.api.Provider
import org.opendaylight.controller.sal.core.api.RpcImplementation
import org.opendaylight.controller.sal.core.api.data.DataBrokerService
import org.opendaylight.yangtools.yang.model.api.SchemaContext
import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
import org.slf4j.Logger
import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
-import com.google.common.util.concurrent.Futures
-class NetconfDevice implements Provider, //
+class NetconfDevice implements Provider, //
DataReader<InstanceIdentifier, CompositeNode>, //
DataCommitHandler<InstanceIdentifier, CompositeNode>, //
RpcImplementation, //
AutoCloseable {
- var NetconfClient client;
-
@Property
var InetSocketAddress socketAddress;
Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
+ List<RpcRegistration> rpcReg
+ @Property
val String name
- MountProvisionService mountService
-
- int messegeRetryCount = 5;
-
- int messageTimeoutCount = 5 * 1000;
- Set<QName> cachedCapabilities
+ MountProvisionService mountService
@Property
var NetconfClientDispatcher dispatcher
@Property
var SchemaSourceProvider<InputStream> remoteSourceProvider
-
+
DataBrokerService dataBroker
+ var NetconfDeviceListener listener;
+
public new(String name) {
- this.name = name;
+ this._name = name;
this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
Collections.singletonMap(INVENTORY_ID, name)).toInstance;
checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
checkState(eventExecutor != null, "Event executor must be set.");
- val listener = new NetconfDeviceListener(this);
- val task = startClientTask(dispatcher, listener)
- return processingExecutor.submit(task) as Future<Void>;
+ listener = new NetconfDeviceListener(this);
+ logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
+
+ dispatcher.createClient(socketAddress, listener, reconnectStrategy);
}
def Optional<SchemaContext> getSchemaContext() {
return deviceContextProvider.currentContext;
}
- private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
- return [ |
- try {
- logger.info("Starting Netconf Client on: {}", socketAddress);
- client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
- logger.debug("Initial capabilities {}", initialCapabilities);
- var SchemaSourceProvider<String> delegate;
- if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
- delegate = new NetconfRemoteSchemaSourceProvider(this);
- } else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) {
- delegate = new NetconfRemoteSchemaSourceProvider(this);
- } else {
- logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
- delegate = SchemaSourceProviders.<String>noopProvider();
- }
- remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
- deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
- deviceContextProvider.createContextFromCapabilities(initialCapabilities);
- if (mountInstance != null && schemaContext.isPresent) {
- mountInstance.schemaContext = schemaContext.get();
- val operations = schemaContext.get().operations;
- for (rpc : operations) {
- mountInstance.addRpcImplementation(rpc.QName, this);
- }
- }
- updateDeviceState()
- if (mountInstance != null && confReaderReg == null && operReaderReg == null) {
- confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
- operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
- commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
- }
- } catch (Exception e) {
- logger.error("Netconf client NOT started. ", e)
+ def bringDown() {
+ if (rpcReg != null) {
+ for (reg : rpcReg) {
+ reg.close()
+ }
+ rpcReg = null
+ }
+ confReaderReg?.close()
+ confReaderReg = null
+ operReaderReg?.close()
+ operReaderReg = null
+ commitHandlerReg?.close()
+ commitHandlerReg = null
+
+ updateDeviceState(false, Collections.emptySet())
+ }
+
+ def bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
+ remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
+ deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
+ deviceContextProvider.createContextFromCapabilities(capabilities);
+ if (mountInstance != null && schemaContext.isPresent) {
+ mountInstance.schemaContext = schemaContext.get();
+ }
+
+ updateDeviceState(true, capabilities)
+
+ if (mountInstance != null) {
+ confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
+ operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
+ commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
+
+ val rpcs = new ArrayList<RpcRegistration>();
+ for (rpc : mountInstance.schemaContext.operations) {
+ rpcs.add(mountInstance.addRpcImplementation(rpc.QName, this));
}
- ]
+ rpcReg = rpcs
+ }
}
- private def updateDeviceState() {
+ private def updateDeviceState(boolean up, Set<QName> capabilities) {
val transaction = dataBroker.beginTransaction
val it = ImmutableCompositeNode.builder
setQName(INVENTORY_NODE)
addLeaf(INVENTORY_ID, name)
- addLeaf(INVENTORY_CONNECTED, client.clientSession.up)
+ addLeaf(INVENTORY_CONNECTED, up)
- logger.debug("Client capabilities {}", client.capabilities)
- for (capability : client.capabilities) {
+ logger.debug("Client capabilities {}", capabilities)
+ for (capability : capabilities) {
addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
}
logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
+ transaction.removeOperationalData(path)
transaction.putOperationalData(path, it.toInstance)
logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
+
+ // FIXME: this has to be asynchronous
val transactionStatus = transaction.commit.get;
if (transactionStatus.successful) {
Collections.emptySet;
}
-// def createSubscription(String streamName) {
-// val it = ImmutableCompositeNode.builder()
-// QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
-// addLeaf("stream", streamName);
-// invokeRpc(QName, toInstance())
-// }
-
override invokeRpc(QName rpc, CompositeNode input) {
- try {
- val message = rpc.toRpcMessage(input,schemaContext);
- val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
- return Futures.immediateFuture(result.toRpcResult(rpc, schemaContext));
- } catch (Exception e) {
- logger.error("Rpc was not processed correctly.", e)
- throw e;
- }
- }
-
- def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
- logger.debug("Send message {}",XmlUtil.toString(message.document))
- val result = client.sendMessage(message, retryCount, timeout);
- NetconfMapping.checkValidReply(message, result)
- return result;
+ return listener.sendRequest(rpc.toRpcMessage(input,schemaContext));
}
override getProviderFunctionality() {
return null;
} else if (current instanceof CompositeNode) {
val currentComposite = (current as CompositeNode);
-
+
current = currentComposite.getFirstCompositeByName(arg.nodeType);
if(current == null) {
current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
}
override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
- val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
+ val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification, true);
twoPhaseCommit.prepare()
return twoPhaseCommit;
}
- def getInitialCapabilities() {
- val capabilities = client?.capabilities;
- if (capabilities == null) {
- return null;
- }
- if (cachedCapabilities == null) {
- cachedCapabilities = FluentIterable.from(capabilities).filter[
+ def getCapabilities(Collection<String> capabilities) {
+ return FluentIterable.from(capabilities).filter[
contains("?") && contains("module=") && contains("revision=")].transform [
val parts = split("\\?");
val namespace = parts.get(0);
}
return QName.create(namespace, revision, moduleName);
].toSet();
- }
- return cachedCapabilities;
}
override close() {
- confReaderReg?.close()
- operReaderReg?.close()
- client?.close()
+ bringDown()
}
}
*/
package org.opendaylight.controller.sal.connect.netconf;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Set;
+
import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener;
+import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
import org.opendaylight.controller.netconf.client.NetconfClientSession;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
+import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+class NetconfDeviceListener implements NetconfClientSessionListener {
+ private static final class Request {
+ final UncancellableFuture<RpcResult<CompositeNode>> future;
+ final NetconfMessage request;
+
+ private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request) {
+ this.future = future;
+ this.request = request;
+ }
+ }
-class NetconfDeviceListener extends AbstractNetconfClientNotifySessionListener {
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceListener.class);
+ private final Queue<Request> requests = new ArrayDeque<>();
private final NetconfDevice device;
+ private NetconfClientSession session;
public NetconfDeviceListener(final NetconfDevice device) {
this.device = Preconditions.checkNotNull(device);
}
- /**
- * Method intended to customize notification processing.
- *
- * @param session
- * {@see
- * NetconfClientSessionListener#onMessage(NetconfClientSession,
- * NetconfMessage)}
- * @param message
- * {@see
- * NetconfClientSessionListener#onMessage(NetconfClientSession,
- * NetconfMessage)}
- */
@Override
- public void onNotification(final NetconfClientSession session, final NetconfMessage message) {
- this.device.logger.debug("Received NETCONF notification.", message);
- CompositeNode domNotification = null;
- if (message != null) {
- domNotification = NetconfMapping.toNotificationNode(message, device.getSchemaContext());
- }
- if (domNotification != null) {
- MountProvisionInstance _mountInstance = null;
- if (this.device != null) {
- _mountInstance = this.device.getMountInstance();
+ public synchronized void onSessionUp(final NetconfClientSession session) {
+ LOG.debug("Session with {} established as address {} session-id {}",
+ device.getName(), device.getSocketAddress(), session.getSessionId());
+
+ final Set<QName> caps = device.getCapabilities(session.getServerCapabilities());
+ LOG.trace("Server {} advertized capabilities {}", device.getName(), caps);
+
+ // Select the appropriate provider
+ final SchemaSourceProvider<String> delegate;
+ if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) {
+ delegate = new NetconfRemoteSchemaSourceProvider(device);
+ } else if(caps.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
+ delegate = new NetconfRemoteSchemaSourceProvider(device);
+ } else {
+ LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName());
+ delegate = SchemaSourceProviders.<String>noopProvider();
+ }
+
+ device.bringUp(delegate, caps);
+
+ this.session = session;
+ }
+
+ private synchronized void tearDown(final Exception e) {
+ session = null;
+
+ /*
+ * Walk all requests, check if they have been executing
+ * or cancelled and remove them from the queue.
+ */
+ final Iterator<Request> it = requests.iterator();
+ while (it.hasNext()) {
+ final Request r = it.next();
+ if (r.future.isUncancellable()) {
+ // FIXME: add a RpcResult instead?
+ r.future.setException(e);
+ it.remove();
+ } else if (r.future.isCancelled()) {
+ // This just does some house-cleaning
+ it.remove();
}
- if (_mountInstance != null) {
- _mountInstance.publish(domNotification);
+ }
+
+ device.bringDown();
+ }
+
+ @Override
+ public void onSessionDown(final NetconfClientSession session, final Exception e) {
+ LOG.debug("Session with {} went down", device.getName(), e);
+ tearDown(e);
+ }
+
+ @Override
+ public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
+ LOG.debug("Session with {} terminated {}", session, reason);
+ tearDown(new RuntimeException(reason.getErrorMessage()));
+ }
+
+ @Override
+ public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
+ /*
+ * Dispatch between notifications and messages. Messages need to be processed
+ * with lock held, notifications do not.
+ */
+ if (isNotification(message)) {
+ processNotification(message);
+ } else {
+ processMessage(message);
+ }
+ }
+
+ private synchronized void processMessage(final NetconfMessage message) {
+ final Request r = requests.peek();
+ if (r.future.isUncancellable()) {
+ requests.poll();
+ LOG.debug("Matched {} to {}", r.request, message);
+
+ // FIXME: this can throw exceptions, which should result
+ // in the future failing
+ NetconfMapping.checkValidReply(r.request, message);
+ r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()),
+ Collections.<RpcError>emptyList()));
+ } else {
+ LOG.warn("Ignoring unsolicited message", message);
+ }
+ }
+
+ synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message) {
+ if (session == null) {
+ LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
+ return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
+ @Override
+ public boolean isSuccessful() {
+ return false;
+ }
+
+ @Override
+ public CompositeNode getResult() {
+ return null;
+ }
+
+ @Override
+ public Collection<RpcError> getErrors() {
+ // FIXME: indicate that the session is down
+ return Collections.emptySet();
+ }
+ });
+ }
+
+ final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message);
+ requests.add(req);
+
+ session.sendMessage(req.request).addListener(new FutureListener<Void>() {
+ @Override
+ public void operationComplete(final Future<Void> future) throws Exception {
+ if (!future.isSuccess()) {
+ // We expect that a session down will occur at this point
+ LOG.debug("Failed to send request {}", req.request, future.cause());
+ req.future.setException(future.cause());
+ } else {
+ LOG.trace("Finished sending request {}", req.request);
+ }
}
+ });
+
+ return req.future;
+ }
+
+ /**
+ * Process an incoming notification.
+ *
+ * @param notification Notification message
+ */
+ private void processNotification(final NetconfMessage notification) {
+ this.device.logger.debug("Received NETCONF notification.", notification);
+ CompositeNode domNotification = NetconfMapping.toNotificationNode(notification, device.getSchemaContext());
+ if (domNotification == null) {
+ return;
+ }
+
+ MountProvisionInstance mountInstance = this.device.getMountInstance();
+ if (mountInstance != null) {
+ mountInstance.publish(domNotification);
}
}
+
+ private static boolean isNotification(final NetconfMessage message) {
+ final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
+ return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
+ }
}
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction<InstanceIdentifier, CompositeNode> {
+class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction<InstanceIdentifier, CompositeNode> {
private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceTwoPhaseCommitTransaction.class);
- private final NetconfDevice device;
private final DataModification<InstanceIdentifier, CompositeNode> modification;
- private final boolean candidateSupported = true;
+ private final NetconfDevice device;
+ private final boolean candidateSupported;
public NetconfDeviceTwoPhaseCommitTransaction(NetconfDevice device,
- DataModification<InstanceIdentifier, CompositeNode> modification) {
- super();
- this.device = device;
- this.modification = modification;
+ DataModification<InstanceIdentifier, CompositeNode> modification,
+ boolean candidateSupported) {
+ this.device = Preconditions.checkNotNull(device);
+ this.modification = Preconditions.checkNotNull(modification);
+ this.candidateSupported = candidateSupported;
}
void prepare() throws InterruptedException, ExecutionException {
for(Entry<InstanceIdentifier, CompositeNode> toUpdate : modification.getUpdatedConfigurationData().entrySet()) {
sendMerge(toUpdate.getKey(),toUpdate.getValue());
}
-
}
private void sendMerge(InstanceIdentifier key, CompositeNode value) throws InterruptedException, ExecutionException {
RpcResult<CompositeNode> rpcResult = device.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, builder.toInstance()).get();
Preconditions.checkState(rpcResult.isSuccessful(),"Rpc Result was unsuccessful");
-
}
private CompositeNodeBuilder<ImmutableCompositeNode> configurationRpcBuilder() {
public static val NETCONF_URI = URI.create("urn:ietf:params:xml:ns:netconf:base:1.0")
public static val NETCONF_MONITORING_URI = "urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring"
public static val NETCONF_NOTIFICATION_URI = URI.create("urn:ietf:params:xml:ns:netconf:notification:1.0")
-
-
+
+
public static val NETCONF_QNAME = QName.create(NETCONF_URI, null, "netconf");
public static val NETCONF_RPC_QNAME = QName.create(NETCONF_QNAME, "rpc");
public static val NETCONF_GET_QNAME = QName.create(NETCONF_QNAME, "get");
public static val NETCONF_DELETE_CONFIG_QNAME = QName.create(NETCONF_QNAME, "delete-config");
public static val NETCONF_OPERATION_QNAME = QName.create(NETCONF_QNAME, "operation");
public static val NETCONF_COMMIT_QNAME = QName.create(NETCONF_QNAME, "commit");
-
+
public static val NETCONF_CONFIG_QNAME = QName.create(NETCONF_QNAME, "config");
public static val NETCONF_SOURCE_QNAME = QName.create(NETCONF_QNAME, "source");
public static val NETCONF_TARGET_QNAME = QName.create(NETCONF_QNAME, "target");
-
+
public static val NETCONF_CANDIDATE_QNAME = QName.create(NETCONF_QNAME, "candidate");
public static val NETCONF_RUNNING_QNAME = QName.create(NETCONF_QNAME, "running");
-
-
+
+
public static val NETCONF_RPC_REPLY_QNAME = QName.create(NETCONF_QNAME, "rpc-reply");
public static val NETCONF_OK_QNAME = QName.create(NETCONF_QNAME, "ok");
public static val NETCONF_DATA_QNAME = QName.create(NETCONF_QNAME, "data");
if(identifier.path.empty) {
return null;
}
-
+
for (component : identifier.path.reverseView) {
val Node<?> current = component.toNode(previous);
previous = current;
}
static def CompositeNode toCompositeNode(NetconfMessage message,Optional<SchemaContext> ctx) {
- //TODO: implement general normalization to normalize incoming Netconf Message
+ //TODO: implement general normalization to normalize incoming Netconf Message
// for Schema Context counterpart
return null
}
-
+
static def CompositeNode toNotificationNode(NetconfMessage message,Optional<SchemaContext> ctx) {
if (ctx.present) {
val schemaContext = ctx.get
w3cPayload.documentElement.setAttribute("message-id", "m-" + messageId.andIncrement)
return new NetconfMessage(w3cPayload);
}
-
+
def static flattenInput(CompositeNode node) {
val inputQName = QName.create(node.nodeType,"input");
val input = node.getFirstCompositeByName(inputQName);
if(input == null) return node;
if(input instanceof CompositeNode) {
-
+
val nodes = ImmutableList.builder() //
.addAll(input.children) //
.addAll(node.children.filter[nodeType != inputQName]) //
.build()
return ImmutableCompositeNode.create(node.nodeType,nodes);
- }
-
+ }
+
}
static def RpcResult<CompositeNode> toRpcResult(NetconfMessage message,QName rpc,Optional<SchemaContext> context) {
var CompositeNode rawRpc;
if(context.present) {
if(isDataRetrievalReply(rpc)) {
-
+
val xmlData = message.document.dataSubtree
val dataNodes = XmlDocumentUtils.toDomNodes(xmlData, Optional.of(context.get.dataDefinitions))
-
+
val it = ImmutableCompositeNode.builder()
setQName(NETCONF_RPC_REPLY_QNAME)
add(ImmutableCompositeNode.create(NETCONF_DATA_QNAME, dataNodes));
-
+
rawRpc = it.toInstance;
//sys(xmlData)
} else {
val rpcSchema = context.get.operations.findFirst[QName == rpc]
rawRpc = message.document.toCompositeNode() as CompositeNode;
}
-
-
-
} else {
rawRpc = message.document.toCompositeNode() as CompositeNode;
}
//rawRpc.
return Rpcs.getRpcResult(true, rawRpc, Collections.emptySet());
}
-
+
def static Element getDataSubtree(Document doc) {
doc.getElementsByTagNameNS(NETCONF_URI.toString,"data").item(0) as Element
}
-
+
def static boolean isDataRetrievalReply(QName it) {
- return NETCONF_URI == namespace && ( localName == NETCONF_GET_CONFIG_QNAME.localName || localName == NETCONF_GET_QNAME.localName)
+ return NETCONF_URI == namespace && ( localName == NETCONF_GET_CONFIG_QNAME.localName || localName == NETCONF_GET_QNAME.localName)
}
static def wrap(QName name, Node<?> node) {
public static def Node<?> toCompositeNode(Document document) {
return XmlDocumentUtils.toDomNode(document) as Node<?>
}
-
+
public static def checkValidReply(NetconfMessage input, NetconfMessage output) {
val inputMsgId = input.document.documentElement.getAttribute("message-id")
val outputMsgId = output.document.documentElement.getAttribute("message-id")
Preconditions.checkState(inputMsgId == outputMsgId,"Rpc request and reply message IDs must be same.");
-
+
}
-
+
}
*/
package org.opendaylight.controller.sal.connect.netconf;
-import java.util.Set;
+import java.util.Collection;
import java.util.concurrent.ExecutionException;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
class NetconfRemoteSchemaSourceProvider implements SchemaSourceProvider<String> {
public static final QName GET_SCHEMA_QNAME = QName.create(IETF_NETCONF_MONITORING, "get-schema");
public static final QName GET_DATA_QNAME = QName.create(IETF_NETCONF_MONITORING, "data");
- NetconfDevice device;
+ private final NetconfDevice device;
public NetconfRemoteSchemaSourceProvider(NetconfDevice device) {
- super();
- this.device = device;
+ this.device = Preconditions.checkNotNull(device);
}
@Override
return null;
}
- public static final boolean isSupportedFor(Set<QName> capabilities) {
+ public static final boolean isSupportedFor(Collection<QName> capabilities) {
return capabilities.contains(IETF_NETCONF_MONITORING);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractFuture;
+
+final class UncancellableFuture<V> extends AbstractFuture<V> {
+ @GuardedBy("this")
+ private boolean uncancellable = false;
+
+ public UncancellableFuture(boolean uncancellable) {
+ this.uncancellable = uncancellable;
+ }
+
+ public synchronized boolean setUncancellable() {
+ if (isCancelled()) {
+ return false;
+ }
+
+ uncancellable = true;
+ return true;
+ }
+
+ public synchronized boolean isUncancellable() {
+ return uncancellable;
+ }
+
+ @Override
+ public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+ if (uncancellable) {
+ return false;
+ }
+
+ return super.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public synchronized boolean set(@Nullable V value) {
+ Preconditions.checkState(uncancellable == true);
+ return super.set(value);
+ }
+
+ @Override
+ protected boolean setException(Throwable throwable) {
+ Preconditions.checkState(uncancellable == true);
+ return super.setException(throwable);
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connect.netconf;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.opendaylight.yangtools.concepts.Delegator;
-import org.opendaylight.yangtools.yang.common.QName;
-
-import com.google.common.base.Charsets;
-
-/**
- *
- *
- */
-public class YangModelInputStreamAdapter extends InputStream implements Delegator<InputStream> {
-
- final String source;
- final QName moduleIdentifier;
- final InputStream delegate;
-
- private YangModelInputStreamAdapter(String source, QName moduleIdentifier, InputStream delegate) {
- super();
- this.source = source;
- this.moduleIdentifier = moduleIdentifier;
- this.delegate = delegate;
- }
-
- @Override
- public int read() throws IOException {
- return delegate.read();
- }
-
- @Override
- public int hashCode() {
- return delegate.hashCode();
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return delegate.read(b);
- }
-
- @Override
- public boolean equals(Object obj) {
- return delegate.equals(obj);
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- return delegate.read(b, off, len);
- }
-
- @Override
- public long skip(long n) throws IOException {
- return delegate.skip(n);
- }
-
- @Override
- public int available() throws IOException {
- return delegate.available();
- }
-
- @Override
- public void close() throws IOException {
- delegate.close();
- }
-
- @Override
- public void mark(int readlimit) {
- delegate.mark(readlimit);
- }
-
- @Override
- public void reset() throws IOException {
- delegate.reset();
- }
-
- @Override
- public boolean markSupported() {
- return delegate.markSupported();
- }
-
- @Override
- public InputStream getDelegate() {
- return delegate;
- }
-
- @Override
- public String toString() {
- return "YangModelInputStreamAdapter [moduleIdentifier=" + moduleIdentifier + ", delegate=" + delegate + "]";
- }
-
- public static YangModelInputStreamAdapter create(QName name, String module) {
- return new YangModelInputStreamAdapter(null, name, new ByteArrayInputStream(module.getBytes(Charsets.UTF_8)));
- }
-}
public String getErrorMessage() {
return reason;
}
+
+ @Override
+ public String toString() {
+ return reason;
+ }
}