Merge "Bug 1309 - Cannot publish LinkDiscovered event"
authorTony Tkacik <ttkacik@cisco.com>
Mon, 22 Sep 2014 18:29:28 +0000 (18:29 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 22 Sep 2014 18:29:28 +0000 (18:29 +0000)
13 files changed:
opendaylight/config/config-persister-feature-adapter/src/main/java/org/opendaylight/controller/configpusherfeature/internal/FeatureConfigPusher.java
opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/setenv.bat [new file with mode: 0644]
opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties
opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java [deleted file]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java [deleted file]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java
opendaylight/md-sal/topology-manager/src/test/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporterTest.java

index 1c094ad..57052f9 100644 (file)
@@ -26,6 +26,7 @@ import com.google.common.collect.LinkedHashMultimap;
  */
 public class FeatureConfigPusher {
     private static final Logger logger = LoggerFactory.getLogger(FeatureConfigPusher.class);
+    private static final int MAX_RETRIES=100;
     private FeaturesService featuresService = null;
     private ConfigPusher pusher = null;
     /*
@@ -82,7 +83,29 @@ public class FeatureConfigPusher {
     }
 
     private boolean isInstalled(Feature feature) {
-        List<Feature> installedFeatures = Arrays.asList(featuresService.listInstalledFeatures());
+        List<Feature> installedFeatures= null;
+        boolean cont = true;
+        int retries = 0;
+        while(cont) {
+            try {
+                installedFeatures = Arrays.asList(featuresService.listInstalledFeatures());
+                break;
+            } catch (Exception e) {
+                if(retries < MAX_RETRIES) {
+                    logger.warn("Karaf featuresService.listInstalledFeatures() has thrown an exception, retry {}, Exception {}", retries,e);
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e1) {
+                        throw new IllegalStateException(e1);
+                    }
+                    retries++;
+                    continue;
+                } else {
+                    logger.error("Giving up on Karaf featuresService.listInstalledFeatures() which has thrown an exception, retry {}, Exception {}", retries,e);
+                    throw e;
+                }
+            }
+        }
         return installedFeatures.contains(feature);
     }
 
diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/setenv.bat b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/setenv.bat
new file mode 100644 (file)
index 0000000..7c61920
--- /dev/null
@@ -0,0 +1,64 @@
+@echo off
+rem
+rem
+rem    Licensed to the Apache Software Foundation (ASF) under one or more
+rem    contributor license agreements.  See the NOTICE file distributed with
+rem    this work for additional information regarding copyright ownership.
+rem    The ASF licenses this file to You under the Apache License, Version 2.0
+rem    (the "License"); you may not use this file except in compliance with
+rem    the License.  You may obtain a copy of the License at
+rem
+rem       http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem    Unless required by applicable law or agreed to in writing, software
+rem    distributed under the License is distributed on an "AS IS" BASIS,
+rem    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem    See the License for the specific language governing permissions and
+rem    limitations under the License.
+rem
+
+rem
+rem handle specific scripts; the SCRIPT_NAME is exactly the name of the Karaf
+rem script; for example karaf.bat, start.bat, stop.bat, admin.bat, client.bat, ...
+rem
+rem if "%KARAF_SCRIPT%" == "SCRIPT_NAME" (
+rem   Actions go here...
+rem )
+
+rem
+rem general settings which should be applied for all scripts go here; please keep
+rem in mind that it is possible that scripts might be executed more than once, e.g.
+rem in example of the start script where the start script is executed first and the
+rem karaf script afterwards.
+rem
+
+rem
+rem The following section shows the possible configuration options for the default
+rem karaf scripts
+rem
+rem Window name of the windows console
+rem SET KARAF_TITLE
+rem Location of Java installation
+rem SET JAVA_HOME
+rem Minimum memory for the JVM
+rem SET JAVA_MIN_MEM
+rem Maximum memory for the JVM
+rem SET JAVA_MAX_MEM
+rem Minimum perm memory for the JVM
+rem SET JAVA_PERM_MEM
+rem Maximum perm memory for the JVM
+rem SET JAVA_MAX_PERM_MEM
+rem Karaf home folder
+rem SET KARAF_HOME
+rem Karaf data folder
+rem SET KARAF_DATA
+rem Karaf base folder
+rem SET KARAF_BASE
+rem Karaf etc folder
+rem SET KARAF_ETC
+rem Additional available Karaf options
+rem SET KARAF_OPTS
+rem Enable debug mode
+rem SET KARAF_DEBUG
+IF "%JAVA_MAX_PERM_MEM%"=="" SET JAVA_MAX_PERM_MEM=512m
+IF "%JAVA_MAX_MEM%"=="" SET JAVA_MAX_MEM=2048m
index cdb6542..4a8f5ae 100644 (file)
@@ -94,6 +94,10 @@ ovsdb.listenPort=6640
 # default Openflow version = 1.0, we also support 1.3.
 # ovsdb.of.version=1.3
 
+# ovsdb can be configured with ml2 to perform l3 forwarding. The config below enables that functionality, which is
+# disabled by default.
+# ovsdb.l3.fwd.enabled=yes
+
 # ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
 # gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
 # specific arp/neighDiscovery lookup.
index 530e46e..691d83d 100644 (file)
@@ -116,6 +116,10 @@ ovsdb.listenPort=6640
 # default Openflow version = 1.3, we also support 1.0.
 ovsdb.of.version=1.3
 
+# ovsdb can be configured with ml2 to perform l3 forwarding. The config below enables that functionality, which is
+# disabled by default.
+# ovsdb.l3.fwd.enabled=yes
+
 # ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
 # gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
 # specific arp/neighDiscovery lookup.
index 8ed5206..5fbf127 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
 import static com.google.common.base.Preconditions.checkState;
-import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.EnumMap;
@@ -102,6 +101,6 @@ public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory<DO
     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
         LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts);
-        return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> absent());
+        return coordinator.submit(transaction, cohorts);
     }
 }
index 7cd6afa..0b1dd1c 100644 (file)
@@ -6,11 +6,14 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
@@ -27,16 +30,27 @@ import org.slf4j.LoggerFactory;
  * {@link org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType} type.
  *
  */
-public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory<DOMStoreTransactionChain>
-        implements DOMTransactionChain, DOMDataCommitErrorListener {
+final class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory<DOMStoreTransactionChain>
+        implements DOMTransactionChain {
+    private static enum State {
+        RUNNING,
+        CLOSING,
+        CLOSED,
+        FAILED,
+    }
 
+    private static final AtomicIntegerFieldUpdater<DOMDataBrokerTransactionChainImpl> COUNTER_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, "counter");
+    private static final AtomicReferenceFieldUpdater<DOMDataBrokerTransactionChainImpl, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, State.class, "state");
     private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerTransactionChainImpl.class);
     private final AtomicLong txNum = new AtomicLong();
     private final DOMDataCommitExecutor coordinator;
     private final TransactionChainListener listener;
     private final long chainId;
 
-    private volatile boolean failed = false;
+    private volatile State state = State.RUNNING;
+    private volatile int counter = 0;
 
     /**
      *
@@ -62,37 +76,70 @@ public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTrans
         this.listener = Preconditions.checkNotNull(listener);
     }
 
+    private void checkNotFailed() {
+        Preconditions.checkState(state != State.FAILED, "Transaction chain has failed");
+    }
+
     @Override
     protected Object newTransactionIdentifier() {
         return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement();
     }
 
     @Override
-    public CheckedFuture<Void,TransactionCommitFailedException> submit(
+    public CheckedFuture<Void, TransactionCommitFailedException> submit(
             final DOMDataWriteTransaction transaction, final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
+        checkNotFailed();
         checkNotClosed();
 
-        return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> of(this));
+        final CheckedFuture<Void, TransactionCommitFailedException> ret = coordinator.submit(transaction, cohorts);
+
+        COUNTER_UPDATER.incrementAndGet(this);
+        Futures.addCallback(ret, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                transactionCompleted();
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                transactionFailed(transaction, t);
+            }
+        });
+
+        return ret;
     }
 
     @Override
     public void close() {
-        super.close();
+        final boolean success = STATE_UPDATER.compareAndSet(this, State.RUNNING, State.CLOSING);
+        if (!success) {
+            LOG.debug("Chain {} is no longer running", this);
+            return;
+        }
 
+        super.close();
         for (DOMStoreTransactionChain subChain : getTxFactories().values()) {
             subChain.close();
         }
 
-        if (!failed) {
-            LOG.debug("Transaction chain {}¬†successfully finished.", this);
-            // FIXME: this event should be emitted once all operations complete
-            listener.onTransactionChainSuccessful(this);
+        if (counter == 0) {
+            finishClose();
         }
     }
 
-    @Override
-    public void onCommitFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
-        failed = true;
+    private void finishClose() {
+        state = State.CLOSED;
+        listener.onTransactionChainSuccessful(this);
+    }
+
+    private void transactionCompleted() {
+        if (COUNTER_UPDATER.decrementAndGet(this) == 0 && state == State.CLOSING) {
+            finishClose();
+        }
+    }
+
+    private void transactionFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
+        state = State.FAILED;
         LOG.debug("Transaction chain {}¬†failed.", this, cause);
         listener.onTransactionChainFailed(this, tx, cause);
     }
index 77cf105..15d7b1d 100644 (file)
@@ -6,7 +6,6 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
@@ -63,16 +62,15 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
 
     @Override
     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
-            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
+            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
-        Preconditions.checkArgument(listener != null, "Listener must not be null");
         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
 
         ListenableFuture<Void> commitFuture = null;
         try {
             commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
-                    listener, commitStatsTracker));
+                    commitStatsTracker));
         } catch(RejectedExecutionException e) {
             LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
                       executor, e);
@@ -81,10 +79,6 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
                         "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
         }
 
-        if (listener.isPresent()) {
-            Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
-        }
-
         return MappingCheckedFuture.create(commitFuture,
                 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
     }
@@ -141,7 +135,6 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
 
         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
-                final Optional<DOMDataCommitErrorListener> listener,
                 final DurationStatsTracker commitStatTracker) {
             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java
deleted file mode 100644 (file)
index 5ce9241..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.broker.impl;
-
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-
-/**
- *
- * Utility implemetation of {@link FutureCallback} which is responsible
- * for invoking {@link DOMDataCommitErrorListener} on TransactionCommit failed.
- *
- * When {@link #onFailure(Throwable)} is invoked, supplied {@link DOMDataCommitErrorListener}
- * callback is invoked with associated transaction and throwable is invoked on listener.
- *
- */
-class DOMDataCommitErrorInvoker implements FutureCallback<Void> {
-
-    private final DOMDataWriteTransaction tx;
-    private final DOMDataCommitErrorListener listener;
-
-
-    /**
-     *
-     * Construct new DOMDataCommitErrorInvoker.
-     *
-     * @param transaction Transaction which should be passed as argument to {@link DOMDataCommitErrorListener#onCommitFailed(DOMDataWriteTransaction, Throwable)}
-     * @param listener Listener which should be invoked on error.
-     */
-    public DOMDataCommitErrorInvoker(DOMDataWriteTransaction transaction, DOMDataCommitErrorListener listener) {
-        this.tx = Preconditions.checkNotNull(transaction, "Transaction must not be null");
-        this.listener = Preconditions.checkNotNull(listener, "Listener must not be null");
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-        listener.onCommitFailed(tx, t);
-    }
-
-    @Override
-    public void onSuccess(Void result) {
-        // NOOP
-    }
-}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java
deleted file mode 100644 (file)
index 80bc669..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.broker.impl;
-
-import java.util.EventListener;
-
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-
-/**
- *
- * Listener on transaction failure which may be passed to
- * {@link DOMDataCommitExecutor}. This listener is notified during transaction
- * processing, before result is delivered to other client code outside MD-SAL.
- * This allows implementors to update their internal state before transaction
- * failure is visible to client code.
- *
- * This is internal API for MD-SAL implementations, for consumer facing error
- * listeners see {@link org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener}.
- *
- */
-interface DOMDataCommitErrorListener extends EventListener {
-
-    /**
-     *
-     * Callback which is invoked on transaction failure during three phase
-     * commit in {@link DOMDataCommitExecutor}.
-     *
-     *
-     * Implementation of this callback MUST NOT do any blocking calls or any
-     * calls to MD-SAL, since this callback is invoked synchronously on MD-SAL
-     * Broker coordination thread.
-     *
-     * @param tx
-     *            Transaction which failed
-     * @param cause
-     *            Failure reason
-     */
-    void onCommitFailed(DOMDataWriteTransaction tx, Throwable cause);
-
-}
index 234758c..8aa97e7 100644 (file)
@@ -7,11 +7,10 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
+import com.google.common.util.concurrent.CheckedFuture;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
 
 /**
  * Executor of Three Phase Commit coordination for
@@ -35,15 +34,13 @@ interface DOMDataCommitExecutor {
      *            Transaction to be used as context for reporting
      * @param cohort
      *            DOM Store cohorts representing provided transaction, its
-     *            subtransactoins.
-     * @param listener
-     *            Error listener which should be notified if transaction failed.
+     *            subtransactions.
      * @return a CheckedFuture. if commit coordination on cohorts finished successfully,
      *         nothing is returned from the Future, On failure,
      *         the Future fails with a {@link TransactionCommitFailedException}.
      *
      */
     CheckedFuture<Void,TransactionCommitFailedException> submit(DOMDataWriteTransaction tx,
-            Iterable<DOMStoreThreePhaseCommitCohort> cohort, Optional<DOMDataCommitErrorListener> listener);
+            Iterable<DOMStoreThreePhaseCommitCohort> cohort);
 
 }
