BUG-1614: use per-NodeState transaction chains 64/10264/3
authorMilos Fabian <milfabia@cisco.com>
Thu, 28 Aug 2014 08:59:31 +0000 (10:59 +0200)
committerMilos Fabian <milfabia@cisco.com>
Thu, 28 Aug 2014 08:59:31 +0000 (10:59 +0200)
Fixes thread safety issues with the provider. Transaction chaining is
really helpful, as it forces us to properly structure our code and
synchronize hand-offs.

Change-Id: I149f765bcbb1069a1656d67c0dcb70452eda6e03
Signed-off-by: Robert Varga <rovarga@cisco.com>
Signed-off-by: Milos Fabian <milfabia@cisco.com>
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractTopologySessionListener.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPTopologyProvider.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/ServerSessionManager.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/TopologyNodeState.java
pcep/topology-provider/src/main/java/org/opendaylight/controller/config/yang/pcep/topology/provider/PCEPTopologyProviderModule.java
pcep/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractPCEPSessionTest.java

index 3151db0217ee172cb59e8213bdbaf17a3432cbe0..0fc3f2c56f845717c7b01996703fc52250ccb3f9 100644 (file)
@@ -12,7 +12,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import io.netty.util.concurrent.FutureListener;
+
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -20,8 +22,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
+
 import javax.annotation.concurrent.GuardedBy;
+
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -45,11 +48,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.reported.lsp.Path;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -111,11 +109,8 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
     private final Map<L, String> lsps = new HashMap<>();
 
     private final ServerSessionManager serverSessionManager;
-    private InstanceIdentifier<Node> topologyNode;
-    private InstanceIdentifier<Node1> topologyAugment;
     private InstanceIdentifier<PathComputationClient> pccIdentifier;
     private TopologyNodeState nodeState;
-    private boolean ownsTopology = false;
     private boolean synced = false;
     private PCEPSession session;
 
@@ -123,47 +118,6 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         this.serverSessionManager = Preconditions.checkNotNull(serverSessionManager);
     }
 
-    private static String createNodeId(final InetAddress addr) {
-        return "pcc://" + addr.getHostAddress();
-    }
-
-    private Node topologyNode(final ReadWriteTransaction trans, final InetAddress address) {
-        final String pccId = createNodeId(address);
-
-        // FIXME: Futures.transform...
-        try {
-            final Optional<Topology> topoMaybe = trans.read(LogicalDatastoreType.OPERATIONAL, this.serverSessionManager.getTopology()).get();
-            Preconditions.checkState(topoMaybe.isPresent(), "Failed to find topology.");
-            final Topology topo = topoMaybe.get();
-            for (final Node n : topo.getNode()) {
-                LOG.debug("Matching topology node {} to id {}", n, pccId);
-                if (n.getNodeId().getValue().equals(pccId)) {
-                    this.topologyNode = this.serverSessionManager.getTopology().child(Node.class, n.getKey());
-                    LOG.debug("Reusing topology node {} for id {} at {}", n, pccId, this.topologyNode);
-                    return n;
-                }
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            throw new IllegalStateException("Failed to ensure topology presence.", e);
-        }
-
-        /*
-         * We failed to find a matching node. Let's create a dynamic one
-         * and note that we are the owner (so we clean it up afterwards).
-         */
-        final NodeId id = new NodeId(pccId);
-        final NodeKey nk = new NodeKey(id);
-        final InstanceIdentifier<Node> nti = this.serverSessionManager.getTopology().child(Node.class, nk);
-
-        final Node ret = new NodeBuilder().setKey(nk).setNodeId(id).build();
-
-        trans.put(LogicalDatastoreType.OPERATIONAL, nti, ret);
-        LOG.debug("Created topology node {} for id {} at {}", ret, pccId, nti);
-        this.ownsTopology = true;
-        this.topologyNode = nti;
-        return ret;
-    }
-
     @Override
     public final synchronized void onSessionUp(final PCEPSession session) {
         /*
@@ -173,27 +127,29 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
          * the topology model, with empty LSP list.
          */
         final InetAddress peerAddress = session.getRemoteAddress();
-        final ReadWriteTransaction trans = this.serverSessionManager.rwTransaction();
 
-        final Node topoNode = topologyNode(trans, peerAddress);
-        LOG.trace("Peer {} resolved to topology node {}", peerAddress, topoNode);
+        final TopologyNodeState state = serverSessionManager.takeNodeState(peerAddress, this);
+        if (state == null) {
+            session.close(TerminationReason.Unknown);
+            return;
+        }
 
-        // Our augmentation in the topology node
+        LOG.trace("Peer {} resolved to topology node {}", peerAddress, state.getNodeId());
         this.synced = false;
 
-        final PathComputationClientBuilder pccBuilder;
-        pccBuilder = new PathComputationClientBuilder();
+        // Our augmentation in the topology node
+        final PathComputationClientBuilder pccBuilder = new PathComputationClientBuilder();
         pccBuilder.setIpAddress(IpAddressBuilder.getDefaultInstance(peerAddress.getHostAddress()));
 
         onSessionUp(session, pccBuilder);
 
         final Node1 ta = new Node1Builder().setPathComputationClient(pccBuilder.build()).build();
+        final InstanceIdentifier<Node1> topologyAugment = state.getNodeId().augmentation(Node1.class);
+        this.pccIdentifier = topologyAugment.child(PathComputationClient.class);
 
-        this.topologyAugment = this.topologyNode.augmentation(Node1.class);
-        this.pccIdentifier = this.topologyAugment.child(PathComputationClient.class);
-
-        trans.put(LogicalDatastoreType.OPERATIONAL, this.topologyAugment, ta);
-        LOG.trace("Peer data {} set to {}", this.topologyAugment, ta);
+        final ReadWriteTransaction trans = state.rwTransaction();
+        trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
+        LOG.trace("Peer data {} set to {}", topologyAugment, ta);
 
         // All set, commit the modifications
         Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
@@ -209,9 +165,9 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
             }
         });
 
