Merge "Change handling of netconf cluster transactions"
authorTomas Cere <tcere@cisco.com>
Fri, 21 Apr 2017 08:36:04 +0000 (08:36 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 21 Apr 2017 08:36:04 +0000 (08:36 +0000)
27 files changed:
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/NetconfDOMTransaction.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteOperationTxProcessor.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfDOMDataBroker.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBroker.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActor.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActor.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfMasterDOMTransaction.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfReadOnlyTransaction.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfWriteOnlyTransaction.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologyUtils.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionReply.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionRequest.java [moved from netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/OpenTransaction.java with 54% similarity]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionReply.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionRequest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransactionTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransactionTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java

diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/NetconfDOMTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/NetconfDOMTransaction.java
deleted file mode 100644 (file)
index 58fe9b3..0000000
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.api;
-
-import com.google.common.base.Optional;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import scala.concurrent.Future;
-
-/**
- * Provides API for all operations of read and write transactions
- */
-// TODO we should separate between read tx and write tx
-public interface NetconfDOMTransaction {
-
-
-    /**
-     * Opens a new transaction. Transactions have to be opened before applying
-     * any operations on them. Previous transaction has to be either submitted
-     * ({@link #submit()} was invoked) or canceled ({@link #cancel()} was
-     * invoked.
-     *
-     * @throws IllegalStateException
-     *             if the previous transaction was not SUBMITTED or CANCELLED.
-     */
-    void openTransaction();
-
-    /**
-     * Read data from particular data-store
-     * @param store data-store type
-     * @param path unique identifier of a particular node instance in the data tree
-     * @return result as future
-     */
-    Future<Optional<NormalizedNodeMessage>> read(LogicalDatastoreType store, YangInstanceIdentifier path);
-
-    /**
-     * Test existence of node in certain data-store
-     * @param store data-store type
-     * @param path unique identifier of a particular node instance in the data tree
-     * @return result as future
-     */
-    Future<Boolean> exists(LogicalDatastoreType store, YangInstanceIdentifier path);
-
-    /**
-     * Put data to particular data-store
-     * @param store data-store type
-     * @param data data for inserting included in NormalizedNodeMessage object
-     */
-    void put(LogicalDatastoreType store, NormalizedNodeMessage data);
-
-    /**
-     * Merge data with existing node in particular data-store
-     * @param store data-store type
-     * @param data data for merging included in NormalizedNodeMessage object
-     */
-    void merge(LogicalDatastoreType store, NormalizedNodeMessage data);
-
-    /**
-     * Delete node in particular data-store in path
-     * @param store data-store type
-     * @param path unique identifier of a particular node instance in the data tree
-     */
-    void delete(LogicalDatastoreType store, YangInstanceIdentifier path);
-
-    /**
-     * Cancel operation
-     * @return success or not
-     */
-    boolean cancel();
-
-    /**
-     * Commit opened transaction.
-     * @return void or raised exception
-     */
-    Future<Void> submit();
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteOperationTxProcessor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteOperationTxProcessor.java
deleted file mode 100644 (file)
index 5d1c893..0000000
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.api;
-
-import akka.actor.ActorRef;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-/**
- * Provides API for remote calling operations of transactions. Slave sends message of particular
- * operation to master and master performs it.
- */
-public interface RemoteOperationTxProcessor {
-
-    /**
-     * Opens a new transaction.
-     */
-    void doOpenTransaction(ActorRef recipient, ActorRef sender);
-
-    /**
-     * Delete node in particular data-store in path
-     * @param store data-store type
-     * @param path unique identifier of a particular node instance in the data tree
-     */
-    void doDelete(LogicalDatastoreType store, YangInstanceIdentifier path);
-
-    /**
-     * Commit opened transaction.
-     * @param recipient recipient of submit result
-     * @param sender sender of submit result
-     */
-    void doSubmit(ActorRef recipient, ActorRef sender);
-
-    /**
-     * Cancel operation
-     * @param recipient recipient of cancel result
-     * @param sender sender of cancel result
-     */
-    void doCancel(ActorRef recipient, ActorRef sender);
-
-    /**
-     * Put data to particular data-store
-     * @param store data-store type
-     * @param data data for inserting included in NormalizedNodeMessage object
-     */
-    void doPut(LogicalDatastoreType store, NormalizedNodeMessage data);
-
-    /**
-     * Merge data with existing node in particular data-store
-     * @param store data-store type
-     * @param data data for merging included in NormalizedNodeMessage object
-     */
-    void doMerge(LogicalDatastoreType store, NormalizedNodeMessage data);
-
-    /**
-     * Read data from particular data-store
-     * @param store data-store type
-     * @param path unique identifier of a particular node instance in the data tree
-     * @param recipient recipient of read result
-     * @param sender sender of read result
-     */
-    void doRead(LogicalDatastoreType store, YangInstanceIdentifier path, ActorRef recipient, ActorRef sender);
-
-    /**
-     * Test existence of node in certain data-store
-     * @param store data-store type
-     * @param path unique identifier of a particular node instance in the data tree
-     * @param recipient recipient of exists result
-     * @param sender sender of exists result
-     */
-    void doExists(LogicalDatastoreType store, YangInstanceIdentifier path, ActorRef recipient, ActorRef sender);
-
-}
index 96a6cb1d291eb5422fc54f1635946da12e694bdb..1087df67f3b9947845b45c575252e081efc3d2be 100644 (file)
@@ -26,12 +26,10 @@ import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalProvider;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfMasterDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfProxyDOMTransaction;
 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
 import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -137,15 +135,11 @@ class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessi
 
         LOG.info("{}: Creating master data broker for device", id);
 
-        final NetconfDOMTransaction masterDOMTransactions =
-                new NetconfMasterDOMTransaction(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences);
-        deviceDataBroker =
-                new NetconfDOMDataBroker(actorSystem, id, masterDOMTransactions);
-        // We need to create NetconfProxyDOMTransaction so accessing mountpoint
+        deviceDataBroker = new NetconfDeviceDataBroker(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences);
+        // We need to create ProxyDOMDataBroker so accessing mountpoint
         // on leader node would be same as on follower node
-        final NetconfDOMTransaction proxyDOMTransation =
-                new NetconfProxyDOMTransaction(id, actorSystem, masterActorRef, actorResponseWaitTime);
-        final NetconfDOMDataBroker proxyDataBroker = new NetconfDOMDataBroker(actorSystem, id, proxyDOMTransation);
+        final ProxyDOMDataBroker proxyDataBroker =
+                new ProxyDOMDataBroker(actorSystem, id, masterActorRef, actorResponseWaitTime);
         salProvider.getMountInstance()
                 .onTopologyDeviceConnected(remoteSchemaContext, proxyDataBroker, deviceRpc, notificationService);
     }
@@ -154,7 +148,7 @@ class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessi
         final List<SourceIdentifier> sourceIdentifiers =
                 remoteSchemaContext.getAllModuleIdentifiers().stream().map(mi ->
                         RevisionSourceIdentifier.create(mi.getName(),
-                            (SimpleDateFormatUtil.DEFAULT_DATE_REV == mi.getRevision() ? Optional.<String>absent() :
+                                (SimpleDateFormatUtil.DEFAULT_DATE_REV == mi.getRevision() ? Optional.<String>absent() :
                                     Optional.of(mi.getQNameModule().getFormattedRevision()))))
                         .collect(Collectors.toList());
 
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfDOMDataBroker.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfDOMDataBroker.java
deleted file mode 100644 (file)
index 4fadb08..0000000
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl;
-
-import akka.actor.ActorSystem;
-import java.util.Collections;
-import java.util.Map;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfReadOnlyTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfWriteOnlyTransaction;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-public class NetconfDOMDataBroker implements DOMDataBroker {
-
-    private final RemoteDeviceId id;
-    private final NetconfDOMTransaction masterDataBroker;
-    private final ActorSystem actorSystem;
-
-    public NetconfDOMDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
-                         final NetconfDOMTransaction masterDataBroker) {
-        this.id = id;
-        this.masterDataBroker = masterDataBroker;
-        this.actorSystem = actorSystem;
-    }
-
-    @Override
-    public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
-        return new NetconfReadOnlyTransaction(id, actorSystem, masterDataBroker);
-    }
-
-    @Override
-    public DOMDataReadWriteTransaction newReadWriteTransaction() {
-        return new ReadWriteTx(new NetconfReadOnlyTransaction(id, actorSystem, masterDataBroker),
-                new NetconfWriteOnlyTransaction(id, actorSystem, masterDataBroker));
-    }
-
-    @Override
-    public DOMDataWriteTransaction newWriteOnlyTransaction() {
-        return new NetconfWriteOnlyTransaction(id, actorSystem, masterDataBroker);
-    }
-
-    @Override
-    public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(
-            LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener,
-            DataChangeScope triggeringScope) {
-        throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
-    }
-
-    @Override
-    public DOMTransactionChain createTransactionChain(TransactionChainListener listener) {
-        throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
-    }
-
-    @Nonnull
-    @Override
-    public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
-        return Collections.emptyMap();
-    }
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBroker.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBroker.java
new file mode 100644 (file)
index 0000000..416d9bd
--- /dev/null
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyReadTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+public class ProxyDOMDataBroker implements DOMDataBroker {
+
+    private final Timeout askTimeout;
+    private final RemoteDeviceId id;
+    private final ActorRef masterNode;
+    private final ActorSystem actorSystem;
+
+    /**
+     * @param actorSystem system
+     * @param id          id
+     * @param masterNode  {@link org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor} ref
+     * @param askTimeout ask timeout
+     */
+    public ProxyDOMDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
+                              final ActorRef masterNode, final Timeout askTimeout) {
+        this.id = id;
+        this.masterNode = masterNode;
+        this.actorSystem = actorSystem;
+        this.askTimeout = askTimeout;
+    }
+
+    @Override
+    public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+        final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewReadTransactionRequest(), askTimeout);
+        try {
+            final Object msg = Await.result(txActorFuture, askTimeout.duration());
+            if (msg instanceof Throwable) {
+                throw (Throwable) msg;
+            }
+            Preconditions.checkState(msg instanceof NewReadTransactionReply);
+            final NewReadTransactionReply reply = (NewReadTransactionReply) msg;
+            return new ProxyReadTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+        } catch (final Throwable t) {
+            throw new IllegalStateException("Can't create ProxyReadTransaction", t);
+        }
+    }
+
+    @Override
+    public DOMDataReadWriteTransaction newReadWriteTransaction() {
+        return new ReadWriteTx(newReadOnlyTransaction(), newWriteOnlyTransaction());
+    }
+
+    @Override
+    public DOMDataWriteTransaction newWriteOnlyTransaction() {
+        final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewWriteTransactionRequest(), askTimeout);
+        try {
+            final Object msg = Await.result(txActorFuture, askTimeout.duration());
+            if (msg instanceof Throwable) {
+                throw (Throwable) msg;
+            }
+            Preconditions.checkState(msg instanceof NewWriteTransactionReply);
+            final NewWriteTransactionReply reply = (NewWriteTransactionReply) msg;
+            return new ProxyWriteTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+        } catch (final Throwable t) {
+            throw new IllegalStateException("Can't create ProxyWriteTransaction", t);
+        }
+    }
+
+    @Override
+    public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(
+            final LogicalDatastoreType store, final YangInstanceIdentifier path, final DOMDataChangeListener listener,
+            final DataChangeScope triggeringScope) {
+        throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
+    }
+
+    @Override
+    public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+        throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
+    }
+
+    @Nonnull
+    @Override
+    public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+        return Collections.emptyMap();
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java
deleted file mode 100644 (file)
index f19d0d5..0000000
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl;
-
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RemoteOperationTxProcessorImpl implements RemoteOperationTxProcessor, AutoCloseable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RemoteOperationTxProcessorImpl.class);
-
-    private final DOMDataBroker dataBroker;
-    private final RemoteDeviceId id;
-    private DOMDataWriteTransaction writeTx;
-    private DOMDataReadOnlyTransaction readTx;
-
-    private ActorRef currentUser = null;
-
-    public RemoteOperationTxProcessorImpl(final DOMDataBroker dataBroker, final RemoteDeviceId id) {
-        this.dataBroker = dataBroker;
-        this.id = id;
-        this.readTx = dataBroker.newReadOnlyTransaction();
-    }
-
-    @Override
-    public void doOpenTransaction(ActorRef recipient, ActorRef sender) {
-        if (currentUser != null) {
-            LOG.error("{}: Opening a new transaction for {} failed.", id, recipient);
-            recipient.tell(new Status.Failure(
-                    new IllegalStateException("Transaction is already opened for another user")), recipient);
-            return;
-        }
-
-        LOG.debug("{}: Opening a new transaction for {}", id, recipient);
-        currentUser = recipient;
-        recipient.tell(new Status.Success(null), sender);
-    }
-
-    @Override
-    public void doDelete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
-        if (writeTx == null) {
-            writeTx = dataBroker.newWriteOnlyTransaction();
-        }
-        writeTx.delete(store, path);
-    }
-
-    @Override
-    public void doSubmit(final ActorRef recipient, final ActorRef sender) {
-        currentUser = null;
-        if (writeTx != null) {
-            CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
-            Futures.addCallback(submitFuture, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess(Void result) {
-                    recipient.tell(new SubmitReply(), sender);
-                }
-
-                @Override
-                public void onFailure(@Nonnull Throwable throwable) {
-                    recipient.tell(throwable, sender);
-                }
-            });
-        } else {
-            recipient.tell(new SubmitFailedReply(), sender);
-            LOG.warn("{}: Couldn't submit transaction because it was already closed.", id);
-        }
-    }
-
-    @Override
-    public void doCancel(final ActorRef recipient, final ActorRef sender) {
-        currentUser = null;
-        boolean cancel = false;
-        if (writeTx != null) {
-            cancel = writeTx.cancel();
-        }
-        recipient.tell(cancel, sender);
-
-    }
-
-    @Override
-    public void doPut(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
-        if (writeTx == null) {
-            writeTx = dataBroker.newWriteOnlyTransaction();
-        }
-        writeTx.put(store, data.getIdentifier(), data.getNode());
-    }
-
-    @Override
-    public void doMerge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
-        if (writeTx == null) {
-            writeTx = dataBroker.newWriteOnlyTransaction();
-        }
-        writeTx.merge(store, data.getIdentifier(), data.getNode());
-    }
-
-    @Override
-    public void doRead(final LogicalDatastoreType store, final YangInstanceIdentifier path, final ActorRef recipient,
-                       final ActorRef sender) {
-        final CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> readFuture =
-                readTx.read(store, path);
-
-        Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
-
-            @Override
-            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
-                if (!result.isPresent()) {
-                    recipient.tell(new EmptyReadResponse(), sender);
-                    return;
-                }
-                recipient.tell(new NormalizedNodeMessage(path, result.get()), sender);
-            }
-
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                recipient.tell(throwable, sender);
-            }
-        });
-    }
-
-    @Override
-    public void doExists(final LogicalDatastoreType store, final YangInstanceIdentifier path, final ActorRef recipient,
-                         final ActorRef sender) {
-        final CheckedFuture<Boolean, ReadFailedException> readFuture =
-                readTx.exists(store, path);
-        Futures.addCallback(readFuture, new FutureCallback<Boolean>() {
-            @Override
-            public void onSuccess(final Boolean result) {
-                if (result == null) {
-                    recipient.tell(false, sender);
-                } else {
-                    recipient.tell(result, sender);
-                }
-            }
-
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                recipient.tell(throwable, sender);
-            }
-        });
-    }
-
-    @Override
-    public void close() throws Exception {
-        currentUser = null;
-        if (readTx != null) {
-            readTx.close();
-        }
-    }
-}
index fd1c28a0ade3aecedd083b572263c0ac943d2ed7..92d7e1a36ac6f8228075935ae6d32d0515c2a366 100644 (file)
@@ -16,8 +16,6 @@ import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalProvider;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfProxyDOMTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,11 +51,8 @@ public class SlaveSalFacade {
                                         final ActorRef masterActorRef) {
         final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
 
-        final NetconfDOMTransaction proxyDOMTransactions =
-                new NetconfProxyDOMTransaction(id, actorSystem, masterActorRef, actorResponseWaitTime);
-
-        final NetconfDOMDataBroker netconfDeviceDataBroker =
-                new NetconfDOMDataBroker(actorSystem, id, proxyDOMTransactions);
+        final ProxyDOMDataBroker netconfDeviceDataBroker =
+                new ProxyDOMDataBroker(actorSystem, id, masterActorRef, actorResponseWaitTime);
 
         salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, netconfDeviceDataBroker,
                 deviceRpc, notificationService);