index 361373d..d8602c2 100644 (file)
@@ -7,17 +7,8 @@
  */
 package org.opendaylight.md.controller.topology.manager;
 
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeConnectorKey;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeKey;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPoint;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPointId;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyLink;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -48,12 +39,16 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import java.util.Collections;
+import java.util.List;
+
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeConnectorKey;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeKey;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPoint;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPointId;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyLink;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
 
 class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, OpendaylightInventoryListener {
 
@@ -73,11 +68,20 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         final NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId());
         final InstanceIdentifier<Node> nodeInstance = toNodeIdentifier(notification.getNodeRef());
 
+
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(ReadWriteTransaction transaction) {
-                removeAffectedLinks(nodeId, transaction);
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+                Optional<Node> nodeOptional = Optional.absent();
+                try {
+                    nodeOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, nodeInstance).checkedGet();
+                } catch (ReadFailedException e) {
+                    LOG.error("Error occured when trying to read Node ", e);
+                }
+                if (nodeOptional.isPresent()) {
+                    removeAffectedLinks(nodeId, transaction);
+                    transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+                }
             }
 
             @Override
@@ -119,8 +123,16 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(ReadWriteTransaction transaction) {
-                removeAffectedLinks(tpId, transaction);
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
+                Optional<TerminationPoint> terminationPointOptional = Optional.absent();
+                try {
+                    terminationPointOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, tpInstance).checkedGet();
+                } catch (ReadFailedException e) {
+                    LOG.error("Error occured when trying to read NodeConnector ", e);
+                }
+                if (terminationPointOptional.isPresent()) {
+                    removeAffectedLinks(tpId, transaction);
+                    transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
+                }
             }
 
             @Override
