Change handling of netconf cluster transactions 97/55797/1
authorAndrej Mak <andrej.mak@pantheon.tech>
Fri, 24 Feb 2017 07:58:01 +0000 (08:58 +0100)
committerJakub Morvay <jmorvay@cisco.com>
Fri, 21 Apr 2017 09:35:32 +0000 (09:35 +0000)
Own actor is created on master for every transaction.
Proxy transactions use this actor for mount point access.
Creation of proxy transaction initiates sending of message
to the master node actor. Master node actor starts
transaciton actor if needed.
Both master and slaves communicate with device via
proxy transactions.

For write transaction, submit and cancel cause actor stop.
There can be single instance of read actor, which can be
reused for all transactions.

Change-Id: Iea39d794e8f2026b76f049409a718b6d94f41649
Signed-off-by: Andrej Mak <andrej.mak@pantheon.tech>
(cherry picked from commit 446738525190ad92736f1a8868e3263737276aff)

27 files changed:
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/NetconfDOMTransaction.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteOperationTxProcessor.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfDOMDataBroker.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBroker.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActor.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActor.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfMasterDOMTransaction.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfReadOnlyTransaction.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfWriteOnlyTransaction.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologyUtils.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionReply.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionRequest.java [moved from netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/OpenTransaction.java with 54% similarity]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionReply.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionRequest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransactionTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransactionTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java

diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/NetconfDOMTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/NetconfDOMTransaction.java
deleted file mode 100644 (file)
index 58fe9b3..0000000
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.api;
-
-import com.google.common.base.Optional;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import scala.concurrent.Future;
-
-/**
- * Provides API for all operations of read and write transactions
- */
-// TODO we should separate between read tx and write tx
-public interface NetconfDOMTransaction {
-
-
-    /**
-     * Opens a new transaction. Transactions have to be opened before applying
-     * any operations on them. Previous transaction has to be either submitted
-     * ({@link #submit()} was invoked) or canceled ({@link #cancel()} was
-     * invoked.
-     *
-     * @throws IllegalStateException
-     *             if the previous transaction was not SUBMITTED or CANCELLED.
-     */
-    void openTransaction();
-
-    /**
-     * Read data from particular data-store
-     * @param store data-store type
-     * @param path unique identifier of a particular node instance in the data tree
-     * @return result as future
-     */
-    Future<Optional<NormalizedNodeMessage>> read(LogicalDatastoreType store, YangInstanceIdentifier path);
-
-    /**
-     * Test existence of node in certain data-store
-     * @param store data-store type
-     * @param path unique identifier of a particular node instance in the data tree
-     * @return result as future
-     */
-    Future<Boolean> exists(LogicalDatastoreType store, YangInstanceIdentifier path);
-
-    /**
-     * Put data to particular data-store
-     * @param store data-store type
-     * @param data data for inserting included in NormalizedNodeMessage object
-     */
-    void put(LogicalDatastoreType store, NormalizedNodeMessage data);
-
-    /**
-     * Merge data with existing node in particular data-store
-     * @param store data-store type
-     * @param data data for merging included in NormalizedNodeMessage object
-     */
-    void merge(LogicalDatastoreType store, NormalizedNodeMessage data);
-
-    /**
-     * Delete node in particular data-store in path
-     * @param store data-store type
-     * @param path unique identifier of a particular node instance in the data tree
-     */
-    void delete(LogicalDatastoreType store, YangInstanceIdentifier path);
-
-    /**
-     * Cancel operation
-     * @return success or not
-     */
-    boolean cancel();
-
-    /**
-     * Commit opened transaction.
-     * @return void or raised exception
-     */
-    Future<Void> submit();
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteOperationTxProcessor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteOperationTxProcessor.java
deleted file mode 100644 (file)
index 5d1c893..0000000
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.api;
-
-import akka.actor.ActorRef;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-/**
- * Provides API for remote calling operations of transactions. Slave sends message of particular
- * operation to master and master performs it.
- */
-public interface RemoteOperationTxProcessor {
-
-    /**
-     * Opens a new transaction.
-     */
-    void doOpenTransaction(ActorRef recipient, ActorRef sender);
-
-    /**
-     * Delete node in particular data-store in path
-     * @param store data-store type
-     * @param path unique identifier of a particular node instance in the data tree
-     */
-    void doDelete(LogicalDatastoreType store, YangInstanceIdentifier path);
-
-    /**
-     * Commit opened transaction.
-     * @param recipient recipient of submit result
-     * @param sender sender of submit result
-     */
-    void doSubmit(ActorRef recipient, ActorRef sender);
-
-    /**
-     * Cancel operation
-     * @param recipient recipient of cancel result
-     * @param sender sender of cancel result
-     */
-    void doCancel(ActorRef recipient, ActorRef sender);
-
-    /**
-     * Put data to particular data-store
-     * @param store data-store type
-     * @param data data for inserting included in NormalizedNodeMessage object
-     */
-    void doPut(LogicalDatastoreType store, NormalizedNodeMessage data);
-
-    /**
-     * Merge data with existing node in particular data-store
-     * @param store data-store type
-     * @param data data for merging included in NormalizedNodeMessage object
-     */
-    void doMerge(LogicalDatastoreType store, NormalizedNodeMessage data);
-
-    /**
-     * Read data from particular data-store
-     * @param store data-store type
-     * @param path unique identifier of a particular node instance in the data tree
-     * @param recipient recipient of read result
-     * @param sender sender of read result
-     */
-    void doRead(LogicalDatastoreType store, YangInstanceIdentifier path, ActorRef recipient, ActorRef sender);
-
-    /**
-     * Test existence of node in certain data-store
-     * @param store data-store type
-     * @param path unique identifier of a particular node instance in the data tree
-     * @param recipient recipient of exists result
-     * @param sender sender of exists result
-     */
-    void doExists(LogicalDatastoreType store, YangInstanceIdentifier path, ActorRef recipient, ActorRef sender);
-
-}
index 96a6cb1d291eb5422fc54f1635946da12e694bdb..1087df67f3b9947845b45c575252e081efc3d2be 100644 (file)
@@ -26,12 +26,10 @@ import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalProvider;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfMasterDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfProxyDOMTransaction;
 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
 import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -137,15 +135,11 @@ class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessi
 
         LOG.info("{}: Creating master data broker for device", id);
 
-        final NetconfDOMTransaction masterDOMTransactions =
-                new NetconfMasterDOMTransaction(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences);
-        deviceDataBroker =
-                new NetconfDOMDataBroker(actorSystem, id, masterDOMTransactions);
-        // We need to create NetconfProxyDOMTransaction so accessing mountpoint
+        deviceDataBroker = new NetconfDeviceDataBroker(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences);
+        // We need to create ProxyDOMDataBroker so accessing mountpoint
         // on leader node would be same as on follower node
-        final NetconfDOMTransaction proxyDOMTransation =
-                new NetconfProxyDOMTransaction(id, actorSystem, masterActorRef, actorResponseWaitTime);
-        final NetconfDOMDataBroker proxyDataBroker = new NetconfDOMDataBroker(actorSystem, id, proxyDOMTransation);
+        final ProxyDOMDataBroker proxyDataBroker =
+                new ProxyDOMDataBroker(actorSystem, id, masterActorRef, actorResponseWaitTime);
         salProvider.getMountInstance()
                 .onTopologyDeviceConnected(remoteSchemaContext, proxyDataBroker, deviceRpc, notificationService);
     }