-        this.nodeState = this.serverSessionManager.takeNodeState(topoNode.getNodeId(), this);
         this.session = session;
-        LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), topoNode.getNodeId());
+        this.nodeState = state;
+        LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), state.getNodeId());
     }
 
     @GuardedBy("this")
@@ -220,25 +176,6 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         this.nodeState = null;
         this.session = null;
 
-        // The session went down. Undo all the Topology changes we have done.
-        final WriteTransaction trans = this.serverSessionManager.beginTransaction();
-        trans.delete(LogicalDatastoreType.OPERATIONAL, this.topologyAugment);
-        if (this.ownsTopology) {
-            trans.delete(LogicalDatastoreType.OPERATIONAL, this.topologyNode);
-        }
-
-        Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                LOG.trace("Internal state for session {} cleaned up successfully", session);
-            }
-
-            @Override
-            public void onFailure(final Throwable t) {
-                LOG.error("Failed to cleanup internal state for session {}", session, t);
-            }
-        });
-
         // Clear all requests we know about
         for (final Entry<S, PCEPRequest> e : this.requests.entrySet()) {
             final PCEPRequest r = e.getValue();
@@ -275,7 +212,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
 
     @Override
     public final synchronized void onMessage(final PCEPSession session, final Message message) {
-        final MessageContext ctx = new MessageContext(this.serverSessionManager.beginTransaction());
+        final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
 
         if (onMessage(ctx, message)) {
             LOG.info("Unhandled message {} on session {}", message, session);
@@ -478,7 +415,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         return this.lsps.get(id);
     }
 
-    protected final <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
-        return this.serverSessionManager.readOperationalData(id);
+    protected synchronized final <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
+        return this.nodeState.readOperationalData(id);
     }
 }
index 661f481b28ea6b2b4adb2c0c00a57fc5636dc207..0b2699e443b47bb8a695900298a7c93c8a0987fd 100644 (file)
@@ -8,17 +8,16 @@
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
 import com.google.common.base.Preconditions;
-
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-
 import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutionException;
-
 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
 import org.opendaylight.bgpcep.topology.DefaultTopologyReference;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.protocol.pcep.PCEPDispatcher;
@@ -52,7 +51,7 @@ public final class PCEPTopologyProvider extends DefaultTopologyReference impleme
     public static PCEPTopologyProvider create(final PCEPDispatcher dispatcher, final InetSocketAddress address, final KeyMapping keys,
             final InstructionScheduler scheduler, final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry,
             final InstanceIdentifier<Topology> topology, final TopologySessionListenerFactory listenerFactory) throws InterruptedException,