@@ -164,7 +176,7 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
             public void applyOperation(final ReadWriteTransaction transaction) {
                 final Link link = toTopologyLink(notification);
                 final InstanceIdentifier<Link> path = linkPath(link);
-                transaction.merge(LogicalDatastoreType.OPERATIONAL, path, link, true);
+                transaction.put(LogicalDatastoreType.OPERATIONAL, path, link, true);
             }
 
             @Override
@@ -184,7 +196,17 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(final ReadWriteTransaction transaction) {
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
+                Optional<Link> linkOptional = Optional.absent();
+                try {
+                    // read that checks if link exists (if we do not do this we might get an exception on delete)
+                    linkOptional = transaction.read(LogicalDatastoreType.OPERATIONAL,
+                            linkPath(toTopologyLink(notification))).checkedGet();
+                } catch (ReadFailedException e) {
+                    LOG.error("Error occured when trying to read Link ", e);
+                }
+                if (linkOptional.isPresent()) {
+                    transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
+                }
             }
 
             @Override
@@ -194,6 +216,7 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         });
     }
 
+
     @Override
     public void onLinkUtilizationNormal(final LinkUtilizationNormal notification) {
         // NOOP
@@ -212,89 +235,57 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
     }
 
     private void removeAffectedLinks(final NodeId id, final ReadWriteTransaction transaction) {
-        CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
-                transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
-        Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
-            @Override
-            public void onSuccess(Optional<Topology> topologyOptional) {
-                removeAffectedLinks(id, topologyOptional);
-            }
-
-            @Override
-            public void onFailure(Throwable throwable) {
-                LOG.error("Error reading topology data for topology {}", topology, throwable);
-            }
-        });
+        Optional<Topology> topologyOptional = Optional.absent();
+        try {
+            topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+        } catch (ReadFailedException e) {
+            LOG.error("Error reading topology data for topology {}", topology, e);
+        }
+        if (topologyOptional.isPresent()) {
+            removeAffectedLinks(id, topologyOptional, transaction);
+        }
     }
 