index ba8f11fdc07d69fb006d0db31de6ebf5f6fb9f4a..9828e2476fcf7931295fbad29943b067e623548f 100644 (file)
@@ -22,14 +22,15 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
-import org.opendaylight.netconf.topology.singleton.impl.RemoteOperationTxProcessorImpl;
 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
@@ -42,16 +43,11 @@ import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMount
 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.OpenTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
@@ -76,11 +72,13 @@ public class NetconfNodeActor extends UntypedActor {
     private final SchemaSourceRegistry schemaRegistry;
     private final SchemaRepository schemaRepository;
 
-    private RemoteOperationTxProcessor operationsProcessor;
     private List<SourceIdentifier> sourceIdentifiers;
     private DOMRpcService deviceRpc;
     private SlaveSalFacade slaveSalManager;
     private final Timeout actorResponseWaitTime;
+    private DOMDataBroker deviceDataBroker;
+    //readTxActor can be shared
+    private ActorRef readTxActor;
 
     public static Props props(final NetconfTopologySetup setup,
                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
@@ -103,11 +101,12 @@ public class NetconfNodeActor extends UntypedActor {
     public void onReceive(final Object message) throws Exception {
         if (message instanceof CreateInitialMasterActorData) { // master
 
-            sourceIdentifiers = ((CreateInitialMasterActorData) message).getSourceIndentifiers();
-            operationsProcessor =
-                    new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
-                            id);
-            this.deviceRpc = ((CreateInitialMasterActorData) message).getDeviceRpc();
+            final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
+            sourceIdentifiers = masterActorData.getSourceIndentifiers();
+            this.deviceDataBroker = masterActorData.getDeviceDataBroker();
+            final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
+            readTxActor = context().actorOf(ReadTransactionActor.props(tx));
+            this.deviceRpc = masterActorData.getDeviceRpc();
 
             sender().tell(new MasterActorDataInitialized(), self());
 
@@ -118,21 +117,30 @@ public class NetconfNodeActor extends UntypedActor {
             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
             sender().tell(new MasterActorDataInitialized(), self());
         } else if (message instanceof AskForMasterMountPoint) { // master
-            // only master contains reference to operations processor
-            if (operationsProcessor != null) {
+            // only master contains reference to deviceDataBroker
+            if (deviceDataBroker != null) {
                 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
             }
 
-        } else if (message instanceof TransactionRequest) { // master
-
-            resolveProxyCalls(message, sender(), getSelf());
-
         } else if (message instanceof YangTextSchemaSourceRequest) { // master
 
             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
 
-        } else if (message instanceof InvokeRpcMessage) {
+        } else if (message instanceof NewReadTransactionRequest) { // master
+
+            sender().tell(new NewReadTransactionReply(readTxActor), self());
+
+        } else if (message instanceof NewWriteTransactionRequest) { // master
+            try {
+                final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
+                final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx));
+                sender().tell(new NewWriteTransactionReply(txActor), self());
+            } catch (final Throwable t) {
+                sender().tell(t, self());
+            }
+
+        } else if (message instanceof InvokeRpcMessage) { // master
 
             final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
@@ -151,43 +159,6 @@ public class NetconfNodeActor extends UntypedActor {
         }
     }
 
-    private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) {
-        if (message instanceof OpenTransaction) {
-            operationsProcessor.doOpenTransaction(recipient, futureSender);
-        } else if (message instanceof ReadRequest) {
-
-            final ReadRequest readRequest = (ReadRequest) message;
-            operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
-
-        } else if (message instanceof ExistsRequest) {
-
-            final ExistsRequest readRequest = (ExistsRequest) message;
-            operationsProcessor.doExists(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
-
-        } else if (message instanceof MergeRequest) {
-
-            final MergeRequest mergeRequest = (MergeRequest) message;
-            operationsProcessor.doMerge(mergeRequest.getStore(), mergeRequest.getNormalizedNodeMessage());
-
-        } else if (message instanceof PutRequest) {
-
-            final PutRequest putRequest = (PutRequest) message;
-            operationsProcessor.doPut(putRequest.getStore(), putRequest.getNormalizedNodeMessage());
-
-        } else if (message instanceof DeleteRequest) {
-
-            final DeleteRequest deleteRequest = (DeleteRequest) message;
-            operationsProcessor.doDelete(deleteRequest.getStore(), deleteRequest.getPath());
-
-        } else if (message instanceof CancelRequest) {
-
-            operationsProcessor.doCancel(recipient, futureSender);
-
-        } else if (message instanceof SubmitRequest) {
-
-            operationsProcessor.doSubmit(recipient, futureSender);
-        }
-    }
 
     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
         final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActor.java
new file mode 100644 (file)
index 0000000..b6bf651
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * ReadTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
+ */
+public class ReadTransactionActor extends UntypedActor {
+
+    private final DOMDataReadOnlyTransaction tx;
+
+    /**
+     * Creates new actor Props.
+     *
+     * @param tx delegate device read transaction
+     * @return props
+     */
+    static Props props(final DOMDataReadOnlyTransaction tx) {
+        return Props.create(ReadTransactionActor.class, () -> new ReadTransactionActor(tx));
+    }
+
+    private ReadTransactionActor(final DOMDataReadOnlyTransaction tx) {
+        this.tx = tx;
+    }
+
+    @Override
+    public void onReceive(final Object message) throws Throwable {
+        if (message instanceof ReadRequest) {
+
+            final ReadRequest readRequest = (ReadRequest) message;
+            final YangInstanceIdentifier path = readRequest.getPath();
+            final LogicalDatastoreType store = readRequest.getStore();
+            read(path, store, sender(), self());
+
+        } else if (message instanceof ExistsRequest) {
+            final ExistsRequest readRequest = (ExistsRequest) message;
+            final YangInstanceIdentifier path = readRequest.getPath();
+            final LogicalDatastoreType store = readRequest.getStore();
+            exists(path, store, sender(), self());
+
+        } else {
+            unhandled(message);
+        }
+    }
+
+    private void read(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
+                      final ActorRef self) {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(store, path);
+        Futures.addCallback(read, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+
+            @Override
+            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
+                if (!result.isPresent()) {
+                    sender.tell(new EmptyReadResponse(), self);
+                    return;
+                }
+                sender.tell(new NormalizedNodeMessage(path, result.get()), self);
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                sender.tell(throwable, self);
+            }
+        });
+    }
+
+    private void exists(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
+                        final ActorRef self) {
+        final CheckedFuture<Boolean, ReadFailedException> readFuture = tx.exists(store, path);
+        Futures.addCallback(readFuture, new FutureCallback<Boolean>() {
+            @Override
+            public void onSuccess(final Boolean result) {
+                if (result == null) {
+                    sender.tell(false, self);
+                } else {
+                    sender.tell(result, self);
+                }
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                sender.tell(throwable, self);
+            }
+        });
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActor.java
new file mode 100644 (file)
index 0000000..5350b64
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+
+/**
+ * WriteTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
+ */
+public class WriteTransactionActor extends UntypedActor {
+
+    private final DOMDataWriteTransaction tx;
+
+    /**
+     * Creates new actor Props.
+     *
+     * @param tx delegate device write transaction
+     * @return props
+     */
+    static Props props(final DOMDataWriteTransaction tx) {
+        return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx));
+    }
+
+    private WriteTransactionActor(final DOMDataWriteTransaction tx) {
+        this.tx = tx;
+    }
+
+    @Override
+    public void onReceive(final Object message) throws Throwable {
+        if (message instanceof MergeRequest) {
+            final MergeRequest mergeRequest = (MergeRequest) message;
+            final NormalizedNodeMessage data = mergeRequest.getNormalizedNodeMessage();
+            tx.merge(mergeRequest.getStore(), data.getIdentifier(), data.getNode());
+        } else if (message instanceof PutRequest) {
+            final PutRequest putRequest = (PutRequest) message;
+            final NormalizedNodeMessage data = putRequest.getNormalizedNodeMessage();
+            tx.put(putRequest.getStore(), data.getIdentifier(), data.getNode());
+        } else if (message instanceof DeleteRequest) {
+            final DeleteRequest deleteRequest = (DeleteRequest) message;
+            tx.delete(deleteRequest.getStore(), deleteRequest.getPath());
+        } else if (message instanceof CancelRequest) {
+            cancel();
+        } else if (message instanceof SubmitRequest) {
+            submit(sender(), self());
+        } else {
+            unhandled(message);
+        }
+    }
+
+    private void cancel() {
+        final boolean cancelled = tx.cancel();
+        sender().tell(cancelled, self());
+        context().stop(self());
+    }
+
+    private void submit(final ActorRef requester, final ActorRef self) {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+        context().stop(self);
+        Futures.addCallback(submitFuture, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                requester.tell(new SubmitReply(), self);
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                requester.tell(throwable, self);
+            }
+        });
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfMasterDOMTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfMasterDOMTransaction.java
deleted file mode 100644 (file)
index 269ee2f..0000000
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.impl.Promise.DefaultPromise;
-
-public class NetconfMasterDOMTransaction implements NetconfDOMTransaction {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfMasterDOMTransaction.class);
-
-    private final RemoteDeviceId id;
-    private final DOMDataBroker delegateBroker;
-
-    private DOMDataReadOnlyTransaction readTx;
-    private DOMDataWriteTransaction writeTx;
-
-    public NetconfMasterDOMTransaction(final RemoteDeviceId id,
-                                       final SchemaContext schemaContext,
-                                       final DOMRpcService rpc,
-                                       final NetconfSessionPreferences netconfSessionPreferences) {
-        this(id, new NetconfDeviceDataBroker(id, schemaContext, rpc, netconfSessionPreferences));
-    }
-
-    public NetconfMasterDOMTransaction(final RemoteDeviceId id, final DOMDataBroker delegateBroker) {
-        this.id = id;
-        this.delegateBroker = delegateBroker;
-
-        // only ever need 1 readTx since it doesnt need to be closed
-        readTx = delegateBroker.newReadOnlyTransaction();
-    }
-
-    @Override
-    public void openTransaction() {
-        // TODO We don't have to do anything here since
-        // NetconfProxyDOMTransactions and RemoteOperationTxProcessor do all
-        // the work regarding opening transactions. But maybe we should check
-        // for open transaction here instead in RemoteOperationTxProcessor
-    }
-
-    @Override
-    public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
-                                                        final YangInstanceIdentifier path) {
-        LOG.trace("{}: Read[{}] {} via NETCONF: {}", id, readTx.getIdentifier(), store, path);
-
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture = readTx.read(store, path);
-
-        final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
-        Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
-            @Override
-            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
-                if (!result.isPresent()) {
-                    promise.success(Optional.absent());
-                } else {
-                    promise.success(Optional.of(new NormalizedNodeMessage(path, result.get())));
-                }
-            }
-
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                promise.failure(throwable);
-            }
-        });
-        return promise.future();
-    }
-
-    @Override
-    public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
-        LOG.trace("{}: Exists[{}] {} via NETCONF: {}", id, readTx.getIdentifier(), store, path);
-
-        final CheckedFuture<Boolean, ReadFailedException> existsFuture = readTx.exists(store, path);
-
-        final DefaultPromise<Boolean> promise = new DefaultPromise<>();
-        Futures.addCallback(existsFuture, new FutureCallback<Boolean>() {
-            @Override
-            public void onSuccess(final Boolean result) {
-                promise.success(result);
-            }
-
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                promise.failure(throwable);
-            }
-        });
-        return promise.future();
-    }
-
-    @Override
-    public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
-        if (writeTx == null) {
-            writeTx = delegateBroker.newWriteOnlyTransaction();
-        }
-
-        LOG.trace("{}: Write[{}] {} via NETCONF: {} with payload {}", id, writeTx.getIdentifier(), store,
-                data.getIdentifier(), data.getNode());
-
-        writeTx.put(store, data.getIdentifier(), data.getNode());
-    }
-
-    @Override
-    public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
-        if (writeTx == null) {
-            writeTx = delegateBroker.newWriteOnlyTransaction();
-        }
-
-        LOG.trace("{}: Merge[{}] {} via NETCONF: {} with payload {}", id, writeTx.getIdentifier(),store,
-                data.getIdentifier(), data.getNode());
-
-        writeTx.merge(store, data.getIdentifier(), data.getNode());
-    }
-
-    @Override
-    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
-        if (writeTx == null) {
-            writeTx = delegateBroker.newWriteOnlyTransaction();
-        }
-
-        LOG.trace("{}: Delete[{}} {} via NETCONF: {}", id, writeTx.getIdentifier(), store, path);
-
-        writeTx.delete(store, path);
-    }
-
-    @Override
-    public boolean cancel() {
-        LOG.trace("{}: Cancel[{}} via NETCONF", id, writeTx.getIdentifier());
-        try {
-            return writeTx.cancel();
-        } finally {
-            writeTx = null;
-        }
-    }
-
-    @Override
-    public Future<Void> submit() {
-        LOG.trace("{}: Submit[{}} via NETCONF", id, writeTx.getIdentifier());
-
-        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
-        writeTx = null;
-
-        final DefaultPromise<Void> promise = new DefaultPromise<>();
-        Futures.addCallback(submitFuture, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                promise.success(result);
-            }
-
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                promise.failure(throwable);
-            }
-        });
-        return promise.future();
-    }
-
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java
deleted file mode 100644 (file)
index 535123b..0000000
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import org.opendaylight.controller.config.util.xml.DocumentedException;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.OpenTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.impl.Promise.DefaultPromise;
-
-
-public class NetconfProxyDOMTransaction implements NetconfDOMTransaction {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfProxyDOMTransaction.class);
-
-    private final RemoteDeviceId id;
-    private final ActorSystem actorSystem;
-    private final ActorRef masterContextRef;
-    private final Timeout actorResponseWaitTime;
-
-    public NetconfProxyDOMTransaction(final RemoteDeviceId id,
-                                      final ActorSystem actorSystem,
-                                      final ActorRef masterContextRef,
-                                      final Timeout actorResponseWaitTime) {
-        this.id = id;
-        this.actorSystem = actorSystem;
-        this.masterContextRef = masterContextRef;
-        this.actorResponseWaitTime = actorResponseWaitTime;
-    }
-
-    @Override
-    public void openTransaction() {
-        // TODO we can do some checking for already opened transaction also
-        // here in this class. We can track open transaction at least for this
-        // node.
-        LOG.debug("{}: Requesting leader {} to open new transaction", id, masterContextRef);
-        final Future<Object> openTxFuture =
-                Patterns.ask(masterContextRef, new OpenTransaction(), actorResponseWaitTime);
-        try {
-            // we have to wait here so we can see if tx can be opened
-            Await.result(openTxFuture, actorResponseWaitTime.duration());
-            LOG.debug("{}: New transaction opened successfully", id);
-        } catch (final Exception e) {
-            LOG.error("{}: Failed to open new transaction", id, e);
-            Throwables.propagate(e);
-        }
-    }
-
-    @Override
-    public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
-                                                        final YangInstanceIdentifier path) {
-
-        final Future<Object> readScalaFuture =
-                Patterns.ask(masterContextRef, new ReadRequest(store, path), actorResponseWaitTime);
-
-        LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
-
-        final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
-
-        readScalaFuture.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object success) throws Throwable {
-                if (failure != null) { // ask timeout
-                    final Exception exception = new DocumentedException(
-                            id + ":Master is down. Please try again.",
-                            DocumentedException.ErrorType.TRANSPORT,
-                            DocumentedException.ErrorTag.RESOURCE_DENIED,
-                            DocumentedException.ErrorSeverity.ERROR);
-                    promise.failure(exception);
-                    return;
-                }
-                if (success instanceof Throwable) { // Error sended by master
-                    promise.failure((Throwable) success);
-                    return;
-                }
-                if (success instanceof EmptyReadResponse) {
-                    promise.success(Optional.absent());
-                    return;
-                }
-                promise.success(Optional.of((NormalizedNodeMessage) success));
-            }
-        }, actorSystem.dispatcher());
-
-        return promise.future();
-    }
-
-    @Override
-    public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
-        final Future<Object> existsScalaFuture =
-                Patterns.ask(masterContextRef, new ExistsRequest(store, path), actorResponseWaitTime);
-
-        LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
-
-        final DefaultPromise<Boolean> promise = new DefaultPromise<>();
-        existsScalaFuture.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object success) throws Throwable {
-                if (failure != null) { // ask timeout
-                    final Exception exception = new DocumentedException(
-                            id + ":Master is down. Please try again.",
-                            DocumentedException.ErrorType.TRANSPORT,
-                            DocumentedException.ErrorTag.RESOURCE_DENIED,
-                            DocumentedException.ErrorSeverity.ERROR);
-                    promise.failure(exception);
-                    return;
-                }
-                if (success instanceof Throwable) {
-                    promise.failure((Throwable) success);
-                    return;
-                }
-                promise.success((Boolean) success);
-            }
-        }, actorSystem.dispatcher());
-        return promise.future();
-    }
-
-    @Override
-    public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
-        LOG.trace("{}: Write {} via NETCONF: {} with payload {}", id, store, data.getIdentifier(), data.getNode());
-
-        masterContextRef.tell(new PutRequest(store, data), ActorRef.noSender());
-    }
-
-    @Override
-    public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
-        LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, data.getIdentifier(), data.getNode());
-
-        masterContextRef.tell(new MergeRequest(store, data), ActorRef.noSender());
-    }
-
-    @Override
-    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
-        LOG.trace("{}: Delete {} via NETCONF: {}", id, store, path);
-
-        masterContextRef.tell(new DeleteRequest(store, path), ActorRef.noSender());
-    }
-
-    @Override
-    public boolean cancel() {
-        final Future<Object> cancelScalaFuture =
-                Patterns.ask(masterContextRef, new CancelRequest(), actorResponseWaitTime);
-
-        LOG.trace("{}: Cancel {} via NETCONF", id);
-
-        try {
-            // here must be Await because AsyncWriteTransaction do not return future
-            return (boolean) Await.result(cancelScalaFuture, actorResponseWaitTime.duration());
-        } catch (Exception e) {
-            return false;
-        }
-    }
-
-    @Override
-    public Future<Void> submit() {
-        final Future<Object> submitScalaFuture =
-                Patterns.ask(masterContextRef, new SubmitRequest(), actorResponseWaitTime);
-
-        LOG.trace("{}: Submit {} via NETCONF", id);
-
-        final DefaultPromise<Void> promise = new DefaultPromise<>();
-
-        submitScalaFuture.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object success) throws Throwable {
-                if (failure != null) { // ask timeout
-                    final Exception exception = new DocumentedException(
-                            id + ":Master is down. Please try again.",
-                            DocumentedException.ErrorType.TRANSPORT,
-                            DocumentedException.ErrorTag.RESOURCE_DENIED,
-                            DocumentedException.ErrorSeverity.ERROR);
-                    promise.failure(exception);
-                    return;
-                }
-                if (success instanceof Throwable) {
-                    promise.failure((Throwable) success);
-                } else {
-                    if (success instanceof SubmitFailedReply) {
-                        LOG.error("{}: Transaction was not submitted because already closed.", id);
-                    }
-                    promise.success(null);
-                }
-            }
-        }, actorSystem.dispatcher());
-
-        return promise.future();
-    }
-
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfReadOnlyTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfReadOnlyTransaction.java
deleted file mode 100644 (file)
index 3048337..0000000
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.config.util.xml.DocumentedException;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-public class NetconfReadOnlyTransaction implements DOMDataReadOnlyTransaction {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfReadOnlyTransaction.class);
-
-    private final RemoteDeviceId id;
-    private final NetconfDOMTransaction delegate;
-    private final ActorSystem actorSystem;
-
-    public NetconfReadOnlyTransaction(final RemoteDeviceId id,
-                                      final ActorSystem actorSystem,
-                                      final NetconfDOMTransaction delegate) {
-        this.id = id;
-        this.delegate = delegate;
-        this.actorSystem = actorSystem;
-    }
-
-    @Override
-    public void close() {
-        //NOOP
-    }
-
-    @Override
-    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
-                                                                                   final YangInstanceIdentifier path) {
-
-        LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
-
-        final Future<Optional<NormalizedNodeMessage>> future = delegate.read(store, path);
-        final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> checkedFuture;
-        checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
-            @Nullable
-            @Override
-            public ReadFailedException apply(Exception cause) {
-                if (cause.getCause() instanceof DocumentedException) {
-                    final DocumentedException exception = (DocumentedException) cause.getCause();
-                    if (exception.getErrorSeverity() == DocumentedException.ErrorSeverity.ERROR &&
-                            exception.getErrorType() == DocumentedException.ErrorType.TRANSPORT &&
-                            exception.getErrorTag() == DocumentedException.ErrorTag.RESOURCE_DENIED) {
-                        final RpcError error = RpcResultBuilder.newError(
-                                RpcError.ErrorType.TRANSPORT,
-                                exception.getErrorTag().getTagValue(),
-                                exception.getMessage());
-                        return new ReadFailedException("Read from transaction failed", error);
-                    }
-                }
-                return new ReadFailedException("Read from transaction failed", cause);
-            }
-        });
-        future.onComplete(new OnComplete<Optional<NormalizedNodeMessage>>() {
-            @Override
-            public void onComplete(final Throwable throwable,
-                                   final Optional<NormalizedNodeMessage> normalizedNodeMessage) throws Throwable {
-                if (throwable == null) {
-                    if (normalizedNodeMessage.isPresent()) {
-                        settableFuture.set(normalizedNodeMessage.transform(new Function<NormalizedNodeMessage,
-                                NormalizedNode<?, ?>>() {
-
-                            @Nullable
-                            @Override
-                            public NormalizedNode<?, ?> apply(final NormalizedNodeMessage input) {
-                                return input.getNode();
-                            }
-                        }));
-                    } else {
-                        settableFuture.set(Optional.absent());
-                    }
-                } else {
-                    settableFuture.setException(throwable);
-                }
-            }
-        }, actorSystem.dispatcher());
-        return checkedFuture;
-    }
-
-    @Override
-    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
-                                                              final YangInstanceIdentifier path) {
-
-        LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
-
-        final Future<Boolean> existsFuture = delegate.exists(store, path);
-        final SettableFuture<Boolean> settableFuture = SettableFuture.create();
-        final CheckedFuture<Boolean, ReadFailedException> checkedFuture;
-        checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
-            @Nullable
-            @Override
-            public ReadFailedException apply(Exception cause) {
-                return new ReadFailedException("Read from transaction failed", cause);
-            }
-        });
-        existsFuture.onComplete(new OnComplete<Boolean>() {
-            @Override
-            public void onComplete(final Throwable throwable, final Boolean result) throws Throwable {
-                if (throwable == null) {
-                    settableFuture.set(result);
-                } else {
-                    settableFuture.setException(throwable);
-                }
-            }
-        }, actorSystem.dispatcher());
-        return checkedFuture;
-    }
-
-    @Override
-    public Object getIdentifier() {
-        return this;
-    }
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfWriteOnlyTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfWriteOnlyTransaction.java
deleted file mode 100644 (file)
index 422cf7a..0000000
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-public class NetconfWriteOnlyTransaction implements DOMDataWriteTransaction {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfWriteOnlyTransaction.class);
-
-    private final RemoteDeviceId id;
-    private final NetconfDOMTransaction delegate;
-    private final ActorSystem actorSystem;
-
-    public NetconfWriteOnlyTransaction(final RemoteDeviceId id,
-                                       final ActorSystem actorSystem,
-                                       final NetconfDOMTransaction delegate) {
-        this.id = id;
-        this.delegate = delegate;
-        this.actorSystem = actorSystem;
-
-        this.delegate.openTransaction();
-    }
-
-    @Override
-    public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
-                    final NormalizedNode<?,?> data) {
-        LOG.trace("{}: Write {} via NETCONF: {} with payload {}", id, store, path, data);
-
-        delegate.put(store, new NormalizedNodeMessage(path, data));
-    }
-
-    @Override
-    public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
-                      final NormalizedNode<?,?> data) {
-        LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, path, data);
-
-        delegate.merge(store, new NormalizedNodeMessage(path, data));
-    }
-
-    @Override
-    public boolean cancel() {
-        LOG.trace("{}: Cancel", id);
-
-        return delegate.cancel();
-    }
-
-    @Override
-    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
-        LOG.trace("{}: Delete {} via NETCONF: {}", id, store, path);
-
-        delegate.delete(store, path);
-    }
-
-    @Override
-    public CheckedFuture<Void, TransactionCommitFailedException> submit() {
-        LOG.trace("{}: Submit", id);
-
-        final Future<Void> submit = delegate.submit();
-        final SettableFuture<Void> settFuture = SettableFuture.create();
-        final CheckedFuture<Void, TransactionCommitFailedException> checkedFuture;
-        checkedFuture = Futures.makeChecked(settFuture, new Function<Exception, TransactionCommitFailedException>() {
-            @Nullable
-            @Override
-            public TransactionCommitFailedException apply(Exception input) {
-                return new TransactionCommitFailedException("Transaction commit failed", input);
-            }
-        });
-        submit.onComplete(new OnComplete<Void>() {
-            @Override
-            public void onComplete(Throwable throwable, Void object) throws Throwable {
-                if (throwable == null) {
-                    settFuture.set(object);
-                } else {
-                    settFuture.setException(throwable);
-                }
-            }
-        }, actorSystem.dispatcher());
-        return checkedFuture;
-    }
-
-    @Override
-    public ListenableFuture<RpcResult<TransactionStatus>> commit() {
-        LOG.trace("{}: Commit", id);
-
-        final Future<Void> commit = delegate.submit();
-        final SettableFuture<RpcResult<TransactionStatus>> settFuture = SettableFuture.create();
-        commit.onComplete(new OnComplete<Void>() {
-            @Override
-            public void onComplete(final Throwable throwable, final Void result) throws Throwable {
-                if (throwable == null) {
-                    TransactionStatus status = TransactionStatus.SUBMITED;
-                    RpcResult<TransactionStatus> rpcResult = RpcResultBuilder.success(status).build();
-                    settFuture.set(rpcResult);
-                } else {
-                    settFuture.setException(throwable);
-                }
-            }
-        }, actorSystem.dispatcher());
-        return settFuture;
-    }
-
-    @Override
-    public Object getIdentifier() {
-        return this;
-    }
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java
new file mode 100644 (file)
index 0000000..0756f33
--- /dev/null
@@ -0,0 +1,132 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * ProxyReadTransaction uses provided {@link ActorRef} to delegate method calls to master
+ * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor}.
+ */
+public class ProxyReadTransaction implements DOMDataReadOnlyTransaction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProxyReadTransaction.class);
+
+    private final ActorRef masterTxActor;
+    private final RemoteDeviceId id;
+    private final ActorSystem actorSystem;
+    private final Timeout askTimeout;
+
+    /**
+     * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor} ref
+     * @param id            device id
+     * @param actorSystem   system
+     * @param askTimeout
+     */
+    public ProxyReadTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
+                                final Timeout askTimeout) {
+        this.masterTxActor = masterTxActor;
+        this.id = id;
+        this.actorSystem = actorSystem;
+        this.askTimeout = askTimeout;
+    }
+
+    @Override
+    public void close() {
+        //noop
+    }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+                                                                                   final YangInstanceIdentifier path) {
+        LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
+
+        final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
+        future.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure,
+                                   final Object success) throws Throwable {
+                if (failure != null) { // ask timeout
+                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+                    settableFuture.setException(exception);
+                    return;
+                }
+                if (success instanceof Throwable) { // Error sended by master
+                    settableFuture.setException((Throwable) success);
+                    return;
+                }
+                if (success instanceof EmptyReadResponse) {
+                    settableFuture.set(Optional.absent());
+                    return;
+                }
+                if (success instanceof NormalizedNodeMessage) {
+                    final NormalizedNodeMessage data = (NormalizedNodeMessage) success;
+                    settableFuture.set(Optional.of(data.getNode()));
+                }
+            }
+        }, actorSystem.dispatcher());
+        return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+    }
+
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+                                                              final YangInstanceIdentifier path) {
+        final Future<Object> existsScalaFuture =
+                Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
+
+        LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
+
+        final SettableFuture<Boolean> settableFuture = SettableFuture.create();
+        existsScalaFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) { // ask timeout
+                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+                    settableFuture.setException(exception);
+                    return;
+                }
+                if (success instanceof Throwable) {
+                    settableFuture.setException((Throwable) success);
+                    return;
+                }
+                settableFuture.set((Boolean) success);
+            }
+        }, actorSystem.dispatcher());
+        return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+    }
+
+
+    @Override
+    public Object getIdentifier() {
+        return this;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java
new file mode 100644 (file)
index 0000000..072f622
--- /dev/null
@@ -0,0 +1,175 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+/**
+ * ProxyWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
+ * {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor}.
+ */
+public class ProxyWriteTransaction implements DOMDataWriteTransaction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProxyWriteTransaction.class);
+
+    private final ActorRef masterTxActor;
+    private final RemoteDeviceId id;
+    private final ActorSystem actorSystem;
+    private final AtomicBoolean opened = new AtomicBoolean(true);
+    private final Timeout askTimeout;
+
+    /**
+     * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor} ref
+     * @param id            device id
+     * @param actorSystem   system
+     * @param askTimeout
+     */
+    public ProxyWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
+                                 final Timeout askTimeout) {
+        this.masterTxActor = masterTxActor;
+        this.id = id;
+        this.actorSystem = actorSystem;
+        this.askTimeout = askTimeout;
+    }
+
+    @Override
+    public boolean cancel() {
+        if (!opened.compareAndSet(true, false)) {
+            return false;
+        }
+        final Future<Object> cancelScalaFuture =
+                Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
+
+        LOG.trace("{}: Cancel {} via NETCONF", id);
+
+        try {
+            // here must be Await because AsyncWriteTransaction do not return future
+            return (boolean) Await.result(cancelScalaFuture, askTimeout.duration());
+        } catch (final Exception e) {
+            return false;
+        }
+    }
+
+    @Override
+    public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+        if (!opened.compareAndSet(true, false)) {
+            throw new IllegalStateException(id + ": Transaction" + getIdentifier() + " is closed");
+        }
+        final Future<Object> submitScalaFuture =
+                Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
+
+        LOG.trace("{}: Submit {} via NETCONF", id);
+
+        final SettableFuture<Void> settableFuture = SettableFuture.create();
+        submitScalaFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) { // ask timeout
+                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+                    settableFuture.setException(exception);
+                    return;
+                }
+                if (success instanceof Throwable) {
+                    settableFuture.setException((Throwable) success);
+                } else {
+                    if (success instanceof SubmitFailedReply) {
+                        LOG.error("{}: Transaction was not submitted because already closed.", id);
+                    }
+                    settableFuture.set(null);
+                }
+            }
+        }, actorSystem.dispatcher());
+
+        return Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
+            @Nullable
+            @Override
+            public TransactionCommitFailedException apply(@Nullable final Exception input) {
+                final String message = "Submit of transaction " + getIdentifier() + " failed";
+                return new TransactionCommitFailedException(message, input);
+            }
+        });
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+        LOG.trace("{}: Commit", id);
+
+        final CheckedFuture<Void, TransactionCommitFailedException> submit = submit();
+        return Futures.transform(submit, new Function<Void, RpcResult<TransactionStatus>>() {
+            @Nullable
+            @Override
+            public RpcResult<TransactionStatus> apply(@Nullable final Void input) {
+                return RpcResultBuilder.success(TransactionStatus.SUBMITED).build();
+            }
+        });
+    }
+
+    @Override
+    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
+        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
+        LOG.trace("{}: Delete {} via NETCONF: {}", id, store, identifier);
+        masterTxActor.tell(new DeleteRequest(store, identifier), ActorRef.noSender());
+    }
+
+    @Override
+    public void put(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
+                    final NormalizedNode<?, ?> data) {
+        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
+        final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data);
+        LOG.trace("{}: Put {} via NETCONF: {} with payload {}", id, store, identifier, data);
+        masterTxActor.tell(new PutRequest(store, msg), ActorRef.noSender());
+    }
+
+    @Override
+    public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
+                      final NormalizedNode<?, ?> data) {
+        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
+        final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data);
+        LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, identifier, data);
+        masterTxActor.tell(new MergeRequest(store, msg), ActorRef.noSender());
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return this;
+    }
+}
index 1b59e883989c8b6d3c21e2c3529e194b9741bbe1..39ac4dd718fd377019a6944de020d7ff882f5c93 100644 (file)
@@ -13,6 +13,7 @@ import java.io.File;
 import java.math.BigDecimal;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