-            ExecutionException {
+            ExecutionException, ReadFailedException, TransactionCommitFailedException {
 
         final ServerSessionManager manager = new ServerSessionManager(dataBroker, topology, listenerFactory);
         final ChannelFuture f = dispatcher.createServer(address, keys, manager);
@@ -90,7 +89,7 @@ public final class PCEPTopologyProvider extends DefaultTopologyReference impleme
                 }
                 try {
                     manager.close();
-                } catch (InterruptedException | ExecutionException e) {
+                } catch (Exception e) {
                     LOG.error("Failed to shutdown session manager", e);
                 }
             }
index a86458bb7b7ddee4ddcf58561b33198531aa0d61..9897ba3f4289c928f4a4d2d6d67b62a2f9871a1f 100644 (file)
@@ -9,25 +9,18 @@ package org.opendaylight.bgpcep.pcep.topology.provider;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.protocol.framework.SessionListenerFactory;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.AddLspArgs;
@@ -46,7 +39,6 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypesBuilder;
-import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +46,7 @@ import org.slf4j.LoggerFactory;
 /**
  *
  */
-final class ServerSessionManager implements SessionListenerFactory<PCEPSessionListener>, AutoCloseable, TopologySessionRPCs, TransactionChainListener {
+final class ServerSessionManager implements SessionListenerFactory<PCEPSessionListener>, AutoCloseable, TopologySessionRPCs {
     private static final Logger LOG = LoggerFactory.getLogger(ServerSessionManager.class);
     private static final long DEFAULT_HOLD_STATE_NANOS = TimeUnit.MINUTES.toNanos(5);
 
@@ -62,28 +54,22 @@ final class ServerSessionManager implements SessionListenerFactory<PCEPSessionLi
     private final Map<NodeId, TopologyNodeState> state = new HashMap<>();
     private final TopologySessionListenerFactory listenerFactory;
     private final InstanceIdentifier<Topology> topology;
-    private final BindingTransactionChain chain;
+    private final DataBroker broker;
 
-    public ServerSessionManager(final DataBroker dataProvider, final InstanceIdentifier<Topology> topology,
-            final TopologySessionListenerFactory listenerFactory) {
-        this.chain = dataProvider.createTransactionChain(this);
+    public ServerSessionManager(final DataBroker broker, final InstanceIdentifier<Topology> topology,
+            final TopologySessionListenerFactory listenerFactory) throws ReadFailedException, TransactionCommitFailedException {
+        this.broker = Preconditions.checkNotNull(broker);
         this.topology = Preconditions.checkNotNull(topology);
         this.listenerFactory = Preconditions.checkNotNull(listenerFactory);
 
-        // FIXME: should migrated to transaction chain
-        final ReadWriteTransaction tx = chain.newReadWriteTransaction();
 
         // Make sure the topology does not exist
-        final Optional<?> c;
-        try {
-            c = tx.read(LogicalDatastoreType.OPERATIONAL, topology).get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new IllegalStateException("Failed to ensure topology presence", e);
-        }
+        final ReadWriteTransaction tx = broker.newReadWriteTransaction();
+        final Optional<?> c = tx.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
         Preconditions.checkArgument(!c.isPresent(), "Topology %s already exists", topology);
 
-        // create empty network-topology is not exists
-        tx.put(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class).build(), new NetworkTopologyBuilder().build());
+        // create empty network-topology if not exists
+        tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class).build(), new NetworkTopologyBuilder().build());
         // Now create the base topology
         final TopologyKey k = InstanceIdentifier.keyOf(topology);
         tx.put(LogicalDatastoreType.OPERATIONAL, topology, new TopologyBuilder().setKey(k).setTopologyId(k.getTopologyId()).setTopologyTypes(
@@ -91,36 +77,36 @@ final class ServerSessionManager implements SessionListenerFactory<PCEPSessionLi
                         new TopologyTypes1Builder().setTopologyPcep(new TopologyPcepBuilder().build()).build()).build()).setNode(
                                 new ArrayList<Node>()).build());
 
