final DOMDataBroker domBroker = getServiceWithCheck(mount, DOMDataBroker.class);
this.schemaContext = mount.getSchemaContext();
+
dataReader = new BackwardsCompatibleDataBroker(domBroker, this);
+
+ // Set schema context to provide it for BackwardsCompatibleDataBroker
+ if(schemaContext != null) {
+ setSchemaContext(schemaContext);
+ }
+
readWrapper = new ReadWrapper();
notificationPublishService = getServiceWithCheck(mount, NotificationPublishService.class);
<dependencies>
<dependency>
-
<groupId>${project.groupId}</groupId>
<artifactId>netconf-client</artifactId>
<version>${netconf.version}</version>
<groupId>org.opendaylight.controller.model</groupId>
<artifactId>model-inventory</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-broker-impl</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-data-impl</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-broker-impl</artifactId>
- <type>jar</type>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>mockito-configuration</artifactId>
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connect.netconf.sal;
-
-import java.util.concurrent.ExecutionException;
-
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
-import org.opendaylight.controller.md.sal.common.api.data.DataModification;
-import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class NetconfDeviceCommitHandler implements DataCommitHandler<InstanceIdentifier,CompositeNode> {
-
- private static final Logger logger= LoggerFactory.getLogger(NetconfDeviceCommitHandler.class);
-
- private final RemoteDeviceId id;
- private final RpcImplementation rpc;
- private final boolean rollbackSupported;
-
- public NetconfDeviceCommitHandler(final RemoteDeviceId id, final RpcImplementation rpc, final boolean rollbackSupported) {
- this.id = id;
- this.rpc = rpc;
- this.rollbackSupported = rollbackSupported;
- }
-
- @Override
- public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
- final DataModification<InstanceIdentifier, CompositeNode> modification) {
-
- final NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(id, rpc,
- modification, true, rollbackSupported);
- try {
- twoPhaseCommit.prepare();
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(id + ": Interrupted while waiting for response", e);
- } catch (final ExecutionException e) {
- logger.warn("{}: Error executing pre commit operation on remote device", id, e);
- return new FailingTransaction(twoPhaseCommit, e);
- }
-
- return twoPhaseCommit;
- }
-
- /**
- * Always fail commit transaction that rolls back delegate transaction afterwards
- */
- private class FailingTransaction implements DataCommitTransaction<InstanceIdentifier, CompositeNode> {
- private final NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit;
- private final ExecutionException e;
-
- public FailingTransaction(final NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit, final ExecutionException e) {
- this.twoPhaseCommit = twoPhaseCommit;
- this.e = e;
- }
-
- @Override
- public DataModification<InstanceIdentifier, CompositeNode> getModification() {
- return twoPhaseCommit.getModification();
- }
-
- @Override
- public RpcResult<Void> finish() throws IllegalStateException {
- return RpcResultBuilder.<Void>failed().withError( RpcError.ErrorType.APPLICATION,
- id + ": Unexpected operation error during pre-commit operations", e ).build();
- }
-
- @Override
- public RpcResult<Void> rollback() throws IllegalStateException {
- return twoPhaseCommit.rollback();
- }
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html.
+ */
+
+package org.opendaylight.controller.sal.connect.netconf.sal;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.sal.tx.NetconfDeviceReadOnlyTx;
+import org.opendaylight.controller.sal.connect.netconf.sal.tx.NetconfDeviceReadWriteTx;
+import org.opendaylight.controller.sal.connect.netconf.sal.tx.NetconfDeviceWriteOnlyTx;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+final class NetconfDeviceDataBroker implements DOMDataBroker {
+ private final RemoteDeviceId id;
+ private final RpcImplementation rpc;
+ private final NetconfSessionCapabilities netconfSessionPreferences;
+ private final DataNormalizer normalizer;
+
+ public NetconfDeviceDataBroker(final RemoteDeviceId id, final RpcImplementation rpc, final SchemaContext schemaContext, NetconfSessionCapabilities netconfSessionPreferences) {
+ this.id = id;
+ this.rpc = rpc;
+ this.netconfSessionPreferences = netconfSessionPreferences;
+ normalizer = new DataNormalizer(schemaContext);
+ }
+
+ @Override
+ public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+ return new NetconfDeviceReadOnlyTx(rpc, normalizer);
+ }
+
+ @Override
+ public DOMDataReadWriteTransaction newReadWriteTransaction() {
+ return new NetconfDeviceReadWriteTx(newReadOnlyTransaction(), newWriteOnlyTransaction());
+ }
+
+ @Override
+ public DOMDataWriteTransaction newWriteOnlyTransaction() {
+ // FIXME detect if candidate is supported, true is hardcoded
+ return new NetconfDeviceWriteOnlyTx(id, rpc, normalizer, true, netconfSessionPreferences.isRollbackSupported());
+ }
+
+ @Override
+ public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store, final InstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
+ throw new UnsupportedOperationException("Data change listeners not supported for netconf mount point");
+ }
+
+ @Override
+ public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+ // TODO implement
+ throw new UnsupportedOperationException("Transaction chains not supported for netconf mount point");
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connect.netconf.sal;
-
-import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.CONFIG_SOURCE_RUNNING;
-import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_DATA_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.toFilterStructure;
-
-import java.util.concurrent.ExecutionException;
-
-import org.opendaylight.controller.md.sal.common.api.data.DataReader;
-import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
-import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.api.SimpleNode;
-
-public final class NetconfDeviceDataReader implements DataReader<InstanceIdentifier,CompositeNode> {
-
- private final RpcImplementation rpc;
- private final RemoteDeviceId id;
-
- public NetconfDeviceDataReader(final RemoteDeviceId id, final RpcImplementation rpc) {
- this.id = id;
- this.rpc = rpc;
- }
-
- @Override
- public CompositeNode readConfigurationData(final InstanceIdentifier path) {
- final RpcResult<CompositeNode> result;
- try {
- result = rpc.invokeRpc(NETCONF_GET_CONFIG_QNAME,
- NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
- } catch (final InterruptedException e) {
- throw onInterruptedException(e);
- } catch (final ExecutionException e) {
- throw new RuntimeException(id + ": Read configuration data " + path + " failed", e);
- }
-
- final CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
- return data == null ? null : (CompositeNode) findNode(data, path);
- }
-
- private RuntimeException onInterruptedException(final InterruptedException e) {
- Thread.currentThread().interrupt();
- return new RuntimeException(id + ": Interrupted while waiting for response", e);
- }
-
- @Override
- public CompositeNode readOperationalData(final InstanceIdentifier path) {
- final RpcResult<CompositeNode> result;
- try {
- result = rpc.invokeRpc(NETCONF_GET_QNAME, NetconfMessageTransformUtil.wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
- } catch (final InterruptedException e) {
- throw onInterruptedException(e);
- } catch (final ExecutionException e) {
- throw new RuntimeException(id + ": Read operational data " + path + " failed", e);
- }
-
- final CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
- return (CompositeNode) findNode(data, path);
- }
-
- private static Node<?> findNode(final CompositeNode node, final InstanceIdentifier identifier) {
-
- Node<?> current = node;
- for (final InstanceIdentifier.PathArgument arg : identifier.getPathArguments()) {
- if (current instanceof SimpleNode<?>) {
- return null;
- } else if (current instanceof CompositeNode) {
- final CompositeNode currentComposite = (CompositeNode) current;
-
- current = currentComposite.getFirstCompositeByName(arg.getNodeType());
- if (current == null) {
- current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
- }
- if (current == null) {
- current = currentComposite.getFirstSimpleByName(arg.getNodeType());
- }
- if (current == null) {
- current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
- }
- if (current == null) {
- return null;
- }
- }
- }
- return current;
- }
-}
package org.opendaylight.controller.sal.connect.netconf.sal;
import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNodeBuilder;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceDatastoreAdapter.class);
private final RemoteDeviceId id;
- private final DataProviderService dataService;
- private final ListeningExecutorService executor;
+ private final DataBroker dataService;
- NetconfDeviceDatastoreAdapter(final RemoteDeviceId deviceId, final DataProviderService dataService,
- final ExecutorService executor) {
+ NetconfDeviceDatastoreAdapter(final RemoteDeviceId deviceId, final DataBroker dataService) {
this.id = Preconditions.checkNotNull(deviceId);
this.dataService = Preconditions.checkNotNull(dataService);
- this.executor = MoreExecutors.listeningDecorator(Preconditions.checkNotNull(executor));
- // Initial data change scheduled
- submitDataChangeToExecutor(this.executor, new Runnable() {
- @Override
- public void run() {
- initDeviceData();
- }
- }, deviceId);
+ initDeviceData();
}
public void updateDeviceState(final boolean up, final Set<QName> capabilities) {
- submitDataChangeToExecutor(this.executor, new Runnable() {
- @Override
- public void run() {
- updateDeviceStateInternal(up, capabilities);
- }
- }, id);
- }
-
- private void updateDeviceStateInternal(final boolean up, final Set<QName> capabilities) {
final org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node data = buildDataForDeviceState(
up, capabilities, id);
- final DataModificationTransaction transaction = dataService.beginTransaction();
- logger.trace("{}: Update device state transaction {} putting operational data started.", id, transaction.getIdentifier());
- transaction.removeOperationalData(id.getBindingPath());
- transaction.putOperationalData(id.getBindingPath(), data);
- logger.trace("{}: Update device state transaction {} putting operational data ended.", id, transaction.getIdentifier());
+ final ReadWriteTransaction transaction = dataService.newReadWriteTransaction();
+ logger.trace("{}: Update device state transaction {} merging operational data started.", id, transaction.getIdentifier());
+ transaction.merge(LogicalDatastoreType.OPERATIONAL, id.getBindingPath(), data);
+ logger.trace("{}: Update device state transaction {} merging operational data ended.", id, transaction.getIdentifier());
commitTransaction(transaction, "update");
}
private void removeDeviceConfigAndState() {
- final DataModificationTransaction transaction = dataService.beginTransaction();
+ final WriteTransaction transaction = dataService.newWriteOnlyTransaction();
logger.trace("{}: Close device state transaction {} removing all data started.", id, transaction.getIdentifier());
- transaction.removeConfigurationData(id.getBindingPath());
- transaction.removeOperationalData(id.getBindingPath());
+ transaction.delete(LogicalDatastoreType.CONFIGURATION, id.getBindingPath());
+ transaction.delete(LogicalDatastoreType.OPERATIONAL, id.getBindingPath());
logger.trace("{}: Close device state transaction {} removing all data ended.", id, transaction.getIdentifier());
commitTransaction(transaction, "close");
}
private void initDeviceData() {
- final DataModificationTransaction transaction = dataService.beginTransaction();
+ final WriteTransaction transaction = dataService.newWriteOnlyTransaction();
- final InstanceIdentifier<Node> path = id.getBindingPath();
+ createNodesListIfNotPresent(transaction);
+ final InstanceIdentifier<Node> path = id.getBindingPath();
final Node nodeWithId = getNodeWithId(id);
- if (operationalNodeNotExisting(transaction, path)) {
- transaction.putOperationalData(path, nodeWithId);
- }
- if (configurationNodeNotExisting(transaction, path)) {
- transaction.putConfigurationData(path, nodeWithId);
- }
- commitTransaction(transaction, "init");
- }
+ logger.trace("{}: Init device state transaction {} putting if absent operational data started.", id, transaction.getIdentifier());
+ transaction.merge(LogicalDatastoreType.OPERATIONAL, path, nodeWithId);
+ logger.trace("{}: Init device state transaction {} putting operational data ended.", id, transaction.getIdentifier());
- private void commitTransaction(final DataModificationTransaction transaction, final String txType) {
- // attempt commit
- final RpcResult<TransactionStatus> result;
- try {
- result = transaction.commit().get();
- } catch (InterruptedException | ExecutionException e) {
- logger.error("{}: Transaction({}) failed", id, txType, e);
- throw new IllegalStateException(id + " Transaction(" + txType + ") not committed correctly", e);
- }
-
- // verify success result + committed state
- if (isUpdateSuccessful(result)) {
- logger.trace("{}: Transaction({}) {} SUCCESSFUL", id, txType, transaction.getIdentifier());
- } else {
- logger.error("{}: Transaction({}) {} FAILED!", id, txType, transaction.getIdentifier());
- throw new IllegalStateException(id + " Transaction(" + txType + ") not committed correctly, " +
- "Errors: " + result.getErrors());
- }
- }
+ logger.trace("{}: Init device state transaction {} putting if absent config data started.", id, transaction.getIdentifier());
+ transaction.merge(LogicalDatastoreType.CONFIGURATION, path, nodeWithId);
+ logger.trace("{}: Init device state transaction {} putting config data ended.", id, transaction.getIdentifier());
- @Override
- public void close() throws Exception {
- // Remove device data from datastore
- submitDataChangeToExecutor(executor, new Runnable() {
- @Override
- public void run() {
- removeDeviceConfigAndState();
- }
- }, id);
+ commitTransaction(transaction, "init");
}
- private static boolean isUpdateSuccessful(final RpcResult<TransactionStatus> result) {
- return result.getResult() == TransactionStatus.COMMITED && result.isSuccessful();
+ private void createNodesListIfNotPresent(final WriteTransaction writeTx) {
+ final Nodes nodes = new NodesBuilder().build();
+ final InstanceIdentifier<Nodes> path = InstanceIdentifier.builder(Nodes.class).build();
+ logger.trace("{}: Merging {} container to ensure its presence", id, Nodes.QNAME, writeTx.getIdentifier());
+ writeTx.merge(LogicalDatastoreType.CONFIGURATION, path, nodes);
+ writeTx.merge(LogicalDatastoreType.OPERATIONAL, path, nodes);
}
- private static void submitDataChangeToExecutor(final ListeningExecutorService executor, final Runnable r,
- final RemoteDeviceId id) {
- // Submit data change
- final ListenableFuture<?> f = executor.submit(r);
- // Verify update execution
- Futures.addCallback(f, new FutureCallback<Object>() {
+ private void commitTransaction(final WriteTransaction transaction, final String txType) {
+ logger.trace("{}: Committing Transaction {}:{}", id, txType, transaction.getIdentifier());
+ final CheckedFuture<Void, TransactionCommitFailedException> result = transaction.submit();
+
+ Futures.addCallback(result, new FutureCallback<Void>() {
@Override
- public void onSuccess(final Object result) {
- logger.debug("{}: Device data updated successfully", id);
+ public void onSuccess(final Void result) {
+ logger.trace("{}: Transaction({}) {} SUCCESSFUL", id, txType, transaction.getIdentifier());
}
@Override
public void onFailure(final Throwable t) {
- logger.warn("{}: Device data update failed", id, t);
+ logger.error("{}: Transaction({}) {} FAILED!", id, txType, transaction.getIdentifier(), t);
+ throw new IllegalStateException(id + " Transaction(" + txType + ") not committed correctly", t);
}
});
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ removeDeviceConfigAndState();
}
public static org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node buildDataForDeviceState(
return nodeBuilder.build();
}
- private static boolean configurationNodeNotExisting(final DataModificationTransaction transaction,
- final InstanceIdentifier<Node> path) {
- return null == transaction.readConfigurationData(path);
- }
-
- private static boolean operationalNodeNotExisting(final DataModificationTransaction transaction,
+ private static ListenableFuture<Optional<Node>> readNodeData(
+ final LogicalDatastoreType store,
+ final ReadWriteTransaction transaction,
final InstanceIdentifier<Node> path) {
- return null == transaction.readOperationalData(path);
+ return transaction.read(store, path);
}
private static Node getNodeWithId(final RemoteDeviceId id) {
return Collections.emptySet();
}
+ // TODO change this to work with NormalizedNode api. Then we can loose DataNormalizer from Transactions
+
@Override
public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode input) {
final NetconfMessage message = transformRequest(rpc, input);
*/
package org.opendaylight.controller.sal.connect.netconf.sal;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
+import org.opendaylight.controller.sal.dom.broker.impl.NotificationRouterImpl;
+import org.opendaylight.controller.sal.dom.broker.impl.SchemaAwareRpcBroker;
+import org.opendaylight.controller.sal.dom.broker.spi.NotificationRouter;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessionCapabilities> {
private static final Logger logger= LoggerFactory.getLogger(NetconfDeviceSalFacade.class);
- private static final InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
private final RemoteDeviceId id;
private final NetconfDeviceSalProvider salProvider;
@Override
public synchronized void onDeviceConnected(final SchemaContextProvider remoteSchemaContextProvider,
final NetconfSessionCapabilities netconfSessionPreferences, final RpcImplementation deviceRpc) {
- salProvider.getMountInstance().setSchemaContext(remoteSchemaContextProvider.getSchemaContext());
+ final SchemaContext schemaContext = remoteSchemaContextProvider.getSchemaContext();
+
+ // TODO remove deprecated SchemaContextProvider from SchemaAwareRpcBroker
+ // TODO move SchemaAwareRpcBroker from sal-broker-impl, now we have depend on the whole sal-broker-impl
+ final RpcProvisionRegistry rpcRegistry = new SchemaAwareRpcBroker(id.getPath().toString(), new org.opendaylight.controller.sal.dom.broker.impl.SchemaContextProvider() {
+ @Override
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
+ });
+ registerRpcsToSal(schemaContext, rpcRegistry, deviceRpc);
+ final DOMDataBroker domBroker = new NetconfDeviceDataBroker(id, deviceRpc, schemaContext, netconfSessionPreferences);
+
+ // TODO NotificationPublishService and NotificationRouter have the same interface
+ final NotificationPublishService notificationService = new NotificationPublishService() {
+
+ private final NotificationRouter innerRouter = new NotificationRouterImpl();
+
+ @Override
+ public void publish(final CompositeNode notification) {
+ innerRouter.publish(notification);
+ }
+
+ @Override
+ public ListenerRegistration<NotificationListener> addNotificationListener(final QName notification, final NotificationListener listener) {
+ return innerRouter.addNotificationListener(notification, listener);
+ }
+ };
+
+ salProvider.getMountInstance().onDeviceConnected(schemaContext, domBroker, rpcRegistry, notificationService);
salProvider.getDatastoreAdapter().updateDeviceState(true, netconfSessionPreferences.getModuleBasedCaps());
- registerDataHandlersToSal(deviceRpc, netconfSessionPreferences);
- registerRpcsToSal(deviceRpc);
}
@Override
public void onDeviceDisconnected() {
salProvider.getDatastoreAdapter().updateDeviceState(false, Collections.<QName>emptySet());
+ salProvider.getMountInstance().onDeviceDisconnected();
}
- private void registerRpcsToSal(final RpcImplementation deviceRpc) {
- final MountProvisionInstance mountInstance = salProvider.getMountInstance();
-
+ private void registerRpcsToSal(final SchemaContext schemaContext, final RpcProvisionRegistry rpcRegistry, final RpcImplementation deviceRpc) {
final Map<QName, String> failedRpcs = Maps.newHashMap();
- for (final RpcDefinition rpcDef : mountInstance.getSchemaContext().getOperations()) {
+ for (final RpcDefinition rpcDef : schemaContext.getOperations()) {
try {
- salRegistrations.add(mountInstance.addRpcImplementation(rpcDef.getQName(), deviceRpc));
+ salRegistrations.add(rpcRegistry.addRpcImplementation(rpcDef.getQName(), deviceRpc));
logger.debug("{}: Rpc {} from netconf registered successfully", id, rpcDef.getQName());
} catch (final Exception e) {
// Only debug per rpc, warn for all of them at the end to pollute log a little less (e.g. routed rpcs)
}
}
- private void registerDataHandlersToSal(final RpcImplementation deviceRpc,
- final NetconfSessionCapabilities netconfSessionPreferences) {
- final NetconfDeviceDataReader dataReader = new NetconfDeviceDataReader(id, deviceRpc);
- final NetconfDeviceCommitHandler commitHandler = new NetconfDeviceCommitHandler(id, deviceRpc,
- netconfSessionPreferences.isRollbackSupported());
-
- final MountProvisionInstance mountInstance = salProvider.getMountInstance();
- salRegistrations.add(mountInstance.registerConfigurationReader(ROOT_PATH, dataReader));
- salRegistrations.add(mountInstance.registerOperationalReader(ROOT_PATH, dataReader));
- salRegistrations.add(mountInstance.registerCommitHandler(ROOT_PATH, commitHandler));
- }
-
@Override
public void close() {
for (final AutoCloseable reg : Lists.reverse(salRegistrations)) {
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
import org.opendaylight.yangtools.yang.binding.RpcService;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final RemoteDeviceId id;
private final ExecutorService executor;
- private volatile MountProvisionInstance mountInstance;
private volatile NetconfDeviceDatastoreAdapter datastoreAdapter;
+ private MountInstance mountInstance;
public NetconfDeviceSalProvider(final RemoteDeviceId deviceId, final ExecutorService executor) {
this.id = deviceId;
this.executor = executor;
}
- public MountProvisionInstance getMountInstance() {
+ public MountInstance getMountInstance() {
Preconditions.checkState(mountInstance != null,
- "%s: Sal provider was not initialized by sal. Cannot get mount instance", id);
+ "%s: Mount instance was not initialized by sal. Cannot get mount instance", id);
return mountInstance;
}
@Override
public void onSessionInitiated(final Broker.ProviderSession session) {
- final MountProvisionService mountService = session.getService(MountProvisionService.class);
+ logger.debug("{}: (BI)Session with sal established {}", id, session);
+
+ final DOMMountPointService mountService = session.getService(DOMMountPointService.class);
if (mountService != null) {
- mountInstance = mountService.createOrGetMountPoint(id.getPath());
+ mountInstance = new MountInstance(mountService, id);
}
-
- logger.debug("{}: (BI)Session with sal established {}", id, session);
}
@Override
@Override
public void onSessionInitiated(final BindingAwareBroker.ProviderContext session) {
- final DataProviderService dataBroker = session.getSALService(DataProviderService.class);
- datastoreAdapter = new NetconfDeviceDatastoreAdapter(id, dataBroker, executor);
-
logger.debug("{}: Session with sal established {}", id, session);
+
+ final DataBroker dataBroker = session.getSALService(DataBroker.class);
+ datastoreAdapter = new NetconfDeviceDatastoreAdapter(id, dataBroker);
}
@Override
- public void onSessionInitialized(final BindingAwareBroker.ConsumerContext session) {
- }
+ public void onSessionInitialized(final BindingAwareBroker.ConsumerContext session) {}
public void close() throws Exception {
- mountInstance = null;
+ mountInstance.close();
datastoreAdapter.close();
datastoreAdapter = null;
}
+ static final class MountInstance implements AutoCloseable {
+
+ private DOMMountPointService mountService;
+ private final RemoteDeviceId id;
+ private ObjectRegistration<DOMMountPoint> registration;
+ private NotificationPublishService notificationSerivce;
+
+ MountInstance(final DOMMountPointService mountService, final RemoteDeviceId id) {
+ this.mountService = Preconditions.checkNotNull(mountService);
+ this.id = Preconditions.checkNotNull(id);
+ }
+
+ synchronized void onDeviceConnected(final SchemaContext initialCtx,
+ final DOMDataBroker broker, final RpcProvisionRegistry rpc,
+ final NotificationPublishService notificationSerivce) {
+
+ Preconditions.checkNotNull(mountService, "Closed");
+ Preconditions.checkState(registration == null, "Already initialized");
+
+ final DOMMountPointService.DOMMountPointBuilder mountBuilder = mountService.createMountPoint(id.getPath());
+ mountBuilder.addInitialSchemaContext(initialCtx);
+
+ mountBuilder.addService(DOMDataBroker.class, broker);
+ mountBuilder.addService(RpcProvisionRegistry.class, rpc);
+ this.notificationSerivce = notificationSerivce;
+ mountBuilder.addService(NotificationPublishService.class, notificationSerivce);
+
+ registration = mountBuilder.register();
+ }
+
+ synchronized void onDeviceDisconnected() {
+ if(registration == null) {
+ return;
+ }
+
+ try {
+ registration.close();
+ } catch (final Exception e) {
+ // Only log and ignore
+ logger.warn("Unable to unregister mount instance for {}. Ignoring exception", id.getPath(), e);
+ } finally {
+ registration = null;
+ }
+ }
+
+ @Override
+ synchronized public void close() throws Exception {
+ if(registration != null) {
+ onDeviceDisconnected();
+ }
+ mountService = null;
+ }
+
+ public synchronized void publish(final CompositeNode domNotification) {
+ Preconditions.checkNotNull(notificationSerivce, "Device not set up yet, cannot handle notification {}", domNotification);
+ notificationSerivce.publish(domNotification);
+ }
+ }
+
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html.
+ */
+package org.opendaylight.controller.sal.connect.netconf.sal.tx;
+
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.CONFIG_SOURCE_RUNNING;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_DATA_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.toFilterStructure;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
+import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceReadOnlyTx.class);
+
+ private final RpcImplementation rpc;
+ private final DataNormalizer normalizer;
+
+ public NetconfDeviceReadOnlyTx(final RpcImplementation rpc, final DataNormalizer normalizer) {
+ this.rpc = rpc;
+ this.normalizer = normalizer;
+ }
+
+ public ListenableFuture<Optional<NormalizedNode<?, ?>>> readConfigurationData(final InstanceIdentifier path) {
+ final ListenableFuture<RpcResult<CompositeNode>> future = rpc.invokeRpc(NETCONF_GET_CONFIG_QNAME,
+ NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path)));
+
+ return Futures.transform(future, new Function<RpcResult<CompositeNode>, Optional<NormalizedNode<?, ?>>>() {
+ @Override
+ public Optional<NormalizedNode<?, ?>> apply(final RpcResult<CompositeNode> result) {
+ final CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
+ final CompositeNode node = (CompositeNode) findNode(data, path);
+
+ return data == null ?
+ Optional.<NormalizedNode<?, ?>>absent() :
+ transform(path, node);
+ }
+ });
+ }
+
+ private Optional<NormalizedNode<?, ?>> transform(final InstanceIdentifier path, final CompositeNode node) {
+ if(node == null) {
+ return Optional.absent();
+ }
+ try {
+ return Optional.<NormalizedNode<?, ?>>of(normalizer.toNormalized(path, node).getValue());
+ } catch (final Exception e) {
+ LOG.error("Unable to normalize data for {}, data: {}", path, node, e);
+ throw e;
+ }
+ }
+
+ public ListenableFuture<Optional<NormalizedNode<?, ?>>> readOperationalData(final InstanceIdentifier path) {
+ final ListenableFuture<RpcResult<CompositeNode>> future = rpc.invokeRpc(NETCONF_GET_QNAME, NetconfMessageTransformUtil.wrap(NETCONF_GET_QNAME, toFilterStructure(path)));
+
+ return Futures.transform(future, new Function<RpcResult<CompositeNode>, Optional<NormalizedNode<?, ?>>>() {
+ @Override
+ public Optional<NormalizedNode<?, ?>> apply(final RpcResult<CompositeNode> result) {
+ final CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
+ final CompositeNode node = (CompositeNode) findNode(data, path);
+
+ return data == null ?
+ Optional.<NormalizedNode<?, ?>>absent() :
+ transform(path, node);
+ }
+ });
+ }
+
+ private static Node<?> findNode(final CompositeNode node, final InstanceIdentifier identifier) {
+
+ Node<?> current = node;
+ for (final InstanceIdentifier.PathArgument arg : identifier.getPathArguments()) {
+ if (current instanceof SimpleNode<?>) {
+ return null;
+ } else if (current instanceof CompositeNode) {
+ final CompositeNode currentComposite = (CompositeNode) current;
+
+ current = currentComposite.getFirstCompositeByName(arg.getNodeType());
+ if (current == null) {
+ current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
+ }
+ if (current == null) {
+ current = currentComposite.getFirstSimpleByName(arg.getNodeType());
+ }
+ if (current == null) {
+ current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
+ }
+ if (current == null) {
+ return null;
+ }
+ }
+ }
+ return current;
+ }
+
+ @Override
+ public void close() {
+ // NOOP
+ }
+
+ @Override
+ public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store, final InstanceIdentifier path) {
+ final InstanceIdentifier legacyPath = toLegacyPath(normalizer, path);
+
+ switch (store) {
+ case CONFIGURATION : {
+ return readConfigurationData(legacyPath);
+ }
+ case OPERATIONAL : {
+ return readOperationalData(legacyPath);
+ }
+ }
+
+ throw new IllegalArgumentException(String.format("Cannot read data %s for %s datastore, unknown datastore type", path, store));
+ }
+
+ static InstanceIdentifier toLegacyPath(final DataNormalizer normalizer, final InstanceIdentifier path) {
+ try {
+ return normalizer.toLegacy(path);
+ } catch (final DataNormalizationException e) {
+ throw new IllegalArgumentException("Cannot normalize path " + path, e);
+ }
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html.
+ */
+
+package org.opendaylight.controller.sal.connect.netconf.sal.tx;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class NetconfDeviceReadWriteTx implements DOMDataReadWriteTransaction {
+
+ private final DOMDataReadTransaction delegateReadTx;
+ private final DOMDataWriteTransaction delegateWriteTx;
+
+ public NetconfDeviceReadWriteTx(final DOMDataReadTransaction delegateReadTx, final DOMDataWriteTransaction delegateWriteTx) {
+ this.delegateReadTx = delegateReadTx;
+ this.delegateWriteTx = delegateWriteTx;
+ }
+
+ @Override
+ public boolean cancel() {
+ return delegateWriteTx.cancel();
+ }
+
+ @Override
+ public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ delegateWriteTx.put(store, path, data);
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ delegateWriteTx.merge(store, path, data);
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) {
+ delegateWriteTx.delete(store, path);
+ }
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ return delegateWriteTx.submit();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ return delegateWriteTx.commit();
+ }
+
+ @Override
+ public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store, final InstanceIdentifier path) {
+ return delegateReadTx.read(store, path);
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return this;
+ }
+}
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * and is available at http://www.eclipse.org/legal/epl-v10.html.
*/
-package org.opendaylight.controller.sal.connect.netconf.sal;
+
+package org.opendaylight.controller.sal.connect.netconf.sal.tx;
import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_CANDIDATE_QNAME;
import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_COMMIT_QNAME;
import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_TARGET_QNAME;
import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.ROLLBACK_ON_ERROR_OPTION;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-
-import java.util.Collection;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
-
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.ModifyAction;
import org.opendaylight.yangtools.yang.data.api.Node;
import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
import org.opendaylight.yangtools.yang.data.impl.NodeFactory;
import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Remote transaction that delegates data change to remote device using netconf messages.
- */
-final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction<InstanceIdentifier, CompositeNode> {
+public class NetconfDeviceWriteOnlyTx implements DOMDataWriteTransaction {
- private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceTwoPhaseCommitTransaction.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceWriteOnlyTx.class);
- private final DataModification<InstanceIdentifier, CompositeNode> modification;
+ private final RemoteDeviceId id;
private final RpcImplementation rpc;
+ private final DataNormalizer normalizer;
private final boolean rollbackSupported;
- private final RemoteDeviceId id;
private final CompositeNode targetNode;
- public NetconfDeviceTwoPhaseCommitTransaction(final RemoteDeviceId id, final RpcImplementation rpc,
- final DataModification<InstanceIdentifier, CompositeNode> modification,
- final boolean candidateSupported, final boolean rollbackOnErrorSupported) {
+ public NetconfDeviceWriteOnlyTx(final RemoteDeviceId id, final RpcImplementation rpc, final DataNormalizer normalizer, final boolean candidateSupported, final boolean rollbackOnErrorSupported) {
this.id = id;
- this.rpc = Preconditions.checkNotNull(rpc);
- this.modification = Preconditions.checkNotNull(modification);
+ this.rpc = rpc;
+ this.normalizer = normalizer;
this.targetNode = getTargetNode(candidateSupported);
this.rollbackSupported = rollbackOnErrorSupported;
}
- /**
- * Prepare phase, sends 1 or more netconf edit config operations to modify the data
- *
- * In case of failure or unexpected error response, ExecutionException is thrown
- */
- void prepare() throws InterruptedException, ExecutionException {
- for (final InstanceIdentifier toRemove : modification.getRemovedConfigurationData()) {
- sendDelete(toRemove);
- }
- for(final Entry<InstanceIdentifier, CompositeNode> toUpdate : modification.getUpdatedConfigurationData().entrySet()) {
- sendMerge(toUpdate.getKey(),toUpdate.getValue());
+ // FIXME add logging
+
+ @Override
+ public boolean cancel() {
+ if(isCommitted()) {
+ return false;
}
+
+ return discardChanges();
}
- private void sendMerge(final InstanceIdentifier key, final CompositeNode value) throws InterruptedException, ExecutionException {
- sendEditRpc(createEditConfigStructure(key, Optional.<ModifyAction>absent(), Optional.of(value)), Optional.<ModifyAction>absent());
+ private boolean isCommitted() {
+ // TODO 732
+ return true;
}
- private void sendDelete(final InstanceIdentifier toDelete) throws InterruptedException, ExecutionException {
- sendEditRpc(createEditConfigStructure(toDelete, Optional.of(ModifyAction.DELETE), Optional.<CompositeNode>absent()), Optional.of(ModifyAction.NONE));
+ private boolean discardChanges() {
+ // TODO 732
+ return true;
}
- private void sendEditRpc(final CompositeNode editStructure, final Optional<ModifyAction> defaultOperation) throws InterruptedException, ExecutionException {
- final ImmutableCompositeNode editConfigRequest = createEditConfigRequest(editStructure, defaultOperation);
- final RpcResult<CompositeNode> rpcResult = rpc.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, editConfigRequest).get();
+ // TODO should the edit operations be blocking ?
- // Check result
- if(rpcResult.isSuccessful() == false) {
- throw new ExecutionException(
- String.format("%s: Pre-commit rpc failed, request: %s, errors: %s", id, editConfigRequest, rpcResult.getErrors()), null);
+ @Override
+ public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ Preconditions.checkArgument(store == LogicalDatastoreType.CONFIGURATION, "Can merge only configuration, not %s", store);
+
+ try {
+ final InstanceIdentifier legacyPath = NetconfDeviceReadOnlyTx.toLegacyPath(normalizer, path);
+ final CompositeNode legacyData = normalizer.toLegacy(path, data);
+ sendEditRpc(createEditConfigStructure(legacyPath, Optional.of(ModifyAction.REPLACE), Optional.fromNullable(legacyData)), Optional.of(ModifyAction.NONE));
+ } catch (final ExecutionException e) {
+ LOG.warn("Error putting data to {}, data: {}, discarding changes", path, data, e);
+ discardChanges();
+ throw new RuntimeException("Error while replacing " + path, e);
}
}
- private ImmutableCompositeNode createEditConfigRequest(final CompositeNode editStructure, final Optional<ModifyAction> defaultOperation) {
- final CompositeNodeBuilder<ImmutableCompositeNode> ret = ImmutableCompositeNode.builder();
+ @Override
+ public void merge(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ Preconditions.checkArgument(store == LogicalDatastoreType.CONFIGURATION, "Can merge only configuration, not %s", store);
- // Target
- final Node<?> targetWrapperNode = ImmutableCompositeNode.create(NETCONF_TARGET_QNAME, ImmutableList.<Node<?>>of(targetNode));
- ret.add(targetWrapperNode);
+ try {
+ final InstanceIdentifier legacyPath = NetconfDeviceReadOnlyTx.toLegacyPath(normalizer, path);
+ final CompositeNode legacyData = normalizer.toLegacy(path, data);
+ sendEditRpc(
+ createEditConfigStructure(legacyPath, Optional.<ModifyAction> absent(), Optional.fromNullable(legacyData)), Optional.<ModifyAction> absent());
+ } catch (final ExecutionException e) {
+ LOG.warn("Error merging data to {}, data: {}, discarding changes", path, data, e);
+ discardChanges();
+ throw new RuntimeException("Error while merging " + path, e);
+ }
+ }
- // Default operation
- if(defaultOperation.isPresent()) {
- final SimpleNode<String> defOp = NodeFactory.createImmutableSimpleNode(NETCONF_DEFAULT_OPERATION_QNAME, null, modifyOperationToXmlString(defaultOperation.get()));
- ret.add(defOp);
+ @Override
+ public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) {
+ Preconditions.checkArgument(store == LogicalDatastoreType.CONFIGURATION, "Can merge only configuration, not %s", store);
+
+ try {
+ sendEditRpc(createEditConfigStructure(NetconfDeviceReadOnlyTx.toLegacyPath(normalizer, path), Optional.of(ModifyAction.DELETE), Optional.<CompositeNode>absent()), Optional.of(ModifyAction.NONE));
+ } catch (final ExecutionException e) {
+ LOG.warn("Error deleting data {}, discarding changes", path, e);
+ discardChanges();
+ throw new RuntimeException("Error while deleting " + path, e);
}
+ }
- // Error option
- if(rollbackSupported) {
- ret.addLeaf(NETCONF_ERROR_OPTION_QNAME, ROLLBACK_ON_ERROR_OPTION);
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ final ListenableFuture<Void> commmitFutureAsVoid = Futures.transform(commit(), new Function<RpcResult<TransactionStatus>, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable final RpcResult<TransactionStatus> input) {
+ return null;
+ }
+ });
+
+ return Futures.makeChecked(commmitFutureAsVoid, new Function<Exception, TransactionCommitFailedException>() {
+ @Override
+ public TransactionCommitFailedException apply(final Exception input) {
+ return new TransactionCommitFailedException("Submit of transaction " + getIdentifier() + " failed", input);
+ }
+ });
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ // FIXME do not allow commit if closed or failed
+
+ final ListenableFuture<RpcResult<CompositeNode>> rpcResult = rpc.invokeRpc(NetconfMessageTransformUtil.NETCONF_COMMIT_QNAME, getCommitRequest());
+ return Futures.transform(rpcResult, new Function<RpcResult<CompositeNode>, RpcResult<TransactionStatus>>() {
+ @Override
+ public RpcResult<TransactionStatus> apply(@Nullable final RpcResult<CompositeNode> input) {
+ if(input.isSuccessful()) {
+ return RpcResultBuilder.success(TransactionStatus.COMMITED).build();
+ } else {
+ final RpcResultBuilder<TransactionStatus> failed = RpcResultBuilder.failed();
+ for (final RpcError rpcError : input.getErrors()) {
+ failed.withError(rpcError.getErrorType(), rpcError.getTag(), rpcError.getMessage(), rpcError.getApplicationTag(), rpcError.getInfo(), rpcError.getCause());
+ }
+ return failed.build();
+ }
+ }
+ });
+
+ // FIXME 732 detect commit failure
+ }
+
+ private void sendEditRpc(final CompositeNode editStructure, final Optional<ModifyAction> defaultOperation) throws ExecutionException {
+ final CompositeNode editConfigRequest = createEditConfigRequest(editStructure, defaultOperation);
+ final RpcResult<CompositeNode> rpcResult;
+ try {
+ rpcResult = rpc.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, editConfigRequest).get();
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(id + ": Interrupted while waiting for response", e);
}
- ret.setQName(NETCONF_EDIT_CONFIG_QNAME);
- // Edit content
- ret.add(editStructure);
- return ret.toInstance();
+ // Check result
+ if(rpcResult.isSuccessful() == false) {
+ throw new ExecutionException(
+ String.format("%s: Pre-commit rpc failed, request: %s, errors: %s", id, editConfigRequest, rpcResult.getErrors()), null);
+ }
}
private CompositeNode createEditConfigStructure(final InstanceIdentifier dataPath, final Optional<ModifyAction> operation,
- final Optional<CompositeNode> lastChildOverride) {
+ final Optional<CompositeNode> lastChildOverride) {
Preconditions.checkArgument(Iterables.isEmpty(dataPath.getPathArguments()) == false, "Instance identifier with empty path %s", dataPath);
- List<PathArgument> reversedPath = Lists.reverse(dataPath.getPath());
+ List<InstanceIdentifier.PathArgument> reversedPath = Lists.reverse(dataPath.getPath());
// Create deepest edit element with expected edit operation
CompositeNode previous = getDeepestEditElement(reversedPath.get(0), operation, lastChildOverride);
reversedPath.remove(0);
// Create edit structure in reversed order
- for (final PathArgument arg : reversedPath) {
+ for (final InstanceIdentifier.PathArgument arg : reversedPath) {
final CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder();
builder.setQName(arg.getNodeType());
}
private void addPredicatesToCompositeNodeBuilder(final Map<QName, Object> predicates, final CompositeNodeBuilder<ImmutableCompositeNode> builder) {
- for (final Entry<QName, Object> entry : predicates.entrySet()) {
+ for (final Map.Entry<QName, Object> entry : predicates.entrySet()) {
builder.addLeaf(entry.getKey(), entry.getValue());
}
}
- private Map<QName, Object> getPredicates(final PathArgument arg) {
+ private Map<QName, Object> getPredicates(final InstanceIdentifier.PathArgument arg) {
Map<QName, Object> predicates = Collections.emptyMap();
- if (arg instanceof NodeIdentifierWithPredicates) {
- predicates = ((NodeIdentifierWithPredicates) arg).getKeyValues();
+ if (arg instanceof InstanceIdentifier.NodeIdentifierWithPredicates) {
+ predicates = ((InstanceIdentifier.NodeIdentifierWithPredicates) arg).getKeyValues();
}
return predicates;
}
- private CompositeNode getDeepestEditElement(final PathArgument arg, final Optional<ModifyAction> operation, final Optional<CompositeNode> lastChildOverride) {
+ private CompositeNode getDeepestEditElement(final InstanceIdentifier.PathArgument arg, final Optional<ModifyAction> operation, final Optional<CompositeNode> lastChildOverride) {
final CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder();
builder.setQName(arg.getNodeType());
return builder.toInstance();
}
- private String modifyOperationToXmlString(final ModifyAction operation) {
- return operation.name().toLowerCase();
- }
+ private CompositeNode createEditConfigRequest(final CompositeNode editStructure, final Optional<ModifyAction> defaultOperation) {
+ final CompositeNodeBuilder<ImmutableCompositeNode> ret = ImmutableCompositeNode.builder();
- /**
- * Send commit rpc to finish the transaction
- * In case of failure or unexpected error response, ExecutionException is thrown
- */
- @Override
- public RpcResult<Void> finish() {
- try {
- final RpcResult<?> rpcResult = rpc.invokeRpc(NetconfMessageTransformUtil.NETCONF_COMMIT_QNAME, getCommitRequest()).get();
- return new RpcResultVoidWrapper(rpcResult);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(id + ": Interrupted while waiting for response", e);
- } catch (final ExecutionException e) {
- LOG.warn("{}: Failed to finish commit operation", id, e);
- return RpcResultBuilder.<Void>failed().withError( RpcError.ErrorType.APPLICATION,
- id + ": Unexpected operation error during commit operation", e ).build();
+ // Target
+ final Node<?> targetWrapperNode = ImmutableCompositeNode.create(NETCONF_TARGET_QNAME, ImmutableList.<Node<?>>of(targetNode));
+ ret.add(targetWrapperNode);
+
+ // Default operation
+ if(defaultOperation.isPresent()) {
+ final SimpleNode<String> defOp = NodeFactory.createImmutableSimpleNode(NETCONF_DEFAULT_OPERATION_QNAME, null, modifyOperationToXmlString(defaultOperation.get()));
+ ret.add(defOp);
}
- }
- private ImmutableCompositeNode getCommitRequest() {
- final CompositeNodeBuilder<ImmutableCompositeNode> commitInput = ImmutableCompositeNode.builder();
- commitInput.setQName(NETCONF_COMMIT_QNAME);
- return commitInput.toInstance();
- }
+ // Error option
+ if(rollbackSupported) {
+ ret.addLeaf(NETCONF_ERROR_OPTION_QNAME, ROLLBACK_ON_ERROR_OPTION);
+ }
- @Override
- public DataModification<InstanceIdentifier, CompositeNode> getModification() {
- return this.modification;
+ ret.setQName(NETCONF_EDIT_CONFIG_QNAME);
+ // Edit content
+ ret.add(editStructure);
+ return ret.toInstance();
}
- @Override
- public RpcResult<Void> rollback() throws IllegalStateException {
- // TODO BUG-732 implement rollback by sending discard changes
- return null;
+ private String modifyOperationToXmlString(final ModifyAction operation) {
+ return operation.name().toLowerCase();
}
public CompositeNode getTargetNode(final boolean candidateSupported) {
}
}
- private static final class RpcResultVoidWrapper implements RpcResult<Void> {
-
- private final RpcResult<?> rpcResult;
-
- public RpcResultVoidWrapper(final RpcResult<?> rpcResult) {
- this.rpcResult = rpcResult;
- }
-
- @Override
- public boolean isSuccessful() {
- return rpcResult.isSuccessful();
- }
+ private ImmutableCompositeNode getCommitRequest() {
+ final CompositeNodeBuilder<ImmutableCompositeNode> commitInput = ImmutableCompositeNode.builder();
+ commitInput.setQName(NETCONF_COMMIT_QNAME);
+ return commitInput.toInstance();
+ }
- @Override
- public Void getResult() {
- return null;
- }
- @Override
- public Collection<RpcError> getErrors() {
- return rpcResult.getErrors();
- }
+ @Override
+ public Object getIdentifier() {
+ return this;
}
}