import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.ExecutionException;
+import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.binding.api.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class NetconfDeviceTopologyAdapter implements TransactionChainListener, AutoCloseable {
+public final class NetconfDeviceTopologyAdapter implements TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceTopologyAdapter.class);
- private final SettableFuture<Empty> closeFuture = SettableFuture.create();
private final @NonNull KeyedInstanceIdentifier<Topology, TopologyKey> topologyPath;
private final DataBroker dataBroker;
private final RemoteDeviceId id;
+ @GuardedBy("this")
+ private SettableFuture<Empty> closeFuture;
+ @GuardedBy("this")
private TransactionChain txChain;
public NetconfDeviceTopologyAdapter(final DataBroker dataBroker,
this.id = requireNonNull(id);
txChain = dataBroker.createMergingTransactionChain(this);
- final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ final var tx = txChain.newWriteOnlyTransaction();
LOG.trace("{}: Init device state transaction {} putting if absent operational data started.", id,
- writeTx.getIdentifier());
+ tx.getIdentifier());
final var nodePath = nodePath();
- writeTx.put(LogicalDatastoreType.OPERATIONAL, nodePath, new NodeBuilder()
+ tx.put(LogicalDatastoreType.OPERATIONAL, nodePath, new NodeBuilder()
.withKey(nodePath.getKey())
.addAugmentation(new NetconfNodeBuilder()
.setConnectionStatus(ConnectionStatus.Connecting)
.setHost(id.host())
.setPort(new PortNumber(Uint16.valueOf(id.address().getPort()))).build())
.build());
- LOG.trace("{}: Init device state transaction {} putting operational data ended.", id, writeTx.getIdentifier());
-
- commitTransaction(writeTx, "init");
+ LOG.trace("{}: Init device state transaction {} putting operational data ended.", id, tx.getIdentifier());
+ commitTransaction(tx, "init");
}
- private @NonNull KeyedInstanceIdentifier<Node, NodeKey> nodePath() {
- return topologyPath.child(Node.class, new NodeKey(new NodeId(id.name())));
- }
-
- private @NonNull InstanceIdentifier<NetconfNode> netconfNodePath() {
- return nodePath().augmentation(NetconfNode.class);
- }
-
- public void updateDeviceData(final boolean up, final NetconfDeviceCapabilities capabilities,
+ public synchronized void updateDeviceData(final boolean up, final NetconfDeviceCapabilities capabilities,
final SessionIdType sessionId) {
- final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
- LOG.trace("{}: Update device state transaction {} merging operational data started.",
- id, writeTx.getIdentifier());
+ final var tx = txChain.newWriteOnlyTransaction();
+ LOG.trace("{}: Update device state transaction {} merging operational data started.", id, tx.getIdentifier());
// FIXME: this needs to be tied together with node's operational existence
- writeTx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, netconfNodePath(),
+ tx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, netconfNodePath(),
newNetconfNodeBuilder(up, capabilities, sessionId).build());
- LOG.trace("{}: Update device state transaction {} merging operational data ended.",
- id, writeTx.getIdentifier());
-
- commitTransaction(writeTx, "update");
+ LOG.trace("{}: Update device state transaction {} merging operational data ended.", id, tx.getIdentifier());
+ commitTransaction(tx, "update");
}
- public void updateClusteredDeviceData(final boolean up, final String masterAddress,
+ public synchronized void updateClusteredDeviceData(final boolean up, final String masterAddress,
final NetconfDeviceCapabilities capabilities, final SessionIdType sessionId) {
- final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
- LOG.trace("{}: Update device state transaction {} merging operational data started.",
- id, writeTx.getIdentifier());
- writeTx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, netconfNodePath(),
+ final var tx = txChain.newWriteOnlyTransaction();
+ LOG.trace("{}: Update device state transaction {} merging operational data started.", id, tx.getIdentifier());
+ tx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, netconfNodePath(),
newNetconfNodeBuilder(up, capabilities, sessionId)
.setClusteredConnectionStatus(new ClusteredConnectionStatusBuilder()
.setNetconfMasterNode(masterAddress)
.build())
.build());
- LOG.trace("{}: Update device state transaction {} merging operational data ended.",
- id, writeTx.getIdentifier());
+ LOG.trace("{}: Update device state transaction {} merging operational data ended.", id, tx.getIdentifier());
- commitTransaction(writeTx, "update");
+ commitTransaction(tx, "update");
}
- @Override
- public void onTransactionChainFailed(final TransactionChain chain, final Transaction transaction,
- final Throwable cause) {
- LOG.warn("{}: TransactionChain({}) {} FAILED!", id, chain, transaction.getIdentifier(), cause);
- chain.close();
-
- txChain = dataBroker.createMergingTransactionChain(this);
- LOG.info("{}: TransactionChain reset to {}", id, txChain);
- // FIXME: restart last update
+ public synchronized void setDeviceAsFailed(final Throwable throwable) {
+ final var data = new NetconfNodeBuilder()
+ .setHost(id.host())
+ .setPort(new PortNumber(Uint16.valueOf(id.address().getPort())))
+ .setConnectionStatus(ConnectionStatus.UnableToConnect)
+ .setConnectedMessage(extractReason(throwable))
+ .build();
+
+ final var tx = txChain.newWriteOnlyTransaction();
+ LOG.trace("{}: Setting device state as failed {} putting operational data started.", id, tx.getIdentifier());
+ tx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, netconfNodePath(), data);
+ LOG.trace("{}: Setting device state as failed {} putting operational data ended.", id, tx.getIdentifier());
+ commitTransaction(tx, "update-failed-device");
}
- @Override
- public void onTransactionChainSuccessful(final TransactionChain chain) {
- LOG.trace("{}: TransactionChain({}) SUCCESSFUL", id, chain);
- closeFuture.set(Empty.value());
+ public synchronized ListenableFuture<Empty> shutdown() {
+ if (closeFuture != null) {
+ return closeFuture;
+ }
+
+ closeFuture = SettableFuture.create();
+
+ final var tx = txChain.newWriteOnlyTransaction();
+ LOG.trace("{}: Close device state transaction {} removing all data started.", id, tx.getIdentifier());
+ tx.delete(LogicalDatastoreType.OPERATIONAL, nodePath());
+ LOG.trace("{}: Close device state transaction {} removing all data ended.", id, tx.getIdentifier());
+ commitTransaction(tx, "close");
+
+ txChain.close();
+ return closeFuture;
}
- public void setDeviceAsFailed(final Throwable throwable) {
- String reason = throwable != null && throwable.getMessage() != null ? throwable.getMessage() : "Unknown reason";
+ private @NonNull KeyedInstanceIdentifier<Node, NodeKey> nodePath() {
+ return topologyPath.child(Node.class, new NodeKey(new NodeId(id.name())));
+ }
- final NetconfNode data = new NetconfNodeBuilder()
- .setHost(id.host())
- .setPort(new PortNumber(Uint16.valueOf(id.address().getPort())))
- .setConnectionStatus(ConnectionStatus.UnableToConnect).setConnectedMessage(reason).build();
-
- final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
- LOG.trace(
- "{}: Setting device state as failed {} putting operational data started.",
- id, writeTx.getIdentifier());
- writeTx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, netconfNodePath(), data);
- LOG.trace(
- "{}: Setting device state as failed {} putting operational data ended.",
- id, writeTx.getIdentifier());
-
- commitTransaction(writeTx, "update-failed-device");
+ private @NonNull InstanceIdentifier<NetconfNode> netconfNodePath() {
+ return nodePath().augmentation(NetconfNode.class);
}
private NetconfNodeBuilder newNetconfNodeBuilder(final boolean up, final NetconfDeviceCapabilities capabilities,
}
@Override
- public void close() {
- final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
- LOG.trace("{}: Close device state transaction {} removing all data started.", id, writeTx.getIdentifier());
- writeTx.delete(LogicalDatastoreType.OPERATIONAL, nodePath());
- LOG.trace("{}: Close device state transaction {} removing all data ended.", id, writeTx.getIdentifier());
- commitTransaction(writeTx, "close");
+ public synchronized void onTransactionChainFailed(final TransactionChain chain, final Transaction transaction,
+ final Throwable cause) {
+ LOG.warn("{}: TransactionChain({}) {} FAILED!", id, chain, transaction.getIdentifier(), cause);
+ if (closeFuture != null) {
+ closeFuture.setException(cause);
+ return;
+ }
- txChain.close();
+ // FIXME: move this up once we have MDSAL-838 fixed
+ chain.close();
+
+ txChain = dataBroker.createMergingTransactionChain(this);
+ LOG.info("{}: TransactionChain reset to {}", id, txChain);
+ // FIXME: restart last update
+ }
- try {
- closeFuture.get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("{}: Transaction(close) {} FAILED!", id, writeTx.getIdentifier(), e);
- throw new IllegalStateException(id + " Transaction(close) not committed correctly", e);
+ @Override
+ public synchronized void onTransactionChainSuccessful(final TransactionChain chain) {
+ LOG.trace("{}: TransactionChain({}) SUCCESSFUL", id, chain);
+ closeFuture.set(Empty.value());
+ }
+
+ private static @NonNull String extractReason(final Throwable throwable) {
+ if (throwable != null) {
+ final var message = throwable.getMessage();
+ if (message != null) {
+ return message;
+ }
}
+ return "Unknown reason";
}
}
*/
package org.opendaylight.netconf.topology.spi;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import com.google.common.util.concurrent.Futures;
import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.common.Uint32;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
// FIXME: exact match
doNothing().when(mockTx).delete(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class));
doNothing().when(mockChain).close();
+
+ final var future = adapter.shutdown();
+ verify(mockChain, times(2)).newWriteOnlyTransaction();
+ verify(mockTx).delete(LogicalDatastoreType.OPERATIONAL,
+ TEST_TOPOLOGY_ID.child(Node.class, new NodeKey(new NodeId(id.name()))));
+ verify(mockTx, times(2)).commit();
+ verify(mockChain).close();
+
+ assertFalse(future.isDone());
+
+ // Idempotent
+ assertSame(future, adapter.shutdown());
+
+ // future completes
listeners.getValue().onTransactionChainSuccessful(mockChain);
- adapter.close();
+ assertSame(Empty.value(), Futures.getDone(future));
+ }
+
+ @Test
+ public void testShutdownCompletion() throws Exception {
+ // FIXME: exact match
+ doNothing().when(mockTx).delete(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class));
+ doNothing().when(mockChain).close();
+
+ final var future = adapter.shutdown();
verify(mockChain, times(2)).newWriteOnlyTransaction();
verify(mockTx).delete(LogicalDatastoreType.OPERATIONAL,
TEST_TOPOLOGY_ID.child(Node.class, new NodeKey(new NodeId(id.name()))));
verify(mockTx, times(2)).commit();
+ verify(mockChain).close();
+
+ assertFalse(future.isDone());
+
+ // Idempotent
+ assertSame(future, adapter.shutdown());
+
+ // future completes
+ final var cause = new Throwable();
+ listeners.getValue().onTransactionChainFailed(mockChain, mockTx, cause);
+ final var ex = assertThrows(ExecutionException.class, () -> Futures.getDone(future));
+ assertSame(cause, ex.getCause());
}
}