-        Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                LOG.trace("Topology {} created successfully", topology);
-            }
-
-            @Override
-            public void onFailure(final Throwable t) {
-                LOG.error("Failed to create topology {}", topology, t);
-            }
-        });
+        tx.submit().checkedGet();
     }
 
-    public void releaseNodeState(final TopologyNodeState nodeState) {
+    private static NodeId createNodeId(final InetAddress addr) {
+        return new NodeId("pcc://" + addr.getHostAddress());
+    }
+
+    synchronized void releaseNodeState(final TopologyNodeState nodeState) {
         LOG.debug("Node {} unbound", nodeState.getNodeId());
         this.nodes.remove(nodeState.getNodeId());
         nodeState.released();
     }
 
-    synchronized TopologyNodeState takeNodeState(final NodeId id, final TopologySessionListener sessionListener) {
-        LOG.debug("Node {} bound to listener {}", id, sessionListener);
+    synchronized TopologyNodeState takeNodeState(final InetAddress address, final TopologySessionListener sessionListener) {
+        final NodeId id = createNodeId(address);
 
+        LOG.debug("Node {} requested by listener {}", id, sessionListener);
         TopologyNodeState ret = this.state.get(id);
+
         if (ret == null) {
-            ret = new TopologyNodeState(id, DEFAULT_HOLD_STATE_NANOS);
+            ret = new TopologyNodeState(broker, topology, id, DEFAULT_HOLD_STATE_NANOS);
+            LOG.debug("Created topology node {} for id {} at {}", ret, id, ret.getNodeId());
             this.state.put(id, ret);
+        } else {
+            // FIXME: check for conflicting session
         }
 
-        this.nodes.put(id, sessionListener);
         ret.taken();
+        this.nodes.put(id, sessionListener);
+        LOG.debug("Node {} bound to listener {}", id, sessionListener);
         return ret;
     }
 
@@ -177,38 +163,16 @@ final class ServerSessionManager implements SessionListenerFactory<PCEPSessionLi
         return l.ensureLspOperational(input);
     }
 
-    InstanceIdentifier<Topology> getTopology() {
-        return topology;
-    }
-
-    WriteTransaction beginTransaction() {
-        return chain.newWriteOnlyTransaction();
-    }
-
-    ReadWriteTransaction rwTransaction() {
-        return chain.newReadWriteTransaction();
-    }
-
-    <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
-        final ReadTransaction t = chain.newReadOnlyTransaction();
-        return t.read(LogicalDatastoreType.OPERATIONAL, id);
-    }
-
     @Override
-    public void close() throws InterruptedException, ExecutionException {
-        final WriteTransaction t = this.chain.newWriteOnlyTransaction();
+    public void close() throws TransactionCommitFailedException {
+        for (final TopologySessionListener sessionListener : nodes.values()) {
+            sessionListener.close();
+        }
+        for (final TopologyNodeState nodeState : state.values()) {
+            nodeState.close();
+        }
+        final WriteTransaction t = this.broker.newWriteOnlyTransaction();
         t.delete(LogicalDatastoreType.OPERATIONAL, this.topology);
-        t.submit().get();
-        chain.close();
-    }
-
-    @Override
-    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
-        LOG.error("Unexpected transaction failure in topology {} transaction {}", getTopology(), transaction.getIdentifier(), cause);
-    }
-
-    @Override
-    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
-        LOG.info("Topology {} shutdown successfully", getTopology());
+        t.submit().checkedGet();
     }
 }
