Relocate ted-name
[bgpcep.git] / pcep / topology / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / TopologyNodeState.java
index c6ceb5a2c0fde8445ffbadb21bf50c78eb27321f..95ab551b473cb8759ec3d2d2664e0e15cb8065c3 100644 (file)
@@ -7,31 +7,29 @@
  */
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.Transaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.protocol.pcep.PCEPSession;
-import org.opendaylight.protocol.pcep.TerminationReason;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.Node1;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.lsp.metadata.Metadata;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.Node1;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.lsp.metadata.Metadata;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
@@ -43,12 +41,13 @@ import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@ThreadSafe
-final class TopologyNodeState implements AutoCloseable, TransactionChainListener {
+// This class is thread-safe
+final class TopologyNodeState implements TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(TopologyNodeState.class);
+
     private final Map<String, Metadata> metadata = new HashMap<>();
     private final KeyedInstanceIdentifier<Node, NodeKey> nodeId;
-    private final BindingTransactionChain chain;
+    private final TransactionChain chain;
     private final long holdStateNanos;
     private long lastReleased = 0;
     //cache initial node state, if any node was persisted
@@ -57,75 +56,75 @@ final class TopologyNodeState implements AutoCloseable, TransactionChainListener
 
     TopologyNodeState(final DataBroker broker, final InstanceIdentifier<Topology> topology, final NodeId id,
             final long holdStateNanos) {
-        Preconditions.checkArgument(holdStateNanos >= 0);
-        this.nodeId = topology.child(Node.class, new NodeKey(id));
+        checkArgument(holdStateNanos >= 0);
+        nodeId = topology.child(Node.class, new NodeKey(id));
         this.holdStateNanos = holdStateNanos;
-        this.chain = broker.createTransactionChain(this);
+        chain = broker.createMergingTransactionChain(this);
     }
 
-    @Nonnull
-    KeyedInstanceIdentifier<Node, NodeKey> getNodeId() {
-        return this.nodeId;
+    @NonNull KeyedInstanceIdentifier<Node, NodeKey> getNodeId() {
+        return nodeId;
     }
 
     synchronized Metadata getLspMetadata(final String name) {
-        return this.metadata.get(name);
+        return metadata.get(name);
     }
 
     synchronized void setLspMetadata(final String name, final Metadata value) {
         if (value == null) {
-            this.metadata.remove(name);
+            metadata.remove(name);
         } else {
-            this.metadata.put(name, value);
+            metadata.put(name, value);
         }
     }
 
     synchronized void cleanupExcept(final Collection<String> values) {
-        this.metadata.keySet().removeIf(s -> !values.contains(s));
+        metadata.keySet().removeIf(s -> !values.contains(s));
     }
 
     synchronized void released(final boolean persist) {
         // The session went down. Undo all the Topology changes we have done.
         // We might want to persist topology node for later re-use.
         if (!persist) {
-            final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
-            trans.delete(LogicalDatastoreType.OPERATIONAL, this.nodeId);
+            final WriteTransaction trans = chain.newWriteOnlyTransaction();
+            trans.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
             trans.commit().addCallback(new FutureCallback<CommitInfo>() {
                 @Override
                 public void onSuccess(final CommitInfo result) {
-                    LOG.trace("Internal state for node {} cleaned up successfully", TopologyNodeState.this.nodeId);
+                    LOG.trace("Internal state for node {} cleaned up successfully", nodeId);
                 }
 
                 @Override
                 public void onFailure(final Throwable throwable) {
-                    LOG.error("Failed to cleanup internal state for session {}",
-                            TopologyNodeState.this.nodeId, throwable);
+                    LOG.error("Failed to cleanup internal state for session {}", nodeId, throwable);
                 }
             }, MoreExecutors.directExecutor());
         }
 
-        this.lastReleased = System.nanoTime();
+        close();
+        lastReleased = System.nanoTime();
     }
 
     synchronized void taken(final boolean retrieveNode) {
         final long now = System.nanoTime();
 
-        if (now - this.lastReleased > this.holdStateNanos) {
-            this.metadata.clear();
+        if (now - lastReleased > holdStateNanos) {
+            metadata.clear();
         }
 
         //try to get the topology's node
         if (retrieveNode) {
             try {
-                final Optional<Node> prevNode = readOperationalData(this.nodeId).get();
+                // FIXME: we really should not be performing synchronous operations
+                final Optional<Node> prevNode = readOperationalData(nodeId).get();
                 if (!prevNode.isPresent()) {
                     putTopologyNode();
                 } else {
                     //cache retrieved node
-                    TopologyNodeState.this.initialNodeState = prevNode.get();
+                    initialNodeState = prevNode.get();
                 }
             } catch (final ExecutionException | InterruptedException throwable) {
-                LOG.error("Failed to get topology node {}", TopologyNodeState.this.nodeId, throwable);
+                LOG.error("Failed to get topology node {}", nodeId, throwable);
             }
         } else {
             putTopologyNode();
@@ -133,76 +132,67 @@ final class TopologyNodeState implements AutoCloseable, TransactionChainListener
     }
 
     synchronized Node getInitialNodeState() {
-        return this.initialNodeState;
+        return initialNodeState;
     }
 
-    synchronized BindingTransactionChain getChain() {
-        return this.chain;
+    synchronized TransactionChain getChain() {
+        return chain;
     }
 
-    synchronized <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(
+    synchronized <T extends DataObject> FluentFuture<Optional<T>> readOperationalData(
             final InstanceIdentifier<T> id) {
-        try (ReadOnlyTransaction t = this.chain.newReadOnlyTransaction()) {
+        try (ReadTransaction t = chain.newReadOnlyTransaction()) {
             return t.read(LogicalDatastoreType.OPERATIONAL, id);
         }
     }
 
     @Override
-    public void onTransactionChainFailed(final TransactionChain<?, ?> pchain, final AsyncTransaction<?, ?> transaction,
+    public void onTransactionChainFailed(final TransactionChain pchain, final Transaction transaction,
             final Throwable cause) {
         // FIXME: flip internal state, so that the next attempt to update fails, triggering node reconnect
-        LOG.error("Unexpected transaction failure in node {} transaction {}",
-                this.nodeId, transaction.getIdentifier(), cause);
+        LOG.error("Unexpected transaction failure in node {} transaction {}", nodeId, transaction.getIdentifier(),
+            cause);
+        close();
     }
 
     @Override
-    public void onTransactionChainSuccessful(final TransactionChain<?, ?> pchain) {
-        LOG.info("Node {} shutdown successfully", this.nodeId);
+    public void onTransactionChainSuccessful(final TransactionChain pchain) {
+        LOG.info("Node {} shutdown successfully", nodeId);
     }
 
-    @Override
-    public synchronized void close() {
-        this.chain.close();
+    synchronized void close() {
+        chain.close();
     }
 
-    private synchronized void putTopologyNode() {
-        final Node node = new NodeBuilder().withKey(this.nodeId.getKey())
-                .setNodeId(this.nodeId.getKey().getNodeId()).build();
-        final WriteTransaction t = this.chain.newWriteOnlyTransaction();
-        LOG.trace("Put topology Node {}, value {}", this.nodeId, node);
-        t.merge(LogicalDatastoreType.OPERATIONAL, this.nodeId, node);
-        t.commit().addCallback(new FutureCallback<CommitInfo>() {
+    @Holding("this")
+    private void putTopologyNode() {
+        final Node node = new NodeBuilder().withKey(nodeId.getKey()).build();
+        final WriteTransaction tx = chain.newWriteOnlyTransaction();
+        LOG.trace("Put topology Node {}, value {}", nodeId, node);
+        // FIXME: why is this a 'merge' and not a 'put'? This seems to be related to BGPCEP-739, but there is little
+        //        evidence as to what exactly was being overwritten
+        tx.merge(LogicalDatastoreType.OPERATIONAL, nodeId, node);
+        tx.commit().addCallback(new FutureCallback<CommitInfo>() {
             @Override
             public void onSuccess(final CommitInfo result) {
-                LOG.trace("Topology Node stored {}, value {}", TopologyNodeState.this.nodeId, node);
+                LOG.trace("Topology Node stored {}, value {}", nodeId, node);
             }
 
             @Override
             public void onFailure(final Throwable throwable) {
-                LOG.trace("Put topology Node failed {}, value {}, {}", TopologyNodeState.this.nodeId, node, throwable);
+                LOG.error("Put topology Node failed {}, value {}", nodeId, node, throwable);
             }
         }, MoreExecutors.directExecutor());
     }
 
-    synchronized void storeNode(final InstanceIdentifier<Node1> topologyAugment, final Node1 ta,
-            final PCEPSession session) {
+    @NonNull FluentFuture<? extends @NonNull CommitInfo> storeNode(final InstanceIdentifier<Node1> topologyAugment,
+            final Node1 ta) {
         LOG.trace("Peer data {} set to {}", topologyAugment, ta);
-        final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
-        trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
-
-        // All set, commit the modifications
-        trans.commit().addCallback(new FutureCallback<CommitInfo>() {
-            @Override
-            public void onSuccess(final CommitInfo result) {
-                LOG.trace("Node stored {} for session {} updated successfully", topologyAugment, session);
-            }
-
-            @Override
-            public void onFailure(final Throwable throwable) {
-                LOG.error("Failed to store node {} for session {}, terminating it",
-                        topologyAugment, session, throwable);
-                session.close(TerminationReason.UNKNOWN);
-            }
-        }, MoreExecutors.directExecutor());
+        synchronized (this) {
+            final WriteTransaction trans = chain.newWriteOnlyTransaction();
+            trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
+            // All set, commit the modifications
+            return trans.commit();
+        }
     }
-}
\ No newline at end of file
+}