<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
</dependencies>
<packaging>bundle</packaging>
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.common.util;
public class Arguments {
--- /dev/null
+package org.opendaylight.controller.sal.common.util;
+
+import java.util.Collections;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+public class CommitHandlerTransactions {
+
+ private static class AllwaysSuccessfulTransaction<P extends Path<P>,D> implements DataCommitTransaction<P, D> {
+
+ private final DataModification<P, D> modification;
+
+ public AllwaysSuccessfulTransaction(DataModification<P, D> modification) {
+ this.modification = modification;
+ }
+ @Override
+ public RpcResult<Void> rollback() throws IllegalStateException {
+ return Rpcs.<Void>getRpcResult(true, null, Collections.<RpcError>emptyList());
+ }
+ @Override
+ public RpcResult<Void> finish() throws IllegalStateException {
+ return Rpcs.<Void>getRpcResult(true, null, Collections.<RpcError>emptyList());
+ }
+
+ @Override
+ public DataModification<P, D> getModification() {
+ return modification;
+ }
+ }
+
+ public static final <P extends Path<P>,D> AllwaysSuccessfulTransaction<P, D> allwaysSuccessfulTransaction(DataModification<P, D> modification) {
+ return new AllwaysSuccessfulTransaction<>(modification);
+ }
+}
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.common.util;
import java.util.concurrent.ExecutionException;
import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
import org.opendaylight.yangtools.yang.common.QName;
-public interface RpcProvisionRegistry {
+public interface RpcProvisionRegistry extends BrokerService {
/**
* Registers an implementation of the rpc.
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.sal.core.api.mount;
import java.util.concurrent.Future;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public interface MountInstance extends NotificationService, DataBrokerService {
+public interface MountInstance extends //
+ NotificationService, //
+ DataBrokerService {
Future<RpcResult<CompositeNode>> rpc(QName type, CompositeNode input);
-
+
SchemaContext getSchemaContext();
}
package org.opendaylight.controller.sal.core.api.mount;
+import java.util.EventListener;
+
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
public interface MountProvisionService extends MountService {
MountProvisionInstance createMountPoint(InstanceIdentifier path);
MountProvisionInstance createOrGetMountPoint(InstanceIdentifier path);
+
+ ListenerRegistration<MountProvisionListener> registerProvisionListener(MountProvisionListener listener);
+
+ public interface MountProvisionListener extends EventListener {
+
+ void onMountPointCreated(InstanceIdentifier path);
+
+ void onMountPointRemoved(InstanceIdentifier path);
+
+ }
}
--- /dev/null
+package org.opendaylight.controller.sal.dom.broker;
+
+public class $ModuleInfo {
+
+
+}
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import com.google.common.util.concurrent.MoreExecutors;
+
public class DataBrokerImpl extends AbstractDataBroker<InstanceIdentifier, CompositeNode, DataChangeListener> implements
DataProviderService, AutoCloseable {
public DataBrokerImpl() {
setDataReadRouter(new DataReaderRouter());
+ setExecutor(MoreExecutors.sameThreadExecutor());
}
public AtomicLong getCreatedTransactionsCount() {
public class MountPointImpl implements MountProvisionInstance {
private final RpcRouter rpcs;
- private final DataReaderRouter dataReader;
+ private final DataBrokerImpl dataReader;
private final NotificationRouter notificationRouter;
private final DataReader<InstanceIdentifier,CompositeNode> readWrapper;
public MountPointImpl(InstanceIdentifier path) {
this.mountPath = path;
rpcs = new RpcRouterImpl("");
- dataReader = new DataReaderRouter();
+ dataReader = new DataBrokerImpl();
notificationRouter = new NotificationRouterImpl();
readWrapper = new ReadWrapper();
}
@Override
public DataModificationTransaction beginTransaction() {
- // TODO Auto-generated method stub
- return null;
+ return dataReader.beginTransaction();
}
@Override
public ListenerRegistration<DataChangeListener> registerDataChangeListener(InstanceIdentifier path,
DataChangeListener listener) {
- // TODO Auto-generated method stub
- return null;
+ return dataReader.registerDataChangeListener(path, listener);
}
@Override
@Override
public Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> registerCommitHandler(
InstanceIdentifier path, DataCommitHandler<InstanceIdentifier, CompositeNode> commitHandler) {
- // TODO Auto-generated method stub
- return null;
+ return dataReader.registerCommitHandler(path, commitHandler);
}
@Override
@Override
public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier, CompositeNode>>> registerCommitHandlerListener(
RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier, CompositeNode>> commitHandlerListener) {
- // TODO Auto-generated method stub
- return null;
+ return dataReader.registerCommitHandlerListener(commitHandlerListener);
}
}
import java.util.concurrent.ConcurrentHashMap
import static com.google.common.base.Preconditions.*;
import org.opendaylight.controller.sal.core.api.data.DataProviderService
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionService.MountProvisionListener
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry
class MountPointManagerImpl implements MountProvisionService {
@Property
DataProviderService dataBroker;
+ val ListenerRegistry<MountProvisionListener> listeners = ListenerRegistry.create()
+
ConcurrentMap<InstanceIdentifier,MountPointImpl> mounts = new ConcurrentHashMap();
override createMountPoint(InstanceIdentifier path) {
val mount = new MountPointImpl(path);
registerMountPoint(mount);
mounts.put(path,mount);
+ notifyMountCreated(path);
return mount;
}
+ def notifyMountCreated(InstanceIdentifier identifier) {
+ for(listener : listeners) {
+ listener.instance.onMountPointCreated(identifier);
+ }
+ }
+
def registerMountPoint(MountPointImpl impl) {
dataBroker?.registerConfigurationReader(impl.mountPath,impl.readWrapper);
dataBroker?.registerOperationalReader(impl.mountPath,impl.readWrapper);
}
+ override registerProvisionListener(MountProvisionListener listener) {
+ listeners.register(listener)
+ }
+
override createOrGetMountPoint(InstanceIdentifier path) {
val mount = mounts.get(path);
import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.osgi.framework.ServiceReference;
public MountProvisionInstance createOrGetMountPoint(InstanceIdentifier path) {
return getDelegate().createOrGetMountPoint(path);
}
+
+ @Override
+ public ListenerRegistration<MountProvisionListener> registerProvisionListener(MountProvisionListener listener) {
+ return getDelegate().registerProvisionListener(listener);
+ }
}
package org.opendaylight.controller.sal.connect.netconf
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
-import org.opendaylight.controller.md.sal.common.api.data.DataReader
-import org.opendaylight.yangtools.yang.data.api.CompositeNode
-import org.opendaylight.controller.netconf.client.NetconfClient
-import org.opendaylight.controller.sal.core.api.RpcImplementation
-import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
+import com.google.common.base.Optional
+import com.google.common.collect.FluentIterable
+import io.netty.util.concurrent.EventExecutor
+import java.io.InputStream
import java.net.InetSocketAddress
-import org.opendaylight.yangtools.yang.data.api.Node
-import org.opendaylight.yangtools.yang.data.api.SimpleNode
-import org.opendaylight.yangtools.yang.common.QName
+import java.net.URI
import java.util.Collections
+import java.util.List
+import java.util.Set
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Future
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
+import org.opendaylight.controller.md.sal.common.api.data.DataModification
+import org.opendaylight.controller.md.sal.common.api.data.DataReader
+import org.opendaylight.controller.netconf.api.NetconfMessage
+import org.opendaylight.controller.netconf.client.NetconfClient
import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
-import org.opendaylight.yangtools.concepts.Registration
-import org.opendaylight.controller.sal.core.api.Provider
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*;
+import org.opendaylight.controller.sal.core.api.Provider
+import org.opendaylight.controller.sal.core.api.RpcImplementation
import org.opendaylight.controller.sal.core.api.data.DataBrokerService
import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
import org.opendaylight.protocol.framework.ReconnectStrategy
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
-import org.opendaylight.controller.md.sal.common.api.data.DataModification
-import com.google.common.collect.FluentIterable
-import org.opendaylight.yangtools.yang.model.api.SchemaContext
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState
+import org.opendaylight.yangtools.concepts.Registration
+import org.opendaylight.yangtools.yang.common.QName
+import org.opendaylight.yangtools.yang.data.api.CompositeNode
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
+import org.opendaylight.yangtools.yang.data.api.Node
+import org.opendaylight.yangtools.yang.data.api.SimpleNode
+import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
+import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
+import org.opendaylight.yangtools.yang.model.api.SchemaContext
+import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
+import org.opendaylight.yangtools.yang.model.util.repo.SourceIdentifier
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
-import java.io.InputStream
-import org.slf4j.LoggerFactory
+import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
import org.slf4j.Logger
-import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener
-import org.opendaylight.controller.netconf.client.NetconfClientSession
-import org.opendaylight.controller.netconf.api.NetconfMessage
-import io.netty.util.concurrent.EventExecutor
+import org.slf4j.LoggerFactory
-import java.util.Map
-import java.util.Set
-import com.google.common.collect.ImmutableMap
+import static com.google.common.base.Preconditions.*
+import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
-import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
-import com.google.common.base.Optional
-import com.google.common.collect.ImmutableList
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
-import static com.google.common.base.Preconditions.*;
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Future
-import org.opendaylight.controller.netconf.client.NetconfClientSessionListener
-import io.netty.util.concurrent.Promise
-import org.opendaylight.controller.netconf.util.xml.XmlElement
-import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants
-import java.util.concurrent.ExecutionException
-import java.util.concurrent.locks.ReentrantLock
+import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
+import org.opendaylight.controller.netconf.util.xml.XmlUtil
class NetconfDevice implements Provider, //
DataReader<InstanceIdentifier, CompositeNode>, //
@Property
var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
- private NetconfDeviceSchemaContextProvider schemaContextProvider
+ @Property
+ private NetconfDeviceSchemaContextProvider deviceContextProvider
protected val Logger logger
@Property
var NetconfClientDispatcher dispatcher
-
+
static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
+ @Property
+ var SchemaSourceProvider<InputStream> remoteSourceProvider
+
public new(String name) {
this.name = name;
this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
checkState(eventExecutor != null, "Event executor must be set.");
- val listener = new NetconfDeviceListener(this,eventExecutor);
+ val listener = new NetconfDeviceListener(this, eventExecutor);
val task = startClientTask(dispatcher, listener)
- if(mountInstance != null) {
+ if (mountInstance != null) {
confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
+ commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this)
}
return processingExecutor.submit(task) as Future<Void>;
}
def Optional<SchemaContext> getSchemaContext() {
- if (schemaContextProvider == null) {
+ if (deviceContextProvider == null) {
return Optional.absent();
}
- return schemaContextProvider.currentContext;
+ return deviceContextProvider.currentContext;
}
private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
+
return [ |
logger.info("Starting Netconf Client on: {}", socketAddress);
client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
logger.debug("Initial capabilities {}", initialCapabilities);
var SchemaSourceProvider<String> delegate;
- if (initialCapabilities.contains(NetconfMapping.IETF_NETCONF_MONITORING_MODULE)) {
- delegate = new NetconfDeviceSchemaSourceProvider(this);
- } else {
- logger.info("Device does not support IETF Netconf Monitoring.", socketAddress);
+ if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
+ delegate = new NetconfRemoteSchemaSourceProvider(this);
+ } else {
+ logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
delegate = SchemaSourceProviders.<String>noopProvider();
}
- val sourceProvider = schemaSourceProvider.createInstanceFor(delegate);
- schemaContextProvider = new NetconfDeviceSchemaContextProvider(this, sourceProvider);
- schemaContextProvider.createContextFromCapabilities(initialCapabilities);
+ remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
+ deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
+ deviceContextProvider.createContextFromCapabilities(initialCapabilities);
if (mountInstance != null && schemaContext.isPresent) {
mountInstance.schemaContext = schemaContext.get();
}
override getSupportedRpcs() {
Collections.emptySet;
}
-
+
def createSubscription(String streamName) {
val it = ImmutableCompositeNode.builder()
QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
- addLeaf("stream",streamName);
- invokeRpc(QName,toInstance())
+ addLeaf("stream", streamName);
+ invokeRpc(QName, toInstance())
}
override invokeRpc(QName rpc, CompositeNode input) {
- val message = rpc.toRpcMessage(input);
- val result = client.sendMessage(message, messegeRetryCount, messageTimeoutCount);
- return result.toRpcResult();
+ try {
+ val message = rpc.toRpcMessage(input,schemaContext);
+ val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
+ return result.toRpcResult(rpc, schemaContext);
+
+ } catch (Exception e) {
+ logger.error("Rpc was not processed correctly.", e)
+ throw e;
+ }
+ }
+
+ def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
+ logger.debug("Send message {}",XmlUtil.toString(message.document))
+ val result = client.sendMessage(message, retryCount, timeout);
+ NetconfMapping.checkValidReply(message, result)
+ return result;
}
override getProviderFunctionality() {
return null === transaction.readOperationalData(path);
}
- def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
+ static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
var Node<?> current = node;
for (arg : identifier.path) {
return null;
} else if (current instanceof CompositeNode) {
val currentComposite = (current as CompositeNode);
-
- current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
- if (current == null) {
- current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
+
+ current = currentComposite.getFirstCompositeByName(arg.nodeType);
+ if(current == null) {
+ current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
+ }
+ if(current == null) {
+ current = currentComposite.getFirstSimpleByName(arg.nodeType);
}
if (current == null) {
+ current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
+ } if (current == null) {
return null;
}
}
}
override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
- throw new UnsupportedOperationException("TODO: auto-generated method stub")
+ val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
+ twoPhaseCommit.prepare()
+ return twoPhaseCommit;
}
def getInitialCapabilities() {
val parts = split("\\?");
val namespace = parts.get(0);
val queryParams = FluentIterable.from(parts.get(1).split("&"));
- val revision = queryParams.findFirst[startsWith("revision=")].replaceAll("revision=", "");
- val moduleName = queryParams.findFirst[startsWith("module=")].replaceAll("module=", "");
+ var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
+ val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
+ if (revision === null) {
+ logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
+ revision = queryParams.findFirst[startsWith("&revision=")]?.replaceAll("revision=", "");
+ if (revision != null) {
+ logger.warn("Netconf device returned revision incorectly escaped for {}", it)
+ }
+ }
+ if (revision == null) {
+ return QName.create(URI.create(namespace), null, moduleName);
+ }
return QName.create(namespace, revision, moduleName);
].toSet();
}
}
-package class NetconfDeviceListener extends NetconfClientSessionListener {
-
- val NetconfDevice device
- val EventExecutor eventExecutor
-
- new(NetconfDevice device,EventExecutor eventExecutor) {
- this.device = device
- this.eventExecutor = eventExecutor
- }
-
- var Promise<NetconfMessage> messagePromise;
- val promiseLock = new ReentrantLock;
-
- override onMessage(NetconfClientSession session, NetconfMessage message) {
- if (isNotification(message)) {
- onNotification(session, message);
- } else try {
- promiseLock.lock
- if (messagePromise != null) {
- messagePromise.setSuccess(message);
- messagePromise = null;
- }
- } finally {
- promiseLock.unlock
- }
- }
-
- /**
- * Method intended to customize notification processing.
- *
- * @param session
- * {@see
- * NetconfClientSessionListener#onMessage(NetconfClientSession,
- * NetconfMessage)}
- * @param message
- * {@see
- * NetconfClientSessionListener#onMessage(NetconfClientSession,
- * NetconfMessage)}
- */
- def void onNotification(NetconfClientSession session, NetconfMessage message) {
- device.logger.debug("Received NETCONF notification.",message);
- val domNotification = message?.toCompositeNode?.notificationBody;
- if(domNotification != null) {
- device?.mountInstance?.publish(domNotification);
- }
- }
-
- private static def CompositeNode getNotificationBody(CompositeNode node) {
- for(child : node.children) {
- if(child instanceof CompositeNode) {
- return child as CompositeNode;
- }
- }
- }
-
- override getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException {
- val promise = promiseReply();
- val messageAvailable = promise.await(attempts + attemptMsDelay);
- if (messageAvailable) {
- try {
- return promise.get();
- } catch (ExecutionException e) {
- throw new IllegalStateException(e);
- }
- }
-
- throw new IllegalStateException("Unsuccessful after " + attempts + " attempts.");
-
- // throw new TimeoutException("Message was not received on time.");
- }
-
- def Promise<NetconfMessage> promiseReply() {
- promiseLock.lock
- try {
- if (messagePromise == null) {
- messagePromise = eventExecutor.newPromise();
- return messagePromise;
- }
- return messagePromise;
- } finally {
- promiseLock.unlock
- }
- }
-
- def boolean isNotification(NetconfMessage message) {
- val xmle = XmlElement.fromDomDocument(message.getDocument());
- return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName());
- }
-}
-
package class NetconfDeviceSchemaContextProvider {
@Property
}
def createContextFromCapabilities(Iterable<QName> capabilities) {
-
- val modelsToParse = ImmutableMap.<QName, InputStream>builder();
- for (cap : capabilities) {
- val source = sourceProvider.getSchemaSource(cap.localName, Optional.fromNullable(cap.formattedRevision));
- if (source.present) {
- modelsToParse.put(cap, source.get());
- }
+ val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
+ if (!sourceContext.missingSources.empty) {
+ device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
+ }
+ device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
+ val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
+ if (!sourceContext.validSources.empty) {
+ val schemaContext = tryToCreateContext(modelsToParse);
+ currentContext = Optional.fromNullable(schemaContext);
+ } else {
+ currentContext = Optional.absent();
}
- val context = tryToCreateContext(modelsToParse.build);
- currentContext = Optional.fromNullable(context);
+ if (currentContext.present) {
+ device.logger.debug("Schema context successfully created.");
+ }
+
}
- def SchemaContext tryToCreateContext(Map<QName, InputStream> modelsToParse) {
+ def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
val parser = new YangParserImpl();
try {
- val models = parser.parseYangModelsFromStreams(ImmutableList.copyOf(modelsToParse.values));
+
+ val models = parser.parseYangModelsFromStreams(modelsToParse);
val result = parser.resolveSchemaContext(models);
return result;
} catch (Exception e) {
}
}
}
-
-package class NetconfDeviceSchemaSourceProvider implements SchemaSourceProvider<String> {
-
- val NetconfDevice device;
-
- new(NetconfDevice device) {
- this.device = device;
- }
-
- override getSchemaSource(String moduleName, Optional<String> revision) {
- val it = ImmutableCompositeNode.builder() //
- setQName(QName::create(NetconfState.QNAME, "get-schema")) //
- addLeaf("format", "yang")
- addLeaf("identifier", moduleName)
- if (revision.present) {
- addLeaf("version", revision.get())
- }
-
- device.logger.info("Loading YANG schema source for {}:{}", moduleName, revision)
- val schemaReply = device.invokeRpc(getQName(), toInstance());
-
- if (schemaReply.successful) {
- val schemaBody = schemaReply.result.getFirstSimpleByName(
- QName::create(NetconfState.QNAME.namespace, null, "data"))?.value;
- device.logger.info("YANG Schema successfully received for: {}:{}", moduleName, revision);
- return Optional.of(schemaBody as String);
- }
- return Optional.absent();
- }
-}
--- /dev/null
+package org.opendaylight.controller.sal.connect.netconf;
+
+import com.google.common.base.Objects;
+
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Promise;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.eclipse.xtext.xbase.lib.Exceptions;
+import org.eclipse.xtext.xbase.lib.Functions.Function0;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.client.NetconfClientSession;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
+import org.opendaylight.controller.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.controller.sal.connect.netconf.NetconfMapping;
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.w3c.dom.Document;
+
+@SuppressWarnings("all")
+class NetconfDeviceListener extends NetconfClientSessionListener {
+ private final NetconfDevice device;
+ private final EventExecutor eventExecutor;
+
+ public NetconfDeviceListener(final NetconfDevice device, final EventExecutor eventExecutor) {
+ this.device = device;
+ this.eventExecutor = eventExecutor;
+ }
+
+ private Promise<NetconfMessage> messagePromise;
+ private ConcurrentMap<String, Promise<NetconfMessage>> promisedMessages;
+
+ private final ReentrantLock promiseLock = new ReentrantLock();
+
+ public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
+ if (isNotification(message)) {
+ this.onNotification(session, message);
+ } else {
+ try {
+ this.promiseLock.lock();
+ boolean _notEquals = (!Objects.equal(this.messagePromise, null));
+ if (_notEquals) {
+ this.device.logger.debug("Setting promised reply {} with message {}", this.messagePromise, message);
+ this.messagePromise.setSuccess(message);
+ this.messagePromise = null;
+ }
+ } finally {
+ this.promiseLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Method intended to customize notification processing.
+ *
+ * @param session
+ * {@see
+ * NetconfClientSessionListener#onMessage(NetconfClientSession,
+ * NetconfMessage)}
+ * @param message
+ * {@see
+ * NetconfClientSessionListener#onMessage(NetconfClientSession,
+ * NetconfMessage)}
+ */
+ public void onNotification(final NetconfClientSession session, final NetconfMessage message) {
+ this.device.logger.debug("Received NETCONF notification.", message);
+ CompositeNode _notificationBody = null;
+ CompositeNode _compositeNode = null;
+ if (message != null) {
+ _compositeNode = NetconfMapping.toCompositeNode(message,device.getSchemaContext());
+ }
+ if (_compositeNode != null) {
+ _notificationBody = NetconfDeviceListener.getNotificationBody(_compositeNode);
+ }
+ final CompositeNode domNotification = _notificationBody;
+ boolean _notEquals = (!Objects.equal(domNotification, null));
+ if (_notEquals) {
+ MountProvisionInstance _mountInstance = null;
+ if (this.device != null) {
+ _mountInstance = this.device.getMountInstance();
+ }
+ if (_mountInstance != null) {
+ _mountInstance.publish(domNotification);
+ }
+ }
+ }
+
+ private static CompositeNode getNotificationBody(final CompositeNode node) {
+ List<Node<? extends Object>> _children = node.getChildren();
+ for (final Node<? extends Object> child : _children) {
+ if ((child instanceof CompositeNode)) {
+ return ((CompositeNode) child);
+ }
+ }
+ return null;
+ }
+
+ public NetconfMessage getLastMessage(final int attempts, final int attemptMsDelay) throws InterruptedException {
+ final Promise<NetconfMessage> promise = this.promiseReply();
+ this.device.logger.debug("Waiting for reply {}", promise);
+ int _plus = (attempts * attemptMsDelay);
+ final boolean messageAvailable = promise.await(_plus);
+ if (messageAvailable) {
+ try {
+ try {
+ return promise.get();
+ } catch (Throwable _e) {
+ throw Exceptions.sneakyThrow(_e);
+ }
+ } catch (final Throwable _t) {
+ if (_t instanceof ExecutionException) {
+ final ExecutionException e = (ExecutionException) _t;
+ IllegalStateException _illegalStateException = new IllegalStateException(e);
+ throw _illegalStateException;
+ } else {
+ throw Exceptions.sneakyThrow(_t);
+ }
+ }
+ }
+ String _plus_1 = ("Unsuccessful after " + Integer.valueOf(attempts));
+ String _plus_2 = (_plus_1 + " attempts.");
+ IllegalStateException _illegalStateException_1 = new IllegalStateException(_plus_2);
+ throw _illegalStateException_1;
+ }
+
+ public synchronized Promise<NetconfMessage> promiseReply() {
+ this.device.logger.debug("Promising reply.");
+ this.promiseLock.lock();
+ try {
+ boolean _equals = Objects.equal(this.messagePromise, null);
+ if (_equals) {
+ Promise<NetconfMessage> _newPromise = this.eventExecutor.<NetconfMessage> newPromise();
+ this.messagePromise = _newPromise;
+ return this.messagePromise;
+ }
+ return this.messagePromise;
+ } finally {
+ this.promiseLock.unlock();
+ }
+ }
+
+ public boolean isNotification(final NetconfMessage message) {
+ Document _document = message.getDocument();
+ final XmlElement xmle = XmlElement.fromDomDocument(_document);
+ String _name = xmle.getName();
+ return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(_name);
+ }
+}
--- /dev/null
+package org.opendaylight.controller.sal.connect.netconf;
+
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.eclipse.xtext.xbase.lib.IterableExtensions;
+import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*;
+
+public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction<InstanceIdentifier, CompositeNode> {
+
+ private NetconfDevice device;
+ private final DataModification<InstanceIdentifier, CompositeNode> modification;
+ private boolean candidateSupported = true;
+
+ public NetconfDeviceTwoPhaseCommitTransaction(NetconfDevice device,
+ DataModification<InstanceIdentifier, CompositeNode> modification) {
+ super();
+ this.device = device;
+ this.modification = modification;
+ }
+
+ public void prepare() {
+ for (InstanceIdentifier toRemove : modification.getRemovedConfigurationData()) {
+ sendRemove(toRemove);
+ }
+ for(Entry<InstanceIdentifier, CompositeNode> toUpdate : modification.getUpdatedConfigurationData().entrySet()) {
+ sendMerge(toUpdate.getKey(),toUpdate.getValue());
+ }
+
+ }
+
+ private void sendMerge(InstanceIdentifier key, CompositeNode value) {
+ sendEditRpc(createEditStructure(key, Optional.<String>absent(), Optional.of(value)));
+ }
+
+ private void sendRemove(InstanceIdentifier toRemove) {
+ sendEditRpc(createEditStructure(toRemove, Optional.of("remove"), Optional.<CompositeNode> absent()));
+ }
+
+ private void sendEditRpc(CompositeNode editStructure) {
+ CompositeNodeBuilder<ImmutableCompositeNode> builder = configurationRpcBuilder();
+ builder.setQName(NETCONF_EDIT_CONFIG_QNAME);
+ builder.add(editStructure);
+
+ RpcResult<CompositeNode> rpcResult = device.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, builder.toInstance());
+ Preconditions.checkState(rpcResult.isSuccessful(),"Rpc Result was unsuccessful");
+
+ }
+
+ private CompositeNodeBuilder<ImmutableCompositeNode> configurationRpcBuilder() {
+ CompositeNodeBuilder<ImmutableCompositeNode> ret = ImmutableCompositeNode.builder();
+
+ Node<?> targetNode;
+ if(candidateSupported) {
+ targetNode = ImmutableCompositeNode.create(NETCONF_CANDIDATE_QNAME, ImmutableList.<Node<?>>of());
+ } else {
+ targetNode = ImmutableCompositeNode.create(NETCONF_RUNNING_QNAME, ImmutableList.<Node<?>>of());
+ }
+ Node<?> targetWrapperNode = ImmutableCompositeNode.create(NETCONF_TARGET_QNAME, ImmutableList.<Node<?>>of(targetNode));
+ ret.add(targetWrapperNode);
+ return ret;
+ }
+
+ private CompositeNode createEditStructure(InstanceIdentifier dataPath, Optional<String> action,
+ Optional<CompositeNode> lastChildOverride) {
+ List<PathArgument> path = dataPath.getPath();
+ List<PathArgument> reversed = Lists.reverse(path);
+ CompositeNode previous = null;
+ boolean isLast = true;
+ for (PathArgument arg : reversed) {
+ CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder();
+ builder.setQName(arg.getNodeType());
+
+ if (arg instanceof NodeIdentifierWithPredicates) {
+ for (Entry<QName, Object> entry : ((NodeIdentifierWithPredicates) arg).getKeyValues().entrySet()) {
+ builder.addLeaf(entry.getKey(), entry.getValue());
+ }
+ }
+ if (isLast) {
+ if (action.isPresent()) {
+ builder.setAttribute(NETCONF_ACTION_QNAME, action.get());
+ }
+ if (lastChildOverride.isPresent()) {
+ List<Node<?>> children = lastChildOverride.get().getChildren();
+ builder.addAll(children);
+ }
+ } else {
+ builder.add(previous);
+ }
+ previous = builder.toInstance();
+ isLast = false;
+ }
+ return ImmutableCompositeNode.create(NETCONF_CONFIG_QNAME, ImmutableList.<Node<?>>of(previous));
+ }
+
+ @Override
+ public RpcResult<Void> finish() throws IllegalStateException {
+ CompositeNodeBuilder<ImmutableCompositeNode> commitInput = ImmutableCompositeNode.builder();
+ commitInput.setQName(NETCONF_COMMIT_QNAME);
+ RpcResult<?> rpcResult = device.invokeRpc(NetconfMapping.NETCONF_COMMIT_QNAME, commitInput.toInstance());
+ return (RpcResult<Void>) rpcResult;
+ }
+
+ @Override
+ public DataModification<InstanceIdentifier, CompositeNode> getModification() {
+ return this.modification;
+ }
+
+ @Override
+ public RpcResult<Void> rollback() throws IllegalStateException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
import com.google.common.collect.ImmutableList
import org.opendaylight.yangtools.yang.data.api.SimpleNode
import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
+import com.google.common.base.Preconditions
+import com.google.common.base.Optional
+import org.opendaylight.yangtools.yang.model.api.SchemaContext
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils
class NetconfMapping {
public static val NETCONF_FILTER_QNAME = QName.create(NETCONF_QNAME, "filter");
public static val NETCONF_TYPE_QNAME = QName.create(NETCONF_QNAME, "type");
public static val NETCONF_GET_CONFIG_QNAME = QName.create(NETCONF_QNAME, "get-config");
+ public static val NETCONF_EDIT_CONFIG_QNAME = QName.create(NETCONF_QNAME, "edit-config");
+ public static val NETCONF_DELETE_CONFIG_QNAME = QName.create(NETCONF_QNAME, "delete-config");
+ public static val NETCONF_ACTION_QNAME = QName.create(NETCONF_QNAME, "action");
+ public static val NETCONF_COMMIT_QNAME = QName.create(NETCONF_QNAME, "commit");
+
+ public static val NETCONF_CONFIG_QNAME = QName.create(NETCONF_QNAME, "config");
public static val NETCONF_SOURCE_QNAME = QName.create(NETCONF_QNAME, "source");
+ public static val NETCONF_TARGET_QNAME = QName.create(NETCONF_QNAME, "target");
+
+ public static val NETCONF_CANDIDATE_QNAME = QName.create(NETCONF_QNAME, "candidate");
public static val NETCONF_RUNNING_QNAME = QName.create(NETCONF_QNAME, "running");
+
+
public static val NETCONF_RPC_REPLY_QNAME = QName.create(NETCONF_QNAME, "rpc-reply");
public static val NETCONF_OK_QNAME = QName.create(NETCONF_QNAME, "ok");
public static val NETCONF_DATA_QNAME = QName.create(NETCONF_QNAME, "data");
}
}
- static def CompositeNode toCompositeNode(NetconfMessage message) {
- return message.toRpcResult().result;
+ static def CompositeNode toCompositeNode(NetconfMessage message,Optional<SchemaContext> ctx) {
+ return null//message.toRpcResult().result;
}
- static def NetconfMessage toRpcMessage(QName rpc, CompositeNode node) {
+ static def NetconfMessage toRpcMessage(QName rpc, CompositeNode node,Optional<SchemaContext> ctx) {
val rpcPayload = wrap(NETCONF_RPC_QNAME, flattenInput(node));
val w3cPayload = NodeUtils.buildShadowDomTree(rpcPayload);
w3cPayload.documentElement.setAttribute("message-id", "m-" + messageId.andIncrement);
}
- static def RpcResult<CompositeNode> toRpcResult(NetconfMessage message) {
- val rawRpc = message.document.toCompositeNode() as CompositeNode;
-
+ static def RpcResult<CompositeNode> toRpcResult(NetconfMessage message,QName rpc,Optional<SchemaContext> context) {
+ var CompositeNode rawRpc;
+ if(context.present) {
+ if(isDataRetrievalReply(rpc)) {
+
+ val xmlData = message.document.dataSubtree
+ val dataNodes = XmlDocumentUtils.toDomNodes(xmlData,Optional.of(context.get.dataDefinitions))
+
+ val it = ImmutableCompositeNode.builder()
+ setQName(NETCONF_RPC_REPLY_QNAME)
+ add(ImmutableCompositeNode.create(NETCONF_DATA_QNAME,dataNodes));
+
+ rawRpc = it.toInstance;
+ //sys(xmlData)
+ } else {
+ val rpcSchema = context.get.operations.findFirst[QName == rpc]
+ rawRpc = message.document.toCompositeNode() as CompositeNode;
+ }
+
+
+
+ } else {
+ rawRpc = message.document.toCompositeNode() as CompositeNode;
+ }
//rawRpc.
return Rpcs.getRpcResult(true, rawRpc, Collections.emptySet());
}
+
+ def static Element getDataSubtree(Document doc) {
+ doc.getElementsByTagNameNS(NETCONF_URI.toString,"data").item(0) as Element
+ }
+
+ def static boolean isDataRetrievalReply(QName it) {
+ return NETCONF_URI == namespace && ( localName == NETCONF_GET_CONFIG_QNAME.localName || localName == NETCONF_GET_QNAME.localName)
+ }
static def wrap(QName name, Node<?> node) {
if (node != null) {
}
public static def Node<?> toCompositeNode(Document document) {
- return XmlDocumentUtils.toNode(document) as Node<?>
+ return XmlDocumentUtils.toDomNode(document) as Node<?>
}
+
+ public static def checkValidReply(NetconfMessage input, NetconfMessage output) {
+ val inputMsgId = input.document.documentElement.getAttribute("message-id")
+ val outputMsgId = output.document.documentElement.getAttribute("message-id")
+ Preconditions.checkState(inputMsgId == outputMsgId,"Rpc request and reply message IDs must be same.");
+
+ }
+
}
--- /dev/null
+package org.opendaylight.controller.sal.connect.netconf;
+
+import java.util.Set;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+class NetconfRemoteSchemaSourceProvider implements SchemaSourceProvider<String> {
+
+ public static final QName IETF_NETCONF_MONITORING = QName.create(
+ "urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring", "2010-10-04", "ietf-netconf-monitoring");
+ public static final QName GET_SCHEMA_QNAME = QName.create(IETF_NETCONF_MONITORING, "get-schema");
+ public static final QName GET_DATA_QNAME = QName.create(IETF_NETCONF_MONITORING, "data");
+
+ NetconfDevice device;
+
+ public NetconfRemoteSchemaSourceProvider(NetconfDevice device) {
+ super();
+ this.device = device;
+ }
+
+ @Override
+ public Optional<String> getSchemaSource(String moduleName, Optional<String> revision) {
+ CompositeNodeBuilder<ImmutableCompositeNode> request = ImmutableCompositeNode.builder(); //
+ request.setQName(GET_SCHEMA_QNAME) //
+ .addLeaf("format", "yang") //
+ .addLeaf("identifier", moduleName); //
+ if (revision.isPresent()) {
+ request.addLeaf("version", revision.get());
+ }
+
+ device.logger.info("Loading YANG schema source for {}:{}", moduleName, revision);
+ RpcResult<CompositeNode> schemaReply = device.invokeRpc(GET_SCHEMA_QNAME, request.toInstance());
+ if (schemaReply.isSuccessful()) {
+ String schemaBody = getSchemaFromRpc(schemaReply.getResult());
+ if (schemaBody != null) {
+ device.logger.info("YANG Schema successfully retrieved from remote for {}:{}", moduleName, revision);
+ return Optional.of(schemaBody);
+ }
+ }
+ device.logger.info("YANG shcema was not successfully retrieved.");
+ return Optional.absent();
+ }
+
+ private String getSchemaFromRpc(CompositeNode result) {
+ if (result == null) {
+ return null;
+ }
+ SimpleNode<?> simpleNode = result.getFirstSimpleByName(GET_DATA_QNAME.withoutRevision());
+ Object potential = simpleNode.getValue();
+ if (potential instanceof String) {
+ return (String) potential;
+ }
+ return null;
+ }
+
+ public static final boolean isSupportedFor(Set<QName> capabilities) {
+ return capabilities.contains(IETF_NETCONF_MONITORING);
+ }
+}
+++ /dev/null
-package org.opendaylight.controller.sal.connect.netconf;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-
-import com.google.common.base.Strings;
-
-public class XmlDocumentUtils {
-
- public static Node<?> toNode(Document doc) {
- return toCompositeNode(doc.getDocumentElement());
- }
-
- private static Node<?> toCompositeNode(Element element) {
- String orgNamespace = element.getNamespaceURI();
- URI biNamespace = null;
- if (orgNamespace != null) {
- biNamespace = URI.create(orgNamespace);
- }
- QName qname = new QName(biNamespace, element.getLocalName());
-
- List<Node<?>> values = new ArrayList<>();
- NodeList nodes = element.getChildNodes();
- boolean isSimpleObject = true;
- String value = null;
- for (int i = 0; i < nodes.getLength(); i++) {
- org.w3c.dom.Node child = nodes.item(i);
- if (child instanceof Element) {
- isSimpleObject = false;
- values.add(toCompositeNode((Element) child));
- }
- if (isSimpleObject && child instanceof org.w3c.dom.Text) {
- value = element.getTextContent();
- if (!Strings.isNullOrEmpty(value)) {
- isSimpleObject = true;
- }
- }
- }
-
- if (isSimpleObject) {
- return new SimpleNodeTOImpl<>(qname, null, value);
- }
- return new CompositeNodeTOImpl(qname, null, values);
- }
-}