RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
- createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
+ createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker);
if (keepaliveDelay > 0) {
LOG.warn("Adding keepalive facade, for device {}", nodeId);
- salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
+ salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay, defaultRequestTimeoutMillis);
}
final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
.build();
}
- protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis);
+ protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker);
@Override
public abstract ConnectionStatusListenerRegistration registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener);
RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
- createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
+ createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker);
if (keepaliveDelay > 0) {
LOG.warn("Adding keepalive facade, for device {}", nodeId);
- salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
+ salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay, defaultRequestTimeoutMillis);
}
final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
}
@Override
- protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
- return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
+ protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker) {
+ return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker);
}
@Override
}
@Override
- protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(RemoteDeviceId id, Broker domBroker, BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
- return new NetconfDeviceSalFacade(id, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
+ protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(RemoteDeviceId id, Broker domBroker, BindingAwareBroker bindingBroker) {
+ return new NetconfDeviceSalFacade(id, domBroker, bindingAwareBroker);
}
@Override
public NetconfDeviceMasterDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
final SchemaContext schemaContext, final DOMRpcService rpc,
- final NetconfSessionPreferences netconfSessionPreferences, final long requestTimeoutMillis) {
+ final NetconfSessionPreferences netconfSessionPreferences) {
this.id = id;
- delegateBroker = new NetconfDeviceDataBroker(id, schemaContext, rpc, netconfSessionPreferences, requestTimeoutMillis);
+ delegateBroker = new NetconfDeviceDataBroker(id, schemaContext, rpc, netconfSessionPreferences);
this.actorSystem = actorSystem;
// only ever need 1 readTx since it doesnt need to be closed
private final RemoteDeviceId id;
private final Broker domBroker;
private final BindingAwareBroker bindingBroker;
- private final long defaultRequestTimeoutMillis;
private SchemaContext remoteSchemaContext = null;
private NetconfSessionPreferences netconfSessionPreferences = null;
public TopologyMountPointFacade(final String topologyId,
final RemoteDeviceId id,
final Broker domBroker,
- final BindingAwareBroker bindingBroker,
- long defaultRequestTimeoutMillis) {
+ final BindingAwareBroker bindingBroker) {
this.topologyId = topologyId;
this.id = id;
this.domBroker = domBroker;
this.bindingBroker = bindingBroker;
- this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis;
this.salProvider = new ClusteredNetconfDeviceMountInstanceProxy(id);
registerToSal(domBroker);
}
deviceDataBroker = TypedActor.get(context).typedActorOf(new TypedProps<>(ProxyNetconfDeviceDataBroker.class, new Creator<NetconfDeviceMasterDataBroker>() {
@Override
public NetconfDeviceMasterDataBroker create() throws Exception {
- return new NetconfDeviceMasterDataBroker(actorSystem, id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis);
+ return new NetconfDeviceMasterDataBroker(actorSystem, id, remoteSchemaContext, deviceRpc, netconfSessionPreferences);
}
}), MOUNT_POINT);
LOG.debug("Master data broker registered on path {}", TypedActor.get(actorSystem).getActorRefFor(deviceDataBroker).path());
}
@Override
- protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(RemoteDeviceId id, Broker domBroker, BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
+ protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(RemoteDeviceId id, Broker domBroker, BindingAwareBroker bindingBroker) {
return salFacade;
}
MockitoAnnotations.initMocks(this);
- mountPointFacade = new TopologyMountPointFacade(TOPOLOGY_ID, REMOTE_DEVICE_ID, domBroker, bindingBroker, 1L);
+ mountPointFacade = new TopologyMountPointFacade(TOPOLOGY_ID, REMOTE_DEVICE_ID, domBroker, bindingBroker);
mountPointFacade.registerConnectionStatusListener(connectionStatusListener1);
mountPointFacade.registerConnectionStatusListener(connectionStatusListener2);
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
</dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-api-mockito</artifactId>
- <version>1.6.4</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-junit4</artifactId>
- <version>1.6.4</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<scm>
final ExecutorService globalProcessingExecutor = processingExecutor.getExecutor();
RemoteDeviceHandler<NetconfSessionPreferences> salFacade
- = new NetconfDeviceSalFacade(id, domRegistry, bindingRegistry, getDefaultRequestTimeoutMillis());
+ = new NetconfDeviceSalFacade(id, domRegistry, bindingRegistry);
final Long keepaliveDelay = getKeepaliveDelay();
if (shouldSendKeepalive()) {
// Keepalive executor is optional for now and a default instance is supported
final ScheduledExecutorService executor = keepaliveExecutor == null ? DEFAULT_KEEPALIVE_EXECUTOR : keepaliveExecutor.getExecutor();
- salFacade = new KeepaliveSalFacade(id, salFacade, executor, keepaliveDelay);
+ salFacade = new KeepaliveSalFacade(id, salFacade, executor, keepaliveDelay, getDefaultRequestTimeoutMillis());
}
// Setup information related to the SchemaRegistry, SchemaResourceFactory, etc.
// 2 minutes keepalive delay by default
private static final long DEFAULT_DELAY = TimeUnit.MINUTES.toSeconds(2);
+ // 1 minute transaction timeout by default
+ private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
+
private final RemoteDeviceId id;
private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
private final ScheduledExecutorService executor;
private final long keepaliveDelaySeconds;
private final ResetKeepalive resetKeepaliveTask;
+ private final long defaultRequestTimeoutMillis;
private volatile NetconfDeviceCommunicator listener;
private volatile ScheduledFuture<?> currentKeepalive;
private volatile DOMRpcService currentDeviceRpc;
public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
- final ScheduledExecutorService executor, final long keepaliveDelaySeconds) {
+ final ScheduledExecutorService executor, final long keepaliveDelaySeconds, final long defaultRequestTimeoutMillis) {
this.id = id;
this.salFacade = salFacade;
this.executor = executor;
this.keepaliveDelaySeconds = keepaliveDelaySeconds;
+ this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis;
this.resetKeepaliveTask = new ResetKeepalive();
}
public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
final ScheduledExecutorService executor) {
- this(id, salFacade, executor, DEFAULT_DELAY);
+ this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
}
/**
@Override
public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
this.currentDeviceRpc = deviceRpc;
- final DOMRpcService deviceRpc1 = new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask);
+ final DOMRpcService deviceRpc1 = new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask, defaultRequestTimeoutMillis, executor);
salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1);
LOG.debug("{}: Netconf session initiated, starting keepalives", id);
/**
* Reset keepalive after each RPC response received
*/
- private class ResetKeepalive implements com.google.common.util.concurrent.FutureCallback<DOMRpcResult> {
+ private class ResetKeepalive implements FutureCallback<DOMRpcResult> {
@Override
public void onSuccess(@Nullable final DOMRpcResult result) {
// No matter what response we got, rpc-reply or rpc-error, we got it from device so the netconf session is OK
@Override
public void onFailure(@Nonnull final Throwable t) {
- // User/Application RPC failed (The RPC did not reach the remote device.
+ // User/Application RPC failed (The RPC did not reach the remote device or .. TODO what other reasons could cause this ?)
// There is no point in keeping this session. Reconnect.
LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, t);
reconnect();
}
}
+ /*
+ * Request timeout task is called once the defaultRequestTimeoutMillis is
+ * reached. At this moment, if the request is not yet finished, we cancel
+ * it.
+ */
+ private static final class RequestTimeoutTask implements Runnable {
+
+ private final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResultFuture;
+
+ public RequestTimeoutTask(final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResultFuture) {
+ this.rpcResultFuture = rpcResultFuture;
+ }
+
+ @Override
+ public void run() {
+ if (!rpcResultFuture.isDone()) {
+ rpcResultFuture.cancel(true);
+ }
+ }
+ }
+
/**
- * DOMRpcService proxy that attaches reset-keepalive-task to each RPC invocation.
+ * DOMRpcService proxy that attaches reset-keepalive-task and schedule
+ * request-timeout-task to each RPC invocation.
*/
private static final class KeepaliveDOMRpcService implements DOMRpcService {
private final DOMRpcService deviceRpc;
private ResetKeepalive resetKeepaliveTask;
+ private final long defaultRequestTimeoutMillis;
+ private final ScheduledExecutorService executor;
- public KeepaliveDOMRpcService(final DOMRpcService deviceRpc, final ResetKeepalive resetKeepaliveTask) {
+ public KeepaliveDOMRpcService(final DOMRpcService deviceRpc, final ResetKeepalive resetKeepaliveTask,
+ final long defaultRequestTimeoutMillis, final ScheduledExecutorService executor) {
this.deviceRpc = deviceRpc;
this.resetKeepaliveTask = resetKeepaliveTask;
+ this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis;
+ this.executor = executor;
}
@Nonnull
public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull final SchemaPath type, final NormalizedNode<?, ?> input) {
final CheckedFuture<DOMRpcResult, DOMRpcException> domRpcResultDOMRpcExceptionCheckedFuture = deviceRpc.invokeRpc(type, input);
Futures.addCallback(domRpcResultDOMRpcExceptionCheckedFuture, resetKeepaliveTask);
+
+ final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(domRpcResultDOMRpcExceptionCheckedFuture);
+ executor.schedule(timeoutTask, defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS);
+
return domRpcResultDOMRpcExceptionCheckedFuture;
}
public final class NetconfDeviceDataBroker implements DOMDataBroker {
private final RemoteDeviceId id;
private final NetconfBaseOps netconfOps;
- private final long requestTimeoutMillis;
private final boolean rollbackSupport;
private boolean candidateSupported;
private boolean runningWritable;
- public NetconfDeviceDataBroker(final RemoteDeviceId id, final SchemaContext schemaContext, final DOMRpcService rpc, final NetconfSessionPreferences netconfSessionPreferences, long requestTimeoutMillis) {
+ public NetconfDeviceDataBroker(final RemoteDeviceId id, final SchemaContext schemaContext, final DOMRpcService rpc, final NetconfSessionPreferences netconfSessionPreferences) {
this.id = id;
this.netconfOps = new NetconfBaseOps(rpc, schemaContext);
- this.requestTimeoutMillis = requestTimeoutMillis;
// get specific attributes from netconf preferences and get rid of it
// no need to keep the entire preferences object, its quite big with all the capability QNames
candidateSupported = netconfSessionPreferences.isCandidateSupported();
@Override
public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
- return new ReadOnlyTx(netconfOps, id, requestTimeoutMillis);
+ return new ReadOnlyTx(netconfOps, id);
}
@Override
public DOMDataWriteTransaction newWriteOnlyTransaction() {
if(candidateSupported) {
if(runningWritable) {
- return new WriteCandidateRunningTx(id, netconfOps, rollbackSupport, requestTimeoutMillis);
+ return new WriteCandidateRunningTx(id, netconfOps, rollbackSupport);
} else {
- return new WriteCandidateTx(id, netconfOps, rollbackSupport, requestTimeoutMillis);
+ return new WriteCandidateTx(id, netconfOps, rollbackSupport);
}
} else {
- return new WriteRunningTx(id, netconfOps, rollbackSupport, requestTimeoutMillis);
+ return new WriteRunningTx(id, netconfOps, rollbackSupport);
}
}
private final RemoteDeviceId id;
private final NetconfDeviceSalProvider salProvider;
- private final long defaultRequestTimeoutMillis;
private final List<AutoCloseable> salRegistrations = Lists.newArrayList();
- public NetconfDeviceSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
+ public NetconfDeviceSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker) {
this.id = id;
this.salProvider = new NetconfDeviceSalProvider(id);
- this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis;
registerToSal(domBroker, bindingBroker);
}
public synchronized void onDeviceConnected(final SchemaContext schemaContext,
final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
- final DOMDataBroker domBroker = new NetconfDeviceDataBroker(id, schemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis);
+ final DOMDataBroker domBroker = new NetconfDeviceDataBroker(id, schemaContext, deviceRpc, netconfSessionPreferences);
final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.MixinNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(AbstractWriteTx.class);
- protected final long defaultRequestTimeoutMillis;
protected final RemoteDeviceId id;
protected final NetconfBaseOps netOps;
protected final boolean rollbackSupport;
// Allow commit to be called only once
protected boolean finished = false;
- public AbstractWriteTx(final long requestTimeoutMillis, final NetconfBaseOps netOps, final RemoteDeviceId id, final boolean rollbackSupport) {
- this.defaultRequestTimeoutMillis = requestTimeoutMillis;
+ public AbstractWriteTx(final NetconfBaseOps netOps, final RemoteDeviceId id, final boolean rollbackSupport) {
this.netOps = netOps;
this.id = id;
this.rollbackSupport = rollbackSupport;
}
protected abstract void editConfig(DataContainerChild<?, ?> editStructure, Optional<ModifyAction> defaultOperation) throws NetconfDocumentedException;
-
-
- protected ListenableFuture<DOMRpcResult> perfomRequestWithTimeout(String operation, ListenableFuture<DOMRpcResult> future) {
- try {
- future.get(defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS);
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("{}: {} failed with error", operation, id, e);
- return Futures.immediateFailedCheckedFuture(new RuntimeException(id + ": " + operation + " failed"));
- } catch (TimeoutException e) {
- LOG.warn("{}: Unable to {} after {} milliseconds", id, operation, defaultRequestTimeoutMillis, e);
- return Futures.immediateFailedCheckedFuture(new SchemaSourceException(e.getMessage()));
- }
- return future;
- }
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
private final RemoteDeviceId id;
private final FutureCallback<DOMRpcResult> loggingCallback;
- private final long requestTimeoutMillis;
-
- public ReadOnlyTx(final NetconfBaseOps netconfOps, final RemoteDeviceId id, final long requestTimeoutMillis) {
+ public ReadOnlyTx(final NetconfBaseOps netconfOps, final RemoteDeviceId id) {
this.netconfOps = netconfOps;
this.id = id;
- this.requestTimeoutMillis = requestTimeoutMillis;
// Simple logging callback to log result of read operation
loggingCallback = new FutureCallback<DOMRpcResult>() {
}
});
- if(!readWithTimeout("readConfigurationData", configRunning)) {
- return null;
- }
-
return MappingCheckedFuture.create(transformedFuture, ReadFailedException.MAPPER);
}
}
});
- if(!readWithTimeout("readOperationalData", configCandidate)) {
- return null;
- }
-
return MappingCheckedFuture.create(transformedFuture, ReadFailedException.MAPPER);
}
public Object getIdentifier() {
return this;
}
-
- private boolean readWithTimeout(String operation, ListenableFuture<DOMRpcResult> future) {
- try {
- future.get(requestTimeoutMillis, TimeUnit.MILLISECONDS);
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("{}: {} failed with error", id, operation, e);
- throw new RuntimeException(id + ": readOperationalData failed");
- } catch (TimeoutException e) {
- LOG.warn("{}: Unable to {} after {} milliseconds", id, operation, requestTimeoutMillis, e);
- future.cancel(true);
- return false;
- }
- return true;
- }
}
private static final Logger LOG = LoggerFactory.getLogger(WriteCandidateRunningTx.class);
- public WriteCandidateRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps, final boolean rollbackSupport, final long requestTimeoutMillis) {
- super(id, netOps, rollbackSupport, requestTimeoutMillis);
+ public WriteCandidateRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps, final boolean rollbackSupport) {
+ super(id, netOps, rollbackSupport);
}
@Override
}
private void lockRunning() {
- final String operation = "Lock Running";
try {
- invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+ invokeBlocking("Lock running", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
@Override
public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
- return perfomRequestWithTimeout(operation, input.lockRunning(new NetconfRpcFutureCallback(operation, id)));
-
+ return input.lockRunning(new NetconfRpcFutureCallback("Lock running", id));
}
});
} catch (final NetconfDocumentedException e) {
}
};
- public WriteCandidateTx(final RemoteDeviceId id, final NetconfBaseOps rpc, final boolean rollbackSupport, long requestTimeoutMillis) {
- super(requestTimeoutMillis, rpc, id, rollbackSupport);
+ public WriteCandidateTx(final RemoteDeviceId id, final NetconfBaseOps rpc, final boolean rollbackSupport) {
+ super(rpc, id, rollbackSupport);
}
@Override
}
private void lock() throws NetconfDocumentedException {
- final String operation = "Lock candidate";
try {
- invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+ invokeBlocking("Lock candidate", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
@Override
public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
- return perfomRequestWithTimeout(operation, input.lockCandidate(new NetconfRpcFutureCallback(operation, id)));
+ return input.lockCandidate(new NetconfRpcFutureCallback("Lock candidate", id));
}
});
} catch (final NetconfDocumentedException e) {
@Override
protected void editConfig(final DataContainerChild<?, ?> editStructure, final Optional<ModifyAction> defaultOperation) throws NetconfDocumentedException {
- final String operation = "Edit candidate";
- invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+ invokeBlocking("Edit candidate", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
@Override
public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
-
- return perfomRequestWithTimeout(operation, defaultOperation.isPresent()
- ? input.editConfigCandidate(new NetconfRpcFutureCallback(operation, id), editStructure, defaultOperation.get(),
- rollbackSupport)
- : input.editConfigCandidate(new NetconfRpcFutureCallback(operation, id), editStructure,
- rollbackSupport));
+ return defaultOperation.isPresent()
+ ? input.editConfigCandidate(new NetconfRpcFutureCallback("Edit candidate", id), editStructure, defaultOperation.get(),
+ rollbackSupport)
+ : input.editConfigCandidate(new NetconfRpcFutureCallback("Edit candidate", id), editStructure,
+ rollbackSupport);
}
});
}
private static final Logger LOG = LoggerFactory.getLogger(WriteRunningTx.class);
public WriteRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps,
- final boolean rollbackSupport, long requestTimeoutMillis) {
- super(requestTimeoutMillis, netOps, id, rollbackSupport);
+ final boolean rollbackSupport) {
+ super(netOps, id, rollbackSupport);
}
@Override
}
private void lock() {
- final String operation = "Lock running";
try {
- invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+ invokeBlocking("Lock running", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
@Override
public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
- return perfomRequestWithTimeout(operation, input.lockRunning(new NetconfRpcFutureCallback(operation, id)));
+ return input.lockRunning(new NetconfRpcFutureCallback("Lock running", id));
}
});
} catch (final NetconfDocumentedException e) {
@Override
public synchronized CheckedFuture<Void, TransactionCommitFailedException> submit() {
- final ListenableFuture<Void> commitFutureAsVoid = Futures.transform(commit(), new Function<RpcResult<TransactionStatus>, Void>() {
+ final ListenableFuture<Void> commmitFutureAsVoid = Futures.transform(commit(), new Function<RpcResult<TransactionStatus>, Void>() {
@Override
public Void apply(final RpcResult<TransactionStatus> input) {
return null;
}
});
- return Futures.makeChecked(commitFutureAsVoid, new Function<Exception, TransactionCommitFailedException>() {
+ return Futures.makeChecked(commmitFutureAsVoid, new Function<Exception, TransactionCommitFailedException>() {
@Override
public TransactionCommitFailedException apply(final Exception input) {
return new TransactionCommitFailedException("Submit of transaction " + getIdentifier() + " failed", input);
@Override
protected void editConfig(final DataContainerChild<?, ?> editStructure, final Optional<ModifyAction> defaultOperation) throws NetconfDocumentedException {
- final String operation = "Edit running";
- invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+ invokeBlocking("Edit running", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
@Override
public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
- return perfomRequestWithTimeout(operation, defaultOperation.isPresent()
- ? input.editConfigRunning(new NetconfRpcFutureCallback(operation, id), editStructure, defaultOperation.get(),
- rollbackSupport)
- : input.editConfigRunning(new NetconfRpcFutureCallback(operation, id), editStructure,
- rollbackSupport));
+ return defaultOperation.isPresent()
+ ? input.editConfigRunning(new NetconfRpcFutureCallback("Edit running", id), editStructure, defaultOperation.get(),
+ rollbackSupport)
+ : input.editConfigRunning(new NetconfRpcFutureCallback("Edit running", id), editStructure,
+ rollbackSupport);
}
});
}
private void unlock() {
- final String operation = "Unlocking running";
try {
- invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+ invokeBlocking("Unlocking running", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
@Override
public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
- return perfomRequestWithTimeout(operation, input.unlockRunning(new NetconfRpcFutureCallback(operation, id)));
+ return input.unlockRunning(new NetconfRpcFutureCallback("Unlock running", id));
}
});
} catch (final NetconfDocumentedException e) {
doReturn(Futures.immediateCheckedFuture(result)).when(deviceRpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
final KeepaliveSalFacade keepaliveSalFacade =
- new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorServiceSpy, 1L);
+ new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorServiceSpy, 1L, 1L);
keepaliveSalFacade.setListener(listener);
keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);
.when(deviceRpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
final KeepaliveSalFacade keepaliveSalFacade =
- new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorServiceSpy, 1L);
+ new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorServiceSpy, 1L, 1L);
keepaliveSalFacade.setListener(listener);
keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);
.when(deviceRpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
final KeepaliveSalFacade keepaliveSalFacade =
- new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorServiceSpy, 100L);
+ new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorServiceSpy, 100L, 1L);
keepaliveSalFacade.setListener(listener);
keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);
private NetconfDeviceDataBroker getDataBroker(String... caps) {
NetconfSessionPreferences prefs = NetconfSessionPreferences.fromStrings(Arrays.asList(caps));
final RemoteDeviceId id = new RemoteDeviceId("device-1", InetSocketAddress.createUnresolved("localhost", 17830));
- return new NetconfDeviceDataBroker(id, schemaContext, rpcService, prefs, 1000);
+ return new NetconfDeviceDataBroker(id, schemaContext, rpcService, prefs);
}
}
\ No newline at end of file
@Test
public void testIgnoreNonVisibleData() {
final WriteCandidateTx tx = new WriteCandidateTx(id, new NetconfBaseOps(rpc, mock(SchemaContext.class)),
- false, 60000L);
+ false);
final MapNode emptyList = ImmutableNodes.mapNodeBuilder(NETCONF_FILTER_QNAME).build();
tx.merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(NETCONF_FILTER_QNAME)), emptyList);
tx.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(NETCONF_FILTER_QNAME)), emptyList);
@Test
public void testDiscardChanges() {
final WriteCandidateTx tx = new WriteCandidateTx(id, new NetconfBaseOps(rpc, mock(SchemaContext.class)),
- false, 60000L);
+ false);
final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
try {
submitFuture.checkedGet();
.doReturn(rpcErrorFuture).when(rpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
final WriteCandidateTx tx = new WriteCandidateTx(id, new NetconfBaseOps(rpc, mock(SchemaContext.class)),
- false, 60000L);
+ false);
final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
try {
.when(rpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
final WriteRunningTx tx = new WriteRunningTx(id, new NetconfBaseOps(rpc, NetconfMessageTransformer.BaseSchema.BASE_NETCONF_CTX_WITH_NOTIFICATIONS.getSchemaContext()),
- false, 60000L);
+ false);
try {
tx.delete(LogicalDatastoreType.CONFIGURATION, yangIId);
} catch (final Exception e) {
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-@PrepareForTest({NetconfBaseOps.class})
-@RunWith(PowerMockRunner.class)
public class ReadOnlyTxTest {
private static final YangInstanceIdentifier path = YangInstanceIdentifier.create();
public void testRead() throws Exception {
final NetconfBaseOps netconfOps = new NetconfBaseOps(rpc, mock(SchemaContext.class));
- final ReadOnlyTx readOnlyTx = new ReadOnlyTx(netconfOps, new RemoteDeviceId("a", new InetSocketAddress("localhost", 196)), 60000L);
+ final ReadOnlyTx readOnlyTx = new ReadOnlyTx(netconfOps, new RemoteDeviceId("a", new InetSocketAddress("localhost", 196)));
readOnlyTx.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create());
verify(rpc).invokeRpc(Mockito.eq(NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME)), any(NormalizedNode.class));
readOnlyTx.read(LogicalDatastoreType.OPERATIONAL, path);
verify(rpc).invokeRpc(Mockito.eq(NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.NETCONF_GET_QNAME)), any(NormalizedNode.class));
}
-
- @SuppressWarnings("unchecked")
- @Test
- public void testReadTimeout() throws Exception {
- final ListenableFuture<DOMRpcResult> future = mock(ListenableFuture.class);
-
- Mockito.when(future.get(Mockito.anyLong(), any(TimeUnit.class))).then(new Answer<DOMRpcResult>() {
- @Override
- public DOMRpcResult answer(InvocationOnMock invocation)
- throws Throwable {
- throw new TimeoutException("Processing Timeout");
- }
- });
-
- final NetconfBaseOps netconfOps = PowerMockito.mock(NetconfBaseOps.class);
- Mockito.when(netconfOps.getConfigRunning(any(FutureCallback.class), any(Optional.class))).thenReturn(future);
-
-
- final ReadOnlyTx readOnlyTx = new ReadOnlyTx(netconfOps, new RemoteDeviceId("a", new InetSocketAddress("localhost", 196)), 100L);
- Assert.assertNull("Read operation didn't correctly timeout", readOnlyTx.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create()));
- readOnlyTx.close();
- }
}
\ No newline at end of file