@@ -154,7 +148,7 @@ class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessi
         final List<SourceIdentifier> sourceIdentifiers =
                 remoteSchemaContext.getAllModuleIdentifiers().stream().map(mi ->
                         RevisionSourceIdentifier.create(mi.getName(),
-                            (SimpleDateFormatUtil.DEFAULT_DATE_REV == mi.getRevision() ? Optional.<String>absent() :
+                                (SimpleDateFormatUtil.DEFAULT_DATE_REV == mi.getRevision() ? Optional.<String>absent() :
                                     Optional.of(mi.getQNameModule().getFormattedRevision()))))
                         .collect(Collectors.toList());
 
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfDOMDataBroker.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfDOMDataBroker.java
deleted file mode 100644 (file)
index 4fadb08..0000000
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl;
-
-import akka.actor.ActorSystem;
-import java.util.Collections;
-import java.util.Map;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfReadOnlyTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfWriteOnlyTransaction;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-public class NetconfDOMDataBroker implements DOMDataBroker {
-
-    private final RemoteDeviceId id;
-    private final NetconfDOMTransaction masterDataBroker;
-    private final ActorSystem actorSystem;
-
-    public NetconfDOMDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
-                         final NetconfDOMTransaction masterDataBroker) {
-        this.id = id;
-        this.masterDataBroker = masterDataBroker;
-        this.actorSystem = actorSystem;
-    }
-
-    @Override
-    public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
-        return new NetconfReadOnlyTransaction(id, actorSystem, masterDataBroker);
-    }
-
-    @Override
-    public DOMDataReadWriteTransaction newReadWriteTransaction() {
-        return new ReadWriteTx(new NetconfReadOnlyTransaction(id, actorSystem, masterDataBroker),
-                new NetconfWriteOnlyTransaction(id, actorSystem, masterDataBroker));
-    }
-
-    @Override
-    public DOMDataWriteTransaction newWriteOnlyTransaction() {
-        return new NetconfWriteOnlyTransaction(id, actorSystem, masterDataBroker);
-    }
-
-    @Override
-    public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(
-            LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener,
-            DataChangeScope triggeringScope) {
-        throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
-    }
-
-    @Override
-    public DOMTransactionChain createTransactionChain(TransactionChainListener listener) {
-        throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
-    }
-
-    @Nonnull
-    @Override
-    public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
-        return Collections.emptyMap();
-    }
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBroker.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBroker.java
new file mode 100644 (file)
index 0000000..416d9bd
--- /dev/null
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyReadTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+public class ProxyDOMDataBroker implements DOMDataBroker {
+
+    private final Timeout askTimeout;
+    private final RemoteDeviceId id;
+    private final ActorRef masterNode;
+    private final ActorSystem actorSystem;
+
+    /**
+     * @param actorSystem system
+     * @param id          id
+     * @param masterNode  {@link org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor} ref
+     * @param askTimeout ask timeout
+     */
+    public ProxyDOMDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
+                              final ActorRef masterNode, final Timeout askTimeout) {
+        this.id = id;
+        this.masterNode = masterNode;
+        this.actorSystem = actorSystem;
+        this.askTimeout = askTimeout;
+    }
+
+    @Override
+    public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+        final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewReadTransactionRequest(), askTimeout);
+        try {
+            final Object msg = Await.result(txActorFuture, askTimeout.duration());
+            if (msg instanceof Throwable) {
+                throw (Throwable) msg;
+            }
+            Preconditions.checkState(msg instanceof NewReadTransactionReply);
+            final NewReadTransactionReply reply = (NewReadTransactionReply) msg;
+            return new ProxyReadTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+        } catch (final Throwable t) {
+            throw new IllegalStateException("Can't create ProxyReadTransaction", t);
+        }
+    }
+
+    @Override
+    public DOMDataReadWriteTransaction newReadWriteTransaction() {
+        return new ReadWriteTx(newReadOnlyTransaction(), newWriteOnlyTransaction());
+    }
+
+    @Override
+    public DOMDataWriteTransaction newWriteOnlyTransaction() {
+        final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewWriteTransactionRequest(), askTimeout);
+        try {
+            final Object msg = Await.result(txActorFuture, askTimeout.duration());
+            if (msg instanceof Throwable) {
+                throw (Throwable) msg;
+            }
+            Preconditions.checkState(msg instanceof NewWriteTransactionReply);
+            final NewWriteTransactionReply reply = (NewWriteTransactionReply) msg;
+            return new ProxyWriteTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+        } catch (final Throwable t) {
+            throw new IllegalStateException("Can't create ProxyWriteTransaction", t);
+        }
+    }
+
+    @Override
+    public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(
+            final LogicalDatastoreType store, final YangInstanceIdentifier path, final DOMDataChangeListener listener,
+            final DataChangeScope triggeringScope) {
+        throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
+    }
+
+    @Override
+    public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+        throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
+    }
+
+    @Nonnull
+    @Override
+    public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+        return Collections.emptyMap();
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java
deleted file mode 100644 (file)
index f19d0d5..0000000
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl;
-
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RemoteOperationTxProcessorImpl implements RemoteOperationTxProcessor, AutoCloseable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RemoteOperationTxProcessorImpl.class);
-
-    private final DOMDataBroker dataBroker;
-    private final RemoteDeviceId id;
-    private DOMDataWriteTransaction writeTx;
-    private DOMDataReadOnlyTransaction readTx;
-
-    private ActorRef currentUser = null;
-
-    public RemoteOperationTxProcessorImpl(final DOMDataBroker dataBroker, final RemoteDeviceId id) {
-        this.dataBroker = dataBroker;
-        this.id = id;
-        this.readTx = dataBroker.newReadOnlyTransaction();
-    }
-
-    @Override
-    public void doOpenTransaction(ActorRef recipient, ActorRef sender) {
-        if (currentUser != null) {
-            LOG.error("{}: Opening a new transaction for {} failed.", id, recipient);
-            recipient.tell(new Status.Failure(
-                    new IllegalStateException("Transaction is already opened for another user")), recipient);
-            return;
-        }
-
-        LOG.debug("{}: Opening a new transaction for {}", id, recipient);
-        currentUser = recipient;
-        recipient.tell(new Status.Success(null), sender);
-    }
-
-    @Override
-    public void doDelete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
-        if (writeTx == null) {
-            writeTx = dataBroker.newWriteOnlyTransaction();
-        }
-        writeTx.delete(store, path);
-    }
-
-    @Override
-    public void doSubmit(final ActorRef recipient, final ActorRef sender) {
-        currentUser = null;
-        if (writeTx != null) {
-            CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
-            Futures.addCallback(submitFuture, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess(Void result) {
-                    recipient.tell(new SubmitReply(), sender);
-                }
-
-                @Override
-                public void onFailure(@Nonnull Throwable throwable) {
-                    recipient.tell(throwable, sender);
-                }
-            });
-        } else {
-            recipient.tell(new SubmitFailedReply(), sender);
-            LOG.warn("{}: Couldn't submit transaction because it was already closed.", id);
-        }
-    }
-
-    @Override
-    public void doCancel(final ActorRef recipient, final ActorRef sender) {
-        currentUser = null;
-        boolean cancel = false;
-        if (writeTx != null) {
-            cancel = writeTx.cancel();
-        }
-        recipient.tell(cancel, sender);
-
-    }
-
-    @Override
-    public void doPut(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
-        if (writeTx == null) {
-            writeTx = dataBroker.newWriteOnlyTransaction();
-        }
-        writeTx.put(store, data.getIdentifier(), data.getNode());
-    }
-
-    @Override
-    public void doMerge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
-        if (writeTx == null) {
-            writeTx = dataBroker.newWriteOnlyTransaction();
-        }
-        writeTx.merge(store, data.getIdentifier(), data.getNode());
-    }
-
-    @Override
-    public void doRead(final LogicalDatastoreType store, final YangInstanceIdentifier path, final ActorRef recipient,
-                       final ActorRef sender) {
-        final CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> readFuture =
-                readTx.read(store, path);
-
-        Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
-
-            @Override
-            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
-                if (!result.isPresent()) {
-                    recipient.tell(new EmptyReadResponse(), sender);
-                    return;
-                }
-                recipient.tell(new NormalizedNodeMessage(path, result.get()), sender);
-            }
-
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                recipient.tell(throwable, sender);
-            }
-        });
-    }
-
-    @Override
-    public void doExists(final LogicalDatastoreType store, final YangInstanceIdentifier path, final ActorRef recipient,
-                         final ActorRef sender) {
-        final CheckedFuture<Boolean, ReadFailedException> readFuture =
-                readTx.exists(store, path);
-        Futures.addCallback(readFuture, new FutureCallback<Boolean>() {
-            @Override
-            public void onSuccess(final Boolean result) {
-                if (result == null) {
-                    recipient.tell(false, sender);
-                } else {
-                    recipient.tell(result, sender);
-                }
-            }
-
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                recipient.tell(throwable, sender);
-            }
-        });
-    }
-
-    @Override
-    public void close() throws Exception {
-        currentUser = null;
-        if (readTx != null) {
-            readTx.close();
-        }
-    }
-}
index fd1c28a0ade3aecedd083b572263c0ac943d2ed7..92d7e1a36ac6f8228075935ae6d32d0515c2a366 100644 (file)
@@ -16,8 +16,6 @@ import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalProvider;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfProxyDOMTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,11 +51,8 @@ public class SlaveSalFacade {
                                         final ActorRef masterActorRef) {
         final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
 
-        final NetconfDOMTransaction proxyDOMTransactions =
-                new NetconfProxyDOMTransaction(id, actorSystem, masterActorRef, actorResponseWaitTime);
-
-        final NetconfDOMDataBroker netconfDeviceDataBroker =
-                new NetconfDOMDataBroker(actorSystem, id, proxyDOMTransactions);
+        final ProxyDOMDataBroker netconfDeviceDataBroker =
+                new ProxyDOMDataBroker(actorSystem, id, masterActorRef, actorResponseWaitTime);
 
         salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, netconfDeviceDataBroker,
                 deviceRpc, notificationService);
index ba8f11fdc07d69fb006d0db31de6ebf5f6fb9f4a..9828e2476fcf7931295fbad29943b067e623548f 100644 (file)
@@ -22,14 +22,15 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
-import org.opendaylight.netconf.topology.singleton.impl.RemoteOperationTxProcessorImpl;
 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
@@ -42,16 +43,11 @@ import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMount
 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.OpenTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