-    private void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional) {
+    private void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional, ReadWriteTransaction transaction) {
         if (!topologyOptional.isPresent()) {
             return;
         }
 
         List<Link> linkList = topologyOptional.get().getLink() != null ?
                 topologyOptional.get().getLink() : Collections.<Link> emptyList();
-        final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
         for (Link link : linkList) {
             if (id.equals(link.getSource().getSourceNode()) ||
                     id.equals(link.getDestination().getDestNode())) {
-                linkIDsToDelete.add(linkPath(link));
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
             }
         }
-
-        enqueueLinkDeletes(linkIDsToDelete);
-    }
-
-    private void enqueueLinkDeletes(final Collection<InstanceIdentifier<Link>> linkIDsToDelete) {
-        if(!linkIDsToDelete.isEmpty()) {
-            processor.enqueueOperation(new TopologyOperation() {
-                @Override
-                public void applyOperation(ReadWriteTransaction transaction) {
-                    for(InstanceIdentifier<Link> linkID: linkIDsToDelete) {
-                        transaction.delete(LogicalDatastoreType.OPERATIONAL, linkID);
-                    }
-                }
-
-                @Override
-                public String toString() {
-                    return "Delete Links " + linkIDsToDelete.size();
-                }
-            });
-        }
     }
 
     private void removeAffectedLinks(final TpId id, final ReadWriteTransaction transaction) {
-        CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
-                transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
-        Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
-            @Override
-            public void onSuccess(Optional<Topology> topologyOptional) {
-                removeAffectedLinks(id, topologyOptional);
-            }
-
-            @Override
-            public void onFailure(Throwable throwable) {
-                LOG.error("Error reading topology data for topology {}", topology, throwable);
-            }
-        });
+        Optional<Topology> topologyOptional = Optional.absent();
+        try {
+            topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+        } catch (ReadFailedException e) {
+            LOG.error("Error reading topology data for topology {}", topology, e);
+        }
+        if (topologyOptional.isPresent()) {
+            removeAffectedLinks(id, topologyOptional, transaction);
+        }
     }
 