index 3a5f064b3012b59deec63aa443cb7128bb465149..0415338a8a276eff720e2a5c41f8a4814f5dbbec 100644 (file)
@@ -7,7 +7,11 @@
  */
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -16,23 +20,44 @@ import java.util.Map;
 
 import javax.annotation.concurrent.ThreadSafe;
 
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.lsp.metadata.Metadata;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @ThreadSafe
-final class TopologyNodeState {
+final class TopologyNodeState implements AutoCloseable, TransactionChainListener {
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyNodeState.class);
     private final Map<String, Metadata> metadata = new HashMap<>();
+    private final KeyedInstanceIdentifier<Node, NodeKey> nodeId;
+    private final BindingTransactionChain chain;
     private final long holdStateNanos;
-    private final NodeId nodeId;
     private long lastReleased = 0;
 
-    public TopologyNodeState(final NodeId nodeId, final long holdStateNanos) {
+    public TopologyNodeState(final DataBroker broker, final InstanceIdentifier<Topology> topology, final NodeId id, final long holdStateNanos) {
         Preconditions.checkArgument(holdStateNanos >= 0);
-        this.nodeId = Preconditions.checkNotNull(nodeId);
+        this.nodeId = topology.child(Node.class, new NodeKey(id));
         this.holdStateNanos = holdStateNanos;
+        this.chain = broker.createTransactionChain(this);
     }
 
-    public NodeId getNodeId() {
+    public KeyedInstanceIdentifier<Node, NodeKey> getNodeId() {
         return nodeId;
     }
 
@@ -62,6 +87,23 @@ final class TopologyNodeState {
     }
 
     public synchronized void released() {
+        // The session went down. Undo all the Topology changes we have done.
+        final WriteTransaction trans = beginTransaction();
+
+        trans.delete(LogicalDatastoreType.OPERATIONAL, this.nodeId);
+
+        Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.trace("Internal state for node {} cleaned up successfully", nodeId);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.error("Failed to cleanup internal state for session {}", nodeId, t);
+            }
+        });
+
         lastReleased = System.nanoTime();
     }
 
@@ -71,5 +113,40 @@ final class TopologyNodeState {
         if (now - lastReleased > holdStateNanos) {
             metadata.clear();
         }
+
+        final Node node = new NodeBuilder().setKey(nodeId.getKey()).setNodeId(nodeId.getKey().getNodeId()).build();
+        final WriteTransaction t = chain.newWriteOnlyTransaction();
+        t.put(LogicalDatastoreType.OPERATIONAL, nodeId, node);
+        t.submit();
+    }
+
+    WriteTransaction beginTransaction() {
+        return chain.newWriteOnlyTransaction();
+    }
+
+    ReadWriteTransaction rwTransaction() {
+        return chain.newReadWriteTransaction();
+    }
+
+    <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
+        final ReadTransaction t = chain.newReadOnlyTransaction();
+        return t.read(LogicalDatastoreType.OPERATIONAL, id);
+    }
+
+    @Override
+    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+        // FIXME: flip internal state, so that the next attempt to update fails, triggering node reconnect
+        LOG.error("Unexpected transaction failure in node {} transaction {}", nodeId, transaction.getIdentifier(), cause);
     }
+
+    @Override
+    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+        LOG.info("Node {} shutdown successfully", nodeId);
+    }
+
+    @Override
+    public void close() {
+        chain.close();
+    }
+
 }
\ No newline at end of file
index 21d00e788f93c6b5458bee1192237ebe417db4c0..433e975f5c5244f0c285b9615ab800deb18dfd05 100644 (file)
@@ -18,12 +18,16 @@ package org.opendaylight.controller.config.yang.pcep.topology.provider;
 
 import com.google.common.base.Charsets;
 import com.google.common.net.InetAddresses;
+
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutionException;
+
 import org.opendaylight.bgpcep.pcep.topology.provider.PCEPTopologyProvider;
 import org.opendaylight.controller.config.api.JmxAttributeValidationException;
 import org.opendaylight.controller.config.yang.pcep.impl.PCEPDispatcherImplModuleMXBean;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.tcpmd5.api.KeyMapping;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
@@ -131,7 +135,7 @@ public final class PCEPTopologyProviderModule extends
         try {
             return PCEPTopologyProvider.create(getDispatcherDependency(), address, keys.isEmpty() ? null : keys, getSchedulerDependency(),
                     getDataProviderDependency(), getRpcRegistryDependency(), topology, getStatefulPluginDependency());
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (InterruptedException | ExecutionException | TransactionCommitFailedException | ReadFailedException e) {
             LOG.error("Failed to instantiate topology provider at {}", address, e);
             throw new IllegalStateException("Failed to instantiate provider", e);
         }
index 66af75cd9a7cfcbdd5edfa862ae62fb7cce492e2..5ff30ecf78ae1978926e31202a4b5772ed4b05c6 100644 (file)
@@ -38,6 +38,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiator;
 import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpPrefix;
@@ -137,7 +138,7 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
     }
 
     @After
-    public void tearDown() throws InterruptedException, ExecutionException {
+    public void tearDown() throws TransactionCommitFailedException {
         this.manager.close();
     }