*/
package org.opendaylight.controller.md.inventory.manager;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
public class NodeChangeCommiter implements OpendaylightInventoryListener {
- private final static Logger LOG = LoggerFactory.getLogger(NodeChangeCommiter.class);
+ protected final static Logger LOG = LoggerFactory.getLogger(NodeChangeCommiter.class);
private final FlowCapableInventoryProvider manager;
final NodeConnectorRef ref = connector.getNodeConnectorRef();
final DataModificationTransaction it = this.getManager().startChange();
- NodeChangeCommiter.LOG.debug("removing node connector {} ", ref.getValue());
+ LOG.debug("removing node connector {} ", ref.getValue());
it.removeOperationalData(ref.getValue());
Future<RpcResult<TransactionStatus>> commitResult = it.commit();
- try {
- commitResult.get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Node Connector {} not removed.", ref.getValue(), e);
- }
-
+ listenOnTransactionState(it.getIdentifier(), commitResult, "nodeConnector removal", ref.getValue());
}
@Override
data.addAugmentation(FlowCapableNodeConnector.class, augment);
}
InstanceIdentifier<? extends Object> value = ref.getValue();
- NodeChangeCommiter.LOG.debug("updating node connector : {}.", value);
+ LOG.debug("updating node connector : {}.", value);
NodeConnector build = data.build();
it.putOperationalData((value), build);
Future<RpcResult<TransactionStatus>> commitResult = it.commit();
- try {
- commitResult.get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Node Connector {} not updated.", ref.getValue(), e);
- }
-
+ listenOnTransactionState(it.getIdentifier(), commitResult, "nodeConnector update", ref.getValue());
}
@Override
final NodeRef ref = node.getNodeRef();
final DataModificationTransaction it = this.manager.startChange();
- NodeChangeCommiter.LOG.debug("removing node : {}", ref.getValue());
+ LOG.debug("removing node : {}", ref.getValue());
it.removeOperationalData((ref.getValue()));
Future<RpcResult<TransactionStatus>> commitResult = it.commit();
- try {
- commitResult.get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Node {} not removed.", ref.getValue(), e);
- }
+ listenOnTransactionState(it.getIdentifier(), commitResult, "node removal", ref.getValue());
}
@Override
final FlowCapableNode augment = InventoryMapping.toInventoryAugment(flowNode);
nodeBuilder.addAugmentation(FlowCapableNode.class, augment);
InstanceIdentifier<? extends Object> value = ref.getValue();
- InstanceIdentifierBuilder<Node> builder = InstanceIdentifier.<Node> builder(((InstanceIdentifier<Node>) value));
+ InstanceIdentifierBuilder<Node> builder = ((InstanceIdentifier<Node>) value).builder();
InstanceIdentifierBuilder<FlowCapableNode> augmentation = builder
.<FlowCapableNode> augmentation(FlowCapableNode.class);
final InstanceIdentifier<FlowCapableNode> path = augmentation.build();
- NodeChangeCommiter.LOG.debug("updating node :{} ", path);
+ LOG.debug("updating node :{} ", path);
it.putOperationalData(path, augment);
Future<RpcResult<TransactionStatus>> commitResult = it.commit();
- try {
- commitResult.get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Node {} not updated.", ref.getValue(), e);
- }
-
+ listenOnTransactionState(it.getIdentifier(), commitResult, "node update", ref.getValue());
+ }
+
+ /**
+ * @param txId transaction identificator
+ * @param future transaction result
+ * @param action performed by transaction
+ * @param nodeConnectorPath target value
+ */
+ private static void listenOnTransactionState(final Object txId, Future<RpcResult<TransactionStatus>> future,
+ final String action, final InstanceIdentifier<?> nodeConnectorPath) {
+ Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallback<RpcResult<TransactionStatus>>() {
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Action {} [{}] failed for Tx:{}", action, nodeConnectorPath, txId, t);
+
+ }
+
+ @Override
+ public void onSuccess(RpcResult<TransactionStatus> result) {
+ if(!result.isSuccessful()) {
+ LOG.error("Action {} [{}] failed for Tx:{}", action, nodeConnectorPath, txId);
+ }
+ }
+ });
}
}
import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+
class FlowCapableTopologyExporter implements //
FlowTopologyDiscoveryListener, //
OpendaylightInventoryListener //
{
- private final static Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
+ protected final static Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
public static TopologyKey topology = new TopologyKey(new TopologyId("flow:1"));
// FIXME: Flow capable topology exporter should use transaction chaining API
Topology top = tb.build();
DataModificationTransaction tx = dataService.beginTransaction();
tx.putOperationalData(topologyPath, top);
- tx.commit();
+ listenOnTransactionState(tx.getIdentifier(),tx.commit());
}
@Override
DataModificationTransaction tx = dataService.beginTransaction();
tx.removeOperationalData(nodeInstance);
removeAffectedLinks(tx, nodeId);
- try {
- tx.commit().get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Topology state export not successful. ",e);
- }
+ listenOnTransactionState(tx.getIdentifier(),tx.commit());
}
@Override
InstanceIdentifier<Node> path = getNodePath(toTopologyNodeId(notification.getId()));
DataModificationTransaction tx = dataService.beginTransaction();
tx.putOperationalData(path, node);
- try {
- tx.commit().get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Topology state export not successful. ",e);
- }
+ listenOnTransactionState(tx.getIdentifier(),tx.commit());
}
}
DataModificationTransaction tx = dataService.beginTransaction();
tx.removeOperationalData(tpInstance);
removeAffectedLinks(tx, tpId);
- try {
- tx.commit().get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Topology state export not successful. ",e);
- }
+ listenOnTransactionState(tx.getIdentifier(),tx.commit());
}
|| (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) {
removeAffectedLinks(tx, point.getTpId());
}
- try {
- tx.commit().get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Topology state export not successful. ",e);
- }
+ listenOnTransactionState(tx.getIdentifier(),tx.commit());
}
}
InstanceIdentifier<Link> path = linkPath(link);
DataModificationTransaction tx = dataService.beginTransaction();
tx.putOperationalData(path, link);
- try {
- tx.commit().get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Topology state export not successful. ",e);
- }
+ listenOnTransactionState(tx.getIdentifier(),tx.commit());
+
}
@Override
InstanceIdentifier<Link> path = linkPath(toTopologyLink(notification));
DataModificationTransaction tx = dataService.beginTransaction();
tx.removeOperationalData(path);
- ;
+ listenOnTransactionState(tx.getIdentifier(),tx.commit());
}
@Override
.child(Topology.class, topology).child(Link.class, link.getKey()).build();
return linkInstanceId;
}
+
+ /**
+ * @param txId transaction identificator
+ * @param future transaction result
+ */
+ private static void listenOnTransactionState(final Object txId, Future<RpcResult<TransactionStatus>> future) {
+ Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallback<RpcResult<TransactionStatus>>() {
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Topology export failed for Tx:{}", txId, t);
+
+ }
+
+ @Override
+ public void onSuccess(RpcResult<TransactionStatus> result) {
+ if(!result.isSuccessful()) {
+ LOG.error("Topology export failed for Tx:{}", txId);
+ }
+ }
+ });
+ }
}