+import org.opendaylight.controller.config.util.xml.DocumentedException;
 import java.util.Map;
 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
 import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemasResolverImpl;
@@ -218,4 +219,10 @@ public class NetconfTopologyUtils {
     public static InstanceIdentifier<Node> createTopologyNodePath(final String topologyId) {
         return createTopologyListPath(topologyId).child(Node.class);
     }
+
+    public static DocumentedException createMasterIsDownException(final RemoteDeviceId id) {
+        return new DocumentedException(id + ":Master is down. Please try again.",
+                DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_FAILED,
+                DocumentedException.ErrorSeverity.WARNING);
+    }
 }
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionReply.java
new file mode 100644 (file)
index 0000000..a787a9f
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import akka.actor.ActorRef;
+import java.io.Serializable;
+
+public class NewReadTransactionReply implements Serializable {
+
+    private final ActorRef txActor;
+
+    public NewReadTransactionReply(final ActorRef txActor) {
+        this.txActor = txActor;
+    }
+
+    public ActorRef getTxActor() {
+        return txActor;
+    }
+}
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -8,9 +8,8 @@
 
 package org.opendaylight.netconf.topology.singleton.messages.transactions;
 
-/**
- * A message sent to MountPoint leader to open new transaction
- */
-public class OpenTransaction implements TransactionRequest {
-    private static final long serialVersionUID = 1L;
+import java.io.Serializable;
+
+public class NewReadTransactionRequest implements Serializable {
+
 }
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionReply.java
new file mode 100644 (file)
index 0000000..8e356a8
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import akka.actor.ActorRef;
+import java.io.Serializable;
+
+public class NewWriteTransactionReply implements Serializable {
+
+    private final ActorRef txActor;
+
+    public NewWriteTransactionReply(final ActorRef txActor) {
+        this.txActor = txActor;
+    }
+
+    public ActorRef getTxActor() {
+        return txActor;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionRequest.java
new file mode 100644 (file)
index 0000000..8b7e914
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import java.io.Serializable;
+
+public class NewWriteTransactionRequest implements Serializable {
+
+}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTest.java
new file mode 100644 (file)
index 0000000..3f52fd7
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+public class ReadTransactionActorTest {
+
+    private static final YangInstanceIdentifier path = YangInstanceIdentifier.EMPTY;
+    private static final LogicalDatastoreType store = LogicalDatastoreType.CONFIGURATION;
+
+    @Mock
+    private DOMDataReadOnlyTransaction deviceReadTx;
+    private TestProbe probe;
+    private ActorSystem system;
+    private TestActorRef<ReadTransactionActor> actorRef;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        system = ActorSystem.apply();
+        probe = TestProbe.apply(system);
+        actorRef = TestActorRef.create(system, ReadTransactionActor.props(deviceReadTx), "testA");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system, null, true);
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        final ContainerNode node = Builders.containerBuilder()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+                .build();
+        when(deviceReadTx.read(store, path)).thenReturn(Futures.immediateCheckedFuture(Optional.of(node)));
+        actorRef.tell(new ReadRequest(store, path), probe.ref());
+        verify(deviceReadTx).read(store, path);
+        probe.expectMsgClass(NormalizedNodeMessage.class);
+    }
+
+    @Test
+    public void testReadEmpty() throws Exception {
+        when(deviceReadTx.read(store, path)).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
+        actorRef.tell(new ReadRequest(store, path), probe.ref());
+        verify(deviceReadTx).read(store, path);
+        probe.expectMsgClass(EmptyReadResponse.class);
+    }
+
+    @Test
+    public void testReadFailure() throws Exception {
+        final ReadFailedException cause = new ReadFailedException("fail");
+        when(deviceReadTx.read(store, path)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+        actorRef.tell(new ReadRequest(store, path), probe.ref());
+        verify(deviceReadTx).read(store, path);
+        probe.expectMsg(cause);
+    }
+
+    @Test
+    public void testExists() throws Exception {
+        when(deviceReadTx.exists(store, path)).thenReturn(Futures.immediateCheckedFuture(true));
+        actorRef.tell(new ExistsRequest(store, path), probe.ref());
+        verify(deviceReadTx).exists(store, path);
+        probe.expectMsg(true);
+    }
+
+    @Test
+    public void testExistsFailure() throws Exception {
+        final ReadFailedException cause = new ReadFailedException("fail");
+        when(deviceReadTx.exists(store, path)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+        actorRef.tell(new ExistsRequest(store, path), probe.ref());
+        verify(deviceReadTx).exists(store, path);
+        probe.expectMsg(cause);
+    }
+}
\ No newline at end of file
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTest.java
new file mode 100644 (file)
index 0000000..d95d158
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+public class WriteTransactionActorTest {
+    private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+    private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+    private static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
+
+    @Mock
+    private DOMDataWriteTransaction deviceWriteTx;
+    private TestProbe probe;
+    private ActorSystem system;
+    private TestActorRef<WriteTransactionActor> actorRef;
+    private NormalizedNode<?, ?> node;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        system = ActorSystem.apply();
+        probe = TestProbe.apply(system);
+        node = Builders.containerBuilder()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+                .build();
+        actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx), "testA");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system, null, true);
+    }
+
+    @Test
+    public void testPut() throws Exception {
+        final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
+        actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref());
+        verify(deviceWriteTx).put(STORE, PATH, node);
+    }
+
+    @Test
+    public void testMerge() throws Exception {
+        final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
+        actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref());
+        verify(deviceWriteTx).merge(STORE, PATH, node);
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref());
+        verify(deviceWriteTx).delete(STORE, PATH);
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        when(deviceWriteTx.cancel()).thenReturn(true);
+        final Future<Object> cancelFuture = Patterns.ask(actorRef, new CancelRequest(), TIMEOUT);
+        final Object result = Await.result(cancelFuture, TIMEOUT.duration());
+        Preconditions.checkState(result instanceof Boolean);
+        verify(deviceWriteTx).cancel();
+        Assert.assertTrue((Boolean) result);
+    }
+
+    @Test
+    public void testSubmit() throws Exception {
+        when(deviceWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
+        final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
+        final Object result = Await.result(submitFuture, TIMEOUT.duration());
+        Assert.assertTrue(result instanceof SubmitReply);
+        verify(deviceWriteTx).submit();
+    }
+
+    @Test
+    public void testSubmitFail() throws Exception {
+        final RpcError rpcError =
+                RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail");
+        final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError);
+        when(deviceWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+        final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
+        final Object result = Await.result(submitFuture, TIMEOUT.duration());
+        Assert.assertEquals(cause, result);
+        verify(deviceWriteTx).submit();
+    }
+
+}
\ No newline at end of file
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransactionTest.java
new file mode 100644 (file)
index 0000000..df4be27
--- /dev/null
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.config.util.xml.DocumentedException;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+public class ProxyReadTransactionTest {
+    private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+    private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+
+    private ActorSystem system;
+    private TestProbe masterActor;
+    private ContainerNode node;
+    private ProxyReadTransaction tx;
+
+    @Before
+    public void setUp() throws Exception {
+        system = ActorSystem.apply();
+        masterActor = new TestProbe(system);
+        final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
+        node = Builders.containerBuilder()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+                .build();
+        tx = new ProxyReadTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system, null, true);
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+        masterActor.expectMsgClass(ReadRequest.class);
+        masterActor.reply(new NormalizedNodeMessage(PATH, node));
+        final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
+        Assert.assertTrue(result.isPresent());
+        Assert.assertEquals(node, result.get());
+    }
+
+    @Test
+    public void testReadEmpty() throws Exception {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+        masterActor.expectMsgClass(ReadRequest.class);
+        masterActor.reply(new EmptyReadResponse());
+        final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
+        Assert.assertFalse(result.isPresent());
+    }
+
+    @Test(expected = ReadFailedException.class)
+    public void testReadFail() throws Exception {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+        masterActor.expectMsgClass(ReadRequest.class);
+        masterActor.reply(new RuntimeException("fail"));
+        read.checkedGet();
+    }
+
+    @Test
+    public void testExists() throws Exception {
+        final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+        masterActor.expectMsgClass(ExistsRequest.class);
+        masterActor.reply(true);
+        final Boolean result = read.checkedGet();
+        Assert.assertTrue(result);
+    }
+
+    @Test(expected = ReadFailedException.class)
+    public void testExistsFail() throws Exception {
+        final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+        masterActor.expectMsgClass(ExistsRequest.class);
+        masterActor.reply(new RuntimeException("fail"));
+        read.checkedGet();
+    }
+
+    @Test
+    public void testMasterDownRead() throws Exception {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+        masterActor.expectMsgClass(ReadRequest.class);
+        //master doesn't reply
+        try {
+            read.checkedGet();
+            Assert.fail("Exception should be thrown");
+        } catch (final ReadFailedException e) {
+            final Throwable cause = e.getCause();
+            Assert.assertTrue(cause instanceof DocumentedException);
+            final DocumentedException de = (DocumentedException) cause;
+            Assert.assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity());
+            Assert.assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
+            Assert.assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransactionTest.java
new file mode 100644 (file)
index 0000000..05ad6be
--- /dev/null
@@ -0,0 +1,178 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+public class ProxyWriteTransactionTest {
+    private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+    private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+
+    private ActorSystem system;
+    private TestProbe masterActor;
+    private ContainerNode node;
+    private ProxyWriteTransaction tx;
+
+    @Before
+    public void setUp() throws Exception {
+        system = ActorSystem.apply();
+        masterActor = new TestProbe(system);
+        final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
+        node = Builders.containerBuilder()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+                .build();
+        tx = new ProxyWriteTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system, null, true);
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
+        masterActor.expectMsgClass(CancelRequest.class);
+        masterActor.reply(true);
+        Assert.assertTrue(submit.get());
+    }
+
+    @Test
+    public void testCancelSubmitted() throws Exception {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        submitFuture.checkedGet();
+        final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
+        masterActor.expectNoMsg();
+        Assert.assertFalse(submit.get());
+    }
+
+    @Test
+    public void testSubmit() throws Exception {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        submitFuture.checkedGet();
+    }
+
+    @Test
+    public void testDoubleSubmit() throws Exception {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        submitFuture.checkedGet();
+        try {
+            tx.submit().checkedGet();
+            Assert.fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            masterActor.expectNoMsg();
+        }
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        final ListenableFuture<RpcResult<TransactionStatus>> submitFuture = tx.commit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        Assert.assertEquals(TransactionStatus.SUBMITED, submitFuture.get().getResult());
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        tx.delete(STORE, PATH);
+        masterActor.expectMsgClass(DeleteRequest.class);
+    }
+
+    @Test
+    public void testDeleteClosed() throws Exception {
+        submit();
+        try {
+            tx.delete(STORE, PATH);
+            Assert.fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            masterActor.expectNoMsg();
+        }
+    }
+
+    @Test
+    public void testPut() throws Exception {
+        tx.put(STORE, PATH, node);
+        masterActor.expectMsgClass(PutRequest.class);
+    }
+
+    @Test
+    public void testPutClosed() throws Exception {
+        submit();
+        try {
+            tx.put(STORE, PATH, node);
+            Assert.fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            masterActor.expectNoMsg();
+        }
+    }
+
+    @Test
+    public void testMerge() throws Exception {
+        tx.merge(STORE, PATH, node);
+        masterActor.expectMsgClass(MergeRequest.class);
+    }
+
+    @Test
+    public void testMergeClosed() throws Exception {
+        submit();
+        try {
+            tx.merge(STORE, PATH, node);
+            Assert.fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            masterActor.expectNoMsg();
+        }
+    }
+
+    @Test
+    public void testGetIdentifier() throws Exception {
+        Assert.assertEquals(tx, tx.getIdentifier());
+    }
+
+    private void submit() throws TransactionCommitFailedException {
+        final CheckedFuture<Void, TransactionCommitFailedException> submit = tx.submit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        submit.checkedGet();
+    }
+
+}
\ No newline at end of file
index 58422776f55b2a3329fef144d980581dc4b05858..7ff9869dac6402b49ada1f9862352ea025cdb054 100644 (file)
@@ -43,8 +43,7 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker;
+import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
@@ -68,13 +67,12 @@ public class ReadOnlyTransactionTest {
     public final ExpectedException exception = ExpectedException.none();
 
     private ActorRef masterRef;
-    private NetconfDOMDataBroker slaveDataBroker;
-    private DOMDataBroker masterDataBroker;
+    private ProxyDOMDataBroker slaveDataBroker;
     private List<SourceIdentifier> sourceIdentifiers;
-
+    @Mock
+    private DOMDataBroker deviceDataBroker;
     @Mock
     private DOMDataReadOnlyTransaction readTx;
-
     @Mock
     private DOMRpcService domRpcService;
 
@@ -95,32 +93,18 @@ public class ReadOnlyTransactionTest {
 
         sourceIdentifiers = Lists.newArrayList();
 
-        // Create master data broker
-
-        final DOMDataBroker delegateDataBroker = mock(DOMDataBroker.class);
+        //device read tx
         readTx = mock(DOMDataReadOnlyTransaction.class);
-
-        doReturn(readTx).when(delegateDataBroker).newReadOnlyTransaction();
-
-        final NetconfDOMTransaction masterDOMTransactions =
-                new NetconfMasterDOMTransaction(remoteDeviceId, delegateDataBroker);
-
-        masterDataBroker =
-                new NetconfDOMDataBroker(system, remoteDeviceId, masterDOMTransactions);
+        doReturn(readTx).when(deviceDataBroker).newReadOnlyTransaction();
 
         // Create slave data broker for testing proxy
-
-        final NetconfDOMTransaction proxyDOMTransactions =
-                new NetconfProxyDOMTransaction(remoteDeviceId, system, masterRef, TIMEOUT);
-
-        slaveDataBroker = new NetconfDOMDataBroker(system, remoteDeviceId, proxyDOMTransactions);
-
-
+        slaveDataBroker =
+                new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
     }
 
     @After
     public void teardown() {
-        JavaTestKit.shutdownActorSystem(system);
+        JavaTestKit.shutdownActorSystem(system, null, true);
         system = null;
     }
 
@@ -256,7 +240,7 @@ public class ReadOnlyTransactionTest {
 
     private void initializeDataTest() throws Exception {
         final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers,
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
                                 domRpcService), TIMEOUT);
 
         final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
index 2dc446118c97714c3ab6f5a3fffd706f38fe4118..3e86bd2572da3208bb965464d1dea8813a36637d 100644 (file)
@@ -15,7 +15,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.MockitoAnnotations.initMocks;
 import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
@@ -48,8 +48,7 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker;
+import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
@@ -73,13 +72,12 @@ public class WriteOnlyTransactionTest {
     public final ExpectedException exception = ExpectedException.none();
 
     private ActorRef masterRef;
-    private NetconfDOMDataBroker slaveDataBroker;
-    private DOMDataBroker masterDataBroker;
+    private ProxyDOMDataBroker slaveDataBroker;
     private List<SourceIdentifier> sourceIdentifiers;
-
+    @Mock
+    private DOMDataBroker deviceDataBroker;
     @Mock
     private DOMDataWriteTransaction writeTx;
-
     @Mock
     private DOMRpcService domRpcService;
 
@@ -100,40 +98,25 @@ public class WriteOnlyTransactionTest {
 
         sourceIdentifiers = Lists.newArrayList();
 
-        // Create master data broker
-
-        final DOMDataBroker delegateDataBroker = mock(DOMDataBroker.class);
         writeTx = mock(DOMDataWriteTransaction.class);
         final DOMDataReadOnlyTransaction readTx = mock(DOMDataReadOnlyTransaction.class);
 
-        doReturn(writeTx).when(delegateDataBroker).newWriteOnlyTransaction();
-        doReturn(readTx).when(delegateDataBroker).newReadOnlyTransaction();
-
-        final NetconfDOMTransaction masterDOMTransactions =
-                new NetconfMasterDOMTransaction(remoteDeviceId, delegateDataBroker);
-
-        masterDataBroker =
-                new NetconfDOMDataBroker(system, remoteDeviceId, masterDOMTransactions);
+        doReturn(writeTx).when(deviceDataBroker).newWriteOnlyTransaction();
+        doReturn(readTx).when(deviceDataBroker).newReadOnlyTransaction();
 
         // Create slave data broker for testing proxy
-
-        final NetconfDOMTransaction proxyDOMTransactions =
-                new NetconfProxyDOMTransaction(remoteDeviceId, system, masterRef, TIMEOUT);
-
-        slaveDataBroker = new NetconfDOMDataBroker(system, remoteDeviceId, proxyDOMTransactions);
-
-
+        slaveDataBroker =
+                new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
     }
 
     @After
     public void teardown() {
-        JavaTestKit.shutdownActorSystem(system);
+        JavaTestKit.shutdownActorSystem(system, null, true);
         system = null;
     }
 
     @Test
-    public void testPutMergeDeleteCalls() throws Exception {
-
+    public void testPut() throws Exception {
         /* Initialize data on master */
 
         initializeDataTest();
@@ -148,29 +131,56 @@ public class WriteOnlyTransactionTest {
 
         doNothing().when(writeTx).put(storeType, instanceIdentifier, testNode);
 
-        DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         wTx.put(storeType, instanceIdentifier, testNode);
 
-        verify(writeTx, times(1)).put(storeType, instanceIdentifier, testNode);
+        verify(writeTx, timeout(2000)).put(storeType, instanceIdentifier, testNode);
 
         wTx.cancel();
+
+    }
+
+    @Test
+    public void testMerge() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
+
+        final YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier.EMPTY;
+        final LogicalDatastoreType storeType = LogicalDatastoreType.CONFIGURATION;
+        final NormalizedNode<?, ?> testNode = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname")))
+                .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build();
         // Test of invoking merge on master through slave proxy
 
         doNothing().when(writeTx).merge(storeType, instanceIdentifier, testNode);
-        wTx = slaveDataBroker.newWriteOnlyTransaction();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         wTx.merge(storeType, instanceIdentifier, testNode);
 
-        verify(writeTx, times(1)).merge(storeType, instanceIdentifier, testNode);
+        verify(writeTx, timeout(2000)).merge(storeType, instanceIdentifier, testNode);
 
         wTx.cancel();
+
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
+
+        final YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier.EMPTY;
+        final LogicalDatastoreType storeType = LogicalDatastoreType.CONFIGURATION;
         // Test of invoking delete on master through slave proxy
 
         doNothing().when(writeTx).delete(storeType, instanceIdentifier);
-        wTx = slaveDataBroker.newWriteOnlyTransaction();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         wTx.delete(storeType, instanceIdentifier);
         wTx.cancel();
 
-        verify(writeTx, times(1)).delete(storeType, instanceIdentifier);
+        verify(writeTx, timeout(2000)).delete(storeType, instanceIdentifier);
 
     }
 
@@ -183,8 +193,8 @@ public class WriteOnlyTransactionTest {
 
         // Without Tx
 
-        DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
-        final CheckedFuture<Void,TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
+        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
         doReturn(resultSubmit).when(writeTx).submit();
 
         final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse = wTx.submit();
@@ -192,14 +202,21 @@ public class WriteOnlyTransactionTest {
         final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
 
         assertNull(result);
+    }
 
+    @Test
+    public void testSubmitWithOperation() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
         // With Tx
-        wTx = slaveDataBroker.newWriteOnlyTransaction();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         doNothing().when(writeTx).delete(any(), any());
         wTx.delete(LogicalDatastoreType.CONFIGURATION,
                 YangInstanceIdentifier.EMPTY);
 
-        final CheckedFuture<Void,TransactionCommitFailedException> resultSubmitTx = Futures.immediateCheckedFuture(null);
+        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTx = Futures.immediateCheckedFuture(null);
         doReturn(resultSubmitTx).when(writeTx).submit();
 
         final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse = wTx.submit();
@@ -207,8 +224,15 @@ public class WriteOnlyTransactionTest {
         final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
 
         assertNull(resultTx);
+    }
 
-        wTx = slaveDataBroker.newWriteOnlyTransaction();
+    @Test
+    public void testSubmitFail() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         wTx.delete(LogicalDatastoreType.CONFIGURATION,
                 YangInstanceIdentifier.EMPTY);
 
@@ -233,14 +257,22 @@ public class WriteOnlyTransactionTest {
         initializeDataTest();
 
         // Without Tx
-
-        DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
+        doReturn(true).when(writeTx).cancel();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         final Boolean resultFalseNoTx = wTx.cancel();
-        assertEquals(false, resultFalseNoTx);
+        assertEquals(true, resultFalseNoTx);
+    }
+
+    @Test
+    public void testCancelWithOperation() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
 
         // With Tx, readWriteTx test
 
-        wTx = slaveDataBroker.newWriteOnlyTransaction();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         doNothing().when(writeTx).delete(any(), any());
         wTx.delete(LogicalDatastoreType.CONFIGURATION,
                 YangInstanceIdentifier.EMPTY);
@@ -249,8 +281,6 @@ public class WriteOnlyTransactionTest {
         final Boolean resultTrue = wTx.cancel();
         assertEquals(true, resultTrue);
 
-        doReturn(false).when(writeTx).cancel();
-
         final Boolean resultFalse = wTx.cancel();
         assertEquals(false, resultFalse);
 
@@ -258,7 +288,7 @@ public class WriteOnlyTransactionTest {
 
     private void initializeDataTest() throws Exception {
         final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers,
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
                                 domRpcService), TIMEOUT);
 
         final Object success = Await.result(initialDataToActor, TIMEOUT.duration());