@@ -76,11 +72,13 @@ public class NetconfNodeActor extends UntypedActor {
     private final SchemaSourceRegistry schemaRegistry;
     private final SchemaRepository schemaRepository;
 
-    private RemoteOperationTxProcessor operationsProcessor;
     private List<SourceIdentifier> sourceIdentifiers;
     private DOMRpcService deviceRpc;
     private SlaveSalFacade slaveSalManager;
     private final Timeout actorResponseWaitTime;
+    private DOMDataBroker deviceDataBroker;
+    //readTxActor can be shared
+    private ActorRef readTxActor;
 
     public static Props props(final NetconfTopologySetup setup,
                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
@@ -103,11 +101,12 @@ public class NetconfNodeActor extends UntypedActor {
     public void onReceive(final Object message) throws Exception {
         if (message instanceof CreateInitialMasterActorData) { // master
 
-            sourceIdentifiers = ((CreateInitialMasterActorData) message).getSourceIndentifiers();
-            operationsProcessor =
-                    new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
-                            id);
-            this.deviceRpc = ((CreateInitialMasterActorData) message).getDeviceRpc();
+            final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
+            sourceIdentifiers = masterActorData.getSourceIndentifiers();
+            this.deviceDataBroker = masterActorData.getDeviceDataBroker();
+            final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
+            readTxActor = context().actorOf(ReadTransactionActor.props(tx));
+            this.deviceRpc = masterActorData.getDeviceRpc();
 
             sender().tell(new MasterActorDataInitialized(), self());
 
@@ -118,21 +117,30 @@ public class NetconfNodeActor extends UntypedActor {
             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
             sender().tell(new MasterActorDataInitialized(), self());
         } else if (message instanceof AskForMasterMountPoint) { // master
-            // only master contains reference to operations processor
-            if (operationsProcessor != null) {
+            // only master contains reference to deviceDataBroker
+            if (deviceDataBroker != null) {
                 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
             }
 
-        } else if (message instanceof TransactionRequest) { // master
-
-            resolveProxyCalls(message, sender(), getSelf());
-
         } else if (message instanceof YangTextSchemaSourceRequest) { // master
 
             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
 
-        } else if (message instanceof InvokeRpcMessage) {
+        } else if (message instanceof NewReadTransactionRequest) { // master
+
+            sender().tell(new NewReadTransactionReply(readTxActor), self());
+
+        } else if (message instanceof NewWriteTransactionRequest) { // master
+            try {
+                final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
+                final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx));
+                sender().tell(new NewWriteTransactionReply(txActor), self());
+            } catch (final Throwable t) {
+                sender().tell(t, self());
+            }
+
+        } else if (message instanceof InvokeRpcMessage) { // master
 
             final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
@@ -151,43 +159,6 @@ public class NetconfNodeActor extends UntypedActor {
         }
     }
 
-    private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) {
-        if (message instanceof OpenTransaction) {
-            operationsProcessor.doOpenTransaction(recipient, futureSender);
-        } else if (message instanceof ReadRequest) {
-
-            final ReadRequest readRequest = (ReadRequest) message;
-            operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
-
-        } else if (message instanceof ExistsRequest) {
-
-            final ExistsRequest readRequest = (ExistsRequest) message;
-            operationsProcessor.doExists(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
-
-        } else if (message instanceof MergeRequest) {
-
-            final MergeRequest mergeRequest = (MergeRequest) message;
-            operationsProcessor.doMerge(mergeRequest.getStore(), mergeRequest.getNormalizedNodeMessage());
-
-        } else if (message instanceof PutRequest) {
-
-            final PutRequest putRequest = (PutRequest) message;
-            operationsProcessor.doPut(putRequest.getStore(), putRequest.getNormalizedNodeMessage());
-
-        } else if (message instanceof DeleteRequest) {
-
-            final DeleteRequest deleteRequest = (DeleteRequest) message;
-            operationsProcessor.doDelete(deleteRequest.getStore(), deleteRequest.getPath());
-
-        } else if (message instanceof CancelRequest) {
-
-            operationsProcessor.doCancel(recipient, futureSender);
-
-        } else if (message instanceof SubmitRequest) {
-
-            operationsProcessor.doSubmit(recipient, futureSender);
-        }
-    }
 
     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
         final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActor.java
new file mode 100644 (file)
index 0000000..b6bf651
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * ReadTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
+ */
+public class ReadTransactionActor extends UntypedActor {
+
+    private final DOMDataReadOnlyTransaction tx;
+
+    /**
+     * Creates new actor Props.
+     *
+     * @param tx delegate device read transaction
+     * @return props
+     */
+    static Props props(final DOMDataReadOnlyTransaction tx) {
+        return Props.create(ReadTransactionActor.class, () -> new ReadTransactionActor(tx));
+    }
+
+    private ReadTransactionActor(final DOMDataReadOnlyTransaction tx) {
+        this.tx = tx;
+    }
+
+    @Override
+    public void onReceive(final Object message) throws Throwable {
+        if (message instanceof ReadRequest) {
+
+            final ReadRequest readRequest = (ReadRequest) message;
+            final YangInstanceIdentifier path = readRequest.getPath();
+            final LogicalDatastoreType store = readRequest.getStore();
+            read(path, store, sender(), self());
+
+        } else if (message instanceof ExistsRequest) {
+            final ExistsRequest readRequest = (ExistsRequest) message;
+            final YangInstanceIdentifier path = readRequest.getPath();
+            final LogicalDatastoreType store = readRequest.getStore();
+            exists(path, store, sender(), self());
+
+        } else {
+            unhandled(message);
+        }
+    }
+
+    private void read(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
+                      final ActorRef self) {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(store, path);
+        Futures.addCallback(read, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+
+            @Override
+            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
+                if (!result.isPresent()) {
+                    sender.tell(new EmptyReadResponse(), self);
+                    return;
+                }
+                sender.tell(new NormalizedNodeMessage(path, result.get()), self);
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                sender.tell(throwable, self);
+            }
+        });
+    }
+
+    private void exists(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
+                        final ActorRef self) {
+        final CheckedFuture<Boolean, ReadFailedException> readFuture = tx.exists(store, path);
+        Futures.addCallback(readFuture, new FutureCallback<Boolean>() {
+            @Override
+            public void onSuccess(final Boolean result) {
+                if (result == null) {
+                    sender.tell(false, self);
+                } else {
+                    sender.tell(result, self);
+                }
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                sender.tell(throwable, self);
+            }
+        });
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActor.java
new file mode 100644 (file)
index 0000000..5350b64
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+
+/**
+ * WriteTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
+ */
+public class WriteTransactionActor extends UntypedActor {
+
+    private final DOMDataWriteTransaction tx;
+
+    /**
+     * Creates new actor Props.
+     *
+     * @param tx delegate device write transaction
+     * @return props
+     */
+    static Props props(final DOMDataWriteTransaction tx) {
+        return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx));
+    }
+
+    private WriteTransactionActor(final DOMDataWriteTransaction tx) {
+        this.tx = tx;
+    }
+
+    @Override
+    public void onReceive(final Object message) throws Throwable {
+        if (message instanceof MergeRequest) {
+            final MergeRequest mergeRequest = (MergeRequest) message;
+            final NormalizedNodeMessage data = mergeRequest.getNormalizedNodeMessage();
+            tx.merge(mergeRequest.getStore(), data.getIdentifier(), data.getNode());
+        } else if (message instanceof PutRequest) {
+            final PutRequest putRequest = (PutRequest) message;
+            final NormalizedNodeMessage data = putRequest.getNormalizedNodeMessage();
+            tx.put(putRequest.getStore(), data.getIdentifier(), data.getNode());
+        } else if (message instanceof DeleteRequest) {
+            final DeleteRequest deleteRequest = (DeleteRequest) message;
+            tx.delete(deleteRequest.getStore(), deleteRequest.getPath());
+        } else if (message instanceof CancelRequest) {
+            cancel();
+        } else if (message instanceof SubmitRequest) {
+            submit(sender(), self());
+        } else {
+            unhandled(message);
+        }
+    }
+
+    private void cancel() {
+        final boolean cancelled = tx.cancel();
+        sender().tell(cancelled, self());
+        context().stop(self());
+    }
+
+    private void submit(final ActorRef requester, final ActorRef self) {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+        context().stop(self);
+        Futures.addCallback(submitFuture, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                requester.tell(new SubmitReply(), self);
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                requester.tell(throwable, self);
+            }
+        });
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfMasterDOMTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfMasterDOMTransaction.java
deleted file mode 100644 (file)
index 269ee2f..0000000
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.impl.Promise.DefaultPromise;
-
-public class NetconfMasterDOMTransaction implements NetconfDOMTransaction {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfMasterDOMTransaction.class);
-
-    private final RemoteDeviceId id;
-    private final DOMDataBroker delegateBroker;
-
-    private DOMDataReadOnlyTransaction readTx;
-    private DOMDataWriteTransaction writeTx;
-
-    public NetconfMasterDOMTransaction(final RemoteDeviceId id,
-                                       final SchemaContext schemaContext,
-                                       final DOMRpcService rpc,
-                                       final NetconfSessionPreferences netconfSessionPreferences) {
-        this(id, new NetconfDeviceDataBroker(id, schemaContext, rpc, netconfSessionPreferences));
-    }
-
-    public NetconfMasterDOMTransaction(final RemoteDeviceId id, final DOMDataBroker delegateBroker) {
-        this.id = id;
-        this.delegateBroker = delegateBroker;
-
-        // only ever need 1 readTx since it doesnt need to be closed
-        readTx = delegateBroker.newReadOnlyTransaction();
-    }
-
-    @Override
-    public void openTransaction() {
-        // TODO We don't have to do anything here since
-        // NetconfProxyDOMTransactions and RemoteOperationTxProcessor do all
-        // the work regarding opening transactions. But maybe we should check
-        // for open transaction here instead in RemoteOperationTxProcessor
-    }
-
-    @Override
-    public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
-                                                        final YangInstanceIdentifier path) {
-        LOG.trace("{}: Read[{}] {} via NETCONF: {}", id, readTx.getIdentifier(), store, path);
-
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture = readTx.read(store, path);
-
-        final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
-        Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
-            @Override
-            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
-                if (!result.isPresent()) {
-                    promise.success(Optional.absent());
-                } else {
-                    promise.success(Optional.of(new NormalizedNodeMessage(path, result.get())));
-                }
-            }
-
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                promise.failure(throwable);
-            }
-        });
-        return promise.future();
-    }
-
-    @Override
-    public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
-        LOG.trace("{}: Exists[{}] {} via NETCONF: {}", id, readTx.getIdentifier(), store, path);
-
-        final CheckedFuture<Boolean, ReadFailedException> existsFuture = readTx.exists(store, path);
-
-        final DefaultPromise<Boolean> promise = new DefaultPromise<>();
-        Futures.addCallback(existsFuture, new FutureCallback<Boolean>() {
-            @Override
-            public void onSuccess(final Boolean result) {
-                promise.success(result);
-            }
-
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                promise.failure(throwable);
-            }
-        });
-        return promise.future();
-    }
-
-    @Override
-    public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
-        if (writeTx == null) {
-            writeTx = delegateBroker.newWriteOnlyTransaction();
-        }
-
-        LOG.trace("{}: Write[{}] {} via NETCONF: {} with payload {}", id, writeTx.getIdentifier(), store,
-                data.getIdentifier(), data.getNode());
-
-        writeTx.put(store, data.getIdentifier(), data.getNode());
-    }
-
-    @Override
-    public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
-        if (writeTx == null) {
-            writeTx = delegateBroker.newWriteOnlyTransaction();
-        }
-
-        LOG.trace("{}: Merge[{}] {} via NETCONF: {} with payload {}", id, writeTx.getIdentifier(),store,
-                data.getIdentifier(), data.getNode());
-
-        writeTx.merge(store, data.getIdentifier(), data.getNode());
-    }
-
-    @Override
-    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
-        if (writeTx == null) {
-            writeTx = delegateBroker.newWriteOnlyTransaction();
-        }
-
-        LOG.trace("{}: Delete[{}} {} via NETCONF: {}", id, writeTx.getIdentifier(), store, path);
-
-        writeTx.delete(store, path);
-    }
-
-    @Override
-    public boolean cancel() {
-        LOG.trace("{}: Cancel[{}} via NETCONF", id, writeTx.getIdentifier());
-        try {
-            return writeTx.cancel();
-        } finally {
-            writeTx = null;
-        }
-    }
-
-    @Override
-    public Future<Void> submit() {
-        LOG.trace("{}: Submit[{}} via NETCONF", id, writeTx.getIdentifier());
-
-        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
-        writeTx = null;
-
-        final DefaultPromise<Void> promise = new DefaultPromise<>();
-        Futures.addCallback(submitFuture, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                promise.success(result);
-            }
-
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                promise.failure(throwable);
-            }
-        });
-        return promise.future();
-    }
-
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java
deleted file mode 100644 (file)
index 535123b..0000000
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import org.opendaylight.controller.config.util.xml.DocumentedException;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.OpenTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.impl.Promise.DefaultPromise;
-
-
-public class NetconfProxyDOMTransaction implements NetconfDOMTransaction {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfProxyDOMTransaction.class);
-
-    private final RemoteDeviceId id;
-    private final ActorSystem actorSystem;
-    private final ActorRef masterContextRef;
-    private final Timeout actorResponseWaitTime;
-
-    public NetconfProxyDOMTransaction(final RemoteDeviceId id,
-                                      final ActorSystem actorSystem,
-                                      final ActorRef masterContextRef,
-                                      final Timeout actorResponseWaitTime) {
-        this.id = id;
-        this.actorSystem = actorSystem;
-        this.masterContextRef = masterContextRef;
-        this.actorResponseWaitTime = actorResponseWaitTime;
-    }
-
-    @Override
-    public void openTransaction() {
-        // TODO we can do some checking for already opened transaction also
-        // here in this class. We can track open transaction at least for this
-        // node.
-        LOG.debug("{}: Requesting leader {} to open new transaction", id, masterContextRef);
-        final Future<Object> openTxFuture =
-                Patterns.ask(masterContextRef, new OpenTransaction(), actorResponseWaitTime);
-        try {
-            // we have to wait here so we can see if tx can be opened
-            Await.result(openTxFuture, actorResponseWaitTime.duration());
-            LOG.debug("{}: New transaction opened successfully", id);
-        } catch (final Exception e) {
-            LOG.error("{}: Failed to open new transaction", id, e);
-            Throwables.propagate(e);
-        }
-    }
-
-    @Override
-    public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
-                                                        final YangInstanceIdentifier path) {
-
-        final Future<Object> readScalaFuture =
-                Patterns.ask(masterContextRef, new ReadRequest(store, path), actorResponseWaitTime);
-
-        LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
-
-        final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
-
-        readScalaFuture.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object success) throws Throwable {
-                if (failure != null) { // ask timeout
-                    final Exception exception = new DocumentedException(
-                            id + ":Master is down. Please try again.",
-                            DocumentedException.ErrorType.TRANSPORT,
-                            DocumentedException.ErrorTag.RESOURCE_DENIED,
-                            DocumentedException.ErrorSeverity.ERROR);
-                    promise.failure(exception);
-                    return;
-                }
-                if (success instanceof Throwable) { // Error sended by master
-                    promise.failure((Throwable) success);
-                    return;
-                }
-                if (success instanceof EmptyReadResponse) {
-                    promise.success(Optional.absent());
-                    return;
-                }
-                promise.success(Optional.of((NormalizedNodeMessage) success));
-            }
-        }, actorSystem.dispatcher());
-
-        return promise.future();
-    }
-
-    @Override
-    public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
-        final Future<Object> existsScalaFuture =
-                Patterns.ask(masterContextRef, new ExistsRequest(store, path), actorResponseWaitTime);
-
-        LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
-
-        final DefaultPromise<Boolean> promise = new DefaultPromise<>();
-        existsScalaFuture.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object success) throws Throwable {
-                if (failure != null) { // ask timeout
-                    final Exception exception = new DocumentedException(
-                            id + ":Master is down. Please try again.",
-                            DocumentedException.ErrorType.TRANSPORT,
-                            DocumentedException.ErrorTag.RESOURCE_DENIED,
-                            DocumentedException.ErrorSeverity.ERROR);
-                    promise.failure(exception);
-                    return;
-                }
-                if (success instanceof Throwable) {
-                    promise.failure((Throwable) success);
-                    return;
-                }
-                promise.success((Boolean) success);
-            }
-        }, actorSystem.dispatcher());
-        return promise.future();
-    }
-
-    @Override
-    public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
-        LOG.trace("{}: Write {} via NETCONF: {} with payload {}", id, store, data.getIdentifier(), data.getNode());
-
-        masterContextRef.tell(new PutRequest(store, data), ActorRef.noSender());
-    }
-
-    @Override
-    public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
-        LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, data.getIdentifier(), data.getNode());
-
-        masterContextRef.tell(new MergeRequest(store, data), ActorRef.noSender());
-    }
-
-    @Override
-    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
-        LOG.trace("{}: Delete {} via NETCONF: {}", id, store, path);
-
-        masterContextRef.tell(new DeleteRequest(store, path), ActorRef.noSender());
-    }
-
-    @Override
-    public boolean cancel() {
-        final Future<Object> cancelScalaFuture =
-                Patterns.ask(masterContextRef, new CancelRequest(), actorResponseWaitTime);
-
-        LOG.trace("{}: Cancel {} via NETCONF", id);
-
-        try {
-            // here must be Await because AsyncWriteTransaction do not return future
-            return (boolean) Await.result(cancelScalaFuture, actorResponseWaitTime.duration());
-        } catch (Exception e) {
-            return false;
-        }
-    }
-
-    @Override
-    public Future<Void> submit() {
-        final Future<Object> submitScalaFuture =
-                Patterns.ask(masterContextRef, new SubmitRequest(), actorResponseWaitTime);
-
-        LOG.trace("{}: Submit {} via NETCONF", id);
-
-        final DefaultPromise<Void> promise = new DefaultPromise<>();
-
-        submitScalaFuture.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object success) throws Throwable {
-                if (failure != null) { // ask timeout
-                    final Exception exception = new DocumentedException(
-                            id + ":Master is down. Please try again.",
-                            DocumentedException.ErrorType.TRANSPORT,
-                            DocumentedException.ErrorTag.RESOURCE_DENIED,
-                            DocumentedException.ErrorSeverity.ERROR);
-                    promise.failure(exception);
-                    return;
-                }
-                if (success instanceof Throwable) {
-                    promise.failure((Throwable) success);
-                } else {
-                    if (success instanceof SubmitFailedReply) {
-                        LOG.error("{}: Transaction was not submitted because already closed.", id);
-                    }
-                    promise.success(null);
-                }
-            }
-        }, actorSystem.dispatcher());
-
-        return promise.future();
-    }
-
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfReadOnlyTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfReadOnlyTransaction.java
deleted file mode 100644 (file)
index 3048337..0000000
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.config.util.xml.DocumentedException;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-public class NetconfReadOnlyTransaction implements DOMDataReadOnlyTransaction {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfReadOnlyTransaction.class);
-
-    private final RemoteDeviceId id;
-    private final NetconfDOMTransaction delegate;
-    private final ActorSystem actorSystem;
-
-    public NetconfReadOnlyTransaction(final RemoteDeviceId id,
-                                      final ActorSystem actorSystem,
-                                      final NetconfDOMTransaction delegate) {
-        this.id = id;
-        this.delegate = delegate;
-        this.actorSystem = actorSystem;
-    }
-
-    @Override
-    public void close() {
-        //NOOP
-    }
-
-    @Override
-    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
-                                                                                   final YangInstanceIdentifier path) {
-
-        LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
-
-        final Future<Optional<NormalizedNodeMessage>> future = delegate.read(store, path);
-        final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> checkedFuture;
-        checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
-            @Nullable
-            @Override
-            public ReadFailedException apply(Exception cause) {
-                if (cause.getCause() instanceof DocumentedException) {
-                    final DocumentedException exception = (DocumentedException) cause.getCause();
-                    if (exception.getErrorSeverity() == DocumentedException.ErrorSeverity.ERROR &&
-                            exception.getErrorType() == DocumentedException.ErrorType.TRANSPORT &&
-                            exception.getErrorTag() == DocumentedException.ErrorTag.RESOURCE_DENIED) {
-                        final RpcError error = RpcResultBuilder.newError(
-                                RpcError.ErrorType.TRANSPORT,
-                                exception.getErrorTag().getTagValue(),
-                                exception.getMessage());
-                        return new ReadFailedException("Read from transaction failed", error);
-                    }
-                }
-                return new ReadFailedException("Read from transaction failed", cause);
-            }
-        });
-        future.onComplete(new OnComplete<Optional<NormalizedNodeMessage>>() {
-            @Override
-            public void onComplete(final Throwable throwable,
-                                   final Optional<NormalizedNodeMessage> normalizedNodeMessage) throws Throwable {
-                if (throwable == null) {
-                    if (normalizedNodeMessage.isPresent()) {
-                        settableFuture.set(normalizedNodeMessage.transform(new Function<NormalizedNodeMessage,
-                                NormalizedNode<?, ?>>() {
-
-                            @Nullable
-                            @Override
-                            public NormalizedNode<?, ?> apply(final NormalizedNodeMessage input) {
-                                return input.getNode();
-                            }
-                        }));
-                    } else {
-                        settableFuture.set(Optional.absent());
-                    }
-                } else {
-                    settableFuture.setException(throwable);
-                }
-            }
-        }, actorSystem.dispatcher());
-        return checkedFuture;
-    }
-
-    @Override
-    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
-                                                              final YangInstanceIdentifier path) {
-
-        LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
-
-        final Future<Boolean> existsFuture = delegate.exists(store, path);
-        final SettableFuture<Boolean> settableFuture = SettableFuture.create();
-        final CheckedFuture<Boolean, ReadFailedException> checkedFuture;
-        checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
-            @Nullable
-            @Override
-            public ReadFailedException apply(Exception cause) {
-                return new ReadFailedException("Read from transaction failed", cause);
-            }
-        });
-        existsFuture.onComplete(new OnComplete<Boolean>() {
-            @Override
-            public void onComplete(final Throwable throwable, final Boolean result) throws Throwable {
-                if (throwable == null) {
-                    settableFuture.set(result);
-                } else {
-                    settableFuture.setException(throwable);
-                }
-            }
-        }, actorSystem.dispatcher());
-        return checkedFuture;
-    }
-
-    @Override
-    public Object getIdentifier() {
-        return this;
-    }
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfWriteOnlyTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfWriteOnlyTransaction.java
deleted file mode 100644 (file)
index 422cf7a..0000000
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-public class NetconfWriteOnlyTransaction implements DOMDataWriteTransaction {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfWriteOnlyTransaction.class);
-
-    private final RemoteDeviceId id;
-    private final NetconfDOMTransaction delegate;
-    private final ActorSystem actorSystem;
-
-    public NetconfWriteOnlyTransaction(final RemoteDeviceId id,
-                                       final ActorSystem actorSystem,
-                                       final NetconfDOMTransaction delegate) {
-        this.id = id;
-        this.delegate = delegate;
-        this.actorSystem = actorSystem;
-
-        this.delegate.openTransaction();
-    }
-
-    @Override
-    public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
-                    final NormalizedNode<?,?> data) {
-        LOG.trace("{}: Write {} via NETCONF: {} with payload {}", id, store, path, data);
-
-        delegate.put(store, new NormalizedNodeMessage(path, data));
-    }
-
-    @Override
-    public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
-                      final NormalizedNode<?,?> data) {
-        LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, path, data);
-
-        delegate.merge(store, new NormalizedNodeMessage(path, data));
-    }
-
-    @Override
-    public boolean cancel() {
-        LOG.trace("{}: Cancel", id);
-
-        return delegate.cancel();
-    }
-
-    @Override
-    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
-        LOG.trace("{}: Delete {} via NETCONF: {}", id, store, path);
-
-        delegate.delete(store, path);
-    }
-
-    @Override
-    public CheckedFuture<Void, TransactionCommitFailedException> submit() {
-        LOG.trace("{}: Submit", id);
-
-        final Future<Void> submit = delegate.submit();
-        final SettableFuture<Void> settFuture = SettableFuture.create();
-        final CheckedFuture<Void, TransactionCommitFailedException> checkedFuture;
-        checkedFuture = Futures.makeChecked(settFuture, new Function<Exception, TransactionCommitFailedException>() {
-            @Nullable
-            @Override
-            public TransactionCommitFailedException apply(Exception input) {
-                return new TransactionCommitFailedException("Transaction commit failed", input);
-            }
-        });
-        submit.onComplete(new OnComplete<Void>() {
-            @Override
-            public void onComplete(Throwable throwable, Void object) throws Throwable {
-                if (throwable == null) {
-                    settFuture.set(object);
-                } else {
-                    settFuture.setException(throwable);
-                }
-            }
-        }, actorSystem.dispatcher());
-        return checkedFuture;
-    }
-
-    @Override
-    public ListenableFuture<RpcResult<TransactionStatus>> commit() {
-        LOG.trace("{}: Commit", id);
-
-        final Future<Void> commit = delegate.submit();
-        final SettableFuture<RpcResult<TransactionStatus>> settFuture = SettableFuture.create();
-        commit.onComplete(new OnComplete<Void>() {
-            @Override
-            public void onComplete(final Throwable throwable, final Void result) throws Throwable {
-                if (throwable == null) {
-                    TransactionStatus status = TransactionStatus.SUBMITED;
-                    RpcResult<TransactionStatus> rpcResult = RpcResultBuilder.success(status).build();
-                    settFuture.set(rpcResult);
-                } else {
-                    settFuture.setException(throwable);
-                }
-            }
-        }, actorSystem.dispatcher());
-        return settFuture;
-    }
-
-    @Override
-    public Object getIdentifier() {
-        return this;
-    }
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java
new file mode 100644 (file)
index 0000000..0756f33
--- /dev/null
@@ -0,0 +1,132 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * ProxyReadTransaction uses provided {@link ActorRef} to delegate method calls to master
+ * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor}.
+ */
+public class ProxyReadTransaction implements DOMDataReadOnlyTransaction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProxyReadTransaction.class);
+
+    private final ActorRef masterTxActor;
+    private final RemoteDeviceId id;
+    private final ActorSystem actorSystem;
+    private final Timeout askTimeout;
+
+    /**
+     * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor} ref
+     * @param id            device id
+     * @param actorSystem   system
+     * @param askTimeout
+     */
+    public ProxyReadTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
+                                final Timeout askTimeout) {
+        this.masterTxActor = masterTxActor;
+        this.id = id;
+        this.actorSystem = actorSystem;
+        this.askTimeout = askTimeout;
+    }
+
+    @Override
+    public void close() {
+        //noop
+    }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+                                                                                   final YangInstanceIdentifier path) {
+        LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
+
+        final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
+        future.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure,
+                                   final Object success) throws Throwable {
+                if (failure != null) { // ask timeout
+                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+                    settableFuture.setException(exception);
+                    return;
+                }
+                if (success instanceof Throwable) { // Error sended by master
+                    settableFuture.setException((Throwable) success);
+                    return;
+                }
+                if (success instanceof EmptyReadResponse) {
+                    settableFuture.set(Optional.absent());
+                    return;
+                }
+                if (success instanceof NormalizedNodeMessage) {
+                    final NormalizedNodeMessage data = (NormalizedNodeMessage) success;
+                    settableFuture.set(Optional.of(data.getNode()));
+                }
+            }
+        }, actorSystem.dispatcher());
+        return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+    }
+
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+                                                              final YangInstanceIdentifier path) {
+        final Future<Object> existsScalaFuture =
+                Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
+
+        LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
+
+        final SettableFuture<Boolean> settableFuture = SettableFuture.create();
+        existsScalaFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) { // ask timeout
+                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+                    settableFuture.setException(exception);
+                    return;
+                }
+                if (success instanceof Throwable) {
+                    settableFuture.setException((Throwable) success);
+                    return;
+                }
+                settableFuture.set((Boolean) success);
+            }
+        }, actorSystem.dispatcher());
+        return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+    }
+
+
+    @Override
+    public Object getIdentifier() {
+        return this;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java
new file mode 100644 (file)
index 0000000..072f622
--- /dev/null
@@ -0,0 +1,175 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+/**
+ * ProxyWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
+ * {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor}.
+ */
+public class ProxyWriteTransaction implements DOMDataWriteTransaction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProxyWriteTransaction.class);
+
+    private final ActorRef masterTxActor;
+    private final RemoteDeviceId id;
+    private final ActorSystem actorSystem;
+    private final AtomicBoolean opened = new AtomicBoolean(true);
+    private final Timeout askTimeout;
+
+    /**
+     * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor} ref
+     * @param id            device id
+     * @param actorSystem   system
+     * @param askTimeout
+     */
+    public ProxyWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
+                                 final Timeout askTimeout) {
+        this.masterTxActor = masterTxActor;
+        this.id = id;
+        this.actorSystem = actorSystem;
+        this.askTimeout = askTimeout;
+    }
+
+    @Override
+    public boolean cancel() {
+        if (!opened.compareAndSet(true, false)) {
+            return false;
+        }
+        final Future<Object> cancelScalaFuture =
+                Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
+
+        LOG.trace("{}: Cancel {} via NETCONF", id);
+
+        try {
+            // here must be Await because AsyncWriteTransaction do not return future
+            return (boolean) Await.result(cancelScalaFuture, askTimeout.duration());
+        } catch (final Exception e) {
+            return false;
+        }
+    }
+
+    @Override
+    public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+        if (!opened.compareAndSet(true, false)) {
+            throw new IllegalStateException(id + ": Transaction" + getIdentifier() + " is closed");
+        }
+        final Future<Object> submitScalaFuture =
+                Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
+
+        LOG.trace("{}: Submit {} via NETCONF", id);
+
+        final SettableFuture<Void> settableFuture = SettableFuture.create();
+        submitScalaFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) { // ask timeout
+                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+                    settableFuture.setException(exception);
+                    return;
+                }
+                if (success instanceof Throwable) {
+                    settableFuture.setException((Throwable) success);
+                } else {
+                    if (success instanceof SubmitFailedReply) {
+                        LOG.error("{}: Transaction was not submitted because already closed.", id);
+                    }
+                    settableFuture.set(null);
+                }
+            }
+        }, actorSystem.dispatcher());
+
+        return Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
+            @Nullable
+            @Override
+            public TransactionCommitFailedException apply(@Nullable final Exception input) {
+                final String message = "Submit of transaction " + getIdentifier() + " failed";
+                return new TransactionCommitFailedException(message, input);
+            }
+        });
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+        LOG.trace("{}: Commit", id);
+
+        final CheckedFuture<Void, TransactionCommitFailedException> submit = submit();
+        return Futures.transform(submit, new Function<Void, RpcResult<TransactionStatus>>() {
+            @Nullable
+            @Override
+            public RpcResult<TransactionStatus> apply(@Nullable final Void input) {
+                return RpcResultBuilder.success(TransactionStatus.SUBMITED).build();
+            }
+        });
+    }
+
+    @Override
+    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
+        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
+        LOG.trace("{}: Delete {} via NETCONF: {}", id, store, identifier);
+        masterTxActor.tell(new DeleteRequest(store, identifier), ActorRef.noSender());
+    }
+
+    @Override
+    public void put(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
+                    final NormalizedNode<?, ?> data) {
+        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
+        final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data);
+        LOG.trace("{}: Put {} via NETCONF: {} with payload {}", id, store, identifier, data);
+        masterTxActor.tell(new PutRequest(store, msg), ActorRef.noSender());
+    }
+
+    @Override
+    public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
+                      final NormalizedNode<?, ?> data) {
+        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
+        final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data);
+        LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, identifier, data);
+        masterTxActor.tell(new MergeRequest(store, msg), ActorRef.noSender());
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return this;
+    }
+}
index 1b59e883989c8b6d3c21e2c3529e194b9741bbe1..39ac4dd718fd377019a6944de020d7ff882f5c93 100644 (file)
@@ -13,6 +13,7 @@ import java.io.File;
 import java.math.BigDecimal;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
