-/**
+/*
* Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
*/
package org.opendaylight.openflowplugin.applications.topology.manager;
-import java.util.concurrent.Callable;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
/**
* instance identifier to Node in network topology model (yangtools)
*/
- protected static final InstanceIdentifier<Topology> II_TO_TOPOLOGY =
+ static final InstanceIdentifier<Topology> II_TO_TOPOLOGY =
InstanceIdentifier
.create(NetworkTopology.class)
.child(Topology.class, new TopologyKey(new TopologyId(FlowCapableTopologyProvider.TOPOLOGY_ID)));
- public DataTreeChangeListenerImpl(final OperationProcessor operationProcessor,
- final DataBroker dataBroker,
- final InstanceIdentifier<T> ii) {
+ DataTreeChangeListenerImpl(final OperationProcessor operationProcessor,
+ final DataBroker dataBroker,
+ final InstanceIdentifier<T> ii) {
final DataTreeIdentifier<T> identifier = new DataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, ii);
final SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
try {
- listenerRegistration = looper.loopUntilNoException(new Callable<ListenerRegistration<DataTreeChangeListener>>() {
- @Override
- public ListenerRegistration<DataTreeChangeListener> call() throws Exception {
- return dataBroker.registerDataTreeChangeListener(identifier, DataTreeChangeListenerImpl.this);
- }
- });
+ listenerRegistration = looper.loopUntilNoException(() ->
+ dataBroker.registerDataTreeChangeListener(identifier, DataTreeChangeListenerImpl.this));
} catch (Exception e) {
LOG.error("Data listener registration failed!");
throw new IllegalStateException("TopologyManager startup fail! TM bundle needs restart.", e);
listenerRegistration.close();
}
- protected <T extends DataObject> void sendToTransactionChain(final T node, final InstanceIdentifier<T> iiToTopologyNode) {
- operationProcessor.enqueueOperation(new TopologyOperation() {
- @Override
- public void applyOperation(ReadWriteTransaction transaction) {
- transaction.merge(LogicalDatastoreType.OPERATIONAL, iiToTopologyNode, node, true);
- }
- });
+ <T extends DataObject> void sendToTransactionChain(final T node, final InstanceIdentifier<T> iiToTopologyNode) {
+ operationProcessor.enqueueOperation(manager -> manager.mergeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTopologyNode, node, true));
}
- protected InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> provideIIToTopologyNode(
+ InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> provideIIToTopologyNode(
final NodeId nodeIdInTopology) {
org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey nodeKeyInTopology = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey(
nodeIdInTopology);
nodeKeyInTopology).build();
}
- protected NodeId provideTopologyNodeId(InstanceIdentifier<T> iiToNodeInInventory) {
- final NodeKey inventoryNodeKey = iiToNodeInInventory.firstKeyOf(Node.class, NodeKey.class);
+ NodeId provideTopologyNodeId(InstanceIdentifier<T> iiToNodeInInventory) {
+ final NodeKey inventoryNodeKey = iiToNodeInInventory.firstKeyOf(Node.class);
if (inventoryNodeKey != null) {
return new NodeId(inventoryNodeKey.getId().getValue());
}
*/
package org.opendaylight.openflowplugin.applications.topology.manager;
-import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.getNodeConnectorKey;
-import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.getNodeKey;
-import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.toTerminationPointId;
import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.toTopologyLink;
-import static org.opendaylight.openflowplugin.applications.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscovered;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkOverutilized;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkRemoved;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkUtilizationNormal;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener {
private static final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
- protected final InstanceIdentifier<Topology> iiToTopology;
+ private final InstanceIdentifier<Topology> iiToTopology;
private final OperationProcessor processor;
FlowCapableTopologyExporter(final OperationProcessor processor,
public void onLinkDiscovered(final LinkDiscovered notification) {
processor.enqueueOperation(new TopologyOperation() {
@Override
- public void applyOperation(final ReadWriteTransaction transaction) {
+ public void applyOperation(final TransactionChainManager manager) {
final Link link = toTopologyLink(notification);
final InstanceIdentifier<Link> path = TopologyManagerUtil.linkPath(link, iiToTopology);
- transaction.merge(LogicalDatastoreType.OPERATIONAL, path, link, true);
+ manager.mergeToTransaction(LogicalDatastoreType.OPERATIONAL, path, link, true);
}
@Override
public void onLinkRemoved(final LinkRemoved notification) {
processor.enqueueOperation(new TopologyOperation() {
@Override
- public void applyOperation(final ReadWriteTransaction transaction) {
+ public void applyOperation(final TransactionChainManager manager) {
Optional<Link> linkOptional = Optional.absent();
try {
// read that checks if link exists (if we do not do this we might get an exception on delete)
- linkOptional = transaction.read(LogicalDatastoreType.OPERATIONAL,
+ linkOptional = manager.readFromTransaction(LogicalDatastoreType.OPERATIONAL,
TopologyManagerUtil.linkPath(toTopologyLink(notification), iiToTopology)).checkedGet();
} catch (ReadFailedException e) {
- LOG.warn("Error occured when trying to read Link: {}", e.getMessage());
- LOG.debug("Error occured when trying to read Link.. ", e);
+ LOG.warn("Error occurred when trying to read Link: {}", e.getMessage());
+ LOG.debug("Error occurred when trying to read Link.. ", e);
}
if (linkOptional.isPresent()) {
- transaction.delete(LogicalDatastoreType.OPERATIONAL, TopologyManagerUtil.linkPath(toTopologyLink(notification), iiToTopology));
+ manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL,
+ TopologyManagerUtil.linkPath(toTopologyLink(notification), iiToTopology));
}
}
// NOOP
}
- private InstanceIdentifier<Node> toNodeIdentifier(final NodeRef ref) {
- org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey invNodeKey = getNodeKey(ref);
- NodeKey nodeKey = new NodeKey(toTopologyNodeId(invNodeKey.getId()));
- return iiToTopology.child(Node.class, nodeKey);
- }
-
- private InstanceIdentifier<TerminationPoint> toTerminationPointIdentifier(final NodeConnectorRef ref) {
- org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey invNodeKey = getNodeKey(ref);
- NodeConnectorKey invNodeConnectorKey = getNodeConnectorKey(ref);
- return tpPath(toTopologyNodeId(invNodeKey.getId()), toTerminationPointId(invNodeConnectorKey.getId()));
- }
-
- private InstanceIdentifier<Node> getNodePath(final NodeId nodeId) {
- return iiToTopology.child(Node.class, new NodeKey(nodeId));
- }
-
- private InstanceIdentifier<TerminationPoint> tpPath(final NodeId nodeId, final TpId tpId) {
- NodeKey nodeKey = new NodeKey(nodeId);
- TerminationPointKey tpKey = new TerminationPointKey(tpId);
- return iiToTopology.child(Node.class, nodeKey).child(TerminationPoint.class, tpKey);
- }
-
}
package org.opendaylight.openflowplugin.applications.topology.manager;
import com.google.common.base.Optional;
-import java.util.concurrent.ExecutionException;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
public class FlowCapableTopologyProvider implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyProvider.class);
+ private static final String TOPOLOGY_PROVIDER = "topology-provider";
static final String TOPOLOGY_ID = "flow:1";
+
private final DataBroker dataBroker;
private final NotificationProviderService notificationService;
private final OperationProcessor processor;
+ private TransactionChainManager transactionChainManager;
private ListenerRegistration<NotificationListener> listenerRegistration;
public FlowCapableTopologyProvider(DataBroker dataBroker, NotificationProviderService notificationService,
final FlowCapableTopologyExporter listener = new FlowCapableTopologyExporter(processor, path);
this.listenerRegistration = notificationService.registerNotificationListener(listener);
+ this.transactionChainManager = new TransactionChainManager(dataBroker, TOPOLOGY_PROVIDER);
+ this.transactionChainManager.activateTransactionManager();
+ this.transactionChainManager.initialSubmitWriteTransaction();
- if(!isFlowTopologyExist(dataBroker, path)){
- final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
- tx.put(LogicalDatastoreType.OPERATIONAL, path, new TopologyBuilder().setKey(key).build(), true);
- try {
- tx.submit().get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.warn("Initial topology export failed, continuing anyway", e);
- }
+ if(!isFlowTopologyExist(path)){
+ transactionChainManager.writeToTransaction(
+ LogicalDatastoreType.OPERATIONAL,
+ path,
+ new TopologyBuilder().setKey(key).build(),
+ true);
+ transactionChainManager.submitTransaction();
}
LOG.info("FlowCapableTopologyProvider started");
@Override
public void close() {
LOG.info("FlowCapableTopologyProvider stopped.");
+ this.transactionChainManager.close();
if (this.listenerRegistration != null) {
try {
this.listenerRegistration.close();
}
}
- private boolean isFlowTopologyExist(final DataBroker dataBroker,
- final InstanceIdentifier<Topology> path) {
- final ReadTransaction tx = dataBroker.newReadOnlyTransaction();
+ private boolean isFlowTopologyExist(final InstanceIdentifier<Topology> path) {
try {
- Optional<Topology> ofTopology = tx.read(LogicalDatastoreType.OPERATIONAL, path).checkedGet();
+ Optional<Topology> ofTopology = this.transactionChainManager
+ .readFromTransaction(LogicalDatastoreType.OPERATIONAL, path)
+ .checkedGet();
LOG.debug("OpenFlow topology exist in the operational data store at {}",path);
if(ofTopology.isPresent()){
return true;
-/**
+/*
* Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
final NodeId nodeId = provideTopologyNodeId(iiToNodeInInventory);
final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> iiToTopologyRemovedNode = provideIIToTopologyNode(nodeId);
if (iiToTopologyRemovedNode != null) {
- operationProcessor.enqueueOperation(new TopologyOperation() {
- @Override
- public void applyOperation(final ReadWriteTransaction transaction) {
- transaction.delete(LogicalDatastoreType.OPERATIONAL, iiToTopologyRemovedNode);
- TopologyManagerUtil.removeAffectedLinks(nodeId, transaction, II_TO_TOPOLOGY);
- }
+ operationProcessor.enqueueOperation(manager -> {
+ manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL, iiToTopologyRemovedNode);
+ TopologyManagerUtil.removeAffectedLinks(nodeId, manager, II_TO_TOPOLOGY);
});
} else {
LOG.debug("Instance identifier to inventory wasn't translated to topology while deleting node.");
*/
package org.opendaylight.openflowplugin.applications.topology.manager;
-import com.google.common.base.Preconditions;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
+public final class OperationProcessor implements AutoCloseable, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
private static final int MAX_TRANSACTION_OPERATIONS = 100;
private static final int OPERATION_QUEUE_DEPTH = 500;
+ private static final String TOPOLOGY_MANAGER = "topology-manager";
private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
- private final DataBroker dataBroker;
private final Thread thread;
- private BindingTransactionChain transactionChain;
+ private TransactionChainManager transactionChainManager;
private volatile boolean finishing = false;
public OperationProcessor(final DataBroker dataBroker) {
- this.dataBroker = Preconditions.checkNotNull(dataBroker);
- transactionChain = this.dataBroker.createTransactionChain(this);
+ transactionChainManager = new TransactionChainManager(dataBroker, TOPOLOGY_MANAGER);
+ transactionChainManager.activateTransactionManager();
+ transactionChainManager.initialSubmitWriteTransaction();
thread = new Thread(this);
thread.setDaemon(true);
LOG.debug("New {} operation available, starting transaction", op);
- final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
-
int ops = 0;
do {
- op.applyOperation(tx);
+ op.applyOperation(transactionChainManager);
ops++;
if (ops < MAX_TRANSACTION_OPERATIONS) {
} while (op != null);
LOG.debug("Processed {} operations, submitting transaction", ops);
- submitTransaction(tx);
- } catch (final IllegalStateException e) {
- LOG.warn("Stat DataStoreOperation unexpected State!", e);
- transactionChain.close();
- transactionChain = dataBroker.createTransactionChain(this);
- cleanDataStoreOperQueue();
+ if (!transactionChainManager.submitTransaction()) {
+ cleanDataStoreOperQueue();
+ }
} catch (final InterruptedException e) {
// This should mean we're shutting down.
LOG.debug("Stat Manager DS Operation thread interrupted!", e);
finishing = true;
- } catch (final Exception e) {
- LOG.warn("Stat DataStore Operation executor fail!", e);
}
}
// Drain all events, making sure any blocked threads are unblocked
cleanDataStoreOperQueue();
}
- private void submitTransaction(ReadWriteTransaction tx) {
- try {
- tx.submit().checkedGet();
- } catch (final TransactionCommitFailedException e) {
- LOG.warn("Stat DataStoreOperation unexpected State!", e);
- transactionChain.close();
- transactionChain = dataBroker.createTransactionChain(this);
- cleanDataStoreOperQueue();
- }
- }
-
private void cleanDataStoreOperQueue() {
while (!queue.isEmpty()) {
queue.poll();
}
}
- @Override
- public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction, Throwable cause) {
- LOG.warn("Failed to export Topology manager operations, Transaction {} failed: {}", transaction.getIdentifier(), cause.getMessage());
- LOG.debug("Failed to export Topology manager operations.. ", cause);
- transactionChain.close();
- transactionChain = dataBroker.createTransactionChain(this);
- cleanDataStoreOperQueue();
- }
-
- @Override
- public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
- //NOOP
- }
-
@Override
public void close() {
thread.interrupt();
LOG.debug("Join of thread {} was interrupted", thread.getName(), e);
}
- if (transactionChain != null) {
- transactionChain.close();
- }
+ transactionChainManager.close();
LOG.debug("OperationProcessor closed");
}
-/**
+/*
* Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
*/
package org.opendaylight.openflowplugin.applications.topology.manager;
-import com.google.common.base.Optional;
import java.util.Collection;
+import java.util.Optional;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
if (iiToTopologyTerminationPoint != null) {
final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> node = iiToTopologyTerminationPoint.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class);
- operationProcessor.enqueueOperation(new TopologyOperation() {
- @Override
- public void applyOperation(final ReadWriteTransaction transaction) {
- Optional<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> nodeOptional = Optional.absent();
- try {
- nodeOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, node).checkedGet();
- } catch (ReadFailedException e) {
- LOG.warn("Error occured when trying to read NodeConnector: {}", e.getMessage());
- LOG.debug("Error occured when trying to read NodeConnector.. ", e);
- }
- if (nodeOptional.isPresent()) {
- TopologyManagerUtil.removeAffectedLinks(terminationPointId, transaction, II_TO_TOPOLOGY);
- transaction.delete(LogicalDatastoreType.OPERATIONAL, iiToTopologyTerminationPoint);
- }
+ operationProcessor.enqueueOperation(manager -> {
+ Optional<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> nodeOptional = Optional.empty();
+ try {
+ nodeOptional = Optional.ofNullable(
+ manager.readFromTransaction(LogicalDatastoreType.OPERATIONAL, node).checkedGet().orNull());
+ } catch (ReadFailedException e) {
+ LOG.warn("Error occurred when trying to read NodeConnector: {}", e.getMessage());
+ LOG.debug("Error occurred when trying to read NodeConnector.. ", e);
+ }
+ if (nodeOptional.isPresent()) {
+ TopologyManagerUtil.removeAffectedLinks(terminationPointId, manager, II_TO_TOPOLOGY);
+ manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL, iiToTopologyTerminationPoint);
}
});
} else {
}
private void removeLinks(final FlowCapableNodeConnector flowCapNodeConnector, final TerminationPoint point) {
- operationProcessor.enqueueOperation(new TopologyOperation() {
- @Override
- public void applyOperation(final ReadWriteTransaction transaction) {
- if ((flowCapNodeConnector.getState() != null && flowCapNodeConnector.getState().isLinkDown())
- || (flowCapNodeConnector.getConfiguration() != null && flowCapNodeConnector.getConfiguration().isPORTDOWN())) {
- TopologyManagerUtil.removeAffectedLinks(point.getTpId(), transaction, II_TO_TOPOLOGY);
- }
+ operationProcessor.enqueueOperation(manager -> {
+ if ((flowCapNodeConnector.getState() != null && flowCapNodeConnector.getState().isLinkDown())
+ || (flowCapNodeConnector.getConfiguration() != null && flowCapNodeConnector.getConfiguration().isPORTDOWN())) {
+ TopologyManagerUtil.removeAffectedLinks(point.getTpId(), manager, II_TO_TOPOLOGY);
}
});
}
}
private static TpId provideTopologyTerminationPointId(final InstanceIdentifier<FlowCapableNodeConnector> iiToNodeInInventory) {
- NodeConnectorKey inventoryNodeConnectorKey = iiToNodeInInventory.firstKeyOf(NodeConnector.class,
- NodeConnectorKey.class);
+ NodeConnectorKey inventoryNodeConnectorKey = iiToNodeInInventory.firstKeyOf(NodeConnector.class);
if (inventoryNodeConnectorKey != null) {
return new TpId(inventoryNodeConnectorKey.getId().getValue());
}
import com.google.common.base.Optional;
import java.util.Collections;
import java.util.List;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TopologyManagerUtil {
+class TopologyManagerUtil {
private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerUtil.class);
private TopologyManagerUtil() {}
- static void removeAffectedLinks(final NodeId id, final ReadWriteTransaction transaction, InstanceIdentifier<Topology> topology) {
+ static void removeAffectedLinks(final NodeId id, final TransactionChainManager manager, InstanceIdentifier<Topology> topology) {
Optional<Topology> topologyOptional = Optional.absent();
try {
- topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+ topologyOptional = manager.readFromTransaction(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
} catch (ReadFailedException e) {
LOG.warn("Error reading topology data for topology {}: {}", topology, e.getMessage());
LOG.debug("Error reading topology data for topology.. ", e);
}
if (topologyOptional.isPresent()) {
- removeAffectedLinks(id, topologyOptional, transaction, topology);
+ removeAffectedLinks(id, topologyOptional, manager, topology);
}
}
- static void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional, ReadWriteTransaction transaction, final InstanceIdentifier<Topology> topology) {
+ private static void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional, TransactionChainManager manager, final InstanceIdentifier<Topology> topology) {
if (!topologyOptional.isPresent()) {
return;
}
List<Link> linkList = topologyOptional.get().getLink() != null ?
- topologyOptional.get().getLink() : Collections.<Link> emptyList();
+ topologyOptional.get().getLink() : Collections.emptyList();
for (Link link : linkList) {
if (id.equals(link.getSource().getSourceNode()) ||
id.equals(link.getDestination().getDestNode())) {
- transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link, topology));
+ manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL, linkPath(link, topology));
}
}
}
- static void removeAffectedLinks(final TpId id, final ReadWriteTransaction transaction, final InstanceIdentifier<Topology> topology) {
+ static void removeAffectedLinks(final TpId id, final TransactionChainManager manager, final InstanceIdentifier<Topology> topology) {
Optional<Topology> topologyOptional = Optional.absent();
try {
- topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+ topologyOptional = manager.readFromTransaction(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
} catch (ReadFailedException e) {
LOG.warn("Error reading topology data for topology {}: {}", topology, e.getMessage());
LOG.debug("Error reading topology data for topology..", e);
}
if (topologyOptional.isPresent()) {
- removeAffectedLinks(id, topologyOptional, transaction, topology);
+ removeAffectedLinks(id, topologyOptional, manager, topology);
}
}
- static void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional, ReadWriteTransaction transaction, final InstanceIdentifier<Topology> topology) {
+ private static void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional, TransactionChainManager manager, final InstanceIdentifier<Topology> topology) {
if (!topologyOptional.isPresent()) {
return;
}
for (Link link : linkList) {
if (id.equals(link.getSource().getSourceTp()) ||
id.equals(link.getDestination().getDestTp())) {
- transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link, topology));
+ manager.addDeleteOperationTotTxChain(LogicalDatastoreType.OPERATIONAL, linkPath(link, topology));
}
}
}
package org.opendaylight.openflowplugin.applications.topology.manager;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
/**
* Internal interface for submitted operations. Implementations of this
/**
* Execute the operation on top of the transaction.
*
- * @param transaction Datastore transaction
+ * @param manager Datastore transaction manager
*/
- void applyOperation(ReadWriteTransaction transaction);
+ void applyOperation(TransactionChainManager manager);
}
\ No newline at end of file
+++ /dev/null
-/**
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.applications.topology.manager;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ClassToInstanceMap;
-import com.google.common.util.concurrent.Futures;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.runners.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.controller.sal.binding.api.BindingAwareService;
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.NotificationListener;
-
-/**
- * Test for {@link FlowCapableTopologyProvider}.
- */
-@RunWith(MockitoJUnitRunner.class)
-public class FlowCapableTopologyProviderTest {
-
- private FlowCapableTopologyProvider provider;
-
- @Mock
- private BindingAwareBroker.ProviderContext providerContext;
- @Mock
- private DataBroker dataBroker;
- @Mock
- private NotificationProviderService notificationProviderService;
- @Mock
- private BindingAwareProvider bindingAwareProvider;
- @Mock
- private ClassToInstanceMap<BindingAwareService> serviceProvider;
- @Mock
- private BindingAwareService bindingAwareService;
- @Mock
- private Node mockNode;
- @Mock
- private ReadOnlyTransaction rTx;
- @Mock
- private ReadWriteTransaction wTx;
-
- @Before
- public void setUp() throws Exception {
- when(providerContext.getSALService(Matchers.<Class<? extends BindingAwareService>>any()))
- .thenAnswer(new Answer<BindingAwareService>() {
- @Override
- public BindingAwareService answer(InvocationOnMock invocation) throws Throwable {
- Object[] arguments = invocation.getArguments();
- if (arguments != null && arguments.length > 0 && arguments[0] != null) {
- if(arguments[0].equals(DataBroker.class)) {
- return dataBroker;
- } else if(arguments[0].equals(NotificationProviderService.class)){
- return notificationProviderService;
- }
- }
- return null;
- }
- });
-
- doReturn(rTx).when(dataBroker).newReadOnlyTransaction();
- doReturn(wTx).when(dataBroker).newReadWriteTransaction();
-
- when(wTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
-
- OperationProcessor operationProcessor = new OperationProcessor(dataBroker);
- provider = new FlowCapableTopologyProvider(dataBroker, notificationProviderService, operationProcessor);
- }
-
- @Test
- public void testRun() throws Exception {
- when(rTx.read(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any()))
- .thenReturn(Futures.immediateCheckedFuture(Optional.of(mockNode)));
- provider.start();
- verify(rTx).read(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any());
- }
-
- @Test
- public void testRunWithoutTopology() throws Exception {
- when(rTx.read(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any()))
- .thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
- provider.start();
- verify(wTx).submit();
- }
-
- @Test
- public void testClose() throws Exception {
- when(rTx.read(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any()))
- .thenReturn(Futures.immediateCheckedFuture(Optional.of(mockNode)));
-
- final ListenerRegistration<NotificationInterestListener> listenerRegistration = mock(ListenerRegistration.class);
- doReturn(listenerRegistration).when(notificationProviderService).registerNotificationListener(Matchers.<NotificationListener>any());
-
- provider.start();
- provider.close();
-
- verify(listenerRegistration).close();
- }
-
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowplugin.applications.topology.manager;
-
-import static org.mockito.Mockito.times;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-
-@RunWith(MockitoJUnitRunner.class)
-public class OperationProcessorTest {
-
-
- OperationProcessor processor;
-
- @Mock
- DataBroker dataBroker;
- @Mock
- BindingTransactionChain transactionChain;
- @Mock
- TransactionChainListener transactionChainListener;
- @Mock
- AsyncTransaction asyncTransaction;
- @Mock
- Throwable throwable;
-
- @Before
- public void setUp() {
- Mockito.when(dataBroker.createTransactionChain(Matchers.any(OperationProcessor.class)))
- .thenReturn(transactionChain);
- processor = new OperationProcessor(dataBroker);
- }
-
- @Test
- public void onTransactionChainFailedTest() {
- processor.onTransactionChainFailed(transactionChain, asyncTransaction, throwable);
- Mockito.verify(transactionChain).close();
- //dataBroker.createTransactionChain is called 2 time
- // (first time in constructor, second time after old chain has been closed)
- Mockito.verify(dataBroker, times(2)).createTransactionChain(Matchers.any(OperationProcessor.class));
- }
-
- @Test
- public void closeTest() {
- processor.close();
- Mockito.verify(transactionChain).close();
- }
-
-
-}
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-api</artifactId>
+ </dependency>
<dependency>
<groupId>junit</groupId>
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.openflowplugin.impl.device;
+package org.opendaylight.openflowplugin.common.txchain;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
* method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
* and submitTransaction method (wrapped {@link WriteTransaction#submit()}).
*/
-class TransactionChainManager implements TransactionChainListener, AutoCloseable {
+public class TransactionChainManager implements TransactionChainListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
private final String nodeId;
@GuardedBy("txLock")
- private WriteTransaction writeTx;
+ private ReadWriteTransaction writeTx;
@GuardedBy("txLock")
- private BindingTransactionChain txChainFactory;
+ private BindingTransactionChain transactionChain;
@GuardedBy("txLock")
private boolean submitIsEnabled;
@GuardedBy("txLock")
@GuardedBy("txLock")
private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
- TransactionChainManager(@Nonnull final DataBroker dataBroker,
- @Nonnull final DeviceInfo deviceInfo) {
+ public TransactionChainManager(@Nonnull final DataBroker dataBroker,
+ @Nonnull final String deviceIdentifier) {
this.dataBroker = dataBroker;
- this.nodeId = deviceInfo.toString();
+ this.nodeId = deviceIdentifier;
this.lastSubmittedFuture = Futures.immediateFuture(null);
}
@GuardedBy("txLock")
private void createTxChain() {
- BindingTransactionChain txChainFactoryTemp = txChainFactory;
- txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
+ BindingTransactionChain txChainFactoryTemp = transactionChain;
+ transactionChain = dataBroker.createTransactionChain(TransactionChainManager.this);
Optional.ofNullable(txChainFactoryTemp).ifPresent(TransactionChain::close);
}
- boolean initialSubmitWriteTransaction() {
+ public boolean initialSubmitWriteTransaction() {
enableSubmit();
- return submitWriteTransaction();
+ return submitTransaction();
}
/**
* registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
* transactions. Call this method for MASTER role only.
*/
- void activateTransactionManager() {
+ public void activateTransactionManager() {
if (LOG.isDebugEnabled()) {
LOG.debug("activateTransactionManager for node {} transaction submit is set to {}",
this.nodeId, submitIsEnabled);
}
synchronized (txLock) {
if (TransactionChainManagerStatus.SLEEPING == transactionChainManagerStatus) {
- Preconditions.checkState(txChainFactory == null,
+ Preconditions.checkState(transactionChain == null,
"TxChainFactory survive last close.");
Preconditions.checkState(writeTx == null,
"We have some unexpected WriteTransaction.");
* Call this method for SLAVE only.
* @return Future
*/
- ListenableFuture<Void> deactivateTransactionManager() {
+ public ListenableFuture<Void> deactivateTransactionManager() {
if (LOG.isDebugEnabled()) {
LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
}
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- removeTxChainFactory();
+ closeTransactionChain();
}
@Override
- public void onFailure(final Throwable throwable) {
- removeTxChainFactory();
+ public void onFailure(@Nonnull final Throwable t) {
+ closeTransactionChain();
}
});
} else {
return future;
}
- private void removeTxChainFactory() {
- Optional.ofNullable(txChainFactory).ifPresent(TransactionChain::close);
- txChainFactory = null;
+ private void closeTransactionChain() {
+ if (writeTx != null) {
+ writeTx.cancel();
+ writeTx = null;
+ }
+ Optional.ofNullable(transactionChain).ifPresent(TransactionChain::close);
+ transactionChain = null;
}
- boolean submitWriteTransaction() {
+ public boolean submitTransaction() {
synchronized (txLock) {
if (!submitIsEnabled) {
if (LOG.isTraceEnabled()) {
return true;
}
- <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
- final InstanceIdentifier<T> path) {
+ public <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path){
synchronized (txLock) {
ensureTransaction();
if (writeTx == null) {
}
}
- <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
- final InstanceIdentifier<T> path,
- final T data,
- final boolean createParents) {
+ public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path,
+ final T data,
+ final boolean createParents){
synchronized (txLock) {
ensureTransaction();
if (writeTx == null) {
}
}
+ public <T extends DataObject> void mergeToTransaction(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path,
+ final T data,
+ final boolean createParents){
+ synchronized (txLock) {
+ ensureTransaction();
+ if (writeTx == null) {
+ LOG.debug("WriteTx is null for node {}. Merge data for {} was not realized.", this.nodeId, path);
+ throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
+ }
+
+ writeTx.merge(store, path, data, createParents);
+ }
+ }
+
+ public <T extends DataObject> CheckedFuture<com.google.common.base.Optional<T>, ReadFailedException>
+ readFromTransaction(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path){
+ synchronized (txLock) {
+ ensureTransaction();
+ if (writeTx == null) {
+ LOG.debug("WriteTx is null for node {}. Read data for {} was not realized.", this.nodeId, path);
+ throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
+ }
+
+ return writeTx.read(store, path);
+ }
+ }
+
@Override
public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
final AsyncTransaction<?, ?> transaction, final Throwable cause) {
synchronized (txLock) {
- if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) {
+ if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus &&
+ chain.equals(this.transactionChain)) {
LOG.warn("Transaction chain failed, recreating chain due to ", cause);
+ closeTransactionChain();
createTxChain();
writeTx = null;
}
}
@GuardedBy("txLock")
- private void ensureTransaction() {
+ private void ensureTransaction() {
if (writeTx == null && TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
- && txChainFactory != null) {
- writeTx = txChainFactory.newWriteOnlyTransaction();
+ && transactionChain != null) {
+ writeTx = transactionChain.newReadWriteTransaction();
}
}
- @VisibleForTesting
- void enableSubmit() {
+ private void enableSubmit() {
synchronized (txLock) {
- /* !!!IMPORTANT: never set true without txChainFactory */
- submitIsEnabled = txChainFactory != null;
+ /* !!!IMPORTANT: never set true without transactionChain */
+ submitIsEnabled = transactionChain != null;
}
}
- ListenableFuture<Void> shuttingDown() {
+ public ListenableFuture<Void> shuttingDown() {
if (LOG.isDebugEnabled()) {
LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
}
submitIsEnabled = false;
ListenableFuture<Void> future;
- if (!wasSubmitEnabled || txChainFactory == null) {
+ if (!wasSubmitEnabled || transactionChain == null) {
// stay with actual thread
future = Futures.immediateCheckedFuture(null);
LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
}
synchronized (txLock) {
- removeTxChainFactory();
+ closeTransactionChain();
}
}
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.openflowplugin</groupId>
+ <artifactId>openflowplugin-common</artifactId>
+ </dependency>
</dependencies>
</project>
import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
@Override
public boolean submitTransaction() {
- return initialized.get() && transactionChainManager.submitWriteTransaction();
+ return initialized.get() && transactionChainManager.submitTransaction();
}
@Override
if (LOG.isDebugEnabled()) {
LOG.debug("Transaction chain manager for node {} created", deviceInfo);
}
- this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
- this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo
- .getNodeInstanceIdentifier());
+ this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
+ this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
+ deviceInfo.getNodeInstanceIdentifier());
this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
}
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource;
import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
private static final Long DUMMY_XID = 544L;
private static final Long DUMMY_PORT_NUMBER = 159L;
private static final BigInteger DUMMY_DATAPATH_ID = new BigInteger("55");
- Xid xid;
- Xid xidMulti;
+ private Xid xid;
+ private Xid xidMulti;
- DeviceContext deviceContext;
+ private DeviceContext deviceContext;
@Mock
- RequestContext<GetAsyncReply> requestContext;
+ private RequestContext<GetAsyncReply> requestContext;
@Mock
- RequestContext<MultipartReply> requestContextMultiReply;
+ private RequestContext<MultipartReply> requestContextMultiReply;
@Mock
- ConnectionContext connectionContext;
+ private ConnectionContext connectionContext;
@Mock
- GetFeaturesOutput featuresOutput;
+ private GetFeaturesOutput featuresOutput;
@Mock
- DataBroker dataBroker;
+ private DataBroker dataBroker;
@Mock
- WriteTransaction writeTx;
+ private ReadWriteTransaction writeTx;
@Mock
- ReadOnlyTransaction readTx;
+ private ReadOnlyTransaction readTx;
@Mock
- BindingTransactionChain txChainFactory;
+ private BindingTransactionChain txChainFactory;
@Mock
- HashedWheelTimer timer;
+ private HashedWheelTimer timer;
@Mock
- OutboundQueueProvider outboundQueueProvider;
+ private OutboundQueueProvider outboundQueueProvider;
@Mock
- ConnectionAdapter connectionAdapter;
- NodeId nodeId = new NodeId("h2g2:42");
- KeyedInstanceIdentifier<Node, NodeKey> nodeKeyIdent = DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
+ private ConnectionAdapter connectionAdapter;
+ private NodeId nodeId = new NodeId("h2g2:42");
+ private KeyedInstanceIdentifier<Node, NodeKey> nodeKeyIdent = DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
@Mock
- TranslatorLibrary translatorLibrary;
+ private TranslatorLibrary translatorLibrary;
@Mock
MessageTranslator messageTranslatorPacketReceived;
@Mock
- MessageTranslator messageTranslatorFlowCapableNodeConnector;
+ private MessageTranslator messageTranslatorFlowCapableNodeConnector;
@Mock
private MessageTranslator<Object, Object> messageTranslatorFlowRemoved;
@Mock
settableFutureMultiReply.set((RpcResult<MultipartReply>) invocation.getArguments()[0]);
return null;
}).when(requestContextMultiReply).setResult(any(RpcResult.class));
- Mockito.when(txChainFactory.newWriteOnlyTransaction()).thenReturn(writeTx);
+ Mockito.when(txChainFactory.newReadWriteTransaction()).thenReturn(writeTx);
Mockito.when(dataBroker.newReadOnlyTransaction()).thenReturn(readTx);
Mockito.when(connectionContext.getOutboundQueueProvider()).thenReturn(outboundQueueProvider);
Mockito.when(connectionContext.getConnectionAdapter()).thenReturn(connectionAdapter);
Mockito.when(writeTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
final InstanceIdentifier<Nodes> dummyII = InstanceIdentifier.create(Nodes.class);
((DeviceContextImpl) deviceContext).getTransactionChainManager().activateTransactionManager() ;
- ((DeviceContextImpl) deviceContext).getTransactionChainManager().enableSubmit();
+ ((DeviceContextImpl) deviceContext).getTransactionChainManager().initialSubmitWriteTransaction();
deviceContext.addDeleteToTxChain(LogicalDatastoreType.CONFIGURATION, dummyII);
deviceContext.initialSubmitTransaction();
verify(writeTx).submit();
public void testAddDeleteToTxChain() throws Exception {
final InstanceIdentifier<Nodes> dummyII = InstanceIdentifier.create(Nodes.class);
((DeviceContextImpl) deviceContext).getTransactionChainManager().activateTransactionManager() ;
- ((DeviceContextImpl) deviceContext).getTransactionChainManager().enableSubmit();
+ ((DeviceContextImpl) deviceContext).getTransactionChainManager().initialSubmitWriteTransaction();
deviceContext.addDeleteToTxChain(LogicalDatastoreType.CONFIGURATION, dummyII);
verify(writeTx).delete(eq(LogicalDatastoreType.CONFIGURATION), eq(dummyII));
}
@Test
public void testSubmitTransaction() throws Exception {
((DeviceContextImpl) deviceContext).getTransactionChainManager().activateTransactionManager() ;
- ((DeviceContextImpl) deviceContext).getTransactionChainManager().enableSubmit();
+ ((DeviceContextImpl) deviceContext).getTransactionChainManager().initialSubmitWriteTransaction();
assertTrue(deviceContext.submitTransaction());
}
deviceContext.closeServiceInstance();
}
-}
\ No newline at end of file
+}
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
-import io.netty.util.HashedWheelTimer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
@Mock
private BindingTransactionChain txChain;
@Mock
- private WriteTransaction writeTx;
+ private ReadWriteTransaction writeTx;
@Mock
private TransactionChain<?, ?> transactionChain;
@Mock
- HashedWheelTimer timer;
- @Mock
- Registration registration;
- @Mock
- DeviceState deviceState;
- @Mock
DeviceInfo deviceInfo;
@Mock
nodeKeyIdent = DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
Mockito.when(deviceInfo.getNodeInstanceIdentifier()).thenReturn(nodeKeyIdent);
Mockito.when(deviceInfo.getNodeId()).thenReturn(nodeId);
- txChainManager = new TransactionChainManager(dataBroker, deviceInfo);
- Mockito.when(txChain.newWriteOnlyTransaction()).thenReturn(writeTx);
+ txChainManager = new TransactionChainManager(dataBroker, nodeId.getValue());
+ Mockito.when(txChain.newReadWriteTransaction()).thenReturn(writeTx);
path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
Mockito.when(writeTx.submit())
final Node data = new NodeBuilder().setId(nodeId).build();
txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
- Mockito.verify(txChain).newWriteOnlyTransaction();
+ Mockito.verify(txChain).newReadWriteTransaction();
Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
}
/**
- * Test of {@link TransactionChainManager#submitWriteTransaction()}.
+ * test of {@link TransactionChainManager#submitTransaction()}
+ * @throws Exception
*/
@Test
public void testSubmitTransaction() throws Exception {
final Node data = new NodeBuilder().setId(nodeId).build();
txChainManager.initialSubmitWriteTransaction();
txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
- txChainManager.submitWriteTransaction();
+ txChainManager.submitTransaction();
- Mockito.verify(txChain).newWriteOnlyTransaction();
+ Mockito.verify(txChain).newReadWriteTransaction();
Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
Mockito.verify(writeTx).submit();
}
/**
- * Test of {@link TransactionChainManager#submitWriteTransaction()}: no submit, never enabled.
+ * test of {@link TransactionChainManager#submitTransaction()}: no submit, never enabled
+ * @throws Exception
*/
@Test
public void testSubmitTransaction1() throws Exception {
final Node data = new NodeBuilder().setId(nodeId).build();
txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
- txChainManager.submitWriteTransaction();
+ txChainManager.submitTransaction();
- Mockito.verify(txChain).newWriteOnlyTransaction();
+ Mockito.verify(txChain).newReadWriteTransaction();
Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
Mockito.verify(writeTx, Mockito.never()).submit();
}
@Test
public void testSubmitTransactionFailed() throws Exception {
- Mockito.when(writeTx.submit())
- .thenReturn(Futures.immediateFailedCheckedFuture(new TransactionCommitFailedException("mock")));
+ Mockito.when(writeTx.submit()).thenReturn(Futures.<Void, TransactionCommitFailedException>immediateFailedCheckedFuture(new TransactionCommitFailedException("mock")));
final Node data = new NodeBuilder().setId(nodeId).build();
txChainManager.initialSubmitWriteTransaction();
txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
- txChainManager.submitWriteTransaction();
+ txChainManager.submitTransaction();
- Mockito.verify(txChain).newWriteOnlyTransaction();
+ Mockito.verify(txChain).newReadWriteTransaction();
Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
Mockito.verify(writeTx).submit();
}
- @Test
- public void testSubmitTransactionFailed2() throws Exception {
- final Node data = new NodeBuilder().setId(nodeId).build();
- txChainManager.initialSubmitWriteTransaction();
- txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
- txChainManager.submitWriteTransaction();
-
- Mockito.when(writeTx.submit())
- .thenReturn(Futures.immediateFailedCheckedFuture(new TransactionCommitFailedException("mock")));
- txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
- txChainManager.submitWriteTransaction();
-
- Mockito.verify(txChain, Mockito.times(2)).newWriteOnlyTransaction();
- Mockito.verify(writeTx, Mockito.times(2)).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
- Mockito.verify(writeTx, Mockito.times(2)).submit();
- }
-
/**
* Test of {@link TransactionChainManager#enableSubmit()}: no submit - counter is not active.
*/
txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
- Mockito.verify(txChain).newWriteOnlyTransaction();
+ Mockito.verify(txChain).newReadWriteTransaction();
Mockito.verify(writeTx, Mockito.times(2)).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
Mockito.verify(writeTx, Mockito.never()).submit();
}
@Test
public void testOnTransactionChainFailed() throws Exception {
- txChainManager.onTransactionChainFailed(transactionChain,
- Mockito.mock(AsyncTransaction.class), Mockito.mock(Throwable.class));
+ txChainManager.onTransactionChainFailed(txChain, Mockito.mock(AsyncTransaction.class), Mockito.mock(Throwable.class));
Mockito.verify(txChain).close();
Mockito.verify(dataBroker, Mockito.times(2)).createTransactionChain(txChainManager);
}
public void testAddDeleteOperationTotTxChain() throws Exception {
txChainManager.addDeleteOperationTotTxChain(LogicalDatastoreType.CONFIGURATION, path);
- Mockito.verify(txChain).newWriteOnlyTransaction();
+ Mockito.verify(txChain).newReadWriteTransaction();
Mockito.verify(writeTx).delete(LogicalDatastoreType.CONFIGURATION, path);
}
txChainManager.deactivateTransactionManager();
- Mockito.verify(txChain).newWriteOnlyTransaction();
+ Mockito.verify(txChain).newReadWriteTransaction();
Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
Mockito.verify(writeTx, Mockito.never()).submit();
Mockito.verify(writeTx).cancel();
@Test
public void testShuttingDown() throws Exception {
final Node data = new NodeBuilder().setId(nodeId).build();
+ txChainManager.initialSubmitWriteTransaction();
txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data, false);
- txChainManager.enableSubmit();
txChainManager.shuttingDown();
- Mockito.verify(txChain).newWriteOnlyTransaction();
+ Mockito.verify(txChain).newReadWriteTransaction();
Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data, false);
Mockito.verify(writeTx).submit();
}