*/
public class FeatureConfigPusher {
private static final Logger logger = LoggerFactory.getLogger(FeatureConfigPusher.class);
+ private static final int MAX_RETRIES=100;
private FeaturesService featuresService = null;
private ConfigPusher pusher = null;
/*
}
private boolean isInstalled(Feature feature) {
- List<Feature> installedFeatures = Arrays.asList(featuresService.listInstalledFeatures());
+ List<Feature> installedFeatures= null;
+ boolean cont = true;
+ int retries = 0;
+ while(cont) {
+ try {
+ installedFeatures = Arrays.asList(featuresService.listInstalledFeatures());
+ break;
+ } catch (Exception e) {
+ if(retries < MAX_RETRIES) {
+ logger.warn("Karaf featuresService.listInstalledFeatures() has thrown an exception, retry {}, Exception {}", retries,e);
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e1) {
+ throw new IllegalStateException(e1);
+ }
+ retries++;
+ continue;
+ } else {
+ logger.error("Giving up on Karaf featuresService.listInstalledFeatures() which has thrown an exception, retry {}, Exception {}", retries,e);
+ throw e;
+ }
+ }
+ }
return installedFeatures.contains(feature);
}
--- /dev/null
+@echo off
+rem
+rem
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements. See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License. You may obtain a copy of the License at
+rem
+rem http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+rem
+
+rem
+rem handle specific scripts; the SCRIPT_NAME is exactly the name of the Karaf
+rem script; for example karaf.bat, start.bat, stop.bat, admin.bat, client.bat, ...
+rem
+rem if "%KARAF_SCRIPT%" == "SCRIPT_NAME" (
+rem Actions go here...
+rem )
+
+rem
+rem general settings which should be applied for all scripts go here; please keep
+rem in mind that it is possible that scripts might be executed more than once, e.g.
+rem in example of the start script where the start script is executed first and the
+rem karaf script afterwards.
+rem
+
+rem
+rem The following section shows the possible configuration options for the default
+rem karaf scripts
+rem
+rem Window name of the windows console
+rem SET KARAF_TITLE
+rem Location of Java installation
+rem SET JAVA_HOME
+rem Minimum memory for the JVM
+rem SET JAVA_MIN_MEM
+rem Maximum memory for the JVM
+rem SET JAVA_MAX_MEM
+rem Minimum perm memory for the JVM
+rem SET JAVA_PERM_MEM
+rem Maximum perm memory for the JVM
+rem SET JAVA_MAX_PERM_MEM
+rem Karaf home folder
+rem SET KARAF_HOME
+rem Karaf data folder
+rem SET KARAF_DATA
+rem Karaf base folder
+rem SET KARAF_BASE
+rem Karaf etc folder
+rem SET KARAF_ETC
+rem Additional available Karaf options
+rem SET KARAF_OPTS
+rem Enable debug mode
+rem SET KARAF_DEBUG
+IF "%JAVA_MAX_PERM_MEM%"=="" SET JAVA_MAX_PERM_MEM=512m
+IF "%JAVA_MAX_MEM%"=="" SET JAVA_MAX_MEM=2048m
# default Openflow version = 1.0, we also support 1.3.
# ovsdb.of.version=1.3
+# ovsdb can be configured with ml2 to perform l3 forwarding. The config below enables that functionality, which is
+# disabled by default.
+# ovsdb.l3.fwd.enabled=yes
+
# ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
# gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
# specific arp/neighDiscovery lookup.
# default Openflow version = 1.3, we also support 1.0.
ovsdb.of.version=1.3
+# ovsdb can be configured with ml2 to perform l3 forwarding. The config below enables that functionality, which is
+# disabled by default.
+# ovsdb.l3.fwd.enabled=yes
+
# ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
# gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
# specific arp/neighDiscovery lookup.
package org.opendaylight.controller.md.sal.dom.broker.impl;
import static com.google.common.base.Preconditions.checkState;
-import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.EnumMap;
public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts);
- return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> absent());
+ return coordinator.submit(transaction, cohorts);
}
}
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
* {@link org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType} type.
*
*/
-public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory<DOMStoreTransactionChain>
- implements DOMTransactionChain, DOMDataCommitErrorListener {
+final class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory<DOMStoreTransactionChain>
+ implements DOMTransactionChain {
+ private static enum State {
+ RUNNING,
+ CLOSING,
+ CLOSED,
+ FAILED,
+ }
+ private static final AtomicIntegerFieldUpdater<DOMDataBrokerTransactionChainImpl> COUNTER_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, "counter");
+ private static final AtomicReferenceFieldUpdater<DOMDataBrokerTransactionChainImpl, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, State.class, "state");
private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerTransactionChainImpl.class);
private final AtomicLong txNum = new AtomicLong();
private final DOMDataCommitExecutor coordinator;
private final TransactionChainListener listener;
private final long chainId;
- private volatile boolean failed = false;
+ private volatile State state = State.RUNNING;
+ private volatile int counter = 0;
/**
*
this.listener = Preconditions.checkNotNull(listener);
}
+ private void checkNotFailed() {
+ Preconditions.checkState(state != State.FAILED, "Transaction chain has failed");
+ }
+
@Override
protected Object newTransactionIdentifier() {
return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement();
}
@Override
- public CheckedFuture<Void,TransactionCommitFailedException> submit(
+ public CheckedFuture<Void, TransactionCommitFailedException> submit(
final DOMDataWriteTransaction transaction, final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
+ checkNotFailed();
checkNotClosed();
- return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> of(this));
+ final CheckedFuture<Void, TransactionCommitFailedException> ret = coordinator.submit(transaction, cohorts);
+
+ COUNTER_UPDATER.incrementAndGet(this);
+ Futures.addCallback(ret, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ transactionCompleted();
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ transactionFailed(transaction, t);
+ }
+ });
+
+ return ret;
}
@Override
public void close() {
- super.close();
+ final boolean success = STATE_UPDATER.compareAndSet(this, State.RUNNING, State.CLOSING);
+ if (!success) {
+ LOG.debug("Chain {} is no longer running", this);
+ return;
+ }
+ super.close();
for (DOMStoreTransactionChain subChain : getTxFactories().values()) {
subChain.close();
}
- if (!failed) {
- LOG.debug("Transaction chain {} successfully finished.", this);
- // FIXME: this event should be emitted once all operations complete
- listener.onTransactionChainSuccessful(this);
+ if (counter == 0) {
+ finishClose();
}
}
- @Override
- public void onCommitFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
- failed = true;
+ private void finishClose() {
+ state = State.CLOSED;
+ listener.onTransactionChainSuccessful(this);
+ }
+
+ private void transactionCompleted() {
+ if (COUNTER_UPDATER.decrementAndGet(this) == 0 && state == State.CLOSING) {
+ finishClose();
+ }
+ }
+
+ private void transactionFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
+ state = State.FAILED;
LOG.debug("Transaction chain {} failed.", this, cause);
listener.onTransactionChainFailed(this, tx, cause);
}
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@Override
public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
- final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
+ final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
- Preconditions.checkArgument(listener != null, "Listener must not be null");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
ListenableFuture<Void> commitFuture = null;
try {
commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
- listener, commitStatsTracker));
+ commitStatsTracker));
} catch(RejectedExecutionException e) {
LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
executor, e);
"Could not submit the commit task - the commit queue capacity has been exceeded.", e));
}
- if (listener.isPresent()) {
- Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
- }
-
return MappingCheckedFuture.create(commitFuture,
TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
}
public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
- final Optional<DOMDataCommitErrorListener> listener,
final DurationStatsTracker commitStatTracker) {
this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.broker.impl;
-
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-
-/**
- *
- * Utility implemetation of {@link FutureCallback} which is responsible
- * for invoking {@link DOMDataCommitErrorListener} on TransactionCommit failed.
- *
- * When {@link #onFailure(Throwable)} is invoked, supplied {@link DOMDataCommitErrorListener}
- * callback is invoked with associated transaction and throwable is invoked on listener.
- *
- */
-class DOMDataCommitErrorInvoker implements FutureCallback<Void> {
-
- private final DOMDataWriteTransaction tx;
- private final DOMDataCommitErrorListener listener;
-
-
- /**
- *
- * Construct new DOMDataCommitErrorInvoker.
- *
- * @param transaction Transaction which should be passed as argument to {@link DOMDataCommitErrorListener#onCommitFailed(DOMDataWriteTransaction, Throwable)}
- * @param listener Listener which should be invoked on error.
- */
- public DOMDataCommitErrorInvoker(DOMDataWriteTransaction transaction, DOMDataCommitErrorListener listener) {
- this.tx = Preconditions.checkNotNull(transaction, "Transaction must not be null");
- this.listener = Preconditions.checkNotNull(listener, "Listener must not be null");
- }
-
- @Override
- public void onFailure(Throwable t) {
- listener.onCommitFailed(tx, t);
- }
-
- @Override
- public void onSuccess(Void result) {
- // NOOP
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.broker.impl;
-
-import java.util.EventListener;
-
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-
-/**
- *
- * Listener on transaction failure which may be passed to
- * {@link DOMDataCommitExecutor}. This listener is notified during transaction
- * processing, before result is delivered to other client code outside MD-SAL.
- * This allows implementors to update their internal state before transaction
- * failure is visible to client code.
- *
- * This is internal API for MD-SAL implementations, for consumer facing error
- * listeners see {@link org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener}.
- *
- */
-interface DOMDataCommitErrorListener extends EventListener {
-
- /**
- *
- * Callback which is invoked on transaction failure during three phase
- * commit in {@link DOMDataCommitExecutor}.
- *
- *
- * Implementation of this callback MUST NOT do any blocking calls or any
- * calls to MD-SAL, since this callback is invoked synchronously on MD-SAL
- * Broker coordination thread.
- *
- * @param tx
- * Transaction which failed
- * @param cause
- * Failure reason
- */
- void onCommitFailed(DOMDataWriteTransaction tx, Throwable cause);
-
-}
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
+import com.google.common.util.concurrent.CheckedFuture;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
/**
* Executor of Three Phase Commit coordination for
* Transaction to be used as context for reporting
* @param cohort
* DOM Store cohorts representing provided transaction, its
- * subtransactoins.
- * @param listener
- * Error listener which should be notified if transaction failed.
+ * subtransactions.
* @return a CheckedFuture. if commit coordination on cohorts finished successfully,
* nothing is returned from the Future, On failure,
* the Future fails with a {@link TransactionCommitFailedException}.
*
*/
CheckedFuture<Void,TransactionCommitFailedException> submit(DOMDataWriteTransaction tx,
- Iterable<DOMStoreThreePhaseCommitCohort> cohort, Optional<DOMDataCommitErrorListener> listener);
+ Iterable<DOMStoreThreePhaseCommitCohort> cohort);
}
*/
package org.opendaylight.md.controller.topology.manager;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeConnectorKey;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeKey;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPoint;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPointId;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyLink;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import java.util.Collections;
+import java.util.List;
+
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeConnectorKey;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeKey;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPoint;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPointId;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyLink;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, OpendaylightInventoryListener {
final NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId());
final InstanceIdentifier<Node> nodeInstance = toNodeIdentifier(notification.getNodeRef());
+
processor.enqueueOperation(new TopologyOperation() {
@Override
public void applyOperation(ReadWriteTransaction transaction) {
- removeAffectedLinks(nodeId, transaction);
- transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+ Optional<Node> nodeOptional = Optional.absent();
+ try {
+ nodeOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, nodeInstance).checkedGet();
+ } catch (ReadFailedException e) {
+ LOG.error("Error occured when trying to read Node ", e);
+ }
+ if (nodeOptional.isPresent()) {
+ removeAffectedLinks(nodeId, transaction);
+ transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+ }
}
@Override
processor.enqueueOperation(new TopologyOperation() {
@Override
public void applyOperation(ReadWriteTransaction transaction) {
- removeAffectedLinks(tpId, transaction);
- transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
+ Optional<TerminationPoint> terminationPointOptional = Optional.absent();
+ try {
+ terminationPointOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, tpInstance).checkedGet();
+ } catch (ReadFailedException e) {
+ LOG.error("Error occured when trying to read NodeConnector ", e);
+ }
+ if (terminationPointOptional.isPresent()) {
+ removeAffectedLinks(tpId, transaction);
+ transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
+ }
}
@Override
public void applyOperation(final ReadWriteTransaction transaction) {
final Link link = toTopologyLink(notification);
final InstanceIdentifier<Link> path = linkPath(link);
- transaction.merge(LogicalDatastoreType.OPERATIONAL, path, link, true);
+ transaction.put(LogicalDatastoreType.OPERATIONAL, path, link, true);
}
@Override
processor.enqueueOperation(new TopologyOperation() {
@Override
public void applyOperation(final ReadWriteTransaction transaction) {
- transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
+ Optional<Link> linkOptional = Optional.absent();
+ try {
+ // read that checks if link exists (if we do not do this we might get an exception on delete)
+ linkOptional = transaction.read(LogicalDatastoreType.OPERATIONAL,
+ linkPath(toTopologyLink(notification))).checkedGet();
+ } catch (ReadFailedException e) {
+ LOG.error("Error occured when trying to read Link ", e);
+ }
+ if (linkOptional.isPresent()) {
+ transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
+ }
}
@Override
});
}
+
@Override
public void onLinkUtilizationNormal(final LinkUtilizationNormal notification) {
// NOOP
}
private void removeAffectedLinks(final NodeId id, final ReadWriteTransaction transaction) {
- CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
- transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
- Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
- @Override
- public void onSuccess(Optional<Topology> topologyOptional) {
- removeAffectedLinks(id, topologyOptional);
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- LOG.error("Error reading topology data for topology {}", topology, throwable);
- }
- });
+ Optional<Topology> topologyOptional = Optional.absent();
+ try {
+ topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+ } catch (ReadFailedException e) {
+ LOG.error("Error reading topology data for topology {}", topology, e);
+ }
+ if (topologyOptional.isPresent()) {
+ removeAffectedLinks(id, topologyOptional, transaction);
+ }
}
- private void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional) {
+ private void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional, ReadWriteTransaction transaction) {
if (!topologyOptional.isPresent()) {
return;
}
List<Link> linkList = topologyOptional.get().getLink() != null ?
topologyOptional.get().getLink() : Collections.<Link> emptyList();
- final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
for (Link link : linkList) {
if (id.equals(link.getSource().getSourceNode()) ||
id.equals(link.getDestination().getDestNode())) {
- linkIDsToDelete.add(linkPath(link));
+ transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
}
}
-
- enqueueLinkDeletes(linkIDsToDelete);
- }
-
- private void enqueueLinkDeletes(final Collection<InstanceIdentifier<Link>> linkIDsToDelete) {
- if(!linkIDsToDelete.isEmpty()) {
- processor.enqueueOperation(new TopologyOperation() {
- @Override
- public void applyOperation(ReadWriteTransaction transaction) {
- for(InstanceIdentifier<Link> linkID: linkIDsToDelete) {
- transaction.delete(LogicalDatastoreType.OPERATIONAL, linkID);
- }
- }
-
- @Override
- public String toString() {
- return "Delete Links " + linkIDsToDelete.size();
- }
- });
- }
}
private void removeAffectedLinks(final TpId id, final ReadWriteTransaction transaction) {
- CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
- transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
- Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
- @Override
- public void onSuccess(Optional<Topology> topologyOptional) {
- removeAffectedLinks(id, topologyOptional);
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- LOG.error("Error reading topology data for topology {}", topology, throwable);
- }
- });
+ Optional<Topology> topologyOptional = Optional.absent();
+ try {
+ topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+ } catch (ReadFailedException e) {
+ LOG.error("Error reading topology data for topology {}", topology, e);
+ }
+ if (topologyOptional.isPresent()) {
+ removeAffectedLinks(id, topologyOptional, transaction);
+ }
}
- private void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional) {
+ private void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional, ReadWriteTransaction transaction) {
if (!topologyOptional.isPresent()) {
return;
}
List<Link> linkList = topologyOptional.get().getLink() != null
? topologyOptional.get().getLink() : Collections.<Link> emptyList();
- final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
for (Link link : linkList) {
if (id.equals(link.getSource().getSourceTp()) ||
id.equals(link.getDestination().getDestTp())) {
- linkIDsToDelete.add(linkPath(link));
+ transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
}
}
-
- enqueueLinkDeletes(linkIDsToDelete);
}
private InstanceIdentifier<Node> getNodePath(final NodeId nodeId) {
package org.opendaylight.md.controller.topology.manager;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
private static final int MAX_TRANSACTION_OPERATIONS = 100;
private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
private final DataBroker dataBroker;
- private final BindingTransactionChain transactionChain;
+ private BindingTransactionChain transactionChain;
OperationProcessor(final DataBroker dataBroker) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
LOG.debug("Processed {} operations, submitting transaction", ops);
- CheckedFuture<Void, TransactionCommitFailedException> txResultFuture = tx.submit();
- Futures.addCallback(txResultFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void notUsed) {
- LOG.debug("Topology export successful for tx :{}", tx.getIdentifier());
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- LOG.error("Topology export transaction {} failed", tx.getIdentifier(), throwable.getCause());
- }
- });
+ try {
+ tx.submit().checkedGet();
+ } catch (final TransactionCommitFailedException e) {
+ LOG.warn("Stat DataStoreOperation unexpected State!", e);
+ transactionChain.close();
+ transactionChain = dataBroker.createTransactionChain(this);
+ cleanDataStoreOperQueue();
+ }
}
- } catch (InterruptedException e) {
- LOG.info("Interrupted processing, terminating", e);
+ } catch (final IllegalStateException e) {
+ LOG.warn("Stat DataStoreOperation unexpected State!", e);
+ transactionChain.close();
+ transactionChain = dataBroker.createTransactionChain(this);
+ cleanDataStoreOperQueue();
+ } catch (final InterruptedException e) {
+ LOG.warn("Stat Manager DS Operation thread interupted!", e);
+ } catch (final Exception e) {
+ LOG.warn("Stat DataStore Operation executor fail!", e);
}
+ // Drain all events, making sure any blocked threads are unblocked
+ cleanDataStoreOperQueue();
+
+ }
+
+ private void cleanDataStoreOperQueue() {
// Drain all events, making sure any blocked threads are unblocked
while (!queue.isEmpty()) {
queue.poll();
package org.opendaylight.md.controller.topology.manager;
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
public class FlowCapableTopologyExporterTest {
@Test
public void testOnNodeRemoved() {
+ NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+ InstanceIdentifier<Node> topoNodeII = topologyIID.child(Node.class, topoNodeKey);
+ Node topoNode = new NodeBuilder().setKey(topoNodeKey).build();
+
org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
- nodeKey = newInvNodeKey("node1");
+ nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
nodeKey);
};
SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+ readFuture.set(Optional.of(topology));
ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
.read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+ SettableFuture<Optional<Node>> readFutureNode = SettableFuture.create();
+ readFutureNode.set(Optional.of(topoNode));
+ doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx1)
+ .read(LogicalDatastoreType.OPERATIONAL, topoNodeII);
+
CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
int expDeleteCalls = expDeletedIIDs.length;
ArgumentCaptor.forClass(InstanceIdentifier.class);
setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
- ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
- setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
- CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
-
- doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+ doReturn(mockTx1).when(mockTxChain).newReadWriteTransaction();
exporter.onNodeRemoved(new NodeRemovedBuilder().setNodeRef(new NodeRef(invNodeID)).build());
waitForDeletes(expDeleteCalls, deleteLatch);
- waitForSubmit(submitLatch2);
-
assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
verifyMockTx(mockTx1);
- verifyMockTx(mockTx2);
}
@SuppressWarnings({ "rawtypes" })
@Test
public void testOnNodeRemovedWithNoTopology() {
+ NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+ InstanceIdentifier<Node> topoNodeII = topologyIID.child(Node.class, topoNodeKey);
+ Node topoNode = new NodeBuilder().setKey(topoNodeKey).build();
+
org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
- nodeKey = newInvNodeKey("node1");
+ nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
nodeKey);
.read(LogicalDatastoreType.OPERATIONAL, topologyIID);
CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+ SettableFuture<Optional<Node>> readFutureNode = SettableFuture.create();
+ readFutureNode.set(Optional.of(topoNode));
+ doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx)
+ .read(LogicalDatastoreType.OPERATIONAL, topoNodeII);
+
CountDownLatch deleteLatch = new CountDownLatch(1);
ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
ArgumentCaptor.forClass(InstanceIdentifier.class);
@Test
public void testOnNodeConnectorRemoved() {
+ NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+ TerminationPointKey terminationPointKey = new TerminationPointKey(new TpId("tp1"));
+
+ InstanceIdentifier<TerminationPoint> topoTermPointII = topologyIID.child(Node.class, topoNodeKey)
+ .child(TerminationPoint.class, terminationPointKey);
+ TerminationPoint topoTermPoint = new TerminationPointBuilder().setKey(terminationPointKey).build();
+
org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
- nodeKey = newInvNodeKey("node1");
+ nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
- newInvNodeConnKey("tp1");
+ newInvNodeConnKey(terminationPointKey.getTpId().getValue());
InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
};
final SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+ readFuture.set(Optional.of(topology));
ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
.read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+ SettableFuture<Optional<TerminationPoint>> readFutureNode = SettableFuture.create();
+ readFutureNode.set(Optional.of(topoTermPoint));
+ doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx1)
+ .read(LogicalDatastoreType.OPERATIONAL, topoTermPointII);
+
CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
int expDeleteCalls = expDeletedIIDs.length;
ArgumentCaptor.forClass(InstanceIdentifier.class);
setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
- ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
- setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
- CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
- doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+ doReturn(mockTx1).when(mockTxChain).newReadWriteTransaction();
exporter.onNodeConnectorRemoved(new NodeConnectorRemovedBuilder().setNodeConnectorRef(
new NodeConnectorRef(invNodeConnID)).build());
waitForDeletes(expDeleteCalls, deleteLatch);
- waitForSubmit(submitLatch2);
-
assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
verifyMockTx(mockTx1);
- verifyMockTx(mockTx2);
}
@SuppressWarnings("rawtypes")
@Test
public void testOnNodeConnectorRemovedWithNoTopology() {
+ NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+ TerminationPointKey terminationPointKey = new TerminationPointKey(new TpId("tp1"));
+
+ InstanceIdentifier<TerminationPoint> topoTermPointII = topologyIID.child(Node.class, topoNodeKey)
+ .child(TerminationPoint.class, terminationPointKey);
+ TerminationPoint topoTermPoint = new TerminationPointBuilder().setKey(terminationPointKey).build();
+
org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
- nodeKey = newInvNodeKey("node1");
+ nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
- newInvNodeConnKey("tp1");
+ newInvNodeConnKey(terminationPointKey.getTpId().getValue());
InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
.read(LogicalDatastoreType.OPERATIONAL, topologyIID);
CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+ SettableFuture<Optional<TerminationPoint>> readFutureNode = SettableFuture.create();
+ readFutureNode.set(Optional.of(topoTermPoint));
+ doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx)
+ .read(LogicalDatastoreType.OPERATIONAL, topoTermPointII);
+
CountDownLatch deleteLatch = new CountDownLatch(1);
ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
ArgumentCaptor.forClass(InstanceIdentifier.class);
waitForSubmit(submitLatch);
ArgumentCaptor<Link> mergedNode = ArgumentCaptor.forClass(Link.class);
- verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(
- Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId())))),
+ verify(mockTx).put(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(
+ Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId())))),
mergedNode.capture(), eq(true));
assertEquals("Source node ID", "sourceNode",
mergedNode.getValue().getSource().getSourceNode().getValue());
}
@Test
- public void testOnLinkRemoved() {
+ public void testOnLinkRemovedLinkExists() {
org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
sourceNodeKey = newInvNodeKey("sourceNode");
destNodeConnKey = newInvNodeConnKey("destTP");
InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
+ Link link = newLink(sourceNodeConnKey.getId().getValue(), newSourceTp(sourceNodeConnKey.getId().getValue()),
+ newDestTp(destNodeConnKey.getId().getValue()));
+
ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+ doReturn(Futures.immediateCheckedFuture(Optional.of(link))).when(mockTx).read(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+ Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
exporter.onLinkRemoved(new LinkRemovedBuilder().setSource(
new NodeConnectorRef(sourceConnID)).setDestination(
- new NodeConnectorRef(destConnID)).build());
+ new NodeConnectorRef(destConnID)).build());
waitForSubmit(submitLatch);
Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
}
+ @Test
+ public void testOnLinkRemovedLinkDoesNotExist() {
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ sourceNodeKey = newInvNodeKey("sourceNode");
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+ sourceNodeConnKey = newInvNodeConnKey("sourceTP");
+ InstanceIdentifier<?> sourceConnID = newNodeConnID(sourceNodeKey, sourceNodeConnKey);
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ destNodeKey = newInvNodeKey("destNode");
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+ destNodeConnKey = newInvNodeConnKey("destTP");
+ InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
+
+ ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+ CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+ doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+ doReturn(Futures.immediateCheckedFuture(Optional.<Link>absent())).when(mockTx).read(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+ Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
+
+ exporter.onLinkRemoved(new LinkRemovedBuilder().setSource(
+ new NodeConnectorRef(sourceConnID)).setDestination(
+ new NodeConnectorRef(destConnID)).build());
+
+ waitForSubmit(submitLatch);
+
+ verify(mockTx, never()).delete(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+ Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
+ }
+
private void verifyMockTx(ReadWriteTransaction mockTx) {
InOrder inOrder = inOrder(mockTx);
inOrder.verify(mockTx, atLeast(0)).submit();