import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadOnlyTx;
import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
+import org.opendaylight.netconf.sal.connect.netconf.sal.tx.TxChain;
import org.opendaylight.netconf.sal.connect.netconf.sal.tx.WriteCandidateRunningTx;
import org.opendaylight.netconf.sal.connect.netconf.sal.tx.WriteCandidateTx;
import org.opendaylight.netconf.sal.connect.netconf.sal.tx.WriteRunningTx;
private final NetconfBaseOps netconfOps;
private final boolean rollbackSupport;
- private boolean candidateSupported;
- private boolean runningWritable;
+ private final boolean candidateSupported;
+ private final boolean runningWritable;
public NetconfDeviceDataBroker(final RemoteDeviceId id, final SchemaContext schemaContext, final DOMRpcService rpc, final NetconfSessionPreferences netconfSessionPreferences) {
this.id = id;
@Override
public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
- throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
+ return new TxChain(this, listener);
}
@Override
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.annotation.Nullable;
import org.opendaylight.controller.config.util.xml.DocumentedException;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.netconf.api.NetconfDocumentedException;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.data.api.ModifyAction;
protected final NetconfBaseOps netOps;
protected final boolean rollbackSupport;
protected final List<ListenableFuture<DOMRpcResult>> resultsFutures;
+ private final List<TxListener> listeners = new CopyOnWriteArrayList<>();
// Allow commit to be called only once
protected boolean finished = false;
if(isFinished()) {
return false;
}
-
+ listeners.forEach(listener -> listener.onTransactionCancelled(this));
finished = true;
cleanup();
return true;
@Override
public final ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ listeners.forEach(listener -> listener.onTransactionSubmitted(this));
checkNotFinished();
finished = true;
+ final ListenableFuture<RpcResult<TransactionStatus>> result = performCommit();
+ Futures.addCallback(result, new FutureCallback<RpcResult<TransactionStatus>>() {
+ @Override
+ public void onSuccess(@Nullable final RpcResult<TransactionStatus> result) {
+ if (result != null && result.isSuccessful()) {
+ listeners.forEach(txListener -> txListener.onTransactionSuccessful(AbstractWriteTx.this));
+ } else {
+ final TransactionCommitFailedException cause = new TransactionCommitFailedException("Transaction failed", result.getErrors().toArray(new RpcError[result.getErrors().size()]));
+ listeners.forEach(listener -> listener.onTransactionFailed(AbstractWriteTx.this, cause));
+ }
+ }
- return performCommit();
+ @Override
+ public void onFailure(final Throwable t) {
+ listeners.forEach(listener -> listener.onTransactionFailed(AbstractWriteTx.this, t));
+ }
+ });
+ return result;
}
protected abstract ListenableFuture<RpcResult<TransactionStatus>> performCommit();
public void onFailure(final Throwable throwable) {
final NetconfDocumentedException exception =
new NetconfDocumentedException(
- new DocumentedException(id + ":RPC during tx returned an exception",
- new Exception(throwable),
- DocumentedException.ErrorType.APPLICATION,
- DocumentedException.ErrorTag.OPERATION_FAILED,
- DocumentedException.ErrorSeverity.ERROR));
+ id + ":RPC during tx returned an exception",
+ new Exception(throwable),
+ DocumentedException.ErrorType.APPLICATION,
+ DocumentedException.ErrorTag.OPERATION_FAILED,
+ DocumentedException.ErrorSeverity.ERROR);
transformed.setException(exception);
}
});
return transformed;
}
+
+ AutoCloseable addListener(final TxListener listener) {
+ listeners.add(listener);
+ return () -> listeners.remove(listener);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DOMTransactionChain} implementation for Netconf connector.
+ */
+public class TxChain implements DOMTransactionChain, TxListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TxChain.class);
+
+ private final DOMDataBroker dataBroker;
+ private final TransactionChainListener listener;
+ /**
+ * Submitted transactions that haven't completed yet.
+ */
+ private final Map<DOMDataWriteTransaction, AutoCloseable> pendingTransactions = new HashMap<>();
+
+ /**
+ * Transaction created by this chain that hasn't been submitted or cancelled yet.
+ */
+ private AbstractWriteTx currentTransaction = null;
+ private boolean closed = false;
+ private boolean successful = true;
+
+ public TxChain(final DOMDataBroker dataBroker, final TransactionChainListener listener) {
+ this.dataBroker = dataBroker;
+ this.listener = listener;
+ }
+
+ @Override
+ public synchronized DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+ checkOperationPermitted();
+ return dataBroker.newReadOnlyTransaction();
+ }
+
+ @Override
+ public synchronized AbstractWriteTx newWriteOnlyTransaction() {
+ checkOperationPermitted();
+ final DOMDataWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+ Preconditions.checkState(writeTransaction instanceof AbstractWriteTx);
+ final AbstractWriteTx pendingWriteTx = (AbstractWriteTx) writeTransaction;
+ pendingTransactions.put(pendingWriteTx, pendingWriteTx.addListener(this));
+ currentTransaction = pendingWriteTx;
+ return pendingWriteTx;
+ }
+
+ @Override
+ public synchronized DOMDataReadWriteTransaction newReadWriteTransaction() {
+ return new ReadWriteTx(dataBroker.newReadOnlyTransaction(), newWriteOnlyTransaction());
+ }
+
+ @Override
+ public synchronized void close() {
+ if (!closed) {
+ closed = true;
+ notifyChainListenerSuccess();
+ }
+ }
+
+ @Override
+ public synchronized void onTransactionSuccessful(final AbstractWriteTx transaction) {
+ removePendingTx(transaction);
+ notifyChainListenerSuccess();
+ }
+
+ @Override
+ public synchronized void onTransactionFailed(final AbstractWriteTx transaction, final Throwable cause) {
+ removePendingTx(transaction);
+ successful = false;
+ if (currentTransaction != null) {
+ currentTransaction.cancel();
+ }
+ listener.onTransactionChainFailed(this, transaction, cause);
+ }
+
+ @Override
+ public synchronized void onTransactionSubmitted(final AbstractWriteTx transaction) {
+ currentTransaction = null;
+ }
+
+ @Override
+ public synchronized void onTransactionCancelled(final AbstractWriteTx transaction) {
+ removePendingTx(transaction);
+ currentTransaction = null;
+ }
+
+ private void removePendingTx(final AbstractWriteTx transaction) {
+ try {
+ pendingTransactions.remove(transaction).close();
+ } catch (final Exception e) {
+ LOG.error("Can't remove transaction listener registration", e);
+ }
+ }
+
+ /**
+ * Checks, if chain isn't closed and if there is no not submitted write transaction waiting.
+ */
+ private void checkOperationPermitted() {
+ if (closed) {
+ throw new TransactionChainClosedException("Transaction chain was closed");
+ }
+ Preconditions.checkState(currentTransaction == null, "Last write transaction has not finished yet");
+ }
+
+ private void notifyChainListenerSuccess() {
+ if (closed && pendingTransactions.isEmpty() && successful) {
+ listener.onTransactionChainSuccessful(this);
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
+
+interface TxListener {
+
+ /**
+ * Invoked, when transaction completes successfully.
+ * @param transaction transaction
+ */
+ void onTransactionSuccessful(AbstractWriteTx transaction);
+
+ /**
+ * Invoked, when transaction fails.
+ *
+ * @param transaction transaction
+ * @param cause cause
+ */
+ void onTransactionFailed(AbstractWriteTx transaction, Throwable cause);
+
+ /**
+ * Invoked, when transaction is cancelled.
+ * @param transaction transaction
+ */
+ void onTransactionCancelled(AbstractWriteTx transaction);
+
+ /**
+ * Invoked, when transaction is submitted.
+ * @param transaction transaction
+ */
+ void onTransactionSubmitted(AbstractWriteTx transaction);
+
+
+}
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_CANDIDATE_QNAME;
import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_FILTER_QNAME;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import java.net.InetSocketAddress;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
inOrder.verify(rpc).invokeRpc(toPath(NetconfMessageTransformUtil.NETCONF_UNLOCK_QNAME), NetconfBaseOps.getUnLockContent(NETCONF_RUNNING_QNAME));
}
+ @Test
+ public void testListenerSuccess() throws Exception {
+ doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult((NormalizedNode<?, ?>) null)))
+ .when(rpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
+ final WriteCandidateTx tx = new WriteCandidateTx(id, new NetconfBaseOps(rpc, BaseSchema.BASE_NETCONF_CTX.getSchemaContext()), false);
+ final TxListener listener = mock(TxListener.class);
+ tx.addListener(listener);
+ tx.delete(LogicalDatastoreType.CONFIGURATION, yangIId);
+ tx.submit();
+ verify(listener).onTransactionSubmitted(tx);
+ verify(listener).onTransactionSuccessful(tx);
+ verify(listener, never()).onTransactionFailed(eq(tx), any());
+ verify(listener, never()).onTransactionCancelled(tx);
+ }
+
+ @Test
+ public void testListenerCancellation() throws Exception {
+ doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult((NormalizedNode<?, ?>) null)))
+ .when(rpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
+ final WriteCandidateTx tx = new WriteCandidateTx(id, new NetconfBaseOps(rpc, BaseSchema.BASE_NETCONF_CTX.getSchemaContext()), false);
+ final TxListener listener = mock(TxListener.class);
+ tx.addListener(listener);
+ tx.delete(LogicalDatastoreType.CONFIGURATION, yangIId);
+ tx.cancel();
+ verify(listener).onTransactionCancelled(tx);
+ verify(listener, never()).onTransactionSubmitted(tx);
+ verify(listener, never()).onTransactionSuccessful(tx);
+ verify(listener, never()).onTransactionFailed(eq(tx), any());
+ }
+
+ @Test
+ public void testListenerFailure() throws Exception {
+ final IllegalStateException cause = new IllegalStateException("Failed tx");
+ doReturn(Futures.immediateFailedCheckedFuture(cause))
+ .when(rpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
+ final WriteCandidateTx tx = new WriteCandidateTx(id, new NetconfBaseOps(rpc, BaseSchema.BASE_NETCONF_CTX.getSchemaContext()), false);
+ final TxListener listener = mock(TxListener.class);
+ tx.addListener(listener);
+ tx.delete(LogicalDatastoreType.CONFIGURATION, yangIId);
+ tx.submit();
+ final ArgumentCaptor<Exception> excCaptor = ArgumentCaptor.forClass(Exception.class);
+ verify(listener).onTransactionSubmitted(tx);
+ verify(listener).onTransactionFailed(eq(tx), excCaptor.capture());
+ Assert.assertEquals(cause, excCaptor.getValue().getCause().getCause());
+ verify(listener, never()).onTransactionSuccessful(tx);
+ verify(listener, never()).onTransactionCancelled(tx);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+
+public class TxChainTest {
+
+ @Mock
+ private DOMDataBroker broker;
+ @Mock
+ private TransactionChainListener listener;
+ @Mock
+ private DOMDataReadOnlyTransaction readOnlyTx;
+ @Mock
+ private AbstractWriteTx writeOnlyTx1;
+ @Mock
+ private AbstractWriteTx writeOnlyTx2;
+ @Mock
+ private AbstractWriteTx writeOnlyTx3;
+ @Mock
+ private AutoCloseable registration1;
+ @Mock
+ private AutoCloseable registration2;
+ @Mock
+ private AutoCloseable registration3;
+ private final ArgumentCaptor<TxListener> captor = ArgumentCaptor.forClass(TxListener.class);
+ private TxChain chain;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(broker.newReadOnlyTransaction()).thenReturn(readOnlyTx);
+ when(broker.newWriteOnlyTransaction()).thenReturn(writeOnlyTx1).thenReturn(writeOnlyTx2).thenReturn(writeOnlyTx3);
+ when(writeOnlyTx1.addListener(any())).thenReturn(registration1);
+ when(writeOnlyTx2.addListener(any())).thenReturn(registration2);
+ when(writeOnlyTx3.addListener(any())).thenReturn(registration3);
+ chain = new TxChain(broker, listener);
+ }
+
+ @Test()
+ public void testNewReadOnlyTransactionPrevSubmitted() throws Exception {
+ chain.newWriteOnlyTransaction();
+ verify(writeOnlyTx1).addListener(captor.capture());
+ captor.getValue().onTransactionSubmitted(writeOnlyTx1);
+ chain.newReadOnlyTransaction();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testNewReadOnlyTransactionPrevNotSubmitted() throws Exception {
+ chain.newWriteOnlyTransaction();
+ chain.newReadOnlyTransaction();
+ }
+
+ @Test
+ public void testNewReadWriteTransactionPrevSubmitted() throws Exception {
+ chain.newReadWriteTransaction();
+ verify(writeOnlyTx1).addListener(captor.capture());
+ captor.getValue().onTransactionSubmitted(writeOnlyTx1);
+ chain.newReadWriteTransaction();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testNewReadWriteTransactionPrevNotSubmitted() throws Exception {
+ chain.newReadWriteTransaction();
+ chain.newReadWriteTransaction();
+ }
+
+ @Test
+ public void testNewWriteOnlyTransactionPrevSubmitted() throws Exception {
+ chain.newWriteOnlyTransaction();
+ verify(writeOnlyTx1).addListener(captor.capture());
+ captor.getValue().onTransactionSubmitted(writeOnlyTx1);
+ chain.newWriteOnlyTransaction();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testNewWriteOnlyTransactionPrevNotSubmitted() throws Exception {
+ chain.newWriteOnlyTransaction();
+ chain.newWriteOnlyTransaction();
+ }
+
+ @Test(expected = TransactionChainClosedException.class)
+ public void testCloseAfterFinished() throws Exception {
+ chain.close();
+ verify(listener).onTransactionChainSuccessful(chain);
+ chain.newReadOnlyTransaction();
+ }
+
+ @Test
+ public void testChainFail() throws Exception {
+ final AbstractWriteTx writeTx = chain.newWriteOnlyTransaction();
+ final ArgumentCaptor<TxListener> captor = ArgumentCaptor.forClass(TxListener.class);
+ verify(writeOnlyTx1).addListener(captor.capture());
+ writeTx.submit();
+ final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail");
+ captor.getValue().onTransactionFailed(writeOnlyTx1, cause);
+ verify(registration1).close();
+ verify(listener).onTransactionChainFailed(chain, writeOnlyTx1, cause);
+ }
+
+ @Test
+ public void testChainSuccess() throws Exception {
+ final AbstractWriteTx writeTx = chain.newWriteOnlyTransaction();
+ chain.close();
+ verify(writeOnlyTx1).addListener(captor.capture());
+ writeTx.submit();
+ captor.getValue().onTransactionSuccessful(writeOnlyTx1);
+ verify(registration1).close();
+ verify(listener).onTransactionChainSuccessful(chain);
+ }
+
+ @Test
+ public void testCancel() throws Exception {
+ final AbstractWriteTx writeTx = chain.newWriteOnlyTransaction();
+ verify(writeOnlyTx1).addListener(captor.capture());
+ writeTx.cancel();
+ captor.getValue().onTransactionCancelled(writeOnlyTx1);
+ chain.newWriteOnlyTransaction();
+ }
+
+ @Test
+ public void testMultiplePendingTransactions() throws Exception {
+ //create 1st tx
+ final AbstractWriteTx writeTx1 = chain.newWriteOnlyTransaction();
+ final ArgumentCaptor<TxListener> captor1 = ArgumentCaptor.forClass(TxListener.class);
+ verify(writeOnlyTx1).addListener(captor1.capture());
+ //submit 1st tx
+ writeTx1.submit();
+ captor1.getValue().onTransactionSubmitted(writeOnlyTx1);
+
+ //create 2nd tx
+ final AbstractWriteTx writeTx2 = chain.newWriteOnlyTransaction();
+ final ArgumentCaptor<TxListener> captor2 = ArgumentCaptor.forClass(TxListener.class);
+ verify(writeTx2).addListener(captor2.capture());
+ //submit 2nd tx
+ writeTx2.submit();
+ captor2.getValue().onTransactionSubmitted(writeOnlyTx2);
+
+ //create 3rd tx
+ final AbstractWriteTx writeTx3 = chain.newWriteOnlyTransaction();
+ final ArgumentCaptor<TxListener> captor3 = ArgumentCaptor.forClass(TxListener.class);
+ verify(writeTx3).addListener(captor3.capture());
+ //cancel 3rd tx
+ writeTx3.cancel();
+ captor3.getValue().onTransactionCancelled(writeOnlyTx3);
+
+ //close chain
+ chain.close();
+
+ //complete first two transactions successfully
+ captor1.getValue().onTransactionSuccessful(writeOnlyTx1);
+ captor2.getValue().onTransactionSuccessful(writeOnlyTx2);
+
+ verify(registration1).close();
+ verify(registration2).close();
+ verify(registration3).close();
+ verify(listener).onTransactionChainSuccessful(chain);
+ }
+
+ @Test
+ public void testMultiplePendingTransactionsFail() throws Exception {
+ //create 1st tx
+ final AbstractWriteTx writeTx1 = chain.newWriteOnlyTransaction();
+ final ArgumentCaptor<TxListener> captor1 = ArgumentCaptor.forClass(TxListener.class);
+ verify(writeOnlyTx1).addListener(captor1.capture());
+ //submit 1st tx
+ writeTx1.submit();
+ captor1.getValue().onTransactionSubmitted(writeOnlyTx1);
+
+ //create 2nd tx
+ final AbstractWriteTx writeTx2 = chain.newWriteOnlyTransaction();
+ final ArgumentCaptor<TxListener> captor2 = ArgumentCaptor.forClass(TxListener.class);
+ verify(writeTx2).addListener(captor2.capture());
+ //submit 2nd tx
+ writeTx2.submit();
+ captor2.getValue().onTransactionSubmitted(writeOnlyTx2);
+
+ //create 3rd tx
+ final AbstractWriteTx writeTx3 = chain.newWriteOnlyTransaction();
+ final ArgumentCaptor<TxListener> captor3 = ArgumentCaptor.forClass(TxListener.class);
+ verify(writeTx3).addListener(captor3.capture());
+
+ chain.close();
+
+ //fail 1st transaction
+ final Exception cause1 = new Exception("fail");
+ captor1.getValue().onTransactionFailed(writeOnlyTx1, cause1);
+ //current unsubmitted transaction should be cancelled
+ verify(writeTx3).cancel();
+ captor3.getValue().onTransactionCancelled(writeTx3);
+ //2nd transaction success
+ captor2.getValue().onTransactionSuccessful(writeOnlyTx2);
+
+ verify(registration1).close();
+ verify(registration2).close();
+ verify(registration3).close();
+ verify(listener).onTransactionChainFailed(chain, writeOnlyTx1, cause1);
+ // 1 transaction failed, onTransactionChainSuccessful must not be called
+ verify(listener, never()).onTransactionChainSuccessful(chain);
+ }
+}
\ No newline at end of file
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import java.io.Closeable;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.BindException;
import org.opendaylight.netconf.ssh.SshProxyServer;
import org.opendaylight.netconf.ssh.SshProxyServerConfiguration;
import org.opendaylight.netconf.ssh.SshProxyServerConfigurationBuilder;
+import org.opendaylight.netconf.test.tool.customrpc.SettableOperationProvider;
import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
this.nioExecutor = nioExecutor;
}
- private NetconfServerDispatcherImpl createDispatcher(final Set<Capability> capabilities, final boolean exi, final int generateConfigsTimeout,
- final Optional<File> notificationsFile, final boolean mdSal, final Optional<File> initialConfigXMLFile,
- final SchemaSourceProvider<YangTextSchemaSource> sourceProvider) {
+ private NetconfServerDispatcherImpl createDispatcher(final Set<Capability> capabilities,
+ final SchemaSourceProvider<YangTextSchemaSource> sourceProvider,
+ final TesttoolParameters params) {
final Set<Capability> transformedCapabilities = Sets.newHashSet(Collections2.transform(capabilities, new Function<Capability, Capability>() {
@Override
}
}
}));
-
- final SessionIdProvider idProvider = new SessionIdProvider();
-
- final AggregatedNetconfOperationServiceFactory aggregatedNetconfOperationServiceFactory = new AggregatedNetconfOperationServiceFactory();
- final NetconfOperationServiceFactory operationProvider = mdSal ? new MdsalOperationProvider(idProvider, transformedCapabilities, schemaContext, sourceProvider) :
- new SimulatedOperationProvider(idProvider, transformedCapabilities, notificationsFile, initialConfigXMLFile);
-
transformedCapabilities.add(new BasicCapability("urn:ietf:params:netconf:capability:candidate:1.0"));
-
final NetconfMonitoringService monitoringService1 = new DummyMonitoringService(transformedCapabilities);
+ final SessionIdProvider idProvider = new SessionIdProvider();
- final NetconfMonitoringActivator.NetconfMonitoringOperationServiceFactory monitoringService =
- new NetconfMonitoringActivator.NetconfMonitoringOperationServiceFactory(
- new NetconfMonitoringOperationService(monitoringService1));
- aggregatedNetconfOperationServiceFactory.onAddNetconfOperationServiceFactory(operationProvider);
- aggregatedNetconfOperationServiceFactory.onAddNetconfOperationServiceFactory(monitoringService);
+ final NetconfOperationServiceFactory aggregatedNetconfOperationServiceFactory = createOperationServiceFactory(sourceProvider, params, transformedCapabilities, monitoringService1, idProvider);
- final Set<String> serverCapabilities = exi
+ final Set<String> serverCapabilities = params.exi
? NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES
: Sets.newHashSet(XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1);
final NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new TesttoolNegotiationFactory(
- hashedWheelTimer, aggregatedNetconfOperationServiceFactory, idProvider, generateConfigsTimeout, monitoringService1, serverCapabilities);
+ hashedWheelTimer, aggregatedNetconfOperationServiceFactory, idProvider, params.generateConfigsTimeout, monitoringService1, serverCapabilities);
final NetconfServerDispatcherImpl.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcherImpl.ServerChannelInitializer(
serverNegotiatorFactory);
return new NetconfServerDispatcherImpl(serverChannelInitializer, nettyThreadgroup, nettyThreadgroup);
}
+ private NetconfOperationServiceFactory createOperationServiceFactory(final SchemaSourceProvider<YangTextSchemaSource> sourceProvider,
+ final TesttoolParameters params,
+ final Set<Capability> transformedCapabilities,
+ final NetconfMonitoringService monitoringService1,
+ final SessionIdProvider idProvider) {
+ final AggregatedNetconfOperationServiceFactory aggregatedNetconfOperationServiceFactory = new AggregatedNetconfOperationServiceFactory();
+
+ final NetconfOperationServiceFactory operationProvider;
+ if (params.mdSal) {
+ operationProvider = new MdsalOperationProvider(idProvider, transformedCapabilities, schemaContext, sourceProvider);
+ } else {
+ operationProvider = new SimulatedOperationProvider(idProvider, transformedCapabilities,
+ Optional.fromNullable(params.notificationFile),
+ Optional.fromNullable(params.initialConfigXMLFile));
+ }
+
+
+ final NetconfMonitoringActivator.NetconfMonitoringOperationServiceFactory monitoringService =
+ new NetconfMonitoringActivator.NetconfMonitoringOperationServiceFactory(
+ new NetconfMonitoringOperationService(monitoringService1));
+ aggregatedNetconfOperationServiceFactory.onAddNetconfOperationServiceFactory(operationProvider);
+ aggregatedNetconfOperationServiceFactory.onAddNetconfOperationServiceFactory(monitoringService);
+ if (params.rpcConfig != null) {
+ final SettableOperationProvider settableService = new SettableOperationProvider(params.rpcConfig);
+ aggregatedNetconfOperationServiceFactory.onAddNetconfOperationServiceFactory(settableService);
+ }
+ return aggregatedNetconfOperationServiceFactory;
+ }
+
public List<Integer> start(final TesttoolParameters params) {
LOG.info("Starting {}, {} simulated devices starting on port {}", params.deviceCount, params.ssh ? "SSH" : "TCP", params.startingPort);
final SharedSchemaRepository schemaRepo = new SharedSchemaRepository("netconf-simulator");
final Set<Capability> capabilities = parseSchemasToModuleCapabilities(params, schemaRepo);
- final NetconfServerDispatcherImpl dispatcher = createDispatcher(capabilities, params.exi, params.generateConfigsTimeout,
- Optional.fromNullable(params.notificationFile), params.mdSal, Optional.fromNullable(params.initialConfigXMLFile),
+ final NetconfServerDispatcherImpl dispatcher = createDispatcher(capabilities,
new SchemaSourceProvider<YangTextSchemaSource>() {
@Override
public CheckedFuture<? extends YangTextSchemaSource, SchemaSourceException> getSource(final SourceIdentifier sourceIdentifier) {
return schemaRepo.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
}
- });
+ }, params);
int currentPort = params.startingPort;
@Arg(dest = "thread-pool-size")
public int threadPoolSize;
+ @Arg(dest = "rpc-config")
+ public File rpcConfig;
static ArgumentParser getParser() {
final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf testtool");
.setDefault(8)
.help("The number of threads to keep in the pool, when creating a device simulator. Even if they are idle.")
.dest("thread-pool-size");
+ parser.addArgument("--rpc-config")
+ .type(File.class)
+ .help("Rpc config file. It can be used to define custom rpc behavior, or override the default one." +
+ "Usable for testing buggy device behavior.")
+ .dest("rpc-config");
return parser;
}
}
}
}
+ if (rpcConfig != null) {
+ checkArgument(rpcConfig.exists(), "Rpc config file has to exist");
+ checkArgument(!rpcConfig.isDirectory(), "Rpc config file can't be a directory");
+ checkArgument(rpcConfig.canRead(), "Rpc config file to be readable");
+ }
}
public ArrayList<ArrayList<Execution.DestToPayload>> getThreadsPayloads(final List<Integer> openDevices) {
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.test.tool.customrpc;
+
+import java.util.List;
+import javax.xml.bind.annotation.XmlElement;
+
+class Rpc {
+
+ @XmlElement(name = "input")
+ private XmlData input;
+
+ @XmlElement(name = "output")
+ private List<XmlData> output;
+
+ XmlData getInput() {
+ return input;
+ }
+
+ List<XmlData> getOutput() {
+ return output;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.test.tool.customrpc;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import java.io.File;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+import javax.xml.bind.JAXB;
+import org.opendaylight.controller.config.util.xml.DocumentedException;
+import org.opendaylight.controller.config.util.xml.XmlElement;
+import org.opendaylight.controller.config.util.xml.XmlUtil;
+import org.w3c.dom.Attr;
+import org.w3c.dom.Document;
+
+/**
+ * Mapping between RPCs and responses.
+ */
+class RpcMapping {
+
+ private final Multimap<Request, Document> requestToResponseMap = ArrayListMultimap.create();
+
+ /**
+ * Creates new mapping from file.
+ *
+ * @param config config file
+ */
+ RpcMapping(final File config) {
+ final Rpcs rpcs = JAXB.unmarshal(config, Rpcs.class);
+ for (final Rpc rpc : rpcs.getRpcs()) {
+ final Stream<Document> stream = rpc.getOutput().stream()
+ .map(XmlData::getData);
+ final XmlElement element = XmlElement.fromDomDocument(rpc.getInput().getData());
+ requestToResponseMap.putAll(new Request(element), stream::iterator);
+ }
+ }
+
+ /**
+ * Returns response matching given input. If multiple responses are configured for the same
+ * request, every invocation of this method returns next response as they are defined in the config file.
+ * Last configured response is used, when number of invocations is higher than number of configured responses
+ * for rpc.
+ *
+ * @param input request
+ * @return response document, or absent if mapping is not defined
+ */
+ Optional<Document> getResponse(final XmlElement input) {
+ final Collection<Document> responses = requestToResponseMap.get(new Request(input));
+ final Iterator<Document> iterator = responses.iterator();
+ if (iterator.hasNext()) {
+ final Document response = iterator.next();
+ if (iterator.hasNext()) {
+ iterator.remove();
+ }
+ return Optional.of(response);
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Rpc input wrapper. Needed because of custom {@link Request#equals(Object)}
+ * and {@link Request#hashCode()} implementations.
+ */
+ private static class Request {
+ private final XmlElement xmlElement;
+ private final int hashCode;
+
+ private Request(final XmlElement element) {
+ this.xmlElement = element;
+ hashCode = XmlUtil.toString(element)
+ .replaceAll("message-id=.*(>| )", "") //message id is variable, remove it
+ .replaceAll("\\s+", "") //remove whitespaces
+ .hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final Request request = (Request) o;
+ return documentEquals(this.xmlElement, request.xmlElement);
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ private static boolean documentEquals(final XmlElement e1, final XmlElement e2) {
+ if (!e1.getNamespaceOptionally().equals(e2.getNamespaceOptionally())) {
+ return false;
+ }
+ if (!e1.getName().equals(e2.getName())) {
+ return false;
+ }
+ if (attributesNotEquals(e1, e2)) {
+ return false;
+ }
+
+ final List<XmlElement> e1Children = e1.getChildElements();
+ final List<XmlElement> e2Children = e2.getChildElements();
+ if (e1Children.size() != e2Children.size()) {
+ return false;
+ }
+ final Iterator<XmlElement> e1Iterator = e1Children.iterator();
+ final Iterator<XmlElement> e2Iterator = e2Children.iterator();
+ while (e1Iterator.hasNext() && e2Iterator.hasNext()) {
+ if (!documentEquals(e1Iterator.next(), e2Iterator.next())) {
+ return false;
+ }
+ }
+
+ if (e1Children.isEmpty() && e1Children.isEmpty()) {
+ try {
+ return e1.getTextContent().equals(e2.getTextContent());
+ } catch (final DocumentedException e) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean attributesNotEquals(final XmlElement e1, final XmlElement e2) {
+ final Map<String, Attr> e1Attrs = e1.getAttributes();
+ final Map<String, Attr> e2Attrs = e2.getAttributes();
+ final Iterator<Map.Entry<String, Attr>> e1AttrIt = e1Attrs.entrySet().iterator();
+ final Iterator<Map.Entry<String, Attr>> e2AttrIt = e2Attrs.entrySet().iterator();
+ while (e1AttrIt.hasNext() && e2AttrIt.hasNext()) {
+ final Map.Entry<String, Attr> e1Next = e1AttrIt.next();
+ final Map.Entry<String, Attr> e2Next = e2AttrIt.next();
+ if (e1Next.getKey().equals("message-id") && e2Next.getKey().equals("message-id")) {
+ continue;
+ }
+ if (e1Next.getKey().equals("xmlns") && e2Next.getKey().equals("xmlns")) {
+ continue;
+ }
+ if (!e1Next.getKey().equals(e2Next.getKey())) {
+ return true;
+ }
+ if (!e1Next.getValue().getValue().equals(e2Next.getValue().getValue())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.test.tool.customrpc;
+
+import java.util.List;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "rpcs")
+@XmlAccessorType(XmlAccessType.FIELD)
+class Rpcs {
+
+ @XmlElement(name = "rpc")
+ private List<Rpc> rpcs;
+
+ List<Rpc> getRpcs() {
+ return rpcs;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.test.tool.customrpc;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Set;
+import org.opendaylight.controller.config.util.capability.Capability;
+import org.opendaylight.netconf.api.monitoring.CapabilityListener;
+import org.opendaylight.netconf.mapping.api.NetconfOperation;
+import org.opendaylight.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory;
+
+public class SettableOperationProvider implements NetconfOperationServiceFactory {
+
+ private final File rpcConfig;
+
+ public SettableOperationProvider(final File rpcConfig) {
+ this.rpcConfig = rpcConfig;
+ }
+
+ @Override
+ public Set<Capability> getCapabilities() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
+ return () -> {
+ //no op
+ };
+ }
+
+ @Override
+ public NetconfOperationService createService(final String netconfSessionIdForReporting) {
+ return new SettableOperationService(rpcConfig);
+ }
+
+ private static class SettableOperationService implements NetconfOperationService {
+
+ private final SettableRpc rpc;
+
+ private SettableOperationService(final File rpcConfig) {
+ this.rpc = new SettableRpc(rpcConfig);
+ }
+
+ @Override
+ public Set<NetconfOperation> getNetconfOperations() {
+ return Collections.singleton(rpc);
+ }
+
+ @Override
+ public void close() {
+ // no op
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.test.tool.customrpc;
+
+import java.io.File;
+import java.util.Optional;
+import org.opendaylight.controller.config.util.xml.DocumentedException;
+import org.opendaylight.controller.config.util.xml.XmlElement;
+import org.opendaylight.controller.config.util.xml.XmlUtil;
+import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.netconf.mapping.api.NetconfOperation;
+import org.opendaylight.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.w3c.dom.Document;
+
+/**
+ * {@link NetconfOperation} implementation. It can be configured to intercept rpcs with defined input
+ * and reply with defined output. If input isn't defined, rpc handling is delegated to the subsequent
+ * {@link NetconfOperation} which is able to handle it.
+ */
+class SettableRpc implements NetconfOperation {
+
+ private final RpcMapping mapping;
+
+ SettableRpc(final File rpcConfig) {
+ mapping = new RpcMapping(rpcConfig);
+ }
+
+ @Override
+ public HandlingPriority canHandle(final Document message) throws DocumentedException {
+ return HandlingPriority.HANDLE_WITH_DEFAULT_PRIORITY.increasePriority(1000);
+ }
+
+ @Override
+ public Document handle(final Document requestMessage, final NetconfOperationChainedExecution subsequentOperation)
+ throws DocumentedException {
+ final XmlElement requestElement = XmlElement.fromDomDocument(requestMessage);
+ final XmlElement rpcElement = requestElement.getOnlyChildElement();
+ final String msgId = requestElement.getAttribute(XmlNetconfConstants.MESSAGE_ID);
+ final Optional<Document> response = mapping.getResponse(rpcElement);
+ if (response.isPresent()) {
+ final Document document = response.get();
+ checkForError(document);
+ document.getDocumentElement().setAttribute(XmlNetconfConstants.MESSAGE_ID, msgId);
+ return document;
+ } else if (subsequentOperation.isExecutionTermination()) {
+ throw new DocumentedException("Mapping not found " + XmlUtil.toString(requestMessage),
+ DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_NOT_SUPPORTED,
+ DocumentedException.ErrorSeverity.ERROR);
+ } else {
+ return subsequentOperation.execute(requestMessage);
+ }
+ }
+
+ private void checkForError(final Document document) throws DocumentedException {
+ final XmlElement rpcReply = XmlElement.fromDomDocument(document);
+ if (rpcReply.getOnlyChildElementOptionally("rpc-error").isPresent()) {
+ throw DocumentedException.fromXMLDocument(document);
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.test.tool.customrpc;
+
+import javax.xml.bind.annotation.XmlAnyElement;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+class XmlData {
+ @XmlAnyElement
+ private Element data;
+
+ Document getData() {
+ return data.getOwnerDocument();
+ }
+
+}