Merge "BUG 1082 Migrate sal-rest-connector to Async Data Broker API"
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / sal / tx / NetconfDeviceWriteOnlyTx.java
index 43897aef84fb83614e4a7630855ab698b3968af8..87f5477d35d0f986b6f2a0fd8b1044b8b421e58c 100644 (file)
@@ -8,10 +8,11 @@
 
 package org.opendaylight.controller.sal.connect.netconf.sal.tx;
 
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.DISCARD_CHANGES_RPC_CONTENT;
 import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_CANDIDATE_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_COMMIT_QNAME;
 import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_CONFIG_QNAME;
 import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_DEFAULT_OPERATION_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_DISCARD_CHANGES_QNAME;
 import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_EDIT_CONFIG_QNAME;
 import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_ERROR_OPTION_QNAME;
 import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_OPERATION_QNAME;
@@ -26,13 +27,14 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import javax.annotation.Nullable;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
@@ -46,7 +48,7 @@ import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.ModifyAction;
 import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
@@ -57,97 +59,114 @@ import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class NetconfDeviceWriteOnlyTx implements DOMDataWriteTransaction {
+public class NetconfDeviceWriteOnlyTx implements DOMDataWriteTransaction, FutureCallback<RpcResult<TransactionStatus>> {
 
     private static final Logger LOG  = LoggerFactory.getLogger(NetconfDeviceWriteOnlyTx.class);
 
     private final RemoteDeviceId id;
     private final RpcImplementation rpc;
     private final DataNormalizer normalizer;
+
     private final boolean rollbackSupported;
+    private final boolean candidateSupported;
     private final CompositeNode targetNode;
 
+    // Allow commit to be called only once
+    private final AtomicBoolean finished = new AtomicBoolean(false);
+
     public NetconfDeviceWriteOnlyTx(final RemoteDeviceId id, final RpcImplementation rpc, final DataNormalizer normalizer, final boolean candidateSupported, final boolean rollbackOnErrorSupported) {
         this.id = id;
         this.rpc = rpc;
         this.normalizer = normalizer;
-        this.targetNode = getTargetNode(candidateSupported);
+
+        this.candidateSupported = candidateSupported;
+        this.targetNode = getTargetNode(this.candidateSupported);
         this.rollbackSupported = rollbackOnErrorSupported;
     }
 
-    // FIXME add logging
-
     @Override
     public boolean cancel() {
-        if(isCommitted()) {
+        if(isFinished()) {
             return false;
         }
 
         return discardChanges();
     }
 
-    private boolean isCommitted() {
-        // TODO 732
-        return true;
+    private boolean isFinished() {
+        return finished.get();
     }
 
     private boolean discardChanges() {
-        // TODO 732
+        finished.set(true);
+
+        if(candidateSupported) {
+            sendDiscardChanges();
+        }
         return true;
     }
 
     // TODO should the edit operations be blocking ?
+    // TODO should the discard-changes operations be blocking ?
 
     @Override
-    public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
+    public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkNotFinished();
         Preconditions.checkArgument(store == LogicalDatastoreType.CONFIGURATION, "Can merge only configuration, not %s", store);
 
         try {
-            final InstanceIdentifier legacyPath = NetconfDeviceReadOnlyTx.toLegacyPath(normalizer, path);
+            final YangInstanceIdentifier legacyPath = NetconfDeviceReadOnlyTx.toLegacyPath(normalizer, path, id);
             final CompositeNode legacyData = normalizer.toLegacy(path, data);
-            sendEditRpc(createEditConfigStructure(legacyPath, Optional.of(ModifyAction.REPLACE), Optional.fromNullable(legacyData)), Optional.of(ModifyAction.NONE));
+            sendEditRpc(
+                    createEditConfigStructure(legacyPath, Optional.of(ModifyAction.REPLACE), Optional.fromNullable(legacyData)), Optional.of(ModifyAction.NONE));
         } catch (final ExecutionException e) {
-            LOG.warn("Error putting data to {}, data: {}, discarding changes", path, data, e);
+            LOG.warn("{}: Error putting data to {}, data: {}, discarding changes", id, path, data, e);
             discardChanges();
-            throw new RuntimeException("Error while replacing " + path, e);
+            throw new RuntimeException(id + ": Error while replacing " + path, e);
         }
     }
 
+    private void checkNotFinished() {
+        Preconditions.checkState(isFinished() == false, "%s: Transaction %s already finished", id, getIdentifier());
+    }
+
     @Override
-    public void merge(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        Preconditions.checkArgument(store == LogicalDatastoreType.CONFIGURATION, "Can merge only configuration, not %s", store);
+    public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkNotFinished();
+        Preconditions.checkArgument(store == LogicalDatastoreType.CONFIGURATION, "%s: Can merge only configuration, not %s", id, store);
 
         try {
-            final InstanceIdentifier legacyPath = NetconfDeviceReadOnlyTx.toLegacyPath(normalizer, path);
+            final YangInstanceIdentifier legacyPath = NetconfDeviceReadOnlyTx.toLegacyPath(normalizer, path, id);
             final CompositeNode legacyData = normalizer.toLegacy(path, data);
             sendEditRpc(
                     createEditConfigStructure(legacyPath, Optional.<ModifyAction> absent(), Optional.fromNullable(legacyData)), Optional.<ModifyAction> absent());
         } catch (final ExecutionException e) {
-            LOG.warn("Error merging data to {}, data: {}, discarding changes", path, data, e);
+            LOG.warn("{}: Error merging data to {}, data: {}, discarding changes", id, path, data, e);
             discardChanges();
-            throw new RuntimeException("Error while merging " + path, e);
+            throw new RuntimeException(id + ": Error while merging " + path, e);
         }
     }
 
     @Override
-    public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) {
-        Preconditions.checkArgument(store == LogicalDatastoreType.CONFIGURATION, "Can merge only configuration, not %s", store);
+    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        checkNotFinished();
+        Preconditions.checkArgument(store == LogicalDatastoreType.CONFIGURATION, "%s: Can merge only configuration, not %s", id, store);
 
         try {
-            sendEditRpc(createEditConfigStructure(NetconfDeviceReadOnlyTx.toLegacyPath(normalizer, path), Optional.of(ModifyAction.DELETE), Optional.<CompositeNode>absent()), Optional.of(ModifyAction.NONE));
+            sendEditRpc(
+                    createEditConfigStructure(NetconfDeviceReadOnlyTx.toLegacyPath(normalizer, path, id), Optional.of(ModifyAction.DELETE), Optional.<CompositeNode>absent()), Optional.of(ModifyAction.NONE));
         } catch (final ExecutionException e) {
-            LOG.warn("Error deleting data {}, discarding changes", path, e);
+            LOG.warn("{}: Error deleting data {}, discarding changes", id, path, e);
             discardChanges();
-            throw new RuntimeException("Error while deleting " + path, e);
+            throw new RuntimeException(id + ": Error while deleting " + path, e);
         }
     }
 
     @Override
     public CheckedFuture<Void, TransactionCommitFailedException> submit() {
         final ListenableFuture<Void> commmitFutureAsVoid = Futures.transform(commit(), new Function<RpcResult<TransactionStatus>, Void>() {
-            @Nullable
             @Override
-            public Void apply(@Nullable final RpcResult<TransactionStatus> input) {
+            public Void apply(final RpcResult<TransactionStatus> input) {
                 return null;
             }
         });
@@ -162,25 +181,46 @@ public class NetconfDeviceWriteOnlyTx implements DOMDataWriteTransaction {
 
     @Override
     public ListenableFuture<RpcResult<TransactionStatus>> commit() {
-        // FIXME do not allow commit if closed or failed
+        checkNotFinished();
+        finished.set(true);
 
-        final ListenableFuture<RpcResult<CompositeNode>> rpcResult = rpc.invokeRpc(NetconfMessageTransformUtil.NETCONF_COMMIT_QNAME, getCommitRequest());
-        return Futures.transform(rpcResult, new Function<RpcResult<CompositeNode>, RpcResult<TransactionStatus>>() {
-            @Override
-            public RpcResult<TransactionStatus> apply(@Nullable final RpcResult<CompositeNode> input) {
-                if(input.isSuccessful()) {
-                    return RpcResultBuilder.success(TransactionStatus.COMMITED).build();
-                } else {
-                    final RpcResultBuilder<TransactionStatus> failed = RpcResultBuilder.failed();
-                    for (final RpcError rpcError : input.getErrors()) {
-                        failed.withError(rpcError.getErrorType(), rpcError.getTag(), rpcError.getMessage(), rpcError.getApplicationTag(), rpcError.getInfo(), rpcError.getCause());
+        if(candidateSupported == false) {
+            return Futures.immediateFuture(RpcResultBuilder.success(TransactionStatus.COMMITED).build());
+        }
+
+        final ListenableFuture<RpcResult<CompositeNode>> rpcResult = rpc.invokeRpc(
+                NetconfMessageTransformUtil.NETCONF_COMMIT_QNAME, NetconfMessageTransformUtil.COMMIT_RPC_CONTENT);
+
+        final ListenableFuture<RpcResult<TransactionStatus>> transformed = Futures.transform(rpcResult,
+                new Function<RpcResult<CompositeNode>, RpcResult<TransactionStatus>>() {
+                    @Override
+                    public RpcResult<TransactionStatus> apply(final RpcResult<CompositeNode> input) {
+                        if (input.isSuccessful()) {
+                            return RpcResultBuilder.success(TransactionStatus.COMMITED).build();
+                        } else {
+                            final RpcResultBuilder<TransactionStatus> failed = RpcResultBuilder.failed();
+                            for (final RpcError rpcError : input.getErrors()) {
+                                failed.withError(rpcError.getErrorType(), rpcError.getTag(), rpcError.getMessage(),
+                                        rpcError.getApplicationTag(), rpcError.getInfo(), rpcError.getCause());
+                            }
+                            return failed.build();
+                        }
                     }
-                    return failed.build();
-                }
-            }
-        });
+                });
 
-        // FIXME 732 detect commit failure
+        Futures.addCallback(transformed, this);
+        return transformed;
+    }
+
+    @Override
+    public void onSuccess(final RpcResult<TransactionStatus> result) {
+        LOG.debug("{}: Write successful, transaction: {}", id, getIdentifier());
+    }
+
+    @Override
+    public void onFailure(final Throwable t) {
+        LOG.warn("{}: Write failed, transaction {}, discarding changes", id, getIdentifier(), t);
+        discardChanges();
     }
 
     private void sendEditRpc(final CompositeNode editStructure, final Optional<ModifyAction> defaultOperation) throws ExecutionException {
@@ -200,11 +240,27 @@ public class NetconfDeviceWriteOnlyTx implements DOMDataWriteTransaction {
         }
     }
 
-    private CompositeNode createEditConfigStructure(final InstanceIdentifier dataPath, final Optional<ModifyAction> operation,
+    private void sendDiscardChanges() {
+        final ListenableFuture<RpcResult<CompositeNode>> discardFuture = rpc.invokeRpc(NETCONF_DISCARD_CHANGES_QNAME, DISCARD_CHANGES_RPC_CONTENT);
+        Futures.addCallback(discardFuture, new FutureCallback<RpcResult<CompositeNode>>() {
+            @Override
+            public void onSuccess(final RpcResult<CompositeNode> result) {
+                LOG.debug("{}: Discarding transaction: {}", id, getIdentifier());
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.error("{}: Discarding changes failed, transaction: {}. Device configuration might be corrupted", id, getIdentifier(), t);
+                throw new RuntimeException(id + ": Discarding changes failed, transaction " + getIdentifier(), t);
+            }
+        });
+    }
+
+    private CompositeNode createEditConfigStructure(final YangInstanceIdentifier dataPath, final Optional<ModifyAction> operation,
                                                     final Optional<CompositeNode> lastChildOverride) {
         Preconditions.checkArgument(Iterables.isEmpty(dataPath.getPathArguments()) == false, "Instance identifier with empty path %s", dataPath);
 
-        List<InstanceIdentifier.PathArgument> reversedPath = Lists.reverse(dataPath.getPath());
+        List<YangInstanceIdentifier.PathArgument> reversedPath = Lists.reverse(dataPath.getPath());
 
         // Create deepest edit element with expected edit operation
         CompositeNode previous = getDeepestEditElement(reversedPath.get(0), operation, lastChildOverride);
@@ -214,7 +270,7 @@ public class NetconfDeviceWriteOnlyTx implements DOMDataWriteTransaction {
         reversedPath.remove(0);
 
         // Create edit structure in reversed order
-        for (final InstanceIdentifier.PathArgument arg : reversedPath) {
+        for (final YangInstanceIdentifier.PathArgument arg : reversedPath) {
             final CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder();
             builder.setQName(arg.getNodeType());
 
@@ -232,15 +288,15 @@ public class NetconfDeviceWriteOnlyTx implements DOMDataWriteTransaction {
         }
     }
 
-    private Map<QName, Object> getPredicates(final InstanceIdentifier.PathArgument arg) {
+    private Map<QName, Object> getPredicates(final YangInstanceIdentifier.PathArgument arg) {
         Map<QName, Object> predicates = Collections.emptyMap();
-        if (arg instanceof InstanceIdentifier.NodeIdentifierWithPredicates) {
-            predicates = ((InstanceIdentifier.NodeIdentifierWithPredicates) arg).getKeyValues();
+        if (arg instanceof YangInstanceIdentifier.NodeIdentifierWithPredicates) {
+            predicates = ((YangInstanceIdentifier.NodeIdentifierWithPredicates) arg).getKeyValues();
         }
         return predicates;
     }
 
-    private CompositeNode getDeepestEditElement(final InstanceIdentifier.PathArgument arg, final Optional<ModifyAction> operation, final Optional<CompositeNode> lastChildOverride) {
+    private CompositeNode getDeepestEditElement(final YangInstanceIdentifier.PathArgument arg, final Optional<ModifyAction> operation, final Optional<CompositeNode> lastChildOverride) {
         final CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder();
         builder.setQName(arg.getNodeType());
 
@@ -298,13 +354,6 @@ public class NetconfDeviceWriteOnlyTx implements DOMDataWriteTransaction {
         }
     }
 
-    private ImmutableCompositeNode getCommitRequest() {
-        final CompositeNodeBuilder<ImmutableCompositeNode> commitInput = ImmutableCompositeNode.builder();
-        commitInput.setQName(NETCONF_COMMIT_QNAME);
-        return commitInput.toInstance();
-    }
-
-
     @Override
     public Object getIdentifier() {
         return this;