import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.binding.api.DataBroker;
// This class is thread-safe
final class TopologyNodeState implements AutoCloseable, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(TopologyNodeState.class);
+ private final AtomicBoolean closed = new AtomicBoolean(false);
private final Map<String, Metadata> metadata = new HashMap<>();
private final KeyedInstanceIdentifier<Node, NodeKey> nodeId;
private final TransactionChain chain;
}, MoreExecutors.directExecutor());
}
+ close();
this.lastReleased = System.nanoTime();
}
// FIXME: flip internal state, so that the next attempt to update fails, triggering node reconnect
LOG.error("Unexpected transaction failure in node {} transaction {}",
this.nodeId, transaction.getIdentifier(), cause);
+ if (closed.compareAndSet(false, true)) {
+ pchain.close();
+ }
}
@Override
@Override
public synchronized void close() {
- this.chain.close();
+ if (closed.compareAndSet(false, true)) {
+ this.chain.close();
+ }
}
private synchronized void putTopologyNode() {
@Override
public void onFailure(final Throwable throwable) {
- LOG.trace("Put topology Node failed {}, value {}", TopologyNodeState.this.nodeId, node, throwable);
+ LOG.error("Put topology Node failed {}, value {}", TopologyNodeState.this.nodeId, node, throwable);
}
}, MoreExecutors.directExecutor());
}
@SuppressWarnings("checkstyle:IllegalCatch")
@SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
justification = "https://github.com/spotbugs/spotbugs/issues/811")
- private synchronized void updatePcepStats() {
- final WriteTransaction tx = TopologyStatsProviderImpl.this.transactionChain.newWriteOnlyTransaction();
-
- try {
- for (final Map.Entry<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> entry
- : this.statsMap.entrySet()) {
- if (this.statsPendingDelete.contains(entry.getKey())) {
- continue;
+ private void updatePcepStats() {
+ synchronized (this.transactionChain) {
+ WriteTransaction tx = null;
+
+ try {
+ tx = TopologyStatsProviderImpl.this.transactionChain.newWriteOnlyTransaction();
+ for (final Map.Entry<KeyedInstanceIdentifier<Node, NodeKey>, PcepSessionState> entry
+ : this.statsMap.entrySet()) {
+ if (this.statsPendingDelete.contains(entry.getKey())) {
+ continue;
+ }
+ final PcepTopologyNodeStatsAug nodeStatsAug = new PcepTopologyNodeStatsAugBuilder()
+ .setPcepSessionState(new PcepSessionStateBuilder(entry.getValue()).build()).build();
+ final InstanceIdentifier<PcepTopologyNodeStatsAug> statId =
+ entry.getKey().augmentation(PcepTopologyNodeStatsAug.class);
+ tx.put(LogicalDatastoreType.OPERATIONAL, statId, nodeStatsAug);
}
- final PcepTopologyNodeStatsAug nodeStatsAug = new PcepTopologyNodeStatsAugBuilder()
- .setPcepSessionState(new PcepSessionStateBuilder(entry.getValue()).build()).build();
- final InstanceIdentifier<PcepTopologyNodeStatsAug> statId =
- entry.getKey().augmentation(PcepTopologyNodeStatsAug.class);
- tx.put(LogicalDatastoreType.OPERATIONAL, statId, nodeStatsAug);
- }
- tx.commit().addCallback(new FutureCallback<CommitInfo>() {
- @Override
- public void onSuccess(final CommitInfo result) {
- LOG.debug("Successfully committed Topology stats update");
+ tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+ @Override
+ public void onSuccess(final CommitInfo result) {
+ LOG.debug("Successfully committed Topology stats update");
+ }
+
+ @Override
+ public void onFailure(final Throwable ex) {
+ LOG.error("Failed to commit Topology stats update", ex);
+ }
+ }, MoreExecutors.directExecutor());
+ } catch (final Exception e) {
+ if (tx != null) {
+ LOG.warn("Failed to prepare Tx {} for PCEP stats update", tx.getIdentifier(), e);
+ tx.cancel();
+ this.transactionChain.close();
+ recreateTxChain();
}
+ }
+ }
+ }
- @Override
- public void onFailure(final Throwable ex) {
- LOG.error("Failed to commit Topology stats update", ex);
- }
- }, MoreExecutors.directExecutor());
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private synchronized void recreateTxChain() {
+ try {
+ if (!closed.get()) {
+ transactionChain = dataBroker.createMergingTransactionChain(this);
+ }
} catch (final Exception e) {
- LOG.warn("Failed to prepare Tx for PCEP stats update", e);
- tx.cancel();
+ LOG.error("Failed to recreate transaction chain {}", transactionChain, e);
}
}
if (closed.compareAndSet(false, true)) {
LOG.info("Closing TopologyStatsProvider service.");
this.scheduleTask.cancel(true);
- final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
+ this.transactionChain.close();
+ final WriteTransaction wTx = this.dataBroker.newWriteOnlyTransaction();
for (final KeyedInstanceIdentifier<Node, NodeKey> statId : this.statsMap.keySet()) {
wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
}
wTx.commit().get();
this.statsMap.clear();
- this.transactionChain.close();
this.scheduler.shutdown();
}
}
@Override
- public synchronized void onTransactionChainFailed(final TransactionChain chain,
+ public void onTransactionChainFailed(final TransactionChain chain,
final Transaction transaction, final Throwable cause) {
LOG.error("Transaction chain {} failed for tx {}",
chain, transaction != null ? transaction.getIdentifier() : null, cause);
-
- if (!closed.get()) {
- transactionChain.close();
- transactionChain = dataBroker.createMergingTransactionChain(this);
+ chain.close();
+ if (chain == this.transactionChain) {
+ recreateTxChain();
}
}
@Override
- public synchronized void onTransactionChainSuccessful(final TransactionChain chain) {
+ public void onTransactionChainSuccessful(final TransactionChain chain) {
LOG.debug("Transaction chain {} successful.", chain);
}
@Override
- public synchronized void bind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId,
+ public void bind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId,
final PcepSessionState sessionState) {
- this.statsMap.put(nodeId, sessionState);
+ LOG.info("Bind: {}", nodeId.getKey().getNodeId());
+ synchronized (this.transactionChain) {
+ this.statsMap.put(nodeId, sessionState);
+ }
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
@Override
- public synchronized void unbind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
- this.statsMap.remove(nodeId);
- this.statsPendingDelete.add(nodeId);
- final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
- wTx.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
- wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
- @Override
- public void onSuccess(final CommitInfo result) {
- LOG.debug("Successfully removed Pcep Node stats {}.", nodeId.getKey().getNodeId());
- TopologyStatsProviderImpl.this.statsPendingDelete.remove(nodeId);
- }
-
- @Override
- public void onFailure(final Throwable ex) {
- LOG.warn("Failed to remove Pcep Node stats {}.", nodeId.getKey().getNodeId(), ex);
- TopologyStatsProviderImpl.this.statsPendingDelete.remove(nodeId);
+ public void unbind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
+ synchronized (this.transactionChain) {
+ LOG.info("Unbind: {}", nodeId.getKey().getNodeId());
+ this.statsMap.remove(nodeId);
+ this.statsPendingDelete.add(nodeId);
+ WriteTransaction tx = null;
+ try {
+ tx = this.transactionChain.newWriteOnlyTransaction();
+ tx.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
+ tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+ @Override
+ public void onSuccess(final CommitInfo result) {
+ LOG.info("Successfully removed Pcep Node stats {}.", nodeId.getKey().getNodeId());
+ TopologyStatsProviderImpl.this.statsPendingDelete.remove(nodeId);
+ }
+
+ @Override
+ public void onFailure(final Throwable ex) {
+ LOG.warn("Failed to remove Pcep Node stats {}.", nodeId.getKey().getNodeId(), ex);
+ TopologyStatsProviderImpl.this.statsPendingDelete.remove(nodeId);
+ }
+ }, MoreExecutors.directExecutor());
+ } catch (final Exception e) {
+ if (tx != null) {
+ LOG.warn("Failed to prepare Tx {} for Pcep Node stats delete for node {}.", tx.getIdentifier(),
+ nodeId.getKey().getNodeId(), e);
+ tx.cancel();
+ this.transactionChain.close();
+ recreateTxChain();
+ }
}
- }, MoreExecutors.directExecutor());
+ }
}
}