-    private void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional) {
+    private void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional, ReadWriteTransaction transaction) {
         if (!topologyOptional.isPresent()) {
             return;
         }
 
         List<Link> linkList = topologyOptional.get().getLink() != null
                 ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
-        final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
         for (Link link : linkList) {
             if (id.equals(link.getSource().getSourceTp()) ||
                     id.equals(link.getDestination().getDestTp())) {
-                linkIDsToDelete.add(linkPath(link));
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
             }
         }
-
-        enqueueLinkDeletes(linkIDsToDelete);
     }
 
     private InstanceIdentifier<Node> getNodePath(final NodeId nodeId) {
index f09da00..41162d3 100644 (file)
@@ -8,13 +8,6 @@
 package org.opendaylight.md.controller.topology.manager;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -25,6 +18,9 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFaile
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
     private static final int MAX_TRANSACTION_OPERATIONS = 100;
@@ -32,7 +28,7 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
 
     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
     private final DataBroker dataBroker;
-    private final BindingTransactionChain transactionChain;
+    private BindingTransactionChain transactionChain;
 
     OperationProcessor(final DataBroker dataBroker) {
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
@@ -73,23 +69,32 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
 
                 LOG.debug("Processed {} operations, submitting transaction", ops);
 
-                CheckedFuture<Void, TransactionCommitFailedException> txResultFuture = tx.submit();
-                Futures.addCallback(txResultFuture, new FutureCallback<Void>() {
-                    @Override
-                    public void onSuccess(Void notUsed) {
-                        LOG.debug("Topology export successful for tx :{}", tx.getIdentifier());
-                    }
-
-                    @Override
-                    public void onFailure(Throwable throwable) {
-                        LOG.error("Topology export transaction {} failed", tx.getIdentifier(), throwable.getCause());
-                    }
-                });
+                try {
+                    tx.submit().checkedGet();
+                } catch (final TransactionCommitFailedException e) {
+                    LOG.warn("Stat DataStoreOperation unexpected State!", e);
+                    transactionChain.close();
+                    transactionChain = dataBroker.createTransactionChain(this);
+                    cleanDataStoreOperQueue();
+                }
             }
-        } catch (InterruptedException e) {
-            LOG.info("Interrupted processing, terminating", e);
+        } catch (final IllegalStateException e) {
+            LOG.warn("Stat DataStoreOperation unexpected State!", e);
+            transactionChain.close();
+            transactionChain = dataBroker.createTransactionChain(this);
+            cleanDataStoreOperQueue();
+        } catch (final InterruptedException e) {
+            LOG.warn("Stat Manager DS Operation thread interupted!", e);
+        } catch (final Exception e) {
+            LOG.warn("Stat DataStore Operation executor fail!", e);
         }
 
