*/
package org.opendaylight.bgpcep.pcep.topology.provider;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.Transaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.protocol.pcep.PCEPSession;
-import org.opendaylight.protocol.pcep.TerminationReason;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.Node1;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.lsp.metadata.Metadata;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.Node1;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.lsp.metadata.Metadata;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@ThreadSafe
-final class TopologyNodeState implements AutoCloseable, TransactionChainListener {
+// This class is thread-safe
+final class TopologyNodeState implements TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(TopologyNodeState.class);
+
private final Map<String, Metadata> metadata = new HashMap<>();
private final KeyedInstanceIdentifier<Node, NodeKey> nodeId;
- private final BindingTransactionChain chain;
+ private final TransactionChain chain;
private final long holdStateNanos;
private long lastReleased = 0;
//cache initial node state, if any node was persisted
TopologyNodeState(final DataBroker broker, final InstanceIdentifier<Topology> topology, final NodeId id,
final long holdStateNanos) {
- Preconditions.checkArgument(holdStateNanos >= 0);
- this.nodeId = topology.child(Node.class, new NodeKey(id));
+ checkArgument(holdStateNanos >= 0);
+ nodeId = topology.child(Node.class, new NodeKey(id));
this.holdStateNanos = holdStateNanos;
- this.chain = broker.createTransactionChain(this);
+ chain = broker.createMergingTransactionChain(this);
}
- @Nonnull
- KeyedInstanceIdentifier<Node, NodeKey> getNodeId() {
- return this.nodeId;
+ @NonNull KeyedInstanceIdentifier<Node, NodeKey> getNodeId() {
+ return nodeId;
}
synchronized Metadata getLspMetadata(final String name) {
- return this.metadata.get(name);
+ return metadata.get(name);
}
synchronized void setLspMetadata(final String name, final Metadata value) {
if (value == null) {
- this.metadata.remove(name);
+ metadata.remove(name);
} else {
- this.metadata.put(name, value);
+ metadata.put(name, value);
}
}
synchronized void cleanupExcept(final Collection<String> values) {
- this.metadata.keySet().removeIf(s -> !values.contains(s));
+ metadata.keySet().removeIf(s -> !values.contains(s));
}
synchronized void released(final boolean persist) {
// The session went down. Undo all the Topology changes we have done.
// We might want to persist topology node for later re-use.
if (!persist) {
- final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
- trans.delete(LogicalDatastoreType.OPERATIONAL, this.nodeId);
+ final WriteTransaction trans = chain.newWriteOnlyTransaction();
+ trans.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
trans.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
- LOG.trace("Internal state for node {} cleaned up successfully", TopologyNodeState.this.nodeId);
+ LOG.trace("Internal state for node {} cleaned up successfully", nodeId);
}
@Override
public void onFailure(final Throwable throwable) {
- LOG.error("Failed to cleanup internal state for session {}",
- TopologyNodeState.this.nodeId, throwable);
+ LOG.error("Failed to cleanup internal state for session {}", nodeId, throwable);
}
}, MoreExecutors.directExecutor());
}
- this.lastReleased = System.nanoTime();
+ close();
+ lastReleased = System.nanoTime();
}
synchronized void taken(final boolean retrieveNode) {
final long now = System.nanoTime();
- if (now - this.lastReleased > this.holdStateNanos) {
- this.metadata.clear();
+ if (now - lastReleased > holdStateNanos) {
+ metadata.clear();
}
//try to get the topology's node
if (retrieveNode) {
try {
- final Optional<Node> prevNode = readOperationalData(this.nodeId).get();
+ // FIXME: we really should not be performing synchronous operations
+ final Optional<Node> prevNode = readOperationalData(nodeId).get();
if (!prevNode.isPresent()) {
putTopologyNode();
} else {
//cache retrieved node
- TopologyNodeState.this.initialNodeState = prevNode.get();
+ initialNodeState = prevNode.get();
}
} catch (final ExecutionException | InterruptedException throwable) {
- LOG.error("Failed to get topology node {}", TopologyNodeState.this.nodeId, throwable);
+ LOG.error("Failed to get topology node {}", nodeId, throwable);
}
} else {
putTopologyNode();
}
synchronized Node getInitialNodeState() {
- return this.initialNodeState;
+ return initialNodeState;
}
- synchronized BindingTransactionChain getChain() {
- return this.chain;
+ synchronized TransactionChain getChain() {
+ return chain;
}
- synchronized <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(
+ synchronized <T extends DataObject> FluentFuture<Optional<T>> readOperationalData(
final InstanceIdentifier<T> id) {
- try (ReadOnlyTransaction t = this.chain.newReadOnlyTransaction()) {
+ try (ReadTransaction t = chain.newReadOnlyTransaction()) {
return t.read(LogicalDatastoreType.OPERATIONAL, id);
}
}
@Override
- public void onTransactionChainFailed(final TransactionChain<?, ?> pchain, final AsyncTransaction<?, ?> transaction,
+ public void onTransactionChainFailed(final TransactionChain pchain, final Transaction transaction,
final Throwable cause) {
// FIXME: flip internal state, so that the next attempt to update fails, triggering node reconnect
- LOG.error("Unexpected transaction failure in node {} transaction {}",
- this.nodeId, transaction.getIdentifier(), cause);
+ LOG.error("Unexpected transaction failure in node {} transaction {}", nodeId, transaction.getIdentifier(),
+ cause);
+ close();
}
@Override
- public void onTransactionChainSuccessful(final TransactionChain<?, ?> pchain) {
- LOG.info("Node {} shutdown successfully", this.nodeId);
+ public void onTransactionChainSuccessful(final TransactionChain pchain) {
+ LOG.info("Node {} shutdown successfully", nodeId);
}
- @Override
- public synchronized void close() {
- this.chain.close();
+ synchronized void close() {
+ chain.close();
}
- private synchronized void putTopologyNode() {
- final Node node = new NodeBuilder().withKey(this.nodeId.getKey())
- .setNodeId(this.nodeId.getKey().getNodeId()).build();
- final WriteTransaction t = this.chain.newWriteOnlyTransaction();
- LOG.trace("Put topology Node {}, value {}", this.nodeId, node);
- t.merge(LogicalDatastoreType.OPERATIONAL, this.nodeId, node);
- t.commit().addCallback(new FutureCallback<CommitInfo>() {
+ @Holding("this")
+ private void putTopologyNode() {
+ final Node node = new NodeBuilder().withKey(nodeId.getKey()).build();
+ final WriteTransaction tx = chain.newWriteOnlyTransaction();
+ LOG.trace("Put topology Node {}, value {}", nodeId, node);
+ // FIXME: why is this a 'merge' and not a 'put'? This seems to be related to BGPCEP-739, but there is little
+ // evidence as to what exactly was being overwritten
+ tx.merge(LogicalDatastoreType.OPERATIONAL, nodeId, node);
+ tx.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
- LOG.trace("Topology Node stored {}, value {}", TopologyNodeState.this.nodeId, node);
+ LOG.trace("Topology Node stored {}, value {}", nodeId, node);
}
@Override
public void onFailure(final Throwable throwable) {
- LOG.trace("Put topology Node failed {}, value {}, {}", TopologyNodeState.this.nodeId, node, throwable);
+ LOG.error("Put topology Node failed {}, value {}", nodeId, node, throwable);
}
}, MoreExecutors.directExecutor());
}
- synchronized void storeNode(final InstanceIdentifier<Node1> topologyAugment, final Node1 ta,
- final PCEPSession session) {
+ @NonNull FluentFuture<? extends @NonNull CommitInfo> storeNode(final InstanceIdentifier<Node1> topologyAugment,
+ final Node1 ta) {
LOG.trace("Peer data {} set to {}", topologyAugment, ta);
- final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
- trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
-
- // All set, commit the modifications
- trans.commit().addCallback(new FutureCallback<CommitInfo>() {
- @Override
- public void onSuccess(final CommitInfo result) {
- LOG.trace("Node stored {} for session {} updated successfully", topologyAugment, session);
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.error("Failed to store node {} for session {}, terminating it",
- topologyAugment, session, throwable);
- session.close(TerminationReason.UNKNOWN);
- }
- }, MoreExecutors.directExecutor());
+ synchronized (this) {
+ final WriteTransaction trans = chain.newWriteOnlyTransaction();
+ trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
+ // All set, commit the modifications
+ return trans.commit();
+ }
}
-}
\ No newline at end of file
+}