- added configurable timeout (default 30 secs) while processing RPC requests which need response from PCC
- updated unit-test
Change-Id: I18b508564381e41e4fb1fd7f030cf52e314136ea
Signed-off-by: Ajay <ajayl.bro@gmail.com>
}
enum no-ack {
description
- "The request has been sent to the PCC, but the session
- went down before we have received confirmation of the
- request being received by the PCC. PCC's state is
+ "The request has been sent to the PCC, but either the
+ session went down before we have received confirmation
+ of the request being received by the PCC, or the request
+ timed-out waiting for response from PCC. PCC's state is
unknown -- the request may or may not be reflected
in its internal state. The caller should not make
any hard assumptions about PCC state until it reconnects
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeMXBean;
import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeRegistration;
this.listenerState.updateStatefulSentMsg(message);
final PCEPRequest req = new PCEPRequest(metadata);
this.requests.put(requestId, req);
+ final int rpcTimeout = serverSessionManager.getRpcTimeout();
+ LOG.trace("RPC response timeout value is {} seconds", rpcTimeout);
+ if (rpcTimeout > 0) {
+ setupTimeoutHandler(requestId, req, rpcTimeout);
+ }
f.addListener(new FutureListener<Void>() {
@Override
return req.getFuture();
}
+ private void setupTimeoutHandler(final S requestId, final PCEPRequest req, final int timeout) {
+ final Timer timer = req.getTimer();
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ synchronized (AbstractTopologySessionListener.this) {
+ AbstractTopologySessionListener.this.requests.remove(requestId);
+ }
+ req.done();
+ LOG.info("Request {} timed-out waiting for response", requestId);
+ }
+ }, TimeUnit.SECONDS.toMillis(timeout));
+ LOG.trace("Set up response timeout handler for request {}", requestId);
+ }
+
/**
* Update an LSP in the data store
*
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.Timer;
import java.util.concurrent.TimeUnit;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.OperationResult;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.lsp.metadata.Metadata;
private final Metadata metadata;
private volatile State state;
private final Stopwatch stopwatch;
+ private final Timer timer;
PCEPRequest(final Metadata metadata) {
this.future = SettableFuture.create();
this.metadata = metadata;
this.state = State.UNSENT;
this.stopwatch = Stopwatch.createStarted();
+ this.timer = new Timer();
}
protected ListenableFuture<OperationResult> getFuture() {
return state;
}
+ public Timer getTimer() {
+ return timer;
+ }
+
public synchronized void done(final OperationResult result) {
if (state != State.DONE) {
LOG.debug("Request went from {} to {}", state, State.DONE);
state = State.DONE;
+ timer.cancel();
future.set(result);
}
}
+ public synchronized void done() {
+ OperationResult result = null;
+ switch (state) {
+ case UNSENT:
+ result = OperationResults.UNSENT;
+ break;
+ case UNACKED:
+ result = OperationResults.NOACK;
+ break;
+ case DONE:
+ return;
+ }
+ done(result);
+ }
+
public synchronized void sent() {
if (state == State.UNSENT) {
LOG.debug("Request went from {} to {}", state, State.UNACKED);
public static PCEPTopologyProvider create(final PCEPDispatcher dispatcher, final InetSocketAddress address, final Optional<KeyMapping> keys,
final InstructionScheduler scheduler, final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry,
final InstanceIdentifier<Topology> topology, final TopologySessionListenerFactory listenerFactory,
- final Optional<PCEPTopologyProviderRuntimeRegistrator> runtimeRootRegistrator) throws InterruptedException,
+ final Optional<PCEPTopologyProviderRuntimeRegistrator> runtimeRootRegistrator, final int rpcTimeout) throws InterruptedException,
ExecutionException, ReadFailedException, TransactionCommitFailedException {
- final ServerSessionManager manager = new ServerSessionManager(dataBroker, topology, listenerFactory);
+ final ServerSessionManager manager = new ServerSessionManager(dataBroker, topology, listenerFactory, rpcTimeout);
if (runtimeRootRegistrator.isPresent()) {
manager.registerRuntimeRootRegistartion(runtimeRootRegistrator.get());
}
private final DataBroker broker;
private final PCEPStatefulPeerProposal peerProposal;
private Optional<PCEPTopologyProviderRuntimeRegistration> runtimeRootRegistration = Optional.absent();
+ private final int rpcTimeout;
public ServerSessionManager(final DataBroker broker, final InstanceIdentifier<Topology> topology,
- final TopologySessionListenerFactory listenerFactory) throws ReadFailedException, TransactionCommitFailedException {
+ final TopologySessionListenerFactory listenerFactory, final int rpcTimeout) throws ReadFailedException, TransactionCommitFailedException {
this.broker = Preconditions.checkNotNull(broker);
this.topology = Preconditions.checkNotNull(topology);
this.listenerFactory = Preconditions.checkNotNull(listenerFactory);
this.peerProposal = PCEPStatefulPeerProposal.createStatefulPeerProposal(this.broker, this.topology);
+ this.rpcTimeout = rpcTimeout;
// Now create the base topology
final TopologyKey k = InstanceIdentifier.keyOf(topology);
Preconditions.checkNotNull(address);
peerProposal.setPeerProposal(createNodeId(address.getAddress()), openBuilder);
}
+
+ public int getRpcTimeout() {
+ return this.rpcTimeout;
+ }
}
JmxAttributeValidationException.checkNotNull(getListenAddress(), IS_NOT_SET, listenAddressJmxAttribute);
JmxAttributeValidationException.checkNotNull(getListenPort(), IS_NOT_SET, listenPortJmxAttribute);
JmxAttributeValidationException.checkNotNull(getStatefulPlugin(), IS_NOT_SET, statefulPluginJmxAttribute);
+ JmxAttributeValidationException.checkNotNull(getRpcTimeout(), IS_NOT_SET, rpcTimeoutJmxAttribute);
final Optional<KeyMapping> keys = contructKeys();
if (keys.isPresent()) {
try {
return PCEPTopologyProvider.create(getDispatcherDependency(), address, contructKeys(), getSchedulerDependency(),
getDataProviderDependency(), getRpcRegistryDependency(), topology, getStatefulPluginDependency(),
- Optional.of(getRootRuntimeBeanRegistratorWrapper()));
+ Optional.of(getRootRuntimeBeanRegistratorWrapper()), getRpcTimeout());
} catch (InterruptedException | ExecutionException | TransactionCommitFailedException | ReadFailedException e) {
LOG.error("Failed to instantiate topology provider at {}", address, e);
throw new IllegalStateException("Failed to instantiate provider", e);
description "RFC2385 shared secret";
}
}
+
+ leaf rpc-timeout {
+ type int16;
+ default 30;
+ }
}
}
protected static final String DST_IP_PREFIX = NEW_DESTINATION_ADDRESS + IPV4_MASK;
protected static final short DEAD_TIMER = 30;
protected static final short KEEP_ALIVE = 10;
+ protected static final int RPC_TIMEOUT = 4;
protected List<Notification> receivedMsgs;
doReturn(mock(ChannelFuture.class)).when(this.clientListener).close();
this.listenerFactory = (T) ((Class)((ParameterizedType)this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]).newInstance();
- this.manager = new ServerSessionManager(getDataBroker(), TOPO_IID, this.listenerFactory);
+ this.manager = new ServerSessionManager(getDataBroker(), TOPO_IID, this.listenerFactory, RPC_TIMEOUT);
this.neg = new DefaultPCEPSessionNegotiator(mock(Promise.class), this.clientListener, this.manager.getSessionListener(), (short) 1, 5, this.localPrefs);
this.topologyRpcs = new TopologyRPCs(this.manager);
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.opendaylight.protocol.pcep.pcc.mock.spi.MsgBuilderUtil.createLspTlvs;
import com.google.common.base.Optional;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.config.yang.pcep.topology.provider.SessionState;
assertEquals(PCEPErrors.USED_SYMBOLIC_PATH_NAME, PCEPErrors.forValue(errorObject.getType(), errorObject.getValue()));
}
+ @Test
+ public void testPccResponseTimeout() throws InterruptedException, ExecutionException {
+ this.listener.onSessionUp(this.session);
+ Future<RpcResult<AddLspOutput>> addLspResult = this.topologyRpcs.addLsp(createAddLspInput());
+ try {
+ addLspResult.get(2, TimeUnit.SECONDS);
+ fail();
+ } catch (Exception e) {
+ assertTrue(e instanceof TimeoutException);
+ }
+ Thread.sleep(AbstractPCEPSessionTest.RPC_TIMEOUT);
+ RpcResult<AddLspOutput> rpcResult = addLspResult.get();
+ assertNotNull(rpcResult);
+ assertEquals(rpcResult.getResult().getFailure(), FailureType.Unsent);
+ }
+
@Override
protected Open getLocalPref() {
return new OpenBuilder(super.getLocalPref()).setTlvs(new TlvsBuilder().addAugmentation(Tlvs1.class, new Tlvs1Builder().setStateful(new StatefulBuilder()