+        // Drain all events, making sure any blocked threads are unblocked
+        cleanDataStoreOperQueue();
+
+    }
+
+    private void cleanDataStoreOperQueue() {
         // Drain all events, making sure any blocked threads are unblocked
         while (!queue.isEmpty()) {
             queue.poll();
index b7a56a4..d07d42f 100644 (file)
@@ -8,29 +8,11 @@
 
 package org.opendaylight.md.controller.topology.manager;
 
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,17 +64,36 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 
 public class FlowCapableTopologyExporterTest {
 
@@ -135,8 +136,12 @@ public class FlowCapableTopologyExporterTest {
     @Test
     public void testOnNodeRemoved() {
 
+        NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+        InstanceIdentifier<Node> topoNodeII = topologyIID.child(Node.class, topoNodeKey);
+        Node topoNode = new NodeBuilder().setKey(topoNodeKey).build();
+
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-                                                                nodeKey = newInvNodeKey("node1");
+                                                                nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
         InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
                 org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
                 nodeKey);
@@ -154,10 +159,16 @@ public class FlowCapableTopologyExporterTest {
             };
 
         SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+        readFuture.set(Optional.of(topology));
         ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
         doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
                 .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
 
+        SettableFuture<Optional<Node>> readFutureNode = SettableFuture.create();
+        readFutureNode.set(Optional.of(topoNode));
+        doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx1)
+                .read(LogicalDatastoreType.OPERATIONAL, topoNodeII);
+
         CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
 
         int expDeleteCalls = expDeletedIIDs.length;
@@ -166,11 +177,7 @@ public class FlowCapableTopologyExporterTest {
                 ArgumentCaptor.forClass(InstanceIdentifier.class);
         setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
 
-        ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
-        setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
-        CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
-
-        doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+        doReturn(mockTx1).when(mockTxChain).newReadWriteTransaction();
 
         exporter.onNodeRemoved(new NodeRemovedBuilder().setNodeRef(new NodeRef(invNodeID)).build());
 
@@ -180,20 +187,21 @@ public class FlowCapableTopologyExporterTest {
 
         waitForDeletes(expDeleteCalls, deleteLatch);
 
-        waitForSubmit(submitLatch2);
-
         assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
 
         verifyMockTx(mockTx1);
-        verifyMockTx(mockTx2);
     }
 
     @SuppressWarnings({ "rawtypes" })
     @Test
     public void testOnNodeRemovedWithNoTopology() {
 
+        NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+        InstanceIdentifier<Node> topoNodeII = topologyIID.child(Node.class, topoNodeKey);
+        Node topoNode = new NodeBuilder().setKey(topoNodeKey).build();
+
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-                                                                nodeKey = newInvNodeKey("node1");
+                nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
         InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
                 org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
                 nodeKey);
@@ -207,6 +215,11 @@ public class FlowCapableTopologyExporterTest {
                 .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
         CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
 
+        SettableFuture<Optional<Node>> readFutureNode = SettableFuture.create();
+        readFutureNode.set(Optional.of(topoNode));
+        doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topoNodeII);
+
         CountDownLatch deleteLatch = new CountDownLatch(1);
         ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
                 ArgumentCaptor.forClass(InstanceIdentifier.class);
@@ -227,11 +240,18 @@ public class FlowCapableTopologyExporterTest {
     @Test
     public void testOnNodeConnectorRemoved() {
 
+        NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+        TerminationPointKey terminationPointKey = new TerminationPointKey(new TpId("tp1"));
+
+        InstanceIdentifier<TerminationPoint> topoTermPointII = topologyIID.child(Node.class, topoNodeKey)
+                .child(TerminationPoint.class, terminationPointKey);
+        TerminationPoint topoTermPoint = new TerminationPointBuilder().setKey(terminationPointKey).build();
+
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-                                                                  nodeKey = newInvNodeKey("node1");
+                                                                  nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
 
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
-                newInvNodeConnKey("tp1");
+                newInvNodeConnKey(terminationPointKey.getTpId().getValue());
 
         InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
 
@@ -249,10 +269,16 @@ public class FlowCapableTopologyExporterTest {
             };
 
         final SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+        readFuture.set(Optional.of(topology));
         ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
         doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
                 .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
 
+        SettableFuture<Optional<TerminationPoint>> readFutureNode = SettableFuture.create();
+        readFutureNode.set(Optional.of(topoTermPoint));
+        doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx1)
+                .read(LogicalDatastoreType.OPERATIONAL, topoTermPointII);
+
         CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
 
         int expDeleteCalls = expDeletedIIDs.length;
@@ -261,11 +287,8 @@ public class FlowCapableTopologyExporterTest {
                 ArgumentCaptor.forClass(InstanceIdentifier.class);
         setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
 
-        ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
-        setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
-        CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
 
-        doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+        doReturn(mockTx1).when(mockTxChain).newReadWriteTransaction();
 
         exporter.onNodeConnectorRemoved(new NodeConnectorRemovedBuilder().setNodeConnectorRef(
                 new NodeConnectorRef(invNodeConnID)).build());
@@ -276,23 +299,27 @@ public class FlowCapableTopologyExporterTest {
 
         waitForDeletes(expDeleteCalls, deleteLatch);
 
-        waitForSubmit(submitLatch2);
-
         assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
 
         verifyMockTx(mockTx1);
-        verifyMockTx(mockTx2);
     }
 
     @SuppressWarnings("rawtypes")
     @Test
     public void testOnNodeConnectorRemovedWithNoTopology() {
 
+        NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+        TerminationPointKey terminationPointKey = new TerminationPointKey(new TpId("tp1"));
+
+        InstanceIdentifier<TerminationPoint> topoTermPointII = topologyIID.child(Node.class, topoNodeKey)
+                .child(TerminationPoint.class, terminationPointKey);
+        TerminationPoint topoTermPoint = new TerminationPointBuilder().setKey(terminationPointKey).build();
+
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-                                                                  nodeKey = newInvNodeKey("node1");
+                nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
 
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
-                newInvNodeConnKey("tp1");
+                newInvNodeConnKey(terminationPointKey.getTpId().getValue());
 
         InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
 
@@ -306,6 +333,11 @@ public class FlowCapableTopologyExporterTest {
                 .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
         CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
 
+        SettableFuture<Optional<TerminationPoint>> readFutureNode = SettableFuture.create();
+        readFutureNode.set(Optional.of(topoTermPoint));
+        doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topoTermPointII);
+
         CountDownLatch deleteLatch = new CountDownLatch(1);
         ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
                 ArgumentCaptor.forClass(InstanceIdentifier.class);
@@ -510,8 +542,8 @@ public class FlowCapableTopologyExporterTest {
         waitForSubmit(submitLatch);
 
         ArgumentCaptor<Link> mergedNode = ArgumentCaptor.forClass(Link.class);
-        verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(
-                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId())))),
+        verify(mockTx).put(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(
+                        Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId())))),
                 mergedNode.capture(), eq(true));
         assertEquals("Source node ID", "sourceNode",
                 mergedNode.getValue().getSource().getSourceNode().getValue());
@@ -524,7 +556,7 @@ public class FlowCapableTopologyExporterTest {
     }
 
     @Test
-    public void testOnLinkRemoved() {
+    public void testOnLinkRemovedLinkExists() {
 
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
                 sourceNodeKey = newInvNodeKey("sourceNode");
@@ -538,13 +570,18 @@ public class FlowCapableTopologyExporterTest {
                 destNodeConnKey = newInvNodeConnKey("destTP");
         InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
 
+        Link link = newLink(sourceNodeConnKey.getId().getValue(), newSourceTp(sourceNodeConnKey.getId().getValue()),
+                newDestTp(destNodeConnKey.getId().getValue()));
+
         ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
         CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
         doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+        doReturn(Futures.immediateCheckedFuture(Optional.of(link))).when(mockTx).read(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
 
         exporter.onLinkRemoved(new LinkRemovedBuilder().setSource(
                 new NodeConnectorRef(sourceConnID)).setDestination(
-                        new NodeConnectorRef(destConnID)).build());
+                new NodeConnectorRef(destConnID)).build());
 
         waitForSubmit(submitLatch);
 
@@ -552,6 +589,37 @@ public class FlowCapableTopologyExporterTest {
                 Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
     }
 
+    @Test
+    public void testOnLinkRemovedLinkDoesNotExist() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                sourceNodeKey = newInvNodeKey("sourceNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                sourceNodeConnKey = newInvNodeConnKey("sourceTP");
+        InstanceIdentifier<?> sourceConnID = newNodeConnID(sourceNodeKey, sourceNodeConnKey);
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                destNodeKey = newInvNodeKey("destNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                destNodeConnKey = newInvNodeConnKey("destTP");
+        InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+        doReturn(Futures.immediateCheckedFuture(Optional.<Link>absent())).when(mockTx).read(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
+
+        exporter.onLinkRemoved(new LinkRemovedBuilder().setSource(
+                new NodeConnectorRef(sourceConnID)).setDestination(
+                new NodeConnectorRef(destConnID)).build());
+
+        waitForSubmit(submitLatch);
+
+        verify(mockTx, never()).delete(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
+    }
+
     private void verifyMockTx(ReadWriteTransaction mockTx) {
         InOrder inOrder = inOrder(mockTx);
         inOrder.verify(mockTx, atLeast(0)).submit();