+import org.opendaylight.controller.config.util.xml.DocumentedException;
 import java.util.Map;
 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
 import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemasResolverImpl;
@@ -218,4 +219,10 @@ public class NetconfTopologyUtils {
     public static InstanceIdentifier<Node> createTopologyNodePath(final String topologyId) {
         return createTopologyListPath(topologyId).child(Node.class);
     }
+
+    public static DocumentedException createMasterIsDownException(final RemoteDeviceId id) {
+        return new DocumentedException(id + ":Master is down. Please try again.",
+                DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_FAILED,
+                DocumentedException.ErrorSeverity.WARNING);
+    }
 }
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionReply.java
new file mode 100644 (file)
index 0000000..a787a9f
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import akka.actor.ActorRef;
+import java.io.Serializable;
+
+public class NewReadTransactionReply implements Serializable {
+
+    private final ActorRef txActor;
+
+    public NewReadTransactionReply(final ActorRef txActor) {
+        this.txActor = txActor;
+    }
+
+    public ActorRef getTxActor() {
+        return txActor;
+    }
+}
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -8,9 +8,8 @@
 
 package org.opendaylight.netconf.topology.singleton.messages.transactions;
 
-/**
- * A message sent to MountPoint leader to open new transaction
- */
-public class OpenTransaction implements TransactionRequest {
-    private static final long serialVersionUID = 1L;
+import java.io.Serializable;
+
+public class NewReadTransactionRequest implements Serializable {
+
 }
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionReply.java
new file mode 100644 (file)
index 0000000..8e356a8
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import akka.actor.ActorRef;
+import java.io.Serializable;
+
+public class NewWriteTransactionReply implements Serializable {
+
+    private final ActorRef txActor;
+
+    public NewWriteTransactionReply(final ActorRef txActor) {
+        this.txActor = txActor;
+    }
+
+    public ActorRef getTxActor() {
+        return txActor;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionRequest.java
new file mode 100644 (file)
index 0000000..8b7e914
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import java.io.Serializable;
+
+public class NewWriteTransactionRequest implements Serializable {
+
+}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTest.java
new file mode 100644 (file)
index 0000000..3f52fd7
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+public class ReadTransactionActorTest {
+
+    private static final YangInstanceIdentifier path = YangInstanceIdentifier.EMPTY;
+    private static final LogicalDatastoreType store = LogicalDatastoreType.CONFIGURATION;
+
+    @Mock
+    private DOMDataReadOnlyTransaction deviceReadTx;
+    private TestProbe probe;
+    private ActorSystem system;
+    private TestActorRef<ReadTransactionActor> actorRef;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        system = ActorSystem.apply();
+        probe = TestProbe.apply(system);
+        actorRef = TestActorRef.create(system, ReadTransactionActor.props(deviceReadTx), "testA");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system, null, true);
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        final ContainerNode node = Builders.containerBuilder()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+                .build();
+        when(deviceReadTx.read(store, path)).thenReturn(Futures.immediateCheckedFuture(Optional.of(node)));
+        actorRef.tell(new ReadRequest(store, path), probe.ref());
+        verify(deviceReadTx).read(store, path);
+        probe.expectMsgClass(NormalizedNodeMessage.class);
+    }
+
+    @Test
+    public void testReadEmpty() throws Exception {
+        when(deviceReadTx.read(store, path)).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
+        actorRef.tell(new ReadRequest(store, path), probe.ref());
+        verify(deviceReadTx).read(store, path);
+        probe.expectMsgClass(EmptyReadResponse.class);
+    }
+
+    @Test
+    public void testReadFailure() throws Exception {
+        final ReadFailedException cause = new ReadFailedException("fail");
+        when(deviceReadTx.read(store, path)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+        actorRef.tell(new ReadRequest(store, path), probe.ref());
+        verify(deviceReadTx).read(store, path);
+        probe.expectMsg(cause);
+    }
+
+    @Test
+    public void testExists() throws Exception {
+        when(deviceReadTx.exists(store, path)).thenReturn(Futures.immediateCheckedFuture(true));
+        actorRef.tell(new ExistsRequest(store, path), probe.ref());
+        verify(deviceReadTx).exists(store, path);
+        probe.expectMsg(true);
+    }
+
+    @Test
+    public void testExistsFailure() throws Exception {
+        final ReadFailedException cause = new ReadFailedException("fail");
+        when(deviceReadTx.exists(store, path)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+        actorRef.tell(new ExistsRequest(store, path), probe.ref());
+        verify(deviceReadTx).exists(store, path);
+        probe.expectMsg(cause);
+    }
+}
\ No newline at end of file
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTest.java
new file mode 100644 (file)
index 0000000..d95d158
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+public class WriteTransactionActorTest {
+    private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+    private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+    private static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
+
+    @Mock
+    private DOMDataWriteTransaction deviceWriteTx;
+    private TestProbe probe;
+    private ActorSystem system;
+    private TestActorRef<WriteTransactionActor> actorRef;
+    private NormalizedNode<?, ?> node;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        system = ActorSystem.apply();
+        probe = TestProbe.apply(system);
+        node = Builders.containerBuilder()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+                .build();
+        actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx), "testA");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system, null, true);
+    }
+
+    @Test
+    public void testPut() throws Exception {
+        final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
+        actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref());
+        verify(deviceWriteTx).put(STORE, PATH, node);
+    }
+
+    @Test
+    public void testMerge() throws Exception {
+        final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
+        actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref());
+        verify(deviceWriteTx).merge(STORE, PATH, node);
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref());
+        verify(deviceWriteTx).delete(STORE, PATH);
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        when(deviceWriteTx.cancel()).thenReturn(true);
+        final Future<Object> cancelFuture = Patterns.ask(actorRef, new CancelRequest(), TIMEOUT);
+        final Object result = Await.result(cancelFuture, TIMEOUT.duration());
+        Preconditions.checkState(result instanceof Boolean);
+        verify(deviceWriteTx).cancel();
+        Assert.assertTrue((Boolean) result);
+    }
+
+    @Test
+    public void testSubmit() throws Exception {
+        when(deviceWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
+        final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
+        final Object result = Await.result(submitFuture, TIMEOUT.duration());
+        Assert.assertTrue(result instanceof SubmitReply);
+        verify(deviceWriteTx).submit();
+    }
+
+    @Test
+    public void testSubmitFail() throws Exception {
+        final RpcError rpcError =
+                RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail");
+        final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError);
+        when(deviceWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+        final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
+        final Object result = Await.result(submitFuture, TIMEOUT.duration());
+        Assert.assertEquals(cause, result);
+        verify(deviceWriteTx).submit();
+    }
+
+}
\ No newline at end of file
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransactionTest.java
new file mode 100644 (file)
index 0000000..df4be27
--- /dev/null
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.config.util.xml.DocumentedException;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+public class ProxyReadTransactionTest {
+    private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+    private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+
+    private ActorSystem system;
+    private TestProbe masterActor;
+    private ContainerNode node;
+    private ProxyReadTransaction tx;
+
+    @Before
+    public void setUp() throws Exception {
+        system = ActorSystem.apply();
+        masterActor = new TestProbe(system);
+        final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
+        node = Builders.containerBuilder()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+                .build();
+        tx = new ProxyReadTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system, null, true);
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+        masterActor.expectMsgClass(ReadRequest.class);
+        masterActor.reply(new NormalizedNodeMessage(PATH, node));
+        final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
+        Assert.assertTrue(result.isPresent());
+        Assert.assertEquals(node, result.get());
+    }
+
+    @Test
+    public void testReadEmpty() throws Exception {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+        masterActor.expectMsgClass(ReadRequest.class);
+        masterActor.reply(new EmptyReadResponse());
+        final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
+        Assert.assertFalse(result.isPresent());
+    }
+
+    @Test(expected = ReadFailedException.class)
+    public void testReadFail() throws Exception {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+        masterActor.expectMsgClass(ReadRequest.class);
+        masterActor.reply(new RuntimeException("fail"));
+        read.checkedGet();
+    }
+
+    @Test
+    public void testExists() throws Exception {
+        final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+        masterActor.expectMsgClass(ExistsRequest.class);
+        masterActor.reply(true);
+        final Boolean result = read.checkedGet();
+        Assert.assertTrue(result);
+    }
+
+    @Test(expected = ReadFailedException.class)
+    public void testExistsFail() throws Exception {
+        final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+        masterActor.expectMsgClass(ExistsRequest.class);
+        masterActor.reply(new RuntimeException("fail"));
+        read.checkedGet();
+    }
+
+    @Test
+    public void testMasterDownRead() throws Exception {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+        masterActor.expectMsgClass(ReadRequest.class);
+        //master doesn't reply
+        try {
+            read.checkedGet();
+            Assert.fail("Exception should be thrown");
+        } catch (final ReadFailedException e) {
+            final Throwable cause = e.getCause();
+            Assert.assertTrue(cause instanceof DocumentedException);
+            final DocumentedException de = (DocumentedException) cause;
+            Assert.assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity());
+            Assert.assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
+            Assert.assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransactionTest.java
new file mode 100644 (file)
index 0000000..05ad6be
--- /dev/null
@@ -0,0 +1,178 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+public class ProxyWriteTransactionTest {
+    private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+    private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+
+    private ActorSystem system;
+    private TestProbe masterActor;
+    private ContainerNode node;
+    private ProxyWriteTransaction tx;
+
+    @Before
+    public void setUp() throws Exception {
+        system = ActorSystem.apply();
+        masterActor = new TestProbe(system);
+        final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
+        node = Builders.containerBuilder()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+                .build();
+        tx = new ProxyWriteTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system, null, true);
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
+        masterActor.expectMsgClass(CancelRequest.class);
+        masterActor.reply(true);
+        Assert.assertTrue(submit.get());
+    }
+
+    @Test
+    public void testCancelSubmitted() throws Exception {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        submitFuture.checkedGet();
+        final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
+        masterActor.expectNoMsg();
+        Assert.assertFalse(submit.get());
+    }
+
+    @Test
+    public void testSubmit() throws Exception {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        submitFuture.checkedGet();
+    }
+
+    @Test
+    public void testDoubleSubmit() throws Exception {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        submitFuture.checkedGet();
+        try {
+            tx.submit().checkedGet();
+            Assert.fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            masterActor.expectNoMsg();
+        }
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        final ListenableFuture<RpcResult<TransactionStatus>> submitFuture = tx.commit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        Assert.assertEquals(TransactionStatus.SUBMITED, submitFuture.get().getResult());
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        tx.delete(STORE, PATH);
+        masterActor.expectMsgClass(DeleteRequest.class);
+    }
+
+    @Test
+    public void testDeleteClosed() throws Exception {
+        submit();
+        try {
+            tx.delete(STORE, PATH);
+            Assert.fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            masterActor.expectNoMsg();
+        }
+    }
+
+    @Test
+    public void testPut() throws Exception {
+        tx.put(STORE, PATH, node);
+        masterActor.expectMsgClass(PutRequest.class);
+    }
+
+    @Test
+    public void testPutClosed() throws Exception {
+        submit();
+        try {
+            tx.put(STORE, PATH, node);
+            Assert.fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            masterActor.expectNoMsg();
+        }
+    }
+
+    @Test
+    public void testMerge() throws Exception {
+        tx.merge(STORE, PATH, node);
+        masterActor.expectMsgClass(MergeRequest.class);
+    }
+
+    @Test
+    public void testMergeClosed() throws Exception {
+        submit();
+        try {
+            tx.merge(STORE, PATH, node);
+            Assert.fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            masterActor.expectNoMsg();
+        }
+    }
+
+    @Test
+    public void testGetIdentifier() throws Exception {
+        Assert.assertEquals(tx, tx.getIdentifier());
+    }
+
+    private void submit() throws TransactionCommitFailedException {
+        final CheckedFuture<Void, TransactionCommitFailedException> submit = tx.submit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        submit.checkedGet();
+    }
+
+}
\ No newline at end of file
index 58422776f55b2a3329fef144d980581dc4b05858..7ff9869dac6402b49ada1f9862352ea025cdb054 100644 (file)
@@ -43,8 +43,7 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker;
+import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
@@ -68,13 +67,12 @@ public class ReadOnlyTransactionTest {
     public final ExpectedException exception = ExpectedException.none();
 
     private ActorRef masterRef;
-    private NetconfDOMDataBroker slaveDataBroker;
-    private DOMDataBroker masterDataBroker;
+    private ProxyDOMDataBroker slaveDataBroker;
     private List<SourceIdentifier> sourceIdentifiers;
-
+    @Mock
+    private DOMDataBroker deviceDataBroker;
     @Mock
     private DOMDataReadOnlyTransaction readTx;
-
     @Mock
     private DOMRpcService domRpcService;
 
@@ -95,32 +93,18 @@ public class ReadOnlyTransactionTest {
 
         sourceIdentifiers = Lists.newArrayList();
 
-        // Create master data broker
-
-        final DOMDataBroker delegateDataBroker = mock(DOMDataBroker.class);
+        //device read tx
         readTx = mock(DOMDataReadOnlyTransaction.class);
-
-        doReturn(readTx).when(delegateDataBroker).newReadOnlyTransaction();
-
-        final NetconfDOMTransaction masterDOMTransactions =
-                new NetconfMasterDOMTransaction(remoteDeviceId, delegateDataBroker);
-
-        masterDataBroker =
-                new NetconfDOMDataBroker(system, remoteDeviceId, masterDOMTransactions);
+        doReturn(readTx).when(deviceDataBroker).newReadOnlyTransaction();
 
         // Create slave data broker for testing proxy
-
-        final NetconfDOMTransaction proxyDOMTransactions =
-                new NetconfProxyDOMTransaction(remoteDeviceId, system, masterRef, TIMEOUT);
-
-        slaveDataBroker = new NetconfDOMDataBroker(system, remoteDeviceId, proxyDOMTransactions);
-
-
+        slaveDataBroker =
+                new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
     }
 
     @After
     public void teardown() {
-        JavaTestKit.shutdownActorSystem(system);
+        JavaTestKit.shutdownActorSystem(system, null, true);
         system = null;
     }
 
@@ -256,7 +240,7 @@ public class ReadOnlyTransactionTest {
 
     private void initializeDataTest() throws Exception {
         final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers,
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
                                 domRpcService), TIMEOUT);
 
         final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
index 2dc446118c97714c3ab6f5a3fffd706f38fe4118..3e86bd2572da3208bb965464d1dea8813a36637d 100644 (file)
@@ -15,7 +15,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.MockitoAnnotations.initMocks;
 import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
@@ -48,8 +48,7 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker;
+import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
@@ -73,13 +72,12 @@ public class WriteOnlyTransactionTest {
     public final ExpectedException exception = ExpectedException.none();
 
     private ActorRef masterRef;
-    private NetconfDOMDataBroker slaveDataBroker;
-    private DOMDataBroker masterDataBroker;
+    private ProxyDOMDataBroker slaveDataBroker;
     private List<SourceIdentifier> sourceIdentifiers;
-
+    @Mock
+    private DOMDataBroker deviceDataBroker;
     @Mock
     private DOMDataWriteTransaction writeTx;
-
     @Mock
     private DOMRpcService domRpcService;
 
@@ -100,40 +98,25 @@ public class WriteOnlyTransactionTest {
 
         sourceIdentifiers = Lists.newArrayList();
 
-        // Create master data broker
-
-        final DOMDataBroker delegateDataBroker = mock(DOMDataBroker.class);
         writeTx = mock(DOMDataWriteTransaction.class);
         final DOMDataReadOnlyTransaction readTx = mock(DOMDataReadOnlyTransaction.class);
 
-        doReturn(writeTx).when(delegateDataBroker).newWriteOnlyTransaction();
-        doReturn(readTx).when(delegateDataBroker).newReadOnlyTransaction();
-
-        final NetconfDOMTransaction masterDOMTransactions =
-                new NetconfMasterDOMTransaction(remoteDeviceId, delegateDataBroker);
-
-        masterDataBroker =
-                new NetconfDOMDataBroker(system, remoteDeviceId, masterDOMTransactions);
+        doReturn(writeTx).when(deviceDataBroker).newWriteOnlyTransaction();
+        doReturn(readTx).when(deviceDataBroker).newReadOnlyTransaction();
 
         // Create slave data broker for testing proxy
-
-        final NetconfDOMTransaction proxyDOMTransactions =
-                new NetconfProxyDOMTransaction(remoteDeviceId, system, masterRef, TIMEOUT);
-
-        slaveDataBroker = new NetconfDOMDataBroker(system, remoteDeviceId, proxyDOMTransactions);
-
-
+        slaveDataBroker =
+                new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
     }
 
     @After
     public void teardown() {
-        JavaTestKit.shutdownActorSystem(system);
+        JavaTestKit.shutdownActorSystem(system, null, true);
         system = null;
     }
 
     @Test
-    public void testPutMergeDeleteCalls() throws Exception {
-
+    public void testPut() throws Exception {
         /* Initialize data on master */
 
         initializeDataTest();
@@ -148,29 +131,56 @@ public class WriteOnlyTransactionTest {
 
         doNothing().when(writeTx).put(storeType, instanceIdentifier, testNode);
 
-        DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         wTx.put(storeType, instanceIdentifier, testNode);
 
-        verify(writeTx, times(1)).put(storeType, instanceIdentifier, testNode);
+        verify(writeTx, timeout(2000)).put(storeType, instanceIdentifier, testNode);
 
         wTx.cancel();
+
+    }
+
+    @Test
+    public void testMerge() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
+
+        final YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier.EMPTY;
+        final LogicalDatastoreType storeType = LogicalDatastoreType.CONFIGURATION;
+        final NormalizedNode<?, ?> testNode = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname")))
+                .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build();
         // Test of invoking merge on master through slave proxy
 
         doNothing().when(writeTx).merge(storeType, instanceIdentifier, testNode);
-        wTx = slaveDataBroker.newWriteOnlyTransaction();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         wTx.merge(storeType, instanceIdentifier, testNode);
 
-        verify(writeTx, times(1)).merge(storeType, instanceIdentifier, testNode);
+        verify(writeTx, timeout(2000)).merge(storeType, instanceIdentifier, testNode);
 
         wTx.cancel();
+
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
+
+        final YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier.EMPTY;
+        final LogicalDatastoreType storeType = LogicalDatastoreType.CONFIGURATION;
         // Test of invoking delete on master through slave proxy
 
         doNothing().when(writeTx).delete(storeType, instanceIdentifier);
-        wTx = slaveDataBroker.newWriteOnlyTransaction();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         wTx.delete(storeType, instanceIdentifier);
         wTx.cancel();
 
-        verify(writeTx, times(1)).delete(storeType, instanceIdentifier);
+        verify(writeTx, timeout(2000)).delete(storeType, instanceIdentifier);
 
     }
 
@@ -183,8 +193,8 @@ public class WriteOnlyTransactionTest {
 
         // Without Tx
 
-        DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
-        final CheckedFuture<Void,TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
+        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
         doReturn(resultSubmit).when(writeTx).submit();
 
         final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse = wTx.submit();
@@ -192,14 +202,21 @@ public class WriteOnlyTransactionTest {
         final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
 
         assertNull(result);
+    }
 
+    @Test
+    public void testSubmitWithOperation() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
         // With Tx
-        wTx = slaveDataBroker.newWriteOnlyTransaction();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         doNothing().when(writeTx).delete(any(), any());
         wTx.delete(LogicalDatastoreType.CONFIGURATION,
                 YangInstanceIdentifier.EMPTY);
 
-        final CheckedFuture<Void,TransactionCommitFailedException> resultSubmitTx = Futures.immediateCheckedFuture(null);
+        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTx = Futures.immediateCheckedFuture(null);
         doReturn(resultSubmitTx).when(writeTx).submit();
 
         final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse = wTx.submit();
@@ -207,8 +224,15 @@ public class WriteOnlyTransactionTest {
         final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
 
         assertNull(resultTx);
+    }
 
-        wTx = slaveDataBroker.newWriteOnlyTransaction();
+    @Test
+    public void testSubmitFail() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         wTx.delete(LogicalDatastoreType.CONFIGURATION,
                 YangInstanceIdentifier.EMPTY);
 
@@ -233,14 +257,22 @@ public class WriteOnlyTransactionTest {
         initializeDataTest();
 
         // Without Tx
-
-        DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
+        doReturn(true).when(writeTx).cancel();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         final Boolean resultFalseNoTx = wTx.cancel();
-        assertEquals(false, resultFalseNoTx);
+        assertEquals(true, resultFalseNoTx);
+    }
+
+    @Test
+    public void testCancelWithOperation() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
 
         // With Tx, readWriteTx test
 
-        wTx = slaveDataBroker.newWriteOnlyTransaction();
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         doNothing().when(writeTx).delete(any(), any());
         wTx.delete(LogicalDatastoreType.CONFIGURATION,
                 YangInstanceIdentifier.EMPTY);
@@ -249,8 +281,6 @@ public class WriteOnlyTransactionTest {
         final Boolean resultTrue = wTx.cancel();
         assertEquals(true, resultTrue);
 
-        doReturn(false).when(writeTx).cancel();
-
         final Boolean resultFalse = wTx.cancel();
         assertEquals(false, resultFalse);
 
@@ -258,7 +288,7 @@ public class WriteOnlyTransactionTest {
 
     private void initializeDataTest() throws Exception {
         final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers,
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
                                 domRpcService), TIMEOUT);
 
         final Object success = Await.result(initialDataToActor, TIMEOUT.duration());