final NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId());
final InstanceIdentifier<Node> nodeInstance = toNodeIdentifier(notification.getNodeRef());
-
processor.enqueueOperation(new TopologyOperation() {
@Override
public void applyOperation(ReadWriteTransaction transaction) {
- Optional<Node> nodeOptional = Optional.absent();
- try {
- nodeOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, nodeInstance).checkedGet();
- } catch (ReadFailedException e) {
- LOG.error("Error occured when trying to read Node ", e);
- }
- if (nodeOptional.isPresent()) {
removeAffectedLinks(nodeId, transaction);
transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
- }
}
@Override
final InstanceIdentifier<TerminationPoint> tpInstance = toTerminationPointIdentifier(
notification.getNodeConnectorRef());
+ final InstanceIdentifier<Node> node = tpInstance.firstIdentifierOf(Node.class);
+
final TpId tpId = toTerminationPointId(getNodeConnectorKey(
notification.getNodeConnectorRef()).getId());
processor.enqueueOperation(new TopologyOperation() {
@Override
public void applyOperation(ReadWriteTransaction transaction) {
- Optional<TerminationPoint> terminationPointOptional = Optional.absent();
+ Optional<Node> nodeOptional = Optional.absent();
try {
- terminationPointOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, tpInstance).checkedGet();
+ nodeOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, node).checkedGet();
} catch (ReadFailedException e) {
LOG.error("Error occured when trying to read NodeConnector ", e);
}
- if (terminationPointOptional.isPresent()) {
+ if (nodeOptional.isPresent()) {
removeAffectedLinks(tpId, transaction);
transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
}
});
}
-
@Override
public void onLinkUtilizationNormal(final LinkUtilizationNormal notification) {
// NOOP
private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
private final DataBroker dataBroker;
private BindingTransactionChain transactionChain;
+ private volatile boolean finishing = false;
OperationProcessor(final DataBroker dataBroker) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
@Override
public void run() {
- try {
- for (; ; ) {
- TopologyOperation op = queue.take();
+ while (!finishing) {
+ try {
+ TopologyOperation op = queue.take();
- LOG.debug("New {} operation available, starting transaction", op);
+ LOG.debug("New {} operation available, starting transaction", op);
- final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
+ final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
- int ops = 0;
- do {
- op.applyOperation(tx);
+ int ops = 0;
+ do {
+ op.applyOperation(tx);
- ops++;
- if (ops < MAX_TRANSACTION_OPERATIONS) {
- op = queue.poll();
- } else {
- op = null;
- }
+ ops++;
+ if (ops < MAX_TRANSACTION_OPERATIONS) {
+ op = queue.poll();
+ } else {
+ op = null;
+ }
- LOG.debug("Next operation {}", op);
- } while (op != null);
+ LOG.debug("Next operation {}", op);
+ } while (op != null);
- LOG.debug("Processed {} operations, submitting transaction", ops);
+ LOG.debug("Processed {} operations, submitting transaction", ops);
- try {
- tx.submit().checkedGet();
- } catch (final TransactionCommitFailedException e) {
+ try {
+ tx.submit().checkedGet();
+ } catch (final TransactionCommitFailedException e) {
+ LOG.warn("Stat DataStoreOperation unexpected State!", e);
+ transactionChain.close();
+ transactionChain = dataBroker.createTransactionChain(this);
+ cleanDataStoreOperQueue();
+ }
+
+ } catch (final IllegalStateException e) {
LOG.warn("Stat DataStoreOperation unexpected State!", e);
transactionChain.close();
transactionChain = dataBroker.createTransactionChain(this);
cleanDataStoreOperQueue();
+ } catch (final InterruptedException e) {
+ LOG.warn("Stat Manager DS Operation thread interupted!", e);
+ finishing = true;
+ } catch (final Exception e) {
+ LOG.warn("Stat DataStore Operation executor fail!", e);
}
}
- } catch (final IllegalStateException e) {
- LOG.warn("Stat DataStoreOperation unexpected State!", e);
- transactionChain.close();
- transactionChain = dataBroker.createTransactionChain(this);
- cleanDataStoreOperQueue();
- } catch (final InterruptedException e) {
- LOG.warn("Stat Manager DS Operation thread interupted!", e);
- } catch (final Exception e) {
- LOG.warn("Stat DataStore Operation executor fail!", e);
- }
-
// Drain all events, making sure any blocked threads are unblocked
cleanDataStoreOperQueue();
-
}
private void cleanDataStoreOperQueue() {
- // Drain all events, making sure any blocked threads are unblocked
while (!queue.isEmpty()) {
queue.poll();
}
@Override
public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction, Throwable cause) {
LOG.error("Failed to export Topology manager operations, Transaction {} failed.", transaction.getIdentifier(), cause);
+ transactionChain.close();
+ transactionChain = dataBroker.createTransactionChain(this);
+ cleanDataStoreOperQueue();
}
@Override
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
TerminationPointKey terminationPointKey = new TerminationPointKey(new TpId("tp1"));
- InstanceIdentifier<TerminationPoint> topoTermPointII = topologyIID.child(Node.class, topoNodeKey)
- .child(TerminationPoint.class, terminationPointKey);
- TerminationPoint topoTermPoint = new TerminationPointBuilder().setKey(terminationPointKey).build();
+ InstanceIdentifier<Node> topoNodeII = topologyIID.child(Node.class, topoNodeKey);
+ Node topoNode = new NodeBuilder().setKey(topoNodeKey).build();
org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
.read(LogicalDatastoreType.OPERATIONAL, topologyIID);
- SettableFuture<Optional<TerminationPoint>> readFutureNode = SettableFuture.create();
- readFutureNode.set(Optional.of(topoTermPoint));
+ SettableFuture<Optional<Node>> readFutureNode = SettableFuture.create();
+ readFutureNode.set(Optional.of(topoNode));
doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx1)
- .read(LogicalDatastoreType.OPERATIONAL, topoTermPointII);
+ .read(LogicalDatastoreType.OPERATIONAL, topoNodeII);
CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
ArgumentCaptor.forClass(InstanceIdentifier.class);
setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
-
doReturn(mockTx1).when(mockTxChain).newReadWriteTransaction();
exporter.onNodeConnectorRemoved(new NodeConnectorRemovedBuilder().setNodeConnectorRef(
NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
TerminationPointKey terminationPointKey = new TerminationPointKey(new TpId("tp1"));
- InstanceIdentifier<TerminationPoint> topoTermPointII = topologyIID.child(Node.class, topoNodeKey)
- .child(TerminationPoint.class, terminationPointKey);
- TerminationPoint topoTermPoint = new TerminationPointBuilder().setKey(terminationPointKey).build();
+ InstanceIdentifier<Node> topoNodeII = topologyIID.child(Node.class, topoNodeKey);
+ Node topoNode = new NodeBuilder().setKey(topoNodeKey).build();
org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
.read(LogicalDatastoreType.OPERATIONAL, topologyIID);
CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
- SettableFuture<Optional<TerminationPoint>> readFutureNode = SettableFuture.create();
- readFutureNode.set(Optional.of(topoTermPoint));
+ SettableFuture<Optional<Node>> readFutureNode = SettableFuture.create();
+ readFutureNode.set(Optional.of(topoNode));
doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx)
- .read(LogicalDatastoreType.OPERATIONAL, topoTermPointII);
+ .read(LogicalDatastoreType.OPERATIONAL, topoNodeII);
CountDownLatch deleteLatch = new CountDownLatch(1);
ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
}
@Test
- public void testOnLinkRemovedLinkExists() {
+ public void testOnLinkRemoved() {
org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
sourceNodeKey = newInvNodeKey("sourceNode");