Fix transaction manager closing. 02/63402/1
authorJozef Bacigal <jozef.bacigal@pantheon.tech>
Tue, 5 Sep 2017 09:27:57 +0000 (11:27 +0200)
committerTomas Slusny <tomas.slusny@pantheon.tech>
Thu, 21 Sep 2017 12:33:54 +0000 (14:33 +0200)
- the event onTransactionFailed we properly closing chain and creating
  a new chain
- changed writeOnlyTransaction to readWriteTransaction
- moved transaction manager to the common module
- topology manager using transaction manager instead of creating a
  transaction chain for itself
- added control of unfinished transaction on close

See also: Bug-9038

Change-Id: Idadbb4ed0f4c61e7f80da5e2dbedbd80dece118e
Signed-off-by: Jozef Bacigal <jozef.bacigal@pantheon.tech>
16 files changed:
applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/DataTreeChangeListenerImpl.java
applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyExporter.java
applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProvider.java
applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/NodeChangeListenerImpl.java
applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessor.java
applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TerminationPointChangeListenerImpl.java
applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TopologyManagerUtil.java
applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TopologyOperation.java
applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProviderTest.java [deleted file]
applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessorTest.java [deleted file]
openflowplugin-common/pom.xml
openflowplugin-common/src/main/java/org/opendaylight/openflowplugin/common/txchain/TransactionChainManager.java [moved from openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java with 73% similarity]
openflowplugin-impl/pom.xml
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManagerTest.java

index b295bcb8493aae43e258a7ce81c4e042d417fdae..362857368d4f8e379806100397ba87fcd743faac 100644 (file)
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
@@ -7,11 +7,9 @@
  */
 package org.opendaylight.openflowplugin.applications.topology.manager;
 
-import java.util.concurrent.Callable;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
@@ -38,23 +36,19 @@ public abstract class DataTreeChangeListenerImpl<T extends DataObject> implement
     /**
      * instance identifier to Node in network topology model (yangtools)
      */
-    protected static final InstanceIdentifier<Topology> II_TO_TOPOLOGY =
+    static final InstanceIdentifier<Topology> II_TO_TOPOLOGY =
             InstanceIdentifier
             .create(NetworkTopology.class)
             .child(Topology.class, new TopologyKey(new TopologyId(FlowCapableTopologyProvider.TOPOLOGY_ID)));
 
-    public DataTreeChangeListenerImpl(final OperationProcessor operationProcessor,
-                                      final DataBroker dataBroker,
-                                      final InstanceIdentifier<T> ii) {
+    DataTreeChangeListenerImpl(final OperationProcessor operationProcessor,
+                               final DataBroker dataBroker,
+                               final InstanceIdentifier<T> ii) {
         final DataTreeIdentifier<T> identifier = new DataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, ii);
         final SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
         try {
-            listenerRegistration = looper.loopUntilNoException(new Callable<ListenerRegistration<DataTreeChangeListener>>() {
-                @Override
-                public ListenerRegistration<DataTreeChangeListener> call() throws Exception {
-                    return dataBroker.registerDataTreeChangeListener(identifier, DataTreeChangeListenerImpl.this);
-                }
-            });
+            listenerRegistration = looper.loopUntilNoException(() ->
+                    dataBroker.registerDataTreeChangeListener(identifier, DataTreeChangeListenerImpl.this));
         } catch (Exception e) {
             LOG.error("Data listener registration failed!");
             throw new IllegalStateException("TopologyManager startup fail! TM bundle needs restart.", e);
@@ -67,16 +61,11 @@ public abstract class DataTreeChangeListenerImpl<T extends DataObject> implement
         listenerRegistration.close();
     }
 
-    protected <T extends DataObject> void sendToTransactionChain(final T node, final InstanceIdentifier<T> iiToTopologyNode) {
-        operationProcessor.enqueueOperation(new TopologyOperation() {
-            @Override
-            public void applyOperation(ReadWriteTransaction transaction) {
-                transaction.merge(LogicalDatastoreType.OPERATIONAL, iiToTopologyNode, node, true);
-            }
-        });
+    <T extends DataObject> void sendToTransactionChain(final T node, final InstanceIdentifier<T> iiToTopologyNode) {
+        operationProcessor.enqueueOperation(manager -> manager.mergeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTopologyNode, node, true));
     }
 
-    protected InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> provideIIToTopologyNode(
+    InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> provideIIToTopologyNode(
             final NodeId nodeIdInTopology) {
         org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey nodeKeyInTopology = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey(
                 nodeIdInTopology);
@@ -86,8 +75,8 @@ public abstract class DataTreeChangeListenerImpl<T extends DataObject> implement
                         nodeKeyInTopology).build();
     }
 
-    protected NodeId provideTopologyNodeId(InstanceIdentifier<T> iiToNodeInInventory) {
-        final NodeKey inventoryNodeKey = iiToNodeInInventory.firstKeyOf(Node.class, NodeKey.class);
+    NodeId provideTopologyNodeId(InstanceIdentifier<T> iiToNodeInInventory) {
+        final NodeKey inventoryNodeKey = iiToNodeInInventory.firstKeyOf(Node.class);
         if (inventoryNodeKey != null) {
             return new NodeId(inventoryNodeKey.getId().getValue());
         }
index cb7d973c18793978f57edd1c9f23b3bc7a45396a..ce0f872fd1150d988c542a3a07007ecff628b806 100644 (file)
@@ -7,33 +7,20 @@
  */
 package org.opendaylight.openflowplugin.applications.topology.manager;
 
-import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.getNodeConnectorKey;
-import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.getNodeKey;
-import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.toTerminationPointId;
 import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.toTopologyLink;
-import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscovered;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkOverutilized;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkRemoved;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkUtilizationNormal;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +28,7 @@ import org.slf4j.LoggerFactory;
 class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
-    protected final InstanceIdentifier<Topology> iiToTopology;
+    private final InstanceIdentifier<Topology> iiToTopology;
     private final OperationProcessor processor;
 
     FlowCapableTopologyExporter(final OperationProcessor processor,
@@ -54,10 +41,10 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener {
     public void onLinkDiscovered(final LinkDiscovered notification) {
         processor.enqueueOperation(new TopologyOperation() {
             @Override
-            public void applyOperation(final ReadWriteTransaction transaction) {
+            public void applyOperation(final TransactionChainManager manager) {
                 final Link link = toTopologyLink(notification);
                 final InstanceIdentifier<Link> path = TopologyManagerUtil.linkPath(link, iiToTopology);
-                transaction.merge(LogicalDatastoreType.OPERATIONAL, path, link, true);
+                manager.mergeToTransaction(LogicalDatastoreType.OPERATIONAL, path, link, true);
             }
 
             @Override
@@ -76,18 +63,19 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener {
     public void onLinkRemoved(final LinkRemoved notification) {
         processor.enqueueOperation(new TopologyOperation() {
             @Override
-            public void applyOperation(final ReadWriteTransaction transaction) {
+            public void applyOperation(final TransactionChainManager manager) {
                 Optional<Link> linkOptional = Optional.absent();
                 try {
                     // read that checks if link exists (if we do not do this we might get an exception on delete)
-                    linkOptional = transaction.read(LogicalDatastoreType.OPERATIONAL,
+                    linkOptional = manager.readFromTransaction(LogicalDatastoreType.OPERATIONAL,
                             TopologyManagerUtil.linkPath(toTopologyLink(notification), iiToTopology)).checkedGet();
                 } catch (ReadFailedException e) {
-                    LOG.warn("Error occured when trying to read Link: {}", e.getMessage());
-                    LOG.debug("Error occured when trying to read Link.. ", e);
+                    LOG.warn("Error occurred when trying to read Link: {}", e.getMessage());
+                    LOG.debug("Error occurred when trying to read Link.. ", e);
                 }
                 if (linkOptional.isPresent()) {
-                    transaction.delete(LogicalDatastoreType.OPERATIONAL, TopologyManagerUtil.linkPath(toTopologyLink(notification), iiToTopology));
+                    manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL,
+                            TopologyManagerUtil.linkPath(toTopologyLink(notification), iiToTopology));
                 }
             }
 
@@ -103,26 +91,4 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener {
         // NOOP
     }
 
-    private InstanceIdentifier<Node> toNodeIdentifier(final NodeRef ref) {
-        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey invNodeKey = getNodeKey(ref);
-        NodeKey nodeKey = new NodeKey(toTopologyNodeId(invNodeKey.getId()));
-        return iiToTopology.child(Node.class, nodeKey);
-    }
-
-    private InstanceIdentifier<TerminationPoint> toTerminationPointIdentifier(final NodeConnectorRef ref) {
-        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey invNodeKey = getNodeKey(ref);
-        NodeConnectorKey invNodeConnectorKey = getNodeConnectorKey(ref);
-        return tpPath(toTopologyNodeId(invNodeKey.getId()), toTerminationPointId(invNodeConnectorKey.getId()));
-    }
-
-    private InstanceIdentifier<Node> getNodePath(final NodeId nodeId) {
-        return iiToTopology.child(Node.class, new NodeKey(nodeId));
-    }
-
-    private InstanceIdentifier<TerminationPoint> tpPath(final NodeId nodeId, final TpId tpId) {
-        NodeKey nodeKey = new NodeKey(nodeId);
-        TerminationPointKey tpKey = new TerminationPointKey(tpId);
-        return iiToTopology.child(Node.class, nodeKey).child(TerminationPoint.class, tpKey);
-    }
-
 }
index f49d0f52c63a72d6ef306e5d704d00c023c887ab..83942007b8f9a2503db44d32adf31014509e5ddc 100644 (file)
@@ -8,13 +8,11 @@
 package org.opendaylight.openflowplugin.applications.topology.manager;
 
 import com.google.common.base.Optional;
-import java.util.concurrent.ExecutionException;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
@@ -28,11 +26,14 @@ import org.slf4j.LoggerFactory;
 
 public class FlowCapableTopologyProvider implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyProvider.class);
+    private static final String TOPOLOGY_PROVIDER = "topology-provider";
     static final String TOPOLOGY_ID = "flow:1";
 
+
     private final DataBroker dataBroker;
     private final NotificationProviderService notificationService;
     private final OperationProcessor processor;
+    private TransactionChainManager transactionChainManager;
     private ListenerRegistration<NotificationListener> listenerRegistration;
 
     public FlowCapableTopologyProvider(DataBroker dataBroker, NotificationProviderService notificationService,
@@ -53,15 +54,17 @@ public class FlowCapableTopologyProvider implements AutoCloseable {
 
         final FlowCapableTopologyExporter listener = new FlowCapableTopologyExporter(processor, path);
         this.listenerRegistration = notificationService.registerNotificationListener(listener);
+        this.transactionChainManager = new TransactionChainManager(dataBroker, TOPOLOGY_PROVIDER);
+        this.transactionChainManager.activateTransactionManager();
+        this.transactionChainManager.initialSubmitWriteTransaction();
 
-        if(!isFlowTopologyExist(dataBroker, path)){
-            final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
-            tx.put(LogicalDatastoreType.OPERATIONAL, path, new TopologyBuilder().setKey(key).build(), true);
-            try {
-                tx.submit().get();
-            } catch (InterruptedException | ExecutionException e) {
-                LOG.warn("Initial topology export failed, continuing anyway", e);
-            }
+        if(!isFlowTopologyExist(path)){
+            transactionChainManager.writeToTransaction(
+                    LogicalDatastoreType.OPERATIONAL,
+                    path,
+                    new TopologyBuilder().setKey(key).build(),
+                    true);
+            transactionChainManager.submitTransaction();
         }
 
         LOG.info("FlowCapableTopologyProvider started");
@@ -70,6 +73,7 @@ public class FlowCapableTopologyProvider implements AutoCloseable {
     @Override
     public void close() {
         LOG.info("FlowCapableTopologyProvider stopped.");
+        this.transactionChainManager.close();
         if (this.listenerRegistration != null) {
             try {
                 this.listenerRegistration.close();
@@ -81,11 +85,11 @@ public class FlowCapableTopologyProvider implements AutoCloseable {
         }
     }
 
-    private boolean isFlowTopologyExist(final DataBroker dataBroker,
-                                        final InstanceIdentifier<Topology> path) {
-        final ReadTransaction tx = dataBroker.newReadOnlyTransaction();
+    private boolean isFlowTopologyExist(final InstanceIdentifier<Topology> path) {
         try {
-            Optional<Topology> ofTopology = tx.read(LogicalDatastoreType.OPERATIONAL, path).checkedGet();
+            Optional<Topology> ofTopology = this.transactionChainManager
+                    .readFromTransaction(LogicalDatastoreType.OPERATIONAL, path)
+                    .checkedGet();
             LOG.debug("OpenFlow topology exist in the operational data store at {}",path);
             if(ofTopology.isPresent()){
                 return true;
index e9f2683d0ee71aa0e8b1e063c8b27ef1449c3b5a..a2809dfc69b855521eff5ac0193363dacfdcf522 100644 (file)
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
@@ -11,7 +11,6 @@ import java.util.Collection;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
@@ -59,12 +58,9 @@ public class NodeChangeListenerImpl extends DataTreeChangeListenerImpl<FlowCapab
         final NodeId nodeId = provideTopologyNodeId(iiToNodeInInventory);
         final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> iiToTopologyRemovedNode = provideIIToTopologyNode(nodeId);
         if (iiToTopologyRemovedNode != null) {
-            operationProcessor.enqueueOperation(new TopologyOperation() {
-                @Override
-                public void applyOperation(final ReadWriteTransaction transaction) {
-                    transaction.delete(LogicalDatastoreType.OPERATIONAL, iiToTopologyRemovedNode);
-                    TopologyManagerUtil.removeAffectedLinks(nodeId, transaction, II_TO_TOPOLOGY);
-                }
+            operationProcessor.enqueueOperation(manager -> {
+                manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL, iiToTopologyRemovedNode);
+                TopologyManagerUtil.removeAffectedLinks(nodeId, manager, II_TO_TOPOLOGY);
             });
         } else {
             LOG.debug("Instance identifier to inventory wasn't translated to topology while deleting node.");
index bbfae52b84910b179fe182d9b3fa345a0ef90934..4d9b5019f96faf16549185b3530f00ec8d510bc9 100644 (file)
@@ -7,33 +7,28 @@
  */
 package org.opendaylight.openflowplugin.applications.topology.manager;
 
-import com.google.common.base.Preconditions;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
+public final class OperationProcessor implements AutoCloseable, Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
     private static final int MAX_TRANSACTION_OPERATIONS = 100;
     private static final int OPERATION_QUEUE_DEPTH = 500;
+    private static final String TOPOLOGY_MANAGER = "topology-manager";
 
     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
-    private final DataBroker dataBroker;
     private final Thread thread;
-    private BindingTransactionChain transactionChain;
+    private TransactionChainManager transactionChainManager;
     private volatile boolean finishing = false;
 
     public OperationProcessor(final DataBroker dataBroker) {
-        this.dataBroker = Preconditions.checkNotNull(dataBroker);
-        transactionChain = this.dataBroker.createTransactionChain(this);
+        transactionChainManager = new TransactionChainManager(dataBroker, TOPOLOGY_MANAGER);
+        transactionChainManager.activateTransactionManager();
+        transactionChainManager.initialSubmitWriteTransaction();
 
         thread = new Thread(this);
         thread.setDaemon(true);
@@ -60,11 +55,9 @@ public final class OperationProcessor implements AutoCloseable, Runnable, Transa
 
                     LOG.debug("New {} operation available, starting transaction", op);
 
-                    final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
-
                     int ops = 0;
                     do {
-                        op.applyOperation(tx);
+                        op.applyOperation(transactionChainManager);
 
                         ops++;
                         if (ops < MAX_TRANSACTION_OPERATIONS) {
@@ -77,55 +70,25 @@ public final class OperationProcessor implements AutoCloseable, Runnable, Transa
                     } while (op != null);
 
                     LOG.debug("Processed {} operations, submitting transaction", ops);
-                    submitTransaction(tx);
-                } catch (final IllegalStateException e) {
-                    LOG.warn("Stat DataStoreOperation unexpected State!", e);
-                    transactionChain.close();
-                    transactionChain = dataBroker.createTransactionChain(this);
-                    cleanDataStoreOperQueue();
+                    if (!transactionChainManager.submitTransaction()) {
+                        cleanDataStoreOperQueue();
+                    }
                 } catch (final InterruptedException e) {
                     // This should mean we're shutting down.
                     LOG.debug("Stat Manager DS Operation thread interrupted!", e);
                     finishing = true;
-                } catch (final Exception e) {
-                    LOG.warn("Stat DataStore Operation executor fail!", e);
                 }
             }
         // Drain all events, making sure any blocked threads are unblocked
         cleanDataStoreOperQueue();
     }
 
-    private void submitTransaction(ReadWriteTransaction tx) {
-        try {
-            tx.submit().checkedGet();
-        } catch (final TransactionCommitFailedException e) {
-            LOG.warn("Stat DataStoreOperation unexpected State!", e);
-            transactionChain.close();
-            transactionChain = dataBroker.createTransactionChain(this);
-            cleanDataStoreOperQueue();
-        }
-    }
-
     private void cleanDataStoreOperQueue() {
         while (!queue.isEmpty()) {
             queue.poll();
         }
     }
 
-    @Override
-    public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction, Throwable cause) {
-        LOG.warn("Failed to export Topology manager operations, Transaction {} failed: {}", transaction.getIdentifier(), cause.getMessage());
-        LOG.debug("Failed to export Topology manager operations.. ", cause);
-        transactionChain.close();
-        transactionChain = dataBroker.createTransactionChain(this);
-        cleanDataStoreOperQueue();
-    }
-
-    @Override
-    public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
-        //NOOP
-    }
-
     @Override
     public void close() {
         thread.interrupt();
@@ -135,9 +98,7 @@ public final class OperationProcessor implements AutoCloseable, Runnable, Transa
             LOG.debug("Join of thread {} was interrupted", thread.getName(), e);
         }
 
-        if (transactionChain != null) {
-            transactionChain.close();
-        }
+        transactionChainManager.close();
 
         LOG.debug("OperationProcessor closed");
     }
index c2c129b125cb7eb3bccf83876a8d6e3158fe1fee..a572b439e2a1aaf984b793d6ab57bb721a8fc893 100644 (file)
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
@@ -7,12 +7,11 @@
  */
 package org.opendaylight.openflowplugin.applications.topology.manager;
 
-import com.google.common.base.Optional;
 import java.util.Collection;
+import java.util.Optional;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
@@ -69,20 +68,18 @@ public class TerminationPointChangeListenerImpl extends DataTreeChangeListenerIm
 
         if (iiToTopologyTerminationPoint != null) {
             final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> node = iiToTopologyTerminationPoint.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class);
-            operationProcessor.enqueueOperation(new TopologyOperation() {
-                @Override
-                public void applyOperation(final ReadWriteTransaction transaction) {
-                    Optional<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> nodeOptional = Optional.absent();
-                    try {
-                        nodeOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, node).checkedGet();
-                    } catch (ReadFailedException e) {
-                        LOG.warn("Error occured when trying to read NodeConnector: {}", e.getMessage());
-                        LOG.debug("Error occured when trying to read NodeConnector.. ", e);
-                    }
-                    if (nodeOptional.isPresent()) {
-                        TopologyManagerUtil.removeAffectedLinks(terminationPointId, transaction, II_TO_TOPOLOGY);
-                        transaction.delete(LogicalDatastoreType.OPERATIONAL, iiToTopologyTerminationPoint);
-                    }
+            operationProcessor.enqueueOperation(manager -> {
+                Optional<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> nodeOptional = Optional.empty();
+                try {
+                    nodeOptional = Optional.ofNullable(
+                            manager.readFromTransaction(LogicalDatastoreType.OPERATIONAL, node).checkedGet().orNull());
+                } catch (ReadFailedException e) {
+                    LOG.warn("Error occurred when trying to read NodeConnector: {}", e.getMessage());
+                    LOG.debug("Error occurred when trying to read NodeConnector.. ", e);
+                }
+                if (nodeOptional.isPresent()) {
+                    TopologyManagerUtil.removeAffectedLinks(terminationPointId, manager, II_TO_TOPOLOGY);
+                    manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL, iiToTopologyTerminationPoint);
                 }
             });
         } else {
@@ -109,13 +106,10 @@ public class TerminationPointChangeListenerImpl extends DataTreeChangeListenerIm
     }
 
     private void removeLinks(final FlowCapableNodeConnector flowCapNodeConnector, final TerminationPoint point) {
-        operationProcessor.enqueueOperation(new TopologyOperation() {
-            @Override
-            public void applyOperation(final ReadWriteTransaction transaction) {
-                if ((flowCapNodeConnector.getState() != null && flowCapNodeConnector.getState().isLinkDown())
-                        || (flowCapNodeConnector.getConfiguration() != null && flowCapNodeConnector.getConfiguration().isPORTDOWN())) {
-                    TopologyManagerUtil.removeAffectedLinks(point.getTpId(), transaction, II_TO_TOPOLOGY);
-                }
+        operationProcessor.enqueueOperation(manager -> {
+            if ((flowCapNodeConnector.getState() != null && flowCapNodeConnector.getState().isLinkDown())
+                    || (flowCapNodeConnector.getConfiguration() != null && flowCapNodeConnector.getConfiguration().isPORTDOWN())) {
+                TopologyManagerUtil.removeAffectedLinks(point.getTpId(), manager, II_TO_TOPOLOGY);
             }
         });
     }
@@ -143,8 +137,7 @@ public class TerminationPointChangeListenerImpl extends DataTreeChangeListenerIm
     }
 
     private static TpId provideTopologyTerminationPointId(final InstanceIdentifier<FlowCapableNodeConnector> iiToNodeInInventory) {
-        NodeConnectorKey inventoryNodeConnectorKey = iiToNodeInInventory.firstKeyOf(NodeConnector.class,
-                NodeConnectorKey.class);
+        NodeConnectorKey inventoryNodeConnectorKey = iiToNodeInInventory.firstKeyOf(NodeConnector.class);
         if (inventoryNodeConnectorKey != null) {
             return new TpId(inventoryNodeConnectorKey.getId().getValue());
         }
index 96dd36e582958688b83e6d2acb4373beb5caab13..b3dfcb278f095650676477d77422630814772c18 100644 (file)
@@ -10,9 +10,9 @@ package org.opendaylight.openflowplugin.applications.topology.manager;
 import com.google.common.base.Optional;
 import java.util.Collections;
 import java.util.List;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
@@ -21,54 +21,54 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TopologyManagerUtil {
+class TopologyManagerUtil {
 
     private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerUtil.class);
 
     private TopologyManagerUtil() {}
 
-    static void removeAffectedLinks(final NodeId id, final ReadWriteTransaction transaction, InstanceIdentifier<Topology> topology) {
+    static void removeAffectedLinks(final NodeId id, final TransactionChainManager manager, InstanceIdentifier<Topology> topology) {
         Optional<Topology> topologyOptional = Optional.absent();
         try {
-            topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+            topologyOptional = manager.readFromTransaction(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
         } catch (ReadFailedException e) {
             LOG.warn("Error reading topology data for topology {}: {}", topology, e.getMessage());
             LOG.debug("Error reading topology data for topology.. ", e);
         }
         if (topologyOptional.isPresent()) {
-            removeAffectedLinks(id, topologyOptional, transaction, topology);
+            removeAffectedLinks(id, topologyOptional, manager, topology);
         }
     }
 
-    static void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional, ReadWriteTransaction transaction, final InstanceIdentifier<Topology> topology) {
+    private static void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional, TransactionChainManager manager, final InstanceIdentifier<Topology> topology) {
         if (!topologyOptional.isPresent()) {
             return;
         }
 
         List<Link> linkList = topologyOptional.get().getLink() != null ?
-                topologyOptional.get().getLink() : Collections.<Link> emptyList();
+                topologyOptional.get().getLink() : Collections.emptyList();
         for (Link link : linkList) {
             if (id.equals(link.getSource().getSourceNode()) ||
                     id.equals(link.getDestination().getDestNode())) {
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link, topology));
+                manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL, linkPath(link, topology));
             }
         }
     }
 
-    static void removeAffectedLinks(final TpId id, final ReadWriteTransaction transaction, final InstanceIdentifier<Topology> topology) {
+    static void removeAffectedLinks(final TpId id, final TransactionChainManager manager, final InstanceIdentifier<Topology> topology) {
         Optional<Topology> topologyOptional = Optional.absent();
         try {
-            topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+            topologyOptional = manager.readFromTransaction(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
         } catch (ReadFailedException e) {
             LOG.warn("Error reading topology data for topology {}: {}", topology, e.getMessage());
             LOG.debug("Error reading topology data for topology..", e);
         }
         if (topologyOptional.isPresent()) {
-            removeAffectedLinks(id, topologyOptional, transaction, topology);
+            removeAffectedLinks(id, topologyOptional, manager, topology);
         }
     }
 
-    static void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional, ReadWriteTransaction transaction, final InstanceIdentifier<Topology> topology) {
+    private static void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional, TransactionChainManager manager, final InstanceIdentifier<Topology> topology) {
         if (!topologyOptional.isPresent()) {
             return;
         }
@@ -78,7 +78,7 @@ public class TopologyManagerUtil {
         for (Link link : linkList) {
             if (id.equals(link.getSource().getSourceTp()) ||
                     id.equals(link.getDestination().getDestTp())) {
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link, topology));
+                manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL, linkPath(link, topology));
             }
         }
     }
index a49ec9bd958dc284afcdd6f15a8d134f6589ed96..82f3471ad17ba07ee8e574e28c467884729e7d48 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.openflowplugin.applications.topology.manager;
 
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
 
 /**
  * Internal interface for submitted operations. Implementations of this
@@ -17,7 +18,7 @@ interface TopologyOperation {
     /**
      * Execute the operation on top of the transaction.
      *
-     * @param transaction Datastore transaction
+     * @param manager Datastore transaction manager
      */
-    void applyOperation(ReadWriteTransaction transaction);
+    void applyOperation(TransactionChainManager manager);
 }
\ No newline at end of file
diff --git a/applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProviderTest.java b/applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProviderTest.java
deleted file mode 100644 (file)
index 164516e..0000000
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.applications.topology.manager;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ClassToInstanceMap;
-import com.google.common.util.concurrent.Futures;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.runners.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.controller.sal.binding.api.BindingAwareService;
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.NotificationListener;
-
-/**
- * Test for {@link FlowCapableTopologyProvider}.
- */
-@RunWith(MockitoJUnitRunner.class)
-public class FlowCapableTopologyProviderTest {
-
-    private FlowCapableTopologyProvider provider;
-
-    @Mock
-    private BindingAwareBroker.ProviderContext providerContext;
-    @Mock
-    private DataBroker dataBroker;
-    @Mock
-    private NotificationProviderService notificationProviderService;
-    @Mock
-    private BindingAwareProvider bindingAwareProvider;
-    @Mock
-    private ClassToInstanceMap<BindingAwareService> serviceProvider;
-    @Mock
-    private BindingAwareService bindingAwareService;
-    @Mock
-    private Node mockNode;
-    @Mock
-    private ReadOnlyTransaction rTx;
-    @Mock
-    private ReadWriteTransaction wTx;
-
-    @Before
-    public void setUp() throws Exception {
-        when(providerContext.getSALService(Matchers.<Class<? extends BindingAwareService>>any()))
-        .thenAnswer(new Answer<BindingAwareService>() {
-            @Override
-            public BindingAwareService answer(InvocationOnMock invocation) throws Throwable {
-                Object[] arguments = invocation.getArguments();
-                if (arguments != null && arguments.length > 0 && arguments[0] != null) {
-                    if(arguments[0].equals(DataBroker.class)) {
-                        return dataBroker;
-                    } else if(arguments[0].equals(NotificationProviderService.class)){
-                        return notificationProviderService;
-                    }
-                }
-                return null;
-            }
-        });
-
-        doReturn(rTx).when(dataBroker).newReadOnlyTransaction();
-        doReturn(wTx).when(dataBroker).newReadWriteTransaction();
-
-        when(wTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
-
-        OperationProcessor operationProcessor = new OperationProcessor(dataBroker);
-        provider = new FlowCapableTopologyProvider(dataBroker, notificationProviderService, operationProcessor);
-    }
-
-    @Test
-    public void testRun() throws Exception {
-        when(rTx.read(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any()))
-                .thenReturn(Futures.immediateCheckedFuture(Optional.of(mockNode)));
-        provider.start();
-        verify(rTx).read(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any());
-    }
-
-    @Test
-    public void testRunWithoutTopology() throws Exception {
-        when(rTx.read(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any()))
-                .thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
-        provider.start();
-        verify(wTx).submit();
-    }
-
-    @Test
-    public void testClose() throws Exception {
-        when(rTx.read(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any()))
-                .thenReturn(Futures.immediateCheckedFuture(Optional.of(mockNode)));
-
-        final ListenerRegistration<NotificationInterestListener> listenerRegistration = mock(ListenerRegistration.class);
-        doReturn(listenerRegistration).when(notificationProviderService).registerNotificationListener(Matchers.<NotificationListener>any());
-
-        provider.start();
-        provider.close();
-
-        verify(listenerRegistration).close();
-    }
-
-}
\ No newline at end of file
diff --git a/applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessorTest.java b/applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessorTest.java
deleted file mode 100644 (file)
index a1eb979..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowplugin.applications.topology.manager;
-
-import static org.mockito.Mockito.times;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-
-@RunWith(MockitoJUnitRunner.class)
-public class OperationProcessorTest {
-
-
-    OperationProcessor processor;
-
-    @Mock
-    DataBroker dataBroker;
-    @Mock
-    BindingTransactionChain transactionChain;
-    @Mock
-    TransactionChainListener transactionChainListener;
-    @Mock
-    AsyncTransaction asyncTransaction;
-    @Mock
-    Throwable throwable;
-
-    @Before
-    public void setUp() {
-        Mockito.when(dataBroker.createTransactionChain(Matchers.any(OperationProcessor.class)))
-                .thenReturn(transactionChain);
-        processor = new OperationProcessor(dataBroker);
-    }
-
-    @Test
-    public void onTransactionChainFailedTest() {
-        processor.onTransactionChainFailed(transactionChain, asyncTransaction, throwable);
-        Mockito.verify(transactionChain).close();
-        //dataBroker.createTransactionChain is called 2 time
-        // (first time in constructor, second time after old chain has been closed)
-        Mockito.verify(dataBroker, times(2)).createTransactionChain(Matchers.any(OperationProcessor.class));
-    }
-
-    @Test
-    public void closeTest() {
-        processor.close();
-        Mockito.verify(transactionChain).close();
-    }
-
-
-}
index fbebb60f7c53a9dbc1c9e86293eff51ebcc0864f..18f70cb6b5247d582434da09594d27d3db3b3e9c 100644 (file)
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-binding-api</artifactId>
+        </dependency>
 
     <dependency>
       <groupId>junit</groupId>
similarity index 73%
rename from openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java
rename to openflowplugin-common/src/main/java/org/opendaylight/openflowplugin/common/txchain/TransactionChainManager.java
index e06fbd2f2910ed6224ee8cd3119443282e6a3a15..4a2051cfe263319aca4448e8a43dac9ac33ad4dc 100644 (file)
@@ -6,10 +6,10 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.openflowplugin.impl.device;
+package org.opendaylight.openflowplugin.common.txchain;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -24,14 +24,15 @@ import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
@@ -44,7 +45,7 @@ import org.slf4j.LoggerFactory;
  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
  * and submitTransaction method (wrapped {@link WriteTransaction#submit()}).
  */
-class TransactionChainManager implements TransactionChainListener, AutoCloseable {
+public class TransactionChainManager implements TransactionChainListener, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
     private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
@@ -54,9 +55,9 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
     private final String nodeId;
 
     @GuardedBy("txLock")
-    private WriteTransaction writeTx;
+    private ReadWriteTransaction writeTx;
     @GuardedBy("txLock")
-    private BindingTransactionChain txChainFactory;
+    private BindingTransactionChain transactionChain;
     @GuardedBy("txLock")
     private boolean submitIsEnabled;
     @GuardedBy("txLock")
@@ -67,23 +68,23 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
     @GuardedBy("txLock")
     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
 
-    TransactionChainManager(@Nonnull final DataBroker dataBroker,
-                            @Nonnull final DeviceInfo deviceInfo) {
+    public TransactionChainManager(@Nonnull final DataBroker dataBroker,
+                                   @Nonnull final String deviceIdentifier) {
         this.dataBroker = dataBroker;
-        this.nodeId = deviceInfo.toString();
+        this.nodeId = deviceIdentifier;
         this.lastSubmittedFuture = Futures.immediateFuture(null);
     }
 
     @GuardedBy("txLock")
     private void createTxChain() {
-        BindingTransactionChain txChainFactoryTemp = txChainFactory;
-        txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
+        BindingTransactionChain txChainFactoryTemp = transactionChain;
+        transactionChain = dataBroker.createTransactionChain(TransactionChainManager.this);
         Optional.ofNullable(txChainFactoryTemp).ifPresent(TransactionChain::close);
     }
 
-    boolean initialSubmitWriteTransaction() {
+    public boolean initialSubmitWriteTransaction() {
         enableSubmit();
-        return submitWriteTransaction();
+        return submitTransaction();
     }
 
     /**
@@ -91,14 +92,14 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
      * transactions. Call this method for MASTER role only.
      */
-    void activateTransactionManager() {
+    public void activateTransactionManager() {
         if (LOG.isDebugEnabled()) {
             LOG.debug("activateTransactionManager for node {} transaction submit is set to {}",
                     this.nodeId, submitIsEnabled);
         }
         synchronized (txLock) {
             if (TransactionChainManagerStatus.SLEEPING == transactionChainManagerStatus) {
-                Preconditions.checkState(txChainFactory == null,
+                Preconditions.checkState(transactionChain == null,
                         "TxChainFactory survive last close.");
                 Preconditions.checkState(writeTx == null,
                         "We have some unexpected WriteTransaction.");
@@ -116,7 +117,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * Call this method for SLAVE only.
      * @return Future
      */
-    ListenableFuture<Void> deactivateTransactionManager() {
+    public ListenableFuture<Void> deactivateTransactionManager() {
         if (LOG.isDebugEnabled()) {
             LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
         }
@@ -130,12 +131,12 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
                 Futures.addCallback(future, new FutureCallback<Void>() {
                     @Override
                     public void onSuccess(final Void result) {
-                        removeTxChainFactory();
+                        closeTransactionChain();
                     }
 
                     @Override
-                    public void onFailure(final Throwable throwable) {
-                        removeTxChainFactory();
+                    public void onFailure(@Nonnull final Throwable t) {
+                        closeTransactionChain();
                     }
                 });
             } else {
@@ -146,12 +147,16 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         return future;
     }
 
-    private void removeTxChainFactory() {
-        Optional.ofNullable(txChainFactory).ifPresent(TransactionChain::close);
-        txChainFactory = null;
+    private void closeTransactionChain() {
+        if (writeTx != null) {
+            writeTx.cancel();
+            writeTx = null;
+        }
+        Optional.ofNullable(transactionChain).ifPresent(TransactionChain::close);
+        transactionChain = null;
     }
 
-    boolean submitWriteTransaction() {
+    public boolean submitTransaction() {
         synchronized (txLock) {
             if (!submitIsEnabled) {
                 if (LOG.isTraceEnabled()) {
@@ -207,8 +212,8 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         return true;
     }
 
-    <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
-                                                             final InstanceIdentifier<T> path) {
+    public <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
+                                                                    final InstanceIdentifier<T> path){
         synchronized (txLock) {
             ensureTransaction();
             if (writeTx == null) {
@@ -220,10 +225,10 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         }
     }
 
-    <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
-                                                   final InstanceIdentifier<T> path,
-                                                   final T data,
-                                                   final boolean createParents) {
+    public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
+                                                          final InstanceIdentifier<T> path,
+                                                          final T data,
+                                                          final boolean createParents){
         synchronized (txLock) {
             ensureTransaction();
             if (writeTx == null) {
@@ -235,12 +240,43 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         }
     }
 
+    public <T extends DataObject> void mergeToTransaction(final LogicalDatastoreType store,
+                                                          final InstanceIdentifier<T> path,
+                                                          final T data,
+                                                          final boolean createParents){
+        synchronized (txLock) {
+            ensureTransaction();
+            if (writeTx == null) {
+                LOG.debug("WriteTx is null for node {}. Merge data for {} was not realized.", this.nodeId, path);
+                throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
+            }
+
+            writeTx.merge(store, path, data, createParents);
+        }
+    }
+
+    public <T extends DataObject> CheckedFuture<com.google.common.base.Optional<T>, ReadFailedException>
+    readFromTransaction(final LogicalDatastoreType store,
+                        final InstanceIdentifier<T> path){
+        synchronized (txLock) {
+            ensureTransaction();
+            if (writeTx == null) {
+                LOG.debug("WriteTx is null for node {}. Read data for {} was not realized.", this.nodeId, path);
+                throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
+            }
+
+            return writeTx.read(store, path);
+        }
+    }
+
     @Override
     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
         synchronized (txLock) {
-            if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) {
+            if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus &&
+                    chain.equals(this.transactionChain)) {
                 LOG.warn("Transaction chain failed, recreating chain due to ", cause);
+                closeTransactionChain();
                 createTxChain();
                 writeTx = null;
             }
@@ -253,22 +289,21 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
     }
 
     @GuardedBy("txLock")
-    private void ensureTransaction() {
+   private void ensureTransaction() {
         if (writeTx == null && TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
-                && txChainFactory != null) {
-            writeTx = txChainFactory.newWriteOnlyTransaction();
+            && transactionChain != null) {
+                writeTx = transactionChain.newReadWriteTransaction();
         }
     }
 
-    @VisibleForTesting
-    void enableSubmit() {
+    private void enableSubmit() {
         synchronized (txLock) {
-            /* !!!IMPORTANT: never set true without txChainFactory */
-            submitIsEnabled = txChainFactory != null;
+            /* !!!IMPORTANT: never set true without transactionChain */
+            submitIsEnabled = transactionChain != null;
         }
     }
 
-    ListenableFuture<Void> shuttingDown() {
+    public ListenableFuture<Void> shuttingDown() {
         if (LOG.isDebugEnabled()) {
             LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
         }
@@ -284,7 +319,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         submitIsEnabled = false;
         ListenableFuture<Void> future;
 
-        if (!wasSubmitEnabled || txChainFactory == null) {
+        if (!wasSubmitEnabled || transactionChain == null) {
             // stay with actual thread
             future = Futures.immediateCheckedFuture(null);
 
@@ -313,7 +348,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
             LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
         }
         synchronized (txLock) {
-            removeTxChainFactory();
+            closeTransactionChain();
         }
     }
 
index a0dab34d276393443e79c8e604c9fd8788ad53f7..0c786069923ee2642f346d074909d20e06fb119d 100644 (file)
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.compendium</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.openflowplugin</groupId>
+            <artifactId>openflowplugin-common</artifactId>
+        </dependency>
     </dependencies>
 </project>
 
index 0630f8bae4858623b496174c2e3a004ef98b0160..a01f2cb936f6d085458ed1b9a13a88547570d1a2 100644 (file)
@@ -56,6 +56,7 @@ import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRe
 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
@@ -262,7 +263,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
     @Override
     public boolean submitTransaction() {
-        return initialized.get() && transactionChainManager.submitWriteTransaction();
+        return initialized.get() && transactionChainManager.submitTransaction();
     }
 
     @Override
@@ -682,9 +683,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
             }
-            this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
-            this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo
-                    .getNodeInstanceIdentifier());
+            this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
+            this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
+                    deviceInfo.getNodeInstanceIdentifier());
             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
         }
index 1c15675ee23f5860f2ca1a3790810ceddec62dcb..60265c5d53309424931c26020e6e4a9907c877a6 100644 (file)
@@ -38,7 +38,7 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
@@ -61,6 +61,7 @@ import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRe
 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource;
 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
@@ -120,40 +121,40 @@ public class DeviceContextImplTest {
     private static final Long DUMMY_XID = 544L;
     private static final Long DUMMY_PORT_NUMBER = 159L;
     private static final BigInteger DUMMY_DATAPATH_ID = new BigInteger("55");
-    Xid xid;
-    Xid xidMulti;
+    private Xid xid;
+    private Xid xidMulti;
 
-    DeviceContext deviceContext;
+    private DeviceContext deviceContext;
     @Mock
-    RequestContext<GetAsyncReply> requestContext;
+    private RequestContext<GetAsyncReply> requestContext;
     @Mock
-    RequestContext<MultipartReply> requestContextMultiReply;
+    private RequestContext<MultipartReply> requestContextMultiReply;
     @Mock
-    ConnectionContext connectionContext;
+    private ConnectionContext connectionContext;
     @Mock
-    GetFeaturesOutput featuresOutput;
+    private GetFeaturesOutput featuresOutput;
     @Mock
-    DataBroker dataBroker;
+    private DataBroker dataBroker;
     @Mock
-    WriteTransaction writeTx;
+    private ReadWriteTransaction writeTx;
     @Mock
-    ReadOnlyTransaction readTx;
+    private ReadOnlyTransaction readTx;
     @Mock
-    BindingTransactionChain txChainFactory;
+    private BindingTransactionChain txChainFactory;
     @Mock
-    HashedWheelTimer timer;
+    private HashedWheelTimer timer;
     @Mock
-    OutboundQueueProvider outboundQueueProvider;
+    private OutboundQueueProvider outboundQueueProvider;
     @Mock
-    ConnectionAdapter connectionAdapter;
-    NodeId nodeId = new NodeId("h2g2:42");
-    KeyedInstanceIdentifier<Node, NodeKey> nodeKeyIdent = DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
+    private ConnectionAdapter connectionAdapter;
+    private NodeId nodeId = new NodeId("h2g2:42");
+    private KeyedInstanceIdentifier<Node, NodeKey> nodeKeyIdent = DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
     @Mock
-    TranslatorLibrary translatorLibrary;
+    private TranslatorLibrary translatorLibrary;
     @Mock
     MessageTranslator messageTranslatorPacketReceived;
     @Mock
-    MessageTranslator messageTranslatorFlowCapableNodeConnector;
+    private MessageTranslator messageTranslatorFlowCapableNodeConnector;
     @Mock
     private MessageTranslator<Object, Object> messageTranslatorFlowRemoved;
     @Mock
@@ -197,7 +198,7 @@ public class DeviceContextImplTest {
             settableFutureMultiReply.set((RpcResult<MultipartReply>) invocation.getArguments()[0]);
             return null;
         }).when(requestContextMultiReply).setResult(any(RpcResult.class));
-        Mockito.when(txChainFactory.newWriteOnlyTransaction()).thenReturn(writeTx);
+        Mockito.when(txChainFactory.newReadWriteTransaction()).thenReturn(writeTx);
         Mockito.when(dataBroker.newReadOnlyTransaction()).thenReturn(readTx);
         Mockito.when(connectionContext.getOutboundQueueProvider()).thenReturn(outboundQueueProvider);
         Mockito.when(connectionContext.getConnectionAdapter()).thenReturn(connectionAdapter);
@@ -269,7 +270,7 @@ public class DeviceContextImplTest {
         Mockito.when(writeTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
         final InstanceIdentifier<Nodes> dummyII = InstanceIdentifier.create(Nodes.class);
         ((DeviceContextImpl) deviceContext).getTransactionChainManager().activateTransactionManager() ;
-        ((DeviceContextImpl) deviceContext).getTransactionChainManager().enableSubmit();
+        ((DeviceContextImpl) deviceContext).getTransactionChainManager().initialSubmitWriteTransaction();
         deviceContext.addDeleteToTxChain(LogicalDatastoreType.CONFIGURATION, dummyII);
         deviceContext.initialSubmitTransaction();
         verify(writeTx).submit();
@@ -287,7 +288,7 @@ public class DeviceContextImplTest {
     public void testAddDeleteToTxChain() throws Exception {
         final InstanceIdentifier<Nodes> dummyII = InstanceIdentifier.create(Nodes.class);
         ((DeviceContextImpl) deviceContext).getTransactionChainManager().activateTransactionManager() ;
-        ((DeviceContextImpl) deviceContext).getTransactionChainManager().enableSubmit();
+        ((DeviceContextImpl) deviceContext).getTransactionChainManager().initialSubmitWriteTransaction();
         deviceContext.addDeleteToTxChain(LogicalDatastoreType.CONFIGURATION, dummyII);
         verify(writeTx).delete(eq(LogicalDatastoreType.CONFIGURATION), eq(dummyII));
     }
@@ -295,7 +296,7 @@ public class DeviceContextImplTest {
     @Test
     public void testSubmitTransaction() throws Exception {
         ((DeviceContextImpl) deviceContext).getTransactionChainManager().activateTransactionManager() ;
-        ((DeviceContextImpl) deviceContext).getTransactionChainManager().enableSubmit();
+        ((DeviceContextImpl) deviceContext).getTransactionChainManager().initialSubmitWriteTransaction();
         assertTrue(deviceContext.submitTransaction());
     }
 
@@ -489,4 +490,4 @@ public class DeviceContextImplTest {
         deviceContext.closeServiceInstance();
     }
 
-}
\ No newline at end of file
+}
index 1abf811e9f8403d3121dcbcafdd70a016ed8b572..a117d06d1671f73b3542d6f77f8927ffffd46dd2 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.openflowplugin.impl.device;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
-import io.netty.util.HashedWheelTimer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -23,7 +22,7 @@ import org.mockito.runners.MockitoJUnitRunner;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -31,14 +30,13 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 
@@ -50,16 +48,10 @@ public class TransactionChainManagerTest {
     @Mock
     private BindingTransactionChain txChain;
     @Mock
-    private WriteTransaction writeTx;
+    private ReadWriteTransaction writeTx;
     @Mock
     private TransactionChain<?, ?> transactionChain;
     @Mock
-    HashedWheelTimer timer;
-    @Mock
-    Registration registration;
-    @Mock
-    DeviceState deviceState;
-    @Mock
     DeviceInfo deviceInfo;
 
     @Mock
@@ -82,8 +74,8 @@ public class TransactionChainManagerTest {
         nodeKeyIdent = DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
         Mockito.when(deviceInfo.getNodeInstanceIdentifier()).thenReturn(nodeKeyIdent);
         Mockito.when(deviceInfo.getNodeId()).thenReturn(nodeId);
-        txChainManager = new TransactionChainManager(dataBroker, deviceInfo);
-        Mockito.when(txChain.newWriteOnlyTransaction()).thenReturn(writeTx);
+        txChainManager = new TransactionChainManager(dataBroker, nodeId.getValue());
+        Mockito.when(txChain.newReadWriteTransaction()).thenReturn(writeTx);
 
         path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
         Mockito.when(writeTx.submit())
@@ -101,70 +93,54 @@ public class TransactionChainManagerTest {
         final Node data = new NodeBuilder().setId(nodeId).build();
         txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
 
-        Mockito.verify(txChain).newWriteOnlyTransaction();
+        Mockito.verify(txChain).newReadWriteTransaction();
         Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
     }
 
     /**
-     * Test of {@link TransactionChainManager#submitWriteTransaction()}.
+     * test of {@link TransactionChainManager#submitTransaction()}
+     * @throws Exception
      */
     @Test
     public void testSubmitTransaction() throws Exception {
         final Node data = new NodeBuilder().setId(nodeId).build();
         txChainManager.initialSubmitWriteTransaction();
         txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
-        txChainManager.submitWriteTransaction();
+        txChainManager.submitTransaction();
 
-        Mockito.verify(txChain).newWriteOnlyTransaction();
+        Mockito.verify(txChain).newReadWriteTransaction();
         Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
         Mockito.verify(writeTx).submit();
     }
 
     /**
-     * Test of {@link TransactionChainManager#submitWriteTransaction()}: no submit, never enabled.
+     * test of {@link TransactionChainManager#submitTransaction()}: no submit, never enabled
+     * @throws Exception
      */
     @Test
     public void testSubmitTransaction1() throws Exception {
         final Node data = new NodeBuilder().setId(nodeId).build();
         txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
-        txChainManager.submitWriteTransaction();
+        txChainManager.submitTransaction();
 
-        Mockito.verify(txChain).newWriteOnlyTransaction();
+        Mockito.verify(txChain).newReadWriteTransaction();
         Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
         Mockito.verify(writeTx, Mockito.never()).submit();
     }
 
     @Test
     public void testSubmitTransactionFailed() throws Exception {
-        Mockito.when(writeTx.submit())
-                .thenReturn(Futures.immediateFailedCheckedFuture(new TransactionCommitFailedException("mock")));
+        Mockito.when(writeTx.submit()).thenReturn(Futures.<Void, TransactionCommitFailedException>immediateFailedCheckedFuture(new TransactionCommitFailedException("mock")));
         final Node data = new NodeBuilder().setId(nodeId).build();
         txChainManager.initialSubmitWriteTransaction();
         txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
-        txChainManager.submitWriteTransaction();
+        txChainManager.submitTransaction();
 
-        Mockito.verify(txChain).newWriteOnlyTransaction();
+        Mockito.verify(txChain).newReadWriteTransaction();
         Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
         Mockito.verify(writeTx).submit();
     }
 
-    @Test
-    public void testSubmitTransactionFailed2() throws Exception {
-        final Node data = new NodeBuilder().setId(nodeId).build();
-        txChainManager.initialSubmitWriteTransaction();
-        txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
-        txChainManager.submitWriteTransaction();
-
-        Mockito.when(writeTx.submit())
-                .thenReturn(Futures.immediateFailedCheckedFuture(new TransactionCommitFailedException("mock")));
-        txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
-        txChainManager.submitWriteTransaction();
-
-        Mockito.verify(txChain, Mockito.times(2)).newWriteOnlyTransaction();
-        Mockito.verify(writeTx, Mockito.times(2)).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
-        Mockito.verify(writeTx, Mockito.times(2)).submit();
-    }
-
     /**
      * Test of {@link TransactionChainManager#enableSubmit()}: no submit - counter is not active.
      */
@@ -174,15 +150,14 @@ public class TransactionChainManagerTest {
         txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
         txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
 
-        Mockito.verify(txChain).newWriteOnlyTransaction();
+        Mockito.verify(txChain).newReadWriteTransaction();
         Mockito.verify(writeTx, Mockito.times(2)).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
         Mockito.verify(writeTx, Mockito.never()).submit();
     }
 
     @Test
     public void testOnTransactionChainFailed() throws Exception {
-        txChainManager.onTransactionChainFailed(transactionChain,
-                Mockito.mock(AsyncTransaction.class), Mockito.mock(Throwable.class));
+        txChainManager.onTransactionChainFailed(txChain, Mockito.mock(AsyncTransaction.class), Mockito.mock(Throwable.class));
         Mockito.verify(txChain).close();
         Mockito.verify(dataBroker, Mockito.times(2)).createTransactionChain(txChainManager);
     }
@@ -198,7 +173,7 @@ public class TransactionChainManagerTest {
     public void testAddDeleteOperationTotTxChain() throws Exception {
         txChainManager.addDeleteOperationTotTxChain(LogicalDatastoreType.CONFIGURATION, path);
 
-        Mockito.verify(txChain).newWriteOnlyTransaction();
+        Mockito.verify(txChain).newReadWriteTransaction();
         Mockito.verify(writeTx).delete(LogicalDatastoreType.CONFIGURATION, path);
     }
 
@@ -219,7 +194,7 @@ public class TransactionChainManagerTest {
 
         txChainManager.deactivateTransactionManager();
 
-        Mockito.verify(txChain).newWriteOnlyTransaction();
+        Mockito.verify(txChain).newReadWriteTransaction();
         Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
         Mockito.verify(writeTx, Mockito.never()).submit();
         Mockito.verify(writeTx).cancel();
@@ -229,11 +204,11 @@ public class TransactionChainManagerTest {
     @Test
     public void testShuttingDown() throws Exception {
         final Node data = new NodeBuilder().setId(nodeId).build();
+        txChainManager.initialSubmitWriteTransaction();
         txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
-        txChainManager.enableSubmit();
         txChainManager.shuttingDown();
 
-        Mockito.verify(txChain).newWriteOnlyTransaction();
+        Mockito.verify(txChain).newReadWriteTransaction();
         Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
         Mockito.verify(writeTx).submit();
     }