From 4016b1ab46a0df2c1e46f2907bdc27f273988d92 Mon Sep 17 00:00:00 2001 From: Jozef Bacigal Date: Tue, 5 Sep 2017 11:27:57 +0200 Subject: [PATCH] Fix transaction manager closing. - the event onTransactionFailed we properly closing chain and creating a new chain - changed writeOnlyTransaction to readWriteTransaction - moved transaction manager to the common module - topology manager using transaction manager instead of creating a transaction chain for itself - added control of unfinished transaction on close See also: Bug-9038 Change-Id: Idadbb4ed0f4c61e7f80da5e2dbedbd80dece118e Signed-off-by: Jozef Bacigal --- .../manager/DataTreeChangeListenerImpl.java | 35 ++--- .../manager/FlowCapableTopologyExporter.java | 54 ++------ .../manager/FlowCapableTopologyProvider.java | 34 ++--- .../manager/NodeChangeListenerImpl.java | 12 +- .../topology/manager/OperationProcessor.java | 63 ++------- .../TerminationPointChangeListenerImpl.java | 45 +++---- .../topology/manager/TopologyManagerUtil.java | 26 ++-- .../topology/manager/TopologyOperation.java | 5 +- .../FlowCapableTopologyProviderTest.java | 126 ------------------ .../manager/OperationProcessorTest.java | 64 --------- openflowplugin-common/pom.xml | 4 + .../txchain}/TransactionChainManager.java | 115 ++++++++++------ openflowplugin-impl/pom.xml | 4 + .../impl/device/DeviceContextImpl.java | 9 +- .../impl/device/DeviceContextImplTest.java | 49 +++---- .../device/TransactionChainManagerTest.java | 71 ++++------ 16 files changed, 228 insertions(+), 488 deletions(-) delete mode 100644 applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProviderTest.java delete mode 100644 applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessorTest.java rename {openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device => openflowplugin-common/src/main/java/org/opendaylight/openflowplugin/common/txchain}/TransactionChainManager.java (73%) diff --git a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/DataTreeChangeListenerImpl.java b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/DataTreeChangeListenerImpl.java index b295bcb849..362857368d 100644 --- a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/DataTreeChangeListenerImpl.java +++ b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/DataTreeChangeListenerImpl.java @@ -1,4 +1,4 @@ -/** +/* * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the @@ -7,11 +7,9 @@ */ package org.opendaylight.openflowplugin.applications.topology.manager; -import java.util.concurrent.Callable; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener; import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; @@ -38,23 +36,19 @@ public abstract class DataTreeChangeListenerImpl implement /** * instance identifier to Node in network topology model (yangtools) */ - protected static final InstanceIdentifier II_TO_TOPOLOGY = + static final InstanceIdentifier II_TO_TOPOLOGY = InstanceIdentifier .create(NetworkTopology.class) .child(Topology.class, new TopologyKey(new TopologyId(FlowCapableTopologyProvider.TOPOLOGY_ID))); - public DataTreeChangeListenerImpl(final OperationProcessor operationProcessor, - final DataBroker dataBroker, - final InstanceIdentifier ii) { + DataTreeChangeListenerImpl(final OperationProcessor operationProcessor, + final DataBroker dataBroker, + final InstanceIdentifier ii) { final DataTreeIdentifier identifier = new DataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, ii); final SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES); try { - listenerRegistration = looper.loopUntilNoException(new Callable>() { - @Override - public ListenerRegistration call() throws Exception { - return dataBroker.registerDataTreeChangeListener(identifier, DataTreeChangeListenerImpl.this); - } - }); + listenerRegistration = looper.loopUntilNoException(() -> + dataBroker.registerDataTreeChangeListener(identifier, DataTreeChangeListenerImpl.this)); } catch (Exception e) { LOG.error("Data listener registration failed!"); throw new IllegalStateException("TopologyManager startup fail! TM bundle needs restart.", e); @@ -67,16 +61,11 @@ public abstract class DataTreeChangeListenerImpl implement listenerRegistration.close(); } - protected void sendToTransactionChain(final T node, final InstanceIdentifier iiToTopologyNode) { - operationProcessor.enqueueOperation(new TopologyOperation() { - @Override - public void applyOperation(ReadWriteTransaction transaction) { - transaction.merge(LogicalDatastoreType.OPERATIONAL, iiToTopologyNode, node, true); - } - }); + void sendToTransactionChain(final T node, final InstanceIdentifier iiToTopologyNode) { + operationProcessor.enqueueOperation(manager -> manager.mergeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTopologyNode, node, true)); } - protected InstanceIdentifier provideIIToTopologyNode( + InstanceIdentifier provideIIToTopologyNode( final NodeId nodeIdInTopology) { org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey nodeKeyInTopology = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey( nodeIdInTopology); @@ -86,8 +75,8 @@ public abstract class DataTreeChangeListenerImpl implement nodeKeyInTopology).build(); } - protected NodeId provideTopologyNodeId(InstanceIdentifier iiToNodeInInventory) { - final NodeKey inventoryNodeKey = iiToNodeInInventory.firstKeyOf(Node.class, NodeKey.class); + NodeId provideTopologyNodeId(InstanceIdentifier iiToNodeInInventory) { + final NodeKey inventoryNodeKey = iiToNodeInInventory.firstKeyOf(Node.class); if (inventoryNodeKey != null) { return new NodeId(inventoryNodeKey.getId().getValue()); } diff --git a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyExporter.java b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyExporter.java index cb7d973c18..ce0f872fd1 100644 --- a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyExporter.java +++ b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyExporter.java @@ -7,33 +7,20 @@ */ package org.opendaylight.openflowplugin.applications.topology.manager; -import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.getNodeConnectorKey; -import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.getNodeKey; -import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.toTerminationPointId; import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.toTopologyLink; -import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.toTopologyNodeId; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscovered; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkOverutilized; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkRemoved; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkUtilizationNormal; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +28,7 @@ import org.slf4j.LoggerFactory; class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener { private static final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class); - protected final InstanceIdentifier iiToTopology; + private final InstanceIdentifier iiToTopology; private final OperationProcessor processor; FlowCapableTopologyExporter(final OperationProcessor processor, @@ -54,10 +41,10 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener { public void onLinkDiscovered(final LinkDiscovered notification) { processor.enqueueOperation(new TopologyOperation() { @Override - public void applyOperation(final ReadWriteTransaction transaction) { + public void applyOperation(final TransactionChainManager manager) { final Link link = toTopologyLink(notification); final InstanceIdentifier path = TopologyManagerUtil.linkPath(link, iiToTopology); - transaction.merge(LogicalDatastoreType.OPERATIONAL, path, link, true); + manager.mergeToTransaction(LogicalDatastoreType.OPERATIONAL, path, link, true); } @Override @@ -76,18 +63,19 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener { public void onLinkRemoved(final LinkRemoved notification) { processor.enqueueOperation(new TopologyOperation() { @Override - public void applyOperation(final ReadWriteTransaction transaction) { + public void applyOperation(final TransactionChainManager manager) { Optional linkOptional = Optional.absent(); try { // read that checks if link exists (if we do not do this we might get an exception on delete) - linkOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, + linkOptional = manager.readFromTransaction(LogicalDatastoreType.OPERATIONAL, TopologyManagerUtil.linkPath(toTopologyLink(notification), iiToTopology)).checkedGet(); } catch (ReadFailedException e) { - LOG.warn("Error occured when trying to read Link: {}", e.getMessage()); - LOG.debug("Error occured when trying to read Link.. ", e); + LOG.warn("Error occurred when trying to read Link: {}", e.getMessage()); + LOG.debug("Error occurred when trying to read Link.. ", e); } if (linkOptional.isPresent()) { - transaction.delete(LogicalDatastoreType.OPERATIONAL, TopologyManagerUtil.linkPath(toTopologyLink(notification), iiToTopology)); + manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL, + TopologyManagerUtil.linkPath(toTopologyLink(notification), iiToTopology)); } } @@ -103,26 +91,4 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener { // NOOP } - private InstanceIdentifier toNodeIdentifier(final NodeRef ref) { - org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey invNodeKey = getNodeKey(ref); - NodeKey nodeKey = new NodeKey(toTopologyNodeId(invNodeKey.getId())); - return iiToTopology.child(Node.class, nodeKey); - } - - private InstanceIdentifier toTerminationPointIdentifier(final NodeConnectorRef ref) { - org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey invNodeKey = getNodeKey(ref); - NodeConnectorKey invNodeConnectorKey = getNodeConnectorKey(ref); - return tpPath(toTopologyNodeId(invNodeKey.getId()), toTerminationPointId(invNodeConnectorKey.getId())); - } - - private InstanceIdentifier getNodePath(final NodeId nodeId) { - return iiToTopology.child(Node.class, new NodeKey(nodeId)); - } - - private InstanceIdentifier tpPath(final NodeId nodeId, final TpId tpId) { - NodeKey nodeKey = new NodeKey(nodeId); - TerminationPointKey tpKey = new TerminationPointKey(tpId); - return iiToTopology.child(Node.class, nodeKey).child(TerminationPoint.class, tpKey); - } - } diff --git a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProvider.java b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProvider.java index f49d0f52c6..83942007b8 100644 --- a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProvider.java +++ b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProvider.java @@ -8,13 +8,11 @@ package org.opendaylight.openflowplugin.applications.topology.manager; import com.google.common.base.Optional; -import java.util.concurrent.ExecutionException; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.ReadTransaction; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; @@ -28,11 +26,14 @@ import org.slf4j.LoggerFactory; public class FlowCapableTopologyProvider implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyProvider.class); + private static final String TOPOLOGY_PROVIDER = "topology-provider"; static final String TOPOLOGY_ID = "flow:1"; + private final DataBroker dataBroker; private final NotificationProviderService notificationService; private final OperationProcessor processor; + private TransactionChainManager transactionChainManager; private ListenerRegistration listenerRegistration; public FlowCapableTopologyProvider(DataBroker dataBroker, NotificationProviderService notificationService, @@ -53,15 +54,17 @@ public class FlowCapableTopologyProvider implements AutoCloseable { final FlowCapableTopologyExporter listener = new FlowCapableTopologyExporter(processor, path); this.listenerRegistration = notificationService.registerNotificationListener(listener); + this.transactionChainManager = new TransactionChainManager(dataBroker, TOPOLOGY_PROVIDER); + this.transactionChainManager.activateTransactionManager(); + this.transactionChainManager.initialSubmitWriteTransaction(); - if(!isFlowTopologyExist(dataBroker, path)){ - final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction(); - tx.put(LogicalDatastoreType.OPERATIONAL, path, new TopologyBuilder().setKey(key).build(), true); - try { - tx.submit().get(); - } catch (InterruptedException | ExecutionException e) { - LOG.warn("Initial topology export failed, continuing anyway", e); - } + if(!isFlowTopologyExist(path)){ + transactionChainManager.writeToTransaction( + LogicalDatastoreType.OPERATIONAL, + path, + new TopologyBuilder().setKey(key).build(), + true); + transactionChainManager.submitTransaction(); } LOG.info("FlowCapableTopologyProvider started"); @@ -70,6 +73,7 @@ public class FlowCapableTopologyProvider implements AutoCloseable { @Override public void close() { LOG.info("FlowCapableTopologyProvider stopped."); + this.transactionChainManager.close(); if (this.listenerRegistration != null) { try { this.listenerRegistration.close(); @@ -81,11 +85,11 @@ public class FlowCapableTopologyProvider implements AutoCloseable { } } - private boolean isFlowTopologyExist(final DataBroker dataBroker, - final InstanceIdentifier path) { - final ReadTransaction tx = dataBroker.newReadOnlyTransaction(); + private boolean isFlowTopologyExist(final InstanceIdentifier path) { try { - Optional ofTopology = tx.read(LogicalDatastoreType.OPERATIONAL, path).checkedGet(); + Optional ofTopology = this.transactionChainManager + .readFromTransaction(LogicalDatastoreType.OPERATIONAL, path) + .checkedGet(); LOG.debug("OpenFlow topology exist in the operational data store at {}",path); if(ofTopology.isPresent()){ return true; diff --git a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/NodeChangeListenerImpl.java b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/NodeChangeListenerImpl.java index e9f2683d0e..a2809dfc69 100644 --- a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/NodeChangeListenerImpl.java +++ b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/NodeChangeListenerImpl.java @@ -1,4 +1,4 @@ -/** +/* * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the @@ -11,7 +11,6 @@ import java.util.Collection; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataTreeModification; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; @@ -59,12 +58,9 @@ public class NodeChangeListenerImpl extends DataTreeChangeListenerImpl iiToTopologyRemovedNode = provideIIToTopologyNode(nodeId); if (iiToTopologyRemovedNode != null) { - operationProcessor.enqueueOperation(new TopologyOperation() { - @Override - public void applyOperation(final ReadWriteTransaction transaction) { - transaction.delete(LogicalDatastoreType.OPERATIONAL, iiToTopologyRemovedNode); - TopologyManagerUtil.removeAffectedLinks(nodeId, transaction, II_TO_TOPOLOGY); - } + operationProcessor.enqueueOperation(manager -> { + manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL, iiToTopologyRemovedNode); + TopologyManagerUtil.removeAffectedLinks(nodeId, manager, II_TO_TOPOLOGY); }); } else { LOG.debug("Instance identifier to inventory wasn't translated to topology while deleting node."); diff --git a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessor.java b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessor.java index bbfae52b84..4d9b5019f9 100644 --- a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessor.java +++ b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessor.java @@ -7,33 +7,28 @@ */ package org.opendaylight.openflowplugin.applications.topology.manager; -import com.google.common.base.Preconditions; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener { +public final class OperationProcessor implements AutoCloseable, Runnable { private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class); private static final int MAX_TRANSACTION_OPERATIONS = 100; private static final int OPERATION_QUEUE_DEPTH = 500; + private static final String TOPOLOGY_MANAGER = "topology-manager"; private final BlockingQueue queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH); - private final DataBroker dataBroker; private final Thread thread; - private BindingTransactionChain transactionChain; + private TransactionChainManager transactionChainManager; private volatile boolean finishing = false; public OperationProcessor(final DataBroker dataBroker) { - this.dataBroker = Preconditions.checkNotNull(dataBroker); - transactionChain = this.dataBroker.createTransactionChain(this); + transactionChainManager = new TransactionChainManager(dataBroker, TOPOLOGY_MANAGER); + transactionChainManager.activateTransactionManager(); + transactionChainManager.initialSubmitWriteTransaction(); thread = new Thread(this); thread.setDaemon(true); @@ -60,11 +55,9 @@ public final class OperationProcessor implements AutoCloseable, Runnable, Transa LOG.debug("New {} operation available, starting transaction", op); - final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction(); - int ops = 0; do { - op.applyOperation(tx); + op.applyOperation(transactionChainManager); ops++; if (ops < MAX_TRANSACTION_OPERATIONS) { @@ -77,55 +70,25 @@ public final class OperationProcessor implements AutoCloseable, Runnable, Transa } while (op != null); LOG.debug("Processed {} operations, submitting transaction", ops); - submitTransaction(tx); - } catch (final IllegalStateException e) { - LOG.warn("Stat DataStoreOperation unexpected State!", e); - transactionChain.close(); - transactionChain = dataBroker.createTransactionChain(this); - cleanDataStoreOperQueue(); + if (!transactionChainManager.submitTransaction()) { + cleanDataStoreOperQueue(); + } } catch (final InterruptedException e) { // This should mean we're shutting down. LOG.debug("Stat Manager DS Operation thread interrupted!", e); finishing = true; - } catch (final Exception e) { - LOG.warn("Stat DataStore Operation executor fail!", e); } } // Drain all events, making sure any blocked threads are unblocked cleanDataStoreOperQueue(); } - private void submitTransaction(ReadWriteTransaction tx) { - try { - tx.submit().checkedGet(); - } catch (final TransactionCommitFailedException e) { - LOG.warn("Stat DataStoreOperation unexpected State!", e); - transactionChain.close(); - transactionChain = dataBroker.createTransactionChain(this); - cleanDataStoreOperQueue(); - } - } - private void cleanDataStoreOperQueue() { while (!queue.isEmpty()) { queue.poll(); } } - @Override - public void onTransactionChainFailed(TransactionChain chain, AsyncTransaction transaction, Throwable cause) { - LOG.warn("Failed to export Topology manager operations, Transaction {} failed: {}", transaction.getIdentifier(), cause.getMessage()); - LOG.debug("Failed to export Topology manager operations.. ", cause); - transactionChain.close(); - transactionChain = dataBroker.createTransactionChain(this); - cleanDataStoreOperQueue(); - } - - @Override - public void onTransactionChainSuccessful(TransactionChain chain) { - //NOOP - } - @Override public void close() { thread.interrupt(); @@ -135,9 +98,7 @@ public final class OperationProcessor implements AutoCloseable, Runnable, Transa LOG.debug("Join of thread {} was interrupted", thread.getName(), e); } - if (transactionChain != null) { - transactionChain.close(); - } + transactionChainManager.close(); LOG.debug("OperationProcessor closed"); } diff --git a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TerminationPointChangeListenerImpl.java b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TerminationPointChangeListenerImpl.java index c2c129b125..a572b439e2 100644 --- a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TerminationPointChangeListenerImpl.java +++ b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TerminationPointChangeListenerImpl.java @@ -1,4 +1,4 @@ -/** +/* * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the @@ -7,12 +7,11 @@ */ package org.opendaylight.openflowplugin.applications.topology.manager; -import com.google.common.base.Optional; import java.util.Collection; +import java.util.Optional; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataTreeModification; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; @@ -69,20 +68,18 @@ public class TerminationPointChangeListenerImpl extends DataTreeChangeListenerIm if (iiToTopologyTerminationPoint != null) { final InstanceIdentifier node = iiToTopologyTerminationPoint.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class); - operationProcessor.enqueueOperation(new TopologyOperation() { - @Override - public void applyOperation(final ReadWriteTransaction transaction) { - Optional nodeOptional = Optional.absent(); - try { - nodeOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, node).checkedGet(); - } catch (ReadFailedException e) { - LOG.warn("Error occured when trying to read NodeConnector: {}", e.getMessage()); - LOG.debug("Error occured when trying to read NodeConnector.. ", e); - } - if (nodeOptional.isPresent()) { - TopologyManagerUtil.removeAffectedLinks(terminationPointId, transaction, II_TO_TOPOLOGY); - transaction.delete(LogicalDatastoreType.OPERATIONAL, iiToTopologyTerminationPoint); - } + operationProcessor.enqueueOperation(manager -> { + Optional nodeOptional = Optional.empty(); + try { + nodeOptional = Optional.ofNullable( + manager.readFromTransaction(LogicalDatastoreType.OPERATIONAL, node).checkedGet().orNull()); + } catch (ReadFailedException e) { + LOG.warn("Error occurred when trying to read NodeConnector: {}", e.getMessage()); + LOG.debug("Error occurred when trying to read NodeConnector.. ", e); + } + if (nodeOptional.isPresent()) { + TopologyManagerUtil.removeAffectedLinks(terminationPointId, manager, II_TO_TOPOLOGY); + manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL, iiToTopologyTerminationPoint); } }); } else { @@ -109,13 +106,10 @@ public class TerminationPointChangeListenerImpl extends DataTreeChangeListenerIm } private void removeLinks(final FlowCapableNodeConnector flowCapNodeConnector, final TerminationPoint point) { - operationProcessor.enqueueOperation(new TopologyOperation() { - @Override - public void applyOperation(final ReadWriteTransaction transaction) { - if ((flowCapNodeConnector.getState() != null && flowCapNodeConnector.getState().isLinkDown()) - || (flowCapNodeConnector.getConfiguration() != null && flowCapNodeConnector.getConfiguration().isPORTDOWN())) { - TopologyManagerUtil.removeAffectedLinks(point.getTpId(), transaction, II_TO_TOPOLOGY); - } + operationProcessor.enqueueOperation(manager -> { + if ((flowCapNodeConnector.getState() != null && flowCapNodeConnector.getState().isLinkDown()) + || (flowCapNodeConnector.getConfiguration() != null && flowCapNodeConnector.getConfiguration().isPORTDOWN())) { + TopologyManagerUtil.removeAffectedLinks(point.getTpId(), manager, II_TO_TOPOLOGY); } }); } @@ -143,8 +137,7 @@ public class TerminationPointChangeListenerImpl extends DataTreeChangeListenerIm } private static TpId provideTopologyTerminationPointId(final InstanceIdentifier iiToNodeInInventory) { - NodeConnectorKey inventoryNodeConnectorKey = iiToNodeInInventory.firstKeyOf(NodeConnector.class, - NodeConnectorKey.class); + NodeConnectorKey inventoryNodeConnectorKey = iiToNodeInInventory.firstKeyOf(NodeConnector.class); if (inventoryNodeConnectorKey != null) { return new TpId(inventoryNodeConnectorKey.getId().getValue()); } diff --git a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TopologyManagerUtil.java b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TopologyManagerUtil.java index 96dd36e582..b3dfcb278f 100644 --- a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TopologyManagerUtil.java +++ b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TopologyManagerUtil.java @@ -10,9 +10,9 @@ package org.opendaylight.openflowplugin.applications.topology.manager; import com.google.common.base.Optional; import java.util.Collections; import java.util.List; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; @@ -21,54 +21,54 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TopologyManagerUtil { +class TopologyManagerUtil { private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerUtil.class); private TopologyManagerUtil() {} - static void removeAffectedLinks(final NodeId id, final ReadWriteTransaction transaction, InstanceIdentifier topology) { + static void removeAffectedLinks(final NodeId id, final TransactionChainManager manager, InstanceIdentifier topology) { Optional topologyOptional = Optional.absent(); try { - topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet(); + topologyOptional = manager.readFromTransaction(LogicalDatastoreType.OPERATIONAL, topology).checkedGet(); } catch (ReadFailedException e) { LOG.warn("Error reading topology data for topology {}: {}", topology, e.getMessage()); LOG.debug("Error reading topology data for topology.. ", e); } if (topologyOptional.isPresent()) { - removeAffectedLinks(id, topologyOptional, transaction, topology); + removeAffectedLinks(id, topologyOptional, manager, topology); } } - static void removeAffectedLinks(final NodeId id, Optional topologyOptional, ReadWriteTransaction transaction, final InstanceIdentifier topology) { + private static void removeAffectedLinks(final NodeId id, Optional topologyOptional, TransactionChainManager manager, final InstanceIdentifier topology) { if (!topologyOptional.isPresent()) { return; } List linkList = topologyOptional.get().getLink() != null ? - topologyOptional.get().getLink() : Collections. emptyList(); + topologyOptional.get().getLink() : Collections.emptyList(); for (Link link : linkList) { if (id.equals(link.getSource().getSourceNode()) || id.equals(link.getDestination().getDestNode())) { - transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link, topology)); + manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL, linkPath(link, topology)); } } } - static void removeAffectedLinks(final TpId id, final ReadWriteTransaction transaction, final InstanceIdentifier topology) { + static void removeAffectedLinks(final TpId id, final TransactionChainManager manager, final InstanceIdentifier topology) { Optional topologyOptional = Optional.absent(); try { - topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet(); + topologyOptional = manager.readFromTransaction(LogicalDatastoreType.OPERATIONAL, topology).checkedGet(); } catch (ReadFailedException e) { LOG.warn("Error reading topology data for topology {}: {}", topology, e.getMessage()); LOG.debug("Error reading topology data for topology..", e); } if (topologyOptional.isPresent()) { - removeAffectedLinks(id, topologyOptional, transaction, topology); + removeAffectedLinks(id, topologyOptional, manager, topology); } } - static void removeAffectedLinks(final TpId id, Optional topologyOptional, ReadWriteTransaction transaction, final InstanceIdentifier topology) { + private static void removeAffectedLinks(final TpId id, Optional topologyOptional, TransactionChainManager manager, final InstanceIdentifier topology) { if (!topologyOptional.isPresent()) { return; } @@ -78,7 +78,7 @@ public class TopologyManagerUtil { for (Link link : linkList) { if (id.equals(link.getSource().getSourceTp()) || id.equals(link.getDestination().getDestTp())) { - transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link, topology)); + manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL, linkPath(link, topology)); } } } diff --git a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TopologyOperation.java b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TopologyOperation.java index a49ec9bd95..82f3471ad1 100644 --- a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TopologyOperation.java +++ b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/TopologyOperation.java @@ -8,6 +8,7 @@ package org.opendaylight.openflowplugin.applications.topology.manager; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; +import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager; /** * Internal interface for submitted operations. Implementations of this @@ -17,7 +18,7 @@ interface TopologyOperation { /** * Execute the operation on top of the transaction. * - * @param transaction Datastore transaction + * @param manager Datastore transaction manager */ - void applyOperation(ReadWriteTransaction transaction); + void applyOperation(TransactionChainManager manager); } \ No newline at end of file diff --git a/applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProviderTest.java b/applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProviderTest.java deleted file mode 100644 index 164516e272..0000000000 --- a/applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProviderTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.openflowplugin.applications.topology.manager; - -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.common.base.Optional; -import com.google.common.collect.ClassToInstanceMap; -import com.google.common.util.concurrent.Futures; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Matchers; -import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.runners.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; -import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; -import org.opendaylight.controller.sal.binding.api.BindingAwareService; -import org.opendaylight.controller.sal.binding.api.NotificationProviderService; -import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.binding.DataObject; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.binding.NotificationListener; - -/** - * Test for {@link FlowCapableTopologyProvider}. - */ -@RunWith(MockitoJUnitRunner.class) -public class FlowCapableTopologyProviderTest { - - private FlowCapableTopologyProvider provider; - - @Mock - private BindingAwareBroker.ProviderContext providerContext; - @Mock - private DataBroker dataBroker; - @Mock - private NotificationProviderService notificationProviderService; - @Mock - private BindingAwareProvider bindingAwareProvider; - @Mock - private ClassToInstanceMap serviceProvider; - @Mock - private BindingAwareService bindingAwareService; - @Mock - private Node mockNode; - @Mock - private ReadOnlyTransaction rTx; - @Mock - private ReadWriteTransaction wTx; - - @Before - public void setUp() throws Exception { - when(providerContext.getSALService(Matchers.>any())) - .thenAnswer(new Answer() { - @Override - public BindingAwareService answer(InvocationOnMock invocation) throws Throwable { - Object[] arguments = invocation.getArguments(); - if (arguments != null && arguments.length > 0 && arguments[0] != null) { - if(arguments[0].equals(DataBroker.class)) { - return dataBroker; - } else if(arguments[0].equals(NotificationProviderService.class)){ - return notificationProviderService; - } - } - return null; - } - }); - - doReturn(rTx).when(dataBroker).newReadOnlyTransaction(); - doReturn(wTx).when(dataBroker).newReadWriteTransaction(); - - when(wTx.submit()).thenReturn(Futures.immediateCheckedFuture(null)); - - OperationProcessor operationProcessor = new OperationProcessor(dataBroker); - provider = new FlowCapableTopologyProvider(dataBroker, notificationProviderService, operationProcessor); - } - - @Test - public void testRun() throws Exception { - when(rTx.read(Matchers.any(), Matchers.>any())) - .thenReturn(Futures.immediateCheckedFuture(Optional.of(mockNode))); - provider.start(); - verify(rTx).read(Matchers.any(), Matchers.>any()); - } - - @Test - public void testRunWithoutTopology() throws Exception { - when(rTx.read(Matchers.any(), Matchers.>any())) - .thenReturn(Futures.immediateCheckedFuture(Optional.absent())); - provider.start(); - verify(wTx).submit(); - } - - @Test - public void testClose() throws Exception { - when(rTx.read(Matchers.any(), Matchers.>any())) - .thenReturn(Futures.immediateCheckedFuture(Optional.of(mockNode))); - - final ListenerRegistration listenerRegistration = mock(ListenerRegistration.class); - doReturn(listenerRegistration).when(notificationProviderService).registerNotificationListener(Matchers.any()); - - provider.start(); - provider.close(); - - verify(listenerRegistration).close(); - } - -} \ No newline at end of file diff --git a/applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessorTest.java b/applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessorTest.java deleted file mode 100644 index a1eb97979b..0000000000 --- a/applications/topology-manager/src/test/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessorTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.openflowplugin.applications.topology.manager; - -import static org.mockito.Mockito.times; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Matchers; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.runners.MockitoJUnitRunner; -import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; - -@RunWith(MockitoJUnitRunner.class) -public class OperationProcessorTest { - - - OperationProcessor processor; - - @Mock - DataBroker dataBroker; - @Mock - BindingTransactionChain transactionChain; - @Mock - TransactionChainListener transactionChainListener; - @Mock - AsyncTransaction asyncTransaction; - @Mock - Throwable throwable; - - @Before - public void setUp() { - Mockito.when(dataBroker.createTransactionChain(Matchers.any(OperationProcessor.class))) - .thenReturn(transactionChain); - processor = new OperationProcessor(dataBroker); - } - - @Test - public void onTransactionChainFailedTest() { - processor.onTransactionChainFailed(transactionChain, asyncTransaction, throwable); - Mockito.verify(transactionChain).close(); - //dataBroker.createTransactionChain is called 2 time - // (first time in constructor, second time after old chain has been closed) - Mockito.verify(dataBroker, times(2)).createTransactionChain(Matchers.any(OperationProcessor.class)); - } - - @Test - public void closeTest() { - processor.close(); - Mockito.verify(transactionChain).close(); - } - - -} diff --git a/openflowplugin-common/pom.xml b/openflowplugin-common/pom.xml index fbebb60f7c..18f70cb6b5 100644 --- a/openflowplugin-common/pom.xml +++ b/openflowplugin-common/pom.xml @@ -36,6 +36,10 @@ org.slf4j slf4j-api + + org.opendaylight.controller + sal-binding-api + junit diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java b/openflowplugin-common/src/main/java/org/opendaylight/openflowplugin/common/txchain/TransactionChainManager.java similarity index 73% rename from openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java rename to openflowplugin-common/src/main/java/org/opendaylight/openflowplugin/common/txchain/TransactionChainManager.java index e06fbd2f29..4a2051cfe2 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java +++ b/openflowplugin-common/src/main/java/org/opendaylight/openflowplugin/common/txchain/TransactionChainManager.java @@ -6,10 +6,10 @@ * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.openflowplugin.impl.device; +package org.opendaylight.openflowplugin.common.txchain; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -24,14 +24,15 @@ import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; @@ -44,7 +45,7 @@ import org.slf4j.LoggerFactory; * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)}) * and submitTransaction method (wrapped {@link WriteTransaction#submit()}). */ -class TransactionChainManager implements TransactionChainListener, AutoCloseable { +public class TransactionChainManager implements TransactionChainListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class); private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction."; @@ -54,9 +55,9 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable private final String nodeId; @GuardedBy("txLock") - private WriteTransaction writeTx; + private ReadWriteTransaction writeTx; @GuardedBy("txLock") - private BindingTransactionChain txChainFactory; + private BindingTransactionChain transactionChain; @GuardedBy("txLock") private boolean submitIsEnabled; @GuardedBy("txLock") @@ -67,23 +68,23 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable @GuardedBy("txLock") private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING; - TransactionChainManager(@Nonnull final DataBroker dataBroker, - @Nonnull final DeviceInfo deviceInfo) { + public TransactionChainManager(@Nonnull final DataBroker dataBroker, + @Nonnull final String deviceIdentifier) { this.dataBroker = dataBroker; - this.nodeId = deviceInfo.toString(); + this.nodeId = deviceIdentifier; this.lastSubmittedFuture = Futures.immediateFuture(null); } @GuardedBy("txLock") private void createTxChain() { - BindingTransactionChain txChainFactoryTemp = txChainFactory; - txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this); + BindingTransactionChain txChainFactoryTemp = transactionChain; + transactionChain = dataBroker.createTransactionChain(TransactionChainManager.this); Optional.ofNullable(txChainFactoryTemp).ifPresent(TransactionChain::close); } - boolean initialSubmitWriteTransaction() { + public boolean initialSubmitWriteTransaction() { enableSubmit(); - return submitWriteTransaction(); + return submitTransaction(); } /** @@ -91,14 +92,14 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS * transactions. Call this method for MASTER role only. */ - void activateTransactionManager() { + public void activateTransactionManager() { if (LOG.isDebugEnabled()) { LOG.debug("activateTransactionManager for node {} transaction submit is set to {}", this.nodeId, submitIsEnabled); } synchronized (txLock) { if (TransactionChainManagerStatus.SLEEPING == transactionChainManagerStatus) { - Preconditions.checkState(txChainFactory == null, + Preconditions.checkState(transactionChain == null, "TxChainFactory survive last close."); Preconditions.checkState(writeTx == null, "We have some unexpected WriteTransaction."); @@ -116,7 +117,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable * Call this method for SLAVE only. * @return Future */ - ListenableFuture deactivateTransactionManager() { + public ListenableFuture deactivateTransactionManager() { if (LOG.isDebugEnabled()) { LOG.debug("deactivateTransactionManager for node {}", this.nodeId); } @@ -130,12 +131,12 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(final Void result) { - removeTxChainFactory(); + closeTransactionChain(); } @Override - public void onFailure(final Throwable throwable) { - removeTxChainFactory(); + public void onFailure(@Nonnull final Throwable t) { + closeTransactionChain(); } }); } else { @@ -146,12 +147,16 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable return future; } - private void removeTxChainFactory() { - Optional.ofNullable(txChainFactory).ifPresent(TransactionChain::close); - txChainFactory = null; + private void closeTransactionChain() { + if (writeTx != null) { + writeTx.cancel(); + writeTx = null; + } + Optional.ofNullable(transactionChain).ifPresent(TransactionChain::close); + transactionChain = null; } - boolean submitWriteTransaction() { + public boolean submitTransaction() { synchronized (txLock) { if (!submitIsEnabled) { if (LOG.isTraceEnabled()) { @@ -207,8 +212,8 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable return true; } - void addDeleteOperationTotTxChain(final LogicalDatastoreType store, - final InstanceIdentifier path) { + public void addDeleteOperationTotTxChain(final LogicalDatastoreType store, + final InstanceIdentifier path){ synchronized (txLock) { ensureTransaction(); if (writeTx == null) { @@ -220,10 +225,10 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable } } - void writeToTransaction(final LogicalDatastoreType store, - final InstanceIdentifier path, - final T data, - final boolean createParents) { + public void writeToTransaction(final LogicalDatastoreType store, + final InstanceIdentifier path, + final T data, + final boolean createParents){ synchronized (txLock) { ensureTransaction(); if (writeTx == null) { @@ -235,12 +240,43 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable } } + public void mergeToTransaction(final LogicalDatastoreType store, + final InstanceIdentifier path, + final T data, + final boolean createParents){ + synchronized (txLock) { + ensureTransaction(); + if (writeTx == null) { + LOG.debug("WriteTx is null for node {}. Merge data for {} was not realized.", this.nodeId, path); + throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION); + } + + writeTx.merge(store, path, data, createParents); + } + } + + public CheckedFuture, ReadFailedException> + readFromTransaction(final LogicalDatastoreType store, + final InstanceIdentifier path){ + synchronized (txLock) { + ensureTransaction(); + if (writeTx == null) { + LOG.debug("WriteTx is null for node {}. Read data for {} was not realized.", this.nodeId, path); + throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION); + } + + return writeTx.read(store, path); + } + } + @Override public void onTransactionChainFailed(final TransactionChain chain, final AsyncTransaction transaction, final Throwable cause) { synchronized (txLock) { - if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) { + if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus && + chain.equals(this.transactionChain)) { LOG.warn("Transaction chain failed, recreating chain due to ", cause); + closeTransactionChain(); createTxChain(); writeTx = null; } @@ -253,22 +289,21 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable } @GuardedBy("txLock") - private void ensureTransaction() { + private void ensureTransaction() { if (writeTx == null && TransactionChainManagerStatus.WORKING == transactionChainManagerStatus - && txChainFactory != null) { - writeTx = txChainFactory.newWriteOnlyTransaction(); + && transactionChain != null) { + writeTx = transactionChain.newReadWriteTransaction(); } } - @VisibleForTesting - void enableSubmit() { + private void enableSubmit() { synchronized (txLock) { - /* !!!IMPORTANT: never set true without txChainFactory */ - submitIsEnabled = txChainFactory != null; + /* !!!IMPORTANT: never set true without transactionChain */ + submitIsEnabled = transactionChain != null; } } - ListenableFuture shuttingDown() { + public ListenableFuture shuttingDown() { if (LOG.isDebugEnabled()) { LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId); } @@ -284,7 +319,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable submitIsEnabled = false; ListenableFuture future; - if (!wasSubmitEnabled || txChainFactory == null) { + if (!wasSubmitEnabled || transactionChain == null) { // stay with actual thread future = Futures.immediateCheckedFuture(null); @@ -313,7 +348,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId); } synchronized (txLock) { - removeTxChainFactory(); + closeTransactionChain(); } } diff --git a/openflowplugin-impl/pom.xml b/openflowplugin-impl/pom.xml index a0dab34d27..0c78606992 100644 --- a/openflowplugin-impl/pom.xml +++ b/openflowplugin-impl/pom.xml @@ -122,6 +122,10 @@ org.osgi org.osgi.compendium + + org.opendaylight.openflowplugin + openflowplugin-common + diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java index 0630f8bae4..a01f2cb936 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java @@ -56,6 +56,7 @@ import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRe import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper; import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; +import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager; import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava; import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; @@ -262,7 +263,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public boolean submitTransaction() { - return initialized.get() && transactionChainManager.submitWriteTransaction(); + return initialized.get() && transactionChainManager.submitTransaction(); } @Override @@ -682,9 +683,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi if (LOG.isDebugEnabled()) { LOG.debug("Transaction chain manager for node {} created", deviceInfo); } - this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo); - this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo - .getNodeInstanceIdentifier()); + this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue()); + this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, + deviceInfo.getNodeInstanceIdentifier()); this.deviceGroupRegistry = new DeviceGroupRegistryImpl(); this.deviceMeterRegistry = new DeviceMeterRegistryImpl(); } diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java index 1c15675ee2..60265c5d53 100644 --- a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java +++ b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java @@ -38,7 +38,7 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.ReadTransaction; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; +import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; @@ -61,6 +61,7 @@ import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRe import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource; import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; +import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager; import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer; @@ -120,40 +121,40 @@ public class DeviceContextImplTest { private static final Long DUMMY_XID = 544L; private static final Long DUMMY_PORT_NUMBER = 159L; private static final BigInteger DUMMY_DATAPATH_ID = new BigInteger("55"); - Xid xid; - Xid xidMulti; + private Xid xid; + private Xid xidMulti; - DeviceContext deviceContext; + private DeviceContext deviceContext; @Mock - RequestContext requestContext; + private RequestContext requestContext; @Mock - RequestContext requestContextMultiReply; + private RequestContext requestContextMultiReply; @Mock - ConnectionContext connectionContext; + private ConnectionContext connectionContext; @Mock - GetFeaturesOutput featuresOutput; + private GetFeaturesOutput featuresOutput; @Mock - DataBroker dataBroker; + private DataBroker dataBroker; @Mock - WriteTransaction writeTx; + private ReadWriteTransaction writeTx; @Mock - ReadOnlyTransaction readTx; + private ReadOnlyTransaction readTx; @Mock - BindingTransactionChain txChainFactory; + private BindingTransactionChain txChainFactory; @Mock - HashedWheelTimer timer; + private HashedWheelTimer timer; @Mock - OutboundQueueProvider outboundQueueProvider; + private OutboundQueueProvider outboundQueueProvider; @Mock - ConnectionAdapter connectionAdapter; - NodeId nodeId = new NodeId("h2g2:42"); - KeyedInstanceIdentifier nodeKeyIdent = DeviceStateUtil.createNodeInstanceIdentifier(nodeId); + private ConnectionAdapter connectionAdapter; + private NodeId nodeId = new NodeId("h2g2:42"); + private KeyedInstanceIdentifier nodeKeyIdent = DeviceStateUtil.createNodeInstanceIdentifier(nodeId); @Mock - TranslatorLibrary translatorLibrary; + private TranslatorLibrary translatorLibrary; @Mock MessageTranslator messageTranslatorPacketReceived; @Mock - MessageTranslator messageTranslatorFlowCapableNodeConnector; + private MessageTranslator messageTranslatorFlowCapableNodeConnector; @Mock private MessageTranslator messageTranslatorFlowRemoved; @Mock @@ -197,7 +198,7 @@ public class DeviceContextImplTest { settableFutureMultiReply.set((RpcResult) invocation.getArguments()[0]); return null; }).when(requestContextMultiReply).setResult(any(RpcResult.class)); - Mockito.when(txChainFactory.newWriteOnlyTransaction()).thenReturn(writeTx); + Mockito.when(txChainFactory.newReadWriteTransaction()).thenReturn(writeTx); Mockito.when(dataBroker.newReadOnlyTransaction()).thenReturn(readTx); Mockito.when(connectionContext.getOutboundQueueProvider()).thenReturn(outboundQueueProvider); Mockito.when(connectionContext.getConnectionAdapter()).thenReturn(connectionAdapter); @@ -269,7 +270,7 @@ public class DeviceContextImplTest { Mockito.when(writeTx.submit()).thenReturn(Futures.immediateCheckedFuture(null)); final InstanceIdentifier dummyII = InstanceIdentifier.create(Nodes.class); ((DeviceContextImpl) deviceContext).getTransactionChainManager().activateTransactionManager() ; - ((DeviceContextImpl) deviceContext).getTransactionChainManager().enableSubmit(); + ((DeviceContextImpl) deviceContext).getTransactionChainManager().initialSubmitWriteTransaction(); deviceContext.addDeleteToTxChain(LogicalDatastoreType.CONFIGURATION, dummyII); deviceContext.initialSubmitTransaction(); verify(writeTx).submit(); @@ -287,7 +288,7 @@ public class DeviceContextImplTest { public void testAddDeleteToTxChain() throws Exception { final InstanceIdentifier dummyII = InstanceIdentifier.create(Nodes.class); ((DeviceContextImpl) deviceContext).getTransactionChainManager().activateTransactionManager() ; - ((DeviceContextImpl) deviceContext).getTransactionChainManager().enableSubmit(); + ((DeviceContextImpl) deviceContext).getTransactionChainManager().initialSubmitWriteTransaction(); deviceContext.addDeleteToTxChain(LogicalDatastoreType.CONFIGURATION, dummyII); verify(writeTx).delete(eq(LogicalDatastoreType.CONFIGURATION), eq(dummyII)); } @@ -295,7 +296,7 @@ public class DeviceContextImplTest { @Test public void testSubmitTransaction() throws Exception { ((DeviceContextImpl) deviceContext).getTransactionChainManager().activateTransactionManager() ; - ((DeviceContextImpl) deviceContext).getTransactionChainManager().enableSubmit(); + ((DeviceContextImpl) deviceContext).getTransactionChainManager().initialSubmitWriteTransaction(); assertTrue(deviceContext.submitTransaction()); } @@ -489,4 +490,4 @@ public class DeviceContextImplTest { deviceContext.closeServiceInstance(); } -} \ No newline at end of file +} diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManagerTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManagerTest.java index 1abf811e9f..a117d06d16 100644 --- a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManagerTest.java +++ b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManagerTest.java @@ -11,7 +11,6 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; -import io.netty.util.HashedWheelTimer; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -23,7 +22,7 @@ import org.mockito.runners.MockitoJUnitRunner; import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; +import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; @@ -31,14 +30,13 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; +import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager; import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; -import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; @@ -50,16 +48,10 @@ public class TransactionChainManagerTest { @Mock private BindingTransactionChain txChain; @Mock - private WriteTransaction writeTx; + private ReadWriteTransaction writeTx; @Mock private TransactionChain transactionChain; @Mock - HashedWheelTimer timer; - @Mock - Registration registration; - @Mock - DeviceState deviceState; - @Mock DeviceInfo deviceInfo; @Mock @@ -82,8 +74,8 @@ public class TransactionChainManagerTest { nodeKeyIdent = DeviceStateUtil.createNodeInstanceIdentifier(nodeId); Mockito.when(deviceInfo.getNodeInstanceIdentifier()).thenReturn(nodeKeyIdent); Mockito.when(deviceInfo.getNodeId()).thenReturn(nodeId); - txChainManager = new TransactionChainManager(dataBroker, deviceInfo); - Mockito.when(txChain.newWriteOnlyTransaction()).thenReturn(writeTx); + txChainManager = new TransactionChainManager(dataBroker, nodeId.getValue()); + Mockito.when(txChain.newReadWriteTransaction()).thenReturn(writeTx); path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId)); Mockito.when(writeTx.submit()) @@ -101,70 +93,54 @@ public class TransactionChainManagerTest { final Node data = new NodeBuilder().setId(nodeId).build(); txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false); - Mockito.verify(txChain).newWriteOnlyTransaction(); + Mockito.verify(txChain).newReadWriteTransaction(); Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false); } /** - * Test of {@link TransactionChainManager#submitWriteTransaction()}. + * test of {@link TransactionChainManager#submitTransaction()} + * @throws Exception */ @Test public void testSubmitTransaction() throws Exception { final Node data = new NodeBuilder().setId(nodeId).build(); txChainManager.initialSubmitWriteTransaction(); txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false); - txChainManager.submitWriteTransaction(); + txChainManager.submitTransaction(); - Mockito.verify(txChain).newWriteOnlyTransaction(); + Mockito.verify(txChain).newReadWriteTransaction(); Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false); Mockito.verify(writeTx).submit(); } /** - * Test of {@link TransactionChainManager#submitWriteTransaction()}: no submit, never enabled. + * test of {@link TransactionChainManager#submitTransaction()}: no submit, never enabled + * @throws Exception */ @Test public void testSubmitTransaction1() throws Exception { final Node data = new NodeBuilder().setId(nodeId).build(); txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false); - txChainManager.submitWriteTransaction(); + txChainManager.submitTransaction(); - Mockito.verify(txChain).newWriteOnlyTransaction(); + Mockito.verify(txChain).newReadWriteTransaction(); Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false); Mockito.verify(writeTx, Mockito.never()).submit(); } @Test public void testSubmitTransactionFailed() throws Exception { - Mockito.when(writeTx.submit()) - .thenReturn(Futures.immediateFailedCheckedFuture(new TransactionCommitFailedException("mock"))); + Mockito.when(writeTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(new TransactionCommitFailedException("mock"))); final Node data = new NodeBuilder().setId(nodeId).build(); txChainManager.initialSubmitWriteTransaction(); txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false); - txChainManager.submitWriteTransaction(); + txChainManager.submitTransaction(); - Mockito.verify(txChain).newWriteOnlyTransaction(); + Mockito.verify(txChain).newReadWriteTransaction(); Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false); Mockito.verify(writeTx).submit(); } - @Test - public void testSubmitTransactionFailed2() throws Exception { - final Node data = new NodeBuilder().setId(nodeId).build(); - txChainManager.initialSubmitWriteTransaction(); - txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false); - txChainManager.submitWriteTransaction(); - - Mockito.when(writeTx.submit()) - .thenReturn(Futures.immediateFailedCheckedFuture(new TransactionCommitFailedException("mock"))); - txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false); - txChainManager.submitWriteTransaction(); - - Mockito.verify(txChain, Mockito.times(2)).newWriteOnlyTransaction(); - Mockito.verify(writeTx, Mockito.times(2)).put(LogicalDatastoreType.CONFIGURATION, path, data, false); - Mockito.verify(writeTx, Mockito.times(2)).submit(); - } - /** * Test of {@link TransactionChainManager#enableSubmit()}: no submit - counter is not active. */ @@ -174,15 +150,14 @@ public class TransactionChainManagerTest { txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false); txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false); - Mockito.verify(txChain).newWriteOnlyTransaction(); + Mockito.verify(txChain).newReadWriteTransaction(); Mockito.verify(writeTx, Mockito.times(2)).put(LogicalDatastoreType.CONFIGURATION, path, data, false); Mockito.verify(writeTx, Mockito.never()).submit(); } @Test public void testOnTransactionChainFailed() throws Exception { - txChainManager.onTransactionChainFailed(transactionChain, - Mockito.mock(AsyncTransaction.class), Mockito.mock(Throwable.class)); + txChainManager.onTransactionChainFailed(txChain, Mockito.mock(AsyncTransaction.class), Mockito.mock(Throwable.class)); Mockito.verify(txChain).close(); Mockito.verify(dataBroker, Mockito.times(2)).createTransactionChain(txChainManager); } @@ -198,7 +173,7 @@ public class TransactionChainManagerTest { public void testAddDeleteOperationTotTxChain() throws Exception { txChainManager.addDeleteOperationTotTxChain(LogicalDatastoreType.CONFIGURATION, path); - Mockito.verify(txChain).newWriteOnlyTransaction(); + Mockito.verify(txChain).newReadWriteTransaction(); Mockito.verify(writeTx).delete(LogicalDatastoreType.CONFIGURATION, path); } @@ -219,7 +194,7 @@ public class TransactionChainManagerTest { txChainManager.deactivateTransactionManager(); - Mockito.verify(txChain).newWriteOnlyTransaction(); + Mockito.verify(txChain).newReadWriteTransaction(); Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false); Mockito.verify(writeTx, Mockito.never()).submit(); Mockito.verify(writeTx).cancel(); @@ -229,11 +204,11 @@ public class TransactionChainManagerTest { @Test public void testShuttingDown() throws Exception { final Node data = new NodeBuilder().setId(nodeId).build(); + txChainManager.initialSubmitWriteTransaction(); txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false); - txChainManager.enableSubmit(); txChainManager.shuttingDown(); - Mockito.verify(txChain).newWriteOnlyTransaction(); + Mockito.verify(txChain).newReadWriteTransaction(); Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false); Mockito.verify(writeTx).submit(); } -- 2.36.6