*/
package org.opendaylight.bgpcep.pcep.topology.provider;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
private final InstanceIdentifier<Topology> topology;
private final DataBroker broker;
private final PCEPStatefulPeerProposal peerProposal;
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final short rpcTimeout;
private final AtomicReference<PCEPTopologyProviderRuntimeRegistration> runtimeRootRegistration = new AtomicReference<>();
+ @VisibleForTesting
+ public final AtomicBoolean isClosed = new AtomicBoolean(false);
+
public ServerSessionManager(final DataBroker broker, final InstanceIdentifier<Topology> topology,
final TopologySessionListenerFactory listenerFactory, final short rpcTimeout) {
this.broker = Preconditions.checkNotNull(broker);
@Override
public void onSuccess(final Void result) {
LOG.debug("PCEP Topology {} created successfully.", topologyId.getValue());
+ ServerSessionManager.this.isClosed.set(false);
}
@Override
public void onFailure(final Throwable t) {
LOG.error("Failed to create PCEP Topology {}.", topologyId.getValue(), t);
+ ServerSessionManager.this.isClosed.set(true);
}
}, MoreExecutors.directExecutor());
return future;
}
synchronized void releaseNodeState(final TopologyNodeState nodeState, final PCEPSession session, final boolean persistNode) {
+ if (this.isClosed.get()) {
+ LOG.error("Session Manager has already been closed.");
+ return;
+ }
this.nodes.remove(createNodeId(session.getRemoteAddress()));
if (nodeState != null) {
LOG.debug("Node {} unbound", nodeState.getNodeId());
package org.opendaylight.bgpcep.pcep.topology.provider;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mock;
import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeRegistration;
import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeRegistrator;
import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiator;
import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
@Mock
ListenerStateRuntimeRegistration listenerReg;
+ @Mock
+ private PCEPTopologyProviderRuntimeRegistrator registrator;
+
private final Open localPrefs = new OpenBuilder().setDeadTimer((short) 30).setKeepalive((short) 10)
.setSessionId((short) 0).build();
doReturn(mock(ChannelFuture.class)).when(this.clientListener).close();
doNothing().when(this.listenerReg).close();
+ doReturn("listenerReg").when(this.listenerReg).toString();
final PCEPTopologyProviderRuntimeRegistration topologyReg = mock(PCEPTopologyProviderRuntimeRegistration.class);
doReturn(this.listenerReg).when(topologyReg).register(any(ListenerStateRuntimeMXBean.class));
doNothing().when(topologyReg).close();
- final PCEPTopologyProviderRuntimeRegistrator registrator = mock(PCEPTopologyProviderRuntimeRegistrator.class);
- doReturn(topologyReg).when(registrator).register(any(PCEPTopologyProviderRuntimeMXBean.class));
+ doReturn(topologyReg).when(this.registrator).register(any(PCEPTopologyProviderRuntimeMXBean.class));
final T listenerFactory = (T) ((Class) ((ParameterizedType) this.getClass().getGenericSuperclass())
.getActualTypeArguments()[0]).newInstance();
this.manager = new ServerSessionManager(getDataBroker(), TOPO_IID, listenerFactory, RPC_TIMEOUT);
- this.manager.setRuntimeRootRegistrator(registrator);
- this.manager.instantiateServiceInstance().get();
+ startSessionManager();
this.neg = new DefaultPCEPSessionNegotiator(mock(Promise.class), this.clientListener,
this.manager.getSessionListener(), (short) 1, 5, this.localPrefs);
this.topologyRpcs = new TopologyRPCs(this.manager);
}
- @After
- public void tearDown() throws TransactionCommitFailedException {
+ protected void startSessionManager() throws ExecutionException, InterruptedException {
+ this.manager.setRuntimeRootRegistrator(this.registrator);
+ final ListenableFuture<Void> future = this.manager.instantiateServiceInstance();
+ final CountDownLatch lock = new CountDownLatch(1);
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable final Void aVoid) {
+ lock.countDown();
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ // the test cannot continue
+ fail();
+ }
+ }, MoreExecutors.directExecutor());
+ future.get();
+ lock.await(5000, TimeUnit.MILLISECONDS);
+ assertFalse(this.manager.isClosed.get());
+ }
+
+ protected void stopSessionManager() {
this.manager.closeServiceInstance();
}
+ @After
+ public void tearDown() {
+ stopSessionManager();
+ }
+
Ero createEroWithIpPrefixes(final List<String> ipPrefixes) {
final List<Subobject> subobjs = new ArrayList<>(ipPrefixes.size());
final SubobjectBuilder subobjBuilder = new SubobjectBuilder();
import static org.opendaylight.protocol.util.CheckUtil.checkEquals;
import static org.opendaylight.protocol.util.CheckUtil.checkNotPresentOperational;
import static org.opendaylight.protocol.util.CheckUtil.readDataOperational;
-
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import java.net.UnknownHostException;
verify(this.listenerReg, times(0)).close();
// send request
final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
+ assertFalse(this.session.isClosed());
this.listener.onSessionDown(this.session, new IllegalArgumentException());
+ assertTrue(this.session.isClosed());
verify(this.listenerReg, times(1)).close();
final AddLspOutput output = futureOutput.get().getResult();
// deal with unsent request after session down
public void testOnServerSessionManagerDown() throws InterruptedException, ExecutionException,
TransactionCommitFailedException {
this.listener.onSessionUp(this.session);
+ // the session should not be closed when session manager is up
+ assertFalse(this.session.isClosed());
verify(this.listenerReg, times(0)).close();
// send request
final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
- this.manager.closeServiceInstance();
+ stopSessionManager();
verify(this.listenerReg, times(1)).close();
final AddLspOutput output = futureOutput.get().getResult();
// deal with unsent request after session down
assertEquals(FailureType.Unsent, output.getFailure());
+ // verify the session is closed after server session manager is closed
+ assertTrue(this.session.isClosed());
}
/**
@Test
public void testOnServerSessionManagerUnstarted() throws InterruptedException, ExecutionException,
TransactionCommitFailedException, ReadFailedException {
- this.manager.closeServiceInstance();
+ stopSessionManager();
// the registration should not be closed since it's never initialized
verify(this.listenerReg, times(0)).close();
+ assertFalse(this.session.isClosed());
this.listener.onSessionUp(this.session);
// verify the session was NOT added to topology
checkNotPresentOperational(getDataBroker(), TOPO_IID);
// still, the session should not be registered and thus close() is never called
verify(this.listenerReg, times(0)).close();
+ // verify the session is closed due to server session manager is closed
+ assertTrue(this.session.isClosed());
// send request
final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
final AddLspOutput output = futureOutput.get().getResult();
assertEquals(FailureType.Unsent, output.getFailure());
}
+ @Test
+ public void testOnServerSessionManagerRestartAndSessionRecovery() throws Exception {
+ // close server session manager first
+ stopSessionManager();
+ // the registration should not be closed since it's never initialized
+ verify(this.listenerReg, times(0)).close();
+ assertFalse(this.session.isClosed());
+ this.listener.onSessionUp(this.session);
+ // verify the session was NOT added to topology
+ checkNotPresentOperational(getDataBroker(), TOPO_IID);
+ // still, the session should not be registered and thus close() is never called
+ verify(this.listenerReg, times(0)).close();
+ // verify the session is closed due to server session manager is closed
+ assertTrue(this.session.isClosed());
+ // send request
+ final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
+ final AddLspOutput output = futureOutput.get().getResult();
+ // deal with unsent request after session down
+ assertEquals(FailureType.Unsent, output.getFailure());
+ // PCC client is not there
+ checkNotPresentOperational(getDataBroker(), this.pathComputationClientIId);
+
+ // reset received message queue
+ this.receivedMsgs.clear();
+ // now we restart the session manager
+ startSessionManager();
+ // try to start the session again
+ // notice since the session was terminated before, it is not usable anymore.
+ // we need to get a new session instance. the new session will have the same local / remote preference
+ this.session = getPCEPSession(getLocalPref(), getRemotePref());
+ verify(this.listenerReg, times(0)).close();
+ assertFalse(this.session.isClosed());
+ this.listener.onSessionUp(this.session);
+ assertFalse(this.session.isClosed());
+
+ // create node
+ this.topologyRpcs.addLsp(createAddLspInput());
+ final Pcinitiate pcinitiate = (Pcinitiate) this.receivedMsgs.get(0);
+ final Requests req = pcinitiate.getPcinitiateMessage().getRequests().get(0);
+ final long srpId = req.getSrp().getOperationId().getValue();
+ final Tlvs tlvs = createLspTlvs(req.getLsp().getPlspId().getValue(), true,
+ this.testAddress, this.testAddress, this.testAddress, Optional.absent());
+ final Pcrpt pcRpt = MsgBuilderUtil.createPcRtpMessage(new LspBuilder(req.getLsp()).setTlvs(tlvs).setSync(true)
+ .setRemove(false).setOperational(OperationalStatus.Active).build(),
+ Optional.of(MsgBuilderUtil.createSrp(srpId)), MsgBuilderUtil.createPath(req.getEro().getSubobject()));
+ this.listener.onMessage(this.session, pcRpt);
+ readDataOperational(getDataBroker(), TOPO_IID, topology -> {
+ assertEquals(1, topology.getNode().size());
+ return topology;
+ });
+ }
+
/**
* When a session is somehow duplicated in controller, the controller should drop existing session
*/
return topology;
});
+ assertFalse(this.session.isClosed());
// node should be removed after termination
this.listener.onSessionTerminated(this.session, new PCEPCloseTermination(TerminationReason.UNKNOWN));
+ assertTrue(this.session.isClosed());
verify(this.listenerReg, times(1)).close();
checkNotPresentOperational(getDataBroker(), this.pathComputationClientIId);
}