import io.netty.util.concurrent.Future;
import java.net.InetAddress;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.PcepSessionState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs;
*/
Future<Void> sendMessage(Message message);
- void close(TerminationReason reason);
+ /**
+ * Closes PCEP session, cancels all timers, returns to state Idle, sends the Close Message. KeepAlive and DeadTimer
+ * are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP session from
+ * inside the session or from the listener, therefore the parent of this session should be informed.
+ * @param reason The {@link TerminationReason} to be wrapped in a PCEP CLOSE message and sent to the remote peer.
+ * When the reason provided is null, no CLOSE message will be sent.
+ */
+ void close(@Nullable TerminationReason reason);
+
+ /**
+ * Terminate a PCEP session. A CLOSE message with given reason will be sent to remote peer by invoking
+ * {@link #close(TerminationReason)} method.
+ *
+ * It triggers {@link PCEPSessionListener#onSessionTerminated(PCEPSession, PCEPTerminationReason)} after closing.
+ * @param reason The {@link TerminationReason} to be wrapped in a PCEP CLOSE message and sent to the remote peer.
+ * The reason cannot be null.
+ */
+ void terminate(@Nonnull TerminationReason reason);
Tlvs getRemoteTlvs();
void onSessionDown(PCEPSession session, Exception e);
/**
- * Fired when the session is terminated locally. The session has already been closed and transitioned to IDLE state.
+ * Fired when the session is terminated locally or upon a CLOSE message from remote peer is received.
+ * The session has already been closed and transitioned to IDLE state.
* Any outstanding queued messages were not sent. The user should not attempt to make any use of the session.
*
* @param reason the cause why the session went down
*/
package org.opendaylight.protocol.pcep;
-import com.google.common.collect.Maps;
-import java.util.Map;
-
public enum TerminationReason {
UNKNOWN((short) 1), EXP_DEADTIMER((short) 2), MALFORMED_MSG((short) 3), TOO_MANY_UNKNWN_REQS((short) 4), TOO_MANY_UNKNOWN_MSGS((short) 5);
- private short value;
- private static final Map<Short, TerminationReason> VALUE_MAP;
-
- static {
- VALUE_MAP = Maps.newHashMap();
- for (final TerminationReason enumItem : TerminationReason.values()) {
- VALUE_MAP.put(enumItem.value, enumItem);
- }
- }
+ private final short value;
- private TerminationReason(final short value) {
+ TerminationReason(final short value) {
this.value = value;
}
* @return corresponding TerminationReason item
*/
public static TerminationReason forValue(final short valueArg) {
- return VALUE_MAP.get(valueArg);
+ for (final TerminationReason reason : values()) {
+ if (reason.value == valueArg) {
+ return reason;
+ }
+ }
+ return null;
+ }
+
+ public static TerminationReason forValue(final Short valueArg) {
+ if (valueArg == null) {
+ return null;
+ }
+ return forValue(valueArg.shortValue());
}
}
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.protocol.pcep.PCEPCloseTermination;
import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.PcerrMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder;
private int maxUnknownMessages;
// True if the listener should not be notified about events
+ @GuardedBy("this")
private boolean closed = false;
private final Channel channel;
* KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
* starts to execute (the session state will become IDLE), that rescheduling won't occur.
*/
- private void handleKeepaliveTimer() {
+ private void handleKeepaliveTimer() {
final long ct = TICKER.read();
long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
* @param msg to be sent
*/
@Override
- public Future<Void> sendMessage(final Message msg) {
+ public synchronized Future<Void> sendMessage(final Message msg) {
final ChannelFuture f = this.channel.writeAndFlush(msg);
this.lastMessageSentAt = TICKER.read();
this.sessionState.updateLastSentMsg();
@VisibleForTesting
ChannelFuture closeChannel() {
- LOG.info("Closing PCEP session: {}", this);
+ LOG.info("Closing PCEP session channel: {}", this.channel);
return this.channel.close();
}
+ @VisibleForTesting
+ public synchronized boolean isClosed() {
+ return this.closed;
+ }
+
/**
* Closes PCEP session without sending a Close message, as the channel is no longer active.
*/
@Override
- public void close() {
- LOG.info("Closing PCEP session: {}", this);
- closeChannel();
+ public synchronized void close() {
+ close(null);
}
- /**
- * Closes PCEP session, cancels all timers, returns to state Idle, sends the Close Message. KeepAlive and DeadTimer
- * are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP session from
- * inside the session or from the listener, therefore the parent of this session should be informed.
- */
@Override
public synchronized void close(final TerminationReason reason) {
- LOG.info("Closing PCEP session: {}", this);
+ if (isClosed()) {
+ return;
+ }
this.closed = true;
- this.sendMessage(new CloseBuilder().setCCloseMessage(
- new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
- this.close();
+ // only send close message when the reason is provided
+ if (reason != null) {
+ LOG.info("Closing PCEP session with reason {}: {}", reason, this);
+ sendMessage(new CloseBuilder().setCCloseMessage(
+ new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
+ } else {
+ LOG.info("Closing PCEP session: {}", this);
+ }
+ closeChannel();
}
@Override
return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
}
- private synchronized void terminate(final TerminationReason reason) {
- LOG.info("Local PCEP session termination : {}", reason);
+ @Override
+ public synchronized void terminate(final TerminationReason reason) {
+ if (isClosed()) {
+ return;
+ }
+ close(reason);
this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
- this.closed = true;
- this.sendMessage(new CloseBuilder().setCCloseMessage(
- new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
- this.close();
}
public synchronized void endOfInput() {
- if (!this.closed) {
- this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
+ if (!isClosed()) {
+ this.listener.onSessionDown(this, new IOException("End of input detected. Close the session " + this));
this.closed = true;
}
}
* exception is CLOSE message, which needs to be converted into a
* session DOWN event.
*/
- this.close();
+ final CCloseMessage closeMsg = ((CloseMessage) msg).getCCloseMessage();
+ TerminationReason reason = null;
+ if (closeMsg != null && closeMsg.getCClose() != null) {
+ reason = TerminationReason.forValue(closeMsg.getCClose().getReason());
+ }
+ if (reason == null) {
+ reason = TerminationReason.UNKNOWN;
+ }
+ this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
+ close();
} else {
// This message needs to be handled by the user
if (msg instanceof PcerrMessage) {
Assert.assertTrue(this.listener.messages.get(0) instanceof Pcreq);
Assert.assertEquals(2, this.session.getMessages().getReceivedMsgCount().intValue());
+ Assert.assertTrue(this.listener.up);
this.session.handleMessage(new CloseBuilder().build());
Assert.assertEquals(3, this.session.getMessages().getReceivedMsgCount().intValue());
Assert.assertEquals(1, this.listener.messages.size());
Assert.assertTrue(this.channel.isActive());
+ Assert.assertFalse(this.listener.up);
Mockito.verify(this.channel, Mockito.times(1)).close();
this.session.resetStats();
@Override
public void onSessionTerminated(final PCEPSession session, final PCEPTerminationReason cause) {
LOG.debug("Session terminated. Cause : {}", cause.toString());
+ this.up = false;
}
}
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeMXBean;
import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeRegistration;
private SyncOptimization syncOptimization;
private boolean triggeredResyncInProcess;
+ @GuardedBy("this")
private ListenerStateRuntimeRegistration registration;
@GuardedBy("this")
private final SessionListenerState listenerState;
// takeNodeState(..) may fail when the server session manager is being restarted due to configuration change
if (state == null) {
LOG.error("Unable to fetch topology node state for PCEP session. Closing session {}", session);
- this.onSessionDown(session, new RuntimeException("Unable to fetch topology node state for PCEP session with " + session.getRemoteAddress()));
+ session.terminate(TerminationReason.UNKNOWN);
return;
}
if (this.session != null || this.nodeState != null) {
LOG.error("PCEP session is already up. Closing session {}", session);
- this.onSessionDown(session, new IllegalStateException("Session is already up with " + session.getRemoteAddress()));
+ session.terminate(TerminationReason.UNKNOWN);
return;
}
register();
if (this.registration == null) {
LOG.error("PCEP session fails to register. Closing session {}", session);
- this.onSessionDown(session, new RuntimeException("PCEP Session with " + session.getRemoteAddress() + " fails to register."));
+ session.terminate(TerminationReason.UNKNOWN);
return;
}
this.listenerState.init(session);
}
protected void updatePccState(final PccSyncState pccSyncState) {
+ if (this.serverSessionManager.isClosed()) {
+ LOG.debug("Ignore PCC state update for {} as session manager has been closed.", this.session);
+ return;
+ }
final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(pccSyncState).build());
if (pccSyncState != PccSyncState.Synchronized) {
private synchronized void tearDown(final PCEPSession session) {
Preconditions.checkNotNull(session);
this.serverSessionManager.releaseNodeState(this.nodeState, session, isLspDbPersisted());
- this.nodeState = null;
- this.session = null;
- this.syncOptimization = null;
- unregister();
-
- // Clear all requests we know about
- for (final Entry<S, PCEPRequest> e : this.requests.entrySet()) {
- final PCEPRequest r = e.getValue();
- switch (r.getState()) {
- case DONE:
- // Done is done, nothing to do
- LOG.trace("Request {} was done when session went down.", e.getKey());
- break;
- case UNACKED:
- // Peer has not acked: results in failure
- LOG.info("Request {} was incomplete when session went down, failing the instruction", e.getKey());
- r.done(OperationResults.NOACK);
- break;
- case UNSENT:
- // Peer has not been sent to the peer: results in cancellation
- LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
- r.done(OperationResults.UNSENT);
- break;
- default:
- break;
- }
- }
- this.requests.clear();
+ // Do not send CLOSE message here.
+ // * In #onSessionDown(..), that channel is already unavailable, thus we won't be able to send out the message.
+ // * In #OnSessionTerminated(..), a CLOSE message should have already been sent, no need to send again.
+ close(null);
}
@Override
@Override
public final synchronized void onSessionTerminated(final PCEPSession session, final PCEPTerminationReason reason) {
- LOG.info("Session {} terminated by peer with reason {}", session, reason);
+ LOG.info("Session {} terminated with reason {}", session, reason);
tearDown(session);
}
@Override
public final synchronized void onMessage(final PCEPSession session, final Message message) {
- if (this.nodeState == null) {
- LOG.warn("Topology node state is null. Unhandled message {} on session {}", message, session);
+ if (this.serverSessionManager.isClosed()) {
+ // we cannot operate on the topology node when the topology is removed by ServerSessionManager
+ LOG.debug("Ignore message from {} as session manager has been closed.", session);
return;
}
final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
}
@Override
- public void close() {
+ public synchronized void close() {
+ close(TerminationReason.UNKNOWN);
+ }
+
+ /**
+ * Close this session listener. Reset all session related status
+ *
+ * @param reason The {@link TerminationReason} to be wrapped in a PCEP CLOSE message and sent to the remote peer.
+ * When the reason provided is null, no CLOSE message will be sent.
+ */
+ private final synchronized void close(@Nullable final TerminationReason reason) {
unregister();
if (this.session != null) {
- this.session.close(TerminationReason.UNKNOWN);
+ this.session.close(reason);
+ }
+ this.session = null;
+ this.nodeState = null;
+ this.syncOptimization = null;
+
+ // Clear all requests we know about
+ for (final Entry<S, PCEPRequest> e : this.requests.entrySet()) {
+ final PCEPRequest r = e.getValue();
+ switch (r.getState()) {
+ case DONE:
+ // Done is done, nothing to do
+ LOG.trace("Request {} was done when session went down.", e.getKey());
+ break;
+ case UNACKED:
+ // Peer has not acked: results in failure
+ LOG.info("Request {} was incomplete when session went down, failing the instruction", e.getKey());
+ r.done(OperationResults.NOACK);
+ break;
+ case UNSENT:
+ // Peer has not been sent to the peer: results in cancellation
+ LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
+ r.done(OperationResults.UNSENT);
+ break;
+ default:
+ break;
+ }
}
+ this.requests.clear();
+ this.listenerState.destroy();
}
private final synchronized void unregister() {
if (this.registration != null) {
this.registration.close();
- LOG.trace("PCEP session {} is unregistered successfully.", this.session);
+ LOG.debug("PCEP session {} is unregistered successfully.", this.session);
this.registration = null;
} else {
- LOG.trace("PCEP session {} was not registered.", this.session);
+ LOG.debug("PCEP session {} was not registered.", this.session);
}
}
final PCEPTopologyProviderRuntimeRegistration runtimeReg = this.serverSessionManager.getRuntimeRootRegistration();
if (runtimeReg != null) {
this.registration = runtimeReg.register(this);
- LOG.trace("PCEP session {} is successfully registered.", this.session);
+ LOG.debug("PCEP session {} is successfully registered.", this.session);
}
}
private final InstanceIdentifier<Topology> topology;
private final DataBroker broker;
private final PCEPStatefulPeerProposal peerProposal;
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private final AtomicBoolean isClosed = new AtomicBoolean(true);
private final short rpcTimeout;
private final AtomicReference<PCEPTopologyProviderRuntimeRegistration> runtimeRootRegistration = new AtomicReference<>();
@Override
public void onSuccess(final Void result) {
LOG.debug("PCEP Topology {} created successfully.", topologyId.getValue());
+ ServerSessionManager.this.isClosed.set(false);
}
@Override
return new NodeId("pcc://" + addr.getHostAddress());
}
+ boolean isClosed() {
+ return this.isClosed.get();
+ }
+
synchronized void releaseNodeState(final TopologyNodeState nodeState, final PCEPSession session, final boolean persistNode) {
- this.nodes.remove(createNodeId(session.getRemoteAddress()));
+ final NodeId id = createNodeId(session.getRemoteAddress());
+ this.nodes.remove(id);
+ if (isClosed()) {
+ // Since the whole pcep topology is going to be removed by ServerSessionManager, we do not need to remove each single
+ // node separately. Besides, it could cause Optismic Lock on DataStore when operating on the same topology
+ LOG.trace("Server Session Manager is closed. No need to release topology node {}", id);
+ return;
+ }
if (nodeState != null) {
LOG.debug("Node {} unbound", nodeState.getNodeId());
nodeState.released(persistNode);
synchronized TopologyNodeState takeNodeState(final InetAddress address, final TopologySessionListener sessionListener, final boolean retrieveNode) {
final NodeId id = createNodeId(address);
- if (this.isClosed.get()) {
+ if (isClosed()) {
LOG.error("Server Session Manager is closed. Unable to create topology node {} with listener {}", id, sessionListener);
return null;
}
LOG.debug("Created topology node {} for id {} at {}", ret, id, ret.getNodeId());
this.state.put(id, ret);
}
- // FIXME: else check for conflicting session
+ final TopologySessionListener conflictingSessionListener = this.nodes.get(id);
+ if (conflictingSessionListener != null && !sessionListener.equals(conflictingSessionListener)) {
+ LOG.error("Existing session {} is conflict with new session {} on node {}, closing the existing one.", conflictingSessionListener, sessionListener, id);
+ conflictingSessionListener.close();
+ }
ret.taken(retrieveNode);
this.nodes.put(id, sessionListener);
LOG.debug("Node {} bound to listener {}", id, sessionListener);
return (l != null) ? l.triggerSync(input) : OperationResults.UNSENT.future();
}
- synchronized ListenableFuture<Void> closeServiceInstance() {
+ synchronized CheckedFuture<Void, TransactionCommitFailedException> closeServiceInstance() {
if (this.isClosed.getAndSet(true)) {
LOG.error("Session Manager has already been closed.");
Futures.immediateFuture(null);
return future;
}
+ PCEPTopologyProviderRuntimeRegistration getRuntimeRootRegistration() {
+ return this.runtimeRootRegistration.get();
+ }
+
synchronized void setRuntimeRootRegistrator(final PCEPTopologyProviderRuntimeRegistrator runtimeRootRegistrator) {
if (!this.runtimeRootRegistration.compareAndSet(null, runtimeRootRegistrator.register(this))) {
LOG.error("Runtime root registration has been set before.");
}
}
- PCEPTopologyProviderRuntimeRegistration getRuntimeRootRegistration() {
- return this.runtimeRootRegistration.get();
- }
-
+ @Override
public void setPeerSpecificProposal(final InetSocketAddress address, final TlvsBuilder openBuilder) {
Preconditions.checkNotNull(address);
this.peerProposal.setPeerProposal(createNodeId(address.getAddress()), openBuilder);
this.capa = new PeerCapabilities();
}
+ /**
+ * Initialize the listenerState with a PCEP session
+ * @param session PCEP session that is bind to this listenerState
+ */
public void init(final PCEPSession session) {
Preconditions.checkNotNull(session);
this.localPref = getLocalPref(session.getLocalPref());
this.sessionUpDuration.start();
}
+ /**
+ * Reset this listenerState instance to it's initial state
+ * This isn't necessary in most case, as a PCEP session listener
+ * is tightly bind to a particular PCEP session
+ * However, it is required in unit test. Because we are reusing
+ * PCEP session listener in a single test, the listenerState
+ * will not be killed when the session is down. So if later
+ * we want to {@link #init(PCEPSession)} a session with the same session listener
+ * again, an exception saying "stopWatch cannot be started again"
+ * will be thrown, and all the statistical counter won't be correct
+ */
+ public void destroy() {
+ this.localPref = null;
+ this.peerPref = null;
+ this.sessionUpDuration.reset();
+ this.capa = new PeerCapabilities();
+ resetStats();
+ }
+
public void processRequestStats(final long duration) {
if (this.minReplyTime == 0) {
this.minReplyTime = duration;
return msgs;
}
- public void resetStats(final PCEPSession session) {
- Preconditions.checkNotNull(session);
+ /**
+ * Reset statistic counters
+ */
+ private void resetStats() {
this.receivedRptMsgCount = 0;
this.sentInitMsgCount = 0;
this.sentUpdMsgCount = 0;
this.minReplyTime = 0;
this.totalTime = 0;
this.reqCount = 0;
+ }
+
+ public void resetStats(final PCEPSession session) {
+ Preconditions.checkNotNull(session);
+ resetStats();
session.resetStats();
}
package org.opendaylight.bgpcep.pcep.topology.provider;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mock;
import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeRegistrator;
import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiator;
+import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
import org.opendaylight.protocol.util.InetSocketAddressUtil;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpPrefix;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Prefix;
@Mock
ListenerStateRuntimeRegistration listenerReg;
+ @Mock
+ private PCEPTopologyProviderRuntimeRegistrator registrator;
+
private final Open localPrefs = new OpenBuilder().setDeadTimer((short) 30).setKeepalive((short) 10)
.setSessionId((short) 0).build();
ServerSessionManager manager;
+ private PCEPSessionListener sessionListener;
+
NetworkTopologyPcepService topologyRpcs;
private DefaultPCEPSessionNegotiator neg;
doReturn(mock(ChannelFuture.class)).when(this.clientListener).close();
doNothing().when(this.listenerReg).close();
+ doReturn("listenerReg").when(this.listenerReg).toString();
final PCEPTopologyProviderRuntimeRegistration topologyReg = mock(PCEPTopologyProviderRuntimeRegistration.class);
doReturn(this.listenerReg).when(topologyReg).register(any(ListenerStateRuntimeMXBean.class));
doNothing().when(topologyReg).close();
- final PCEPTopologyProviderRuntimeRegistrator registrator = mock(PCEPTopologyProviderRuntimeRegistrator.class);
- doReturn(topologyReg).when(registrator).register(any(PCEPTopologyProviderRuntimeMXBean.class));
+ doReturn(topologyReg).when(this.registrator).register(any(PCEPTopologyProviderRuntimeMXBean.class));
final T listenerFactory = (T) ((Class) ((ParameterizedType) this.getClass().getGenericSuperclass())
.getActualTypeArguments()[0]).newInstance();
this.manager = new ServerSessionManager(getDataBroker(), TOPO_IID, listenerFactory, RPC_TIMEOUT);
- this.manager.setRuntimeRootRegistrator(registrator);
- this.manager.instantiateServiceInstance().checkedGet();
+ startSessionManager();
+ this.sessionListener = this.manager.getSessionListener();
this.neg = new DefaultPCEPSessionNegotiator(mock(Promise.class), this.clientListener,
- this.manager.getSessionListener(), (short) 1, 5, this.localPrefs);
+ this.sessionListener, (short) 1, 5, this.localPrefs);
this.topologyRpcs = new TopologyRPCs(this.manager);
}
@After
public void tearDown() throws TransactionCommitFailedException {
- this.manager.closeServiceInstance();
+ stopSessionManager();
+ }
+
+ protected void startSessionManager() throws TransactionCommitFailedException, InterruptedException {
+ this.manager.setRuntimeRootRegistrator(this.registrator);
+ final CheckedFuture<Void, TransactionCommitFailedException> future = this.manager.instantiateServiceInstance();
+ final CountDownLatch lock = new CountDownLatch(1);
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable final Void aVoid) {
+ lock.countDown();
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ // the test cannot continue
+ fail();
+ }
+ });
+ future.checkedGet();
+ lock.await(5000, TimeUnit.MILLISECONDS);
+ assertFalse(this.manager.isClosed());
+ }
+
+ protected void stopSessionManager() throws TransactionCommitFailedException {
+ this.manager.closeServiceInstance().checkedGet();
}
Ero createEroWithIpPrefixes(final List<String> ipPrefixes) {
}
protected PCEPSessionListener getSessionListener() {
- return this.manager.getSessionListener();
+ return this.sessionListener;
}
- protected final PCEPSession getPCEPSession(final Open localOpen, final Open remoteOpen) {
+ protected final PCEPSessionImpl getPCEPSession(final Open localOpen, final Open remoteOpen) {
return this.neg.createSession(this.clientListener, localOpen, remoteOpen);
}
}
import org.opendaylight.controller.config.yang.pcep.topology.provider.SessionState;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.protocol.pcep.PCEPCloseTermination;
-import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.TerminationReason;
+import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
import org.opendaylight.protocol.pcep.pcc.mock.spi.MsgBuilderUtil;
import org.opendaylight.protocol.pcep.spi.AbstractMessageParser;
import org.opendaylight.protocol.pcep.spi.PCEPErrors;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.pcupd.message.pcupd.message.Updates;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.stateful.capability.tlv.StatefulBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.symbolic.path.name.tlv.SymbolicPathNameBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Close;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.endpoints.address.family.Ipv4CaseBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.endpoints.address.family.ipv4._case.Ipv4Builder;
private Stateful07TopologySessionListener listener;
- private PCEPSession session;
+ private PCEPSessionImpl session;
@Override
@Before
verify(this.listenerReg, times(0)).close();
// send request
final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
+ assertFalse(this.session.isClosed());
+ // .onSessionDown() invokes tearDown(session), which invokes session.close(null)
this.listener.onSessionDown(this.session, new IllegalArgumentException());
+ assertTrue(this.session.isClosed());
verify(this.listenerReg, times(1)).close();
final AddLspOutput output = futureOutput.get().getResult();
// deal with unsent request after session down
* All the pcep session registration should be closed when the session manager is closed
* @throws InterruptedException
* @throws ExecutionException
- * @throws TransactionCommitFailedException
*/
@Test
public void testOnServerSessionManagerDown() throws InterruptedException, ExecutionException,
- TransactionCommitFailedException {
+ TransactionCommitFailedException {
this.listener.onSessionUp(this.session);
+ // the session should not be closed when session manager is up
+ assertFalse(this.session.isClosed());
verify(this.listenerReg, times(0)).close();
// send request
final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
- this.manager.closeServiceInstance();
+ stopSessionManager();
verify(this.listenerReg, times(1)).close();
final AddLspOutput output = futureOutput.get().getResult();
// deal with unsent request after session down
assertEquals(FailureType.Unsent, output.getFailure());
+ // verify the session is closed after server session manager is closed
+ assertTrue(this.session.isClosed());
}
/**
* Verify the PCEP session should not be up when server session manager is down,
* otherwise it would be a problem when the session is up while it's not registered with session manager
- * @throws InterruptedException
- * @throws ExecutionException
- * @throws TransactionCommitFailedException
*/
@Test
public void testOnServerSessionManagerUnstarted() throws InterruptedException, ExecutionException,
- TransactionCommitFailedException, ReadFailedException {
- this.manager.closeServiceInstance();
+ TransactionCommitFailedException, ReadFailedException {
+ stopSessionManager();
// the registration should not be closed since it's never initialized
verify(this.listenerReg, times(0)).close();
+ assertFalse(this.session.isClosed());
this.listener.onSessionUp(this.session);
// verify the session was NOT added to topology
checkNotPresentOperational(getDataBroker(), TOPO_IID);
// still, the session should not be registered and thus close() is never called
verify(this.listenerReg, times(0)).close();
+ // verify the session is closed due to server session manager is closed
+ assertTrue(this.session.isClosed());
// send request
final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
final AddLspOutput output = futureOutput.get().getResult();
return topology;
});
+ assertFalse(this.session.isClosed());
+ this.session.terminate(TerminationReason.UNKNOWN);
+ assertTrue(this.session.isClosed());
+ verify(this.listenerReg, times(1)).close();
// node should be removed after termination
- this.listener.onSessionTerminated(this.session, new PCEPCloseTermination(TerminationReason.UNKNOWN));
+ checkNotPresentOperational(getDataBroker(), this.pathComputationClientIId);
+ assertFalse(this.receivedMsgs.isEmpty());
+ // the last message should be a Close message
+ assertTrue(this.receivedMsgs.get(this.receivedMsgs.size() - 1) instanceof Close);
+ }
+
+ /**
+ * When a session is somehow duplicated in controller, the controller should drop existing session
+ */
+ @Test
+ public void testDuplicatedSession() throws ReadFailedException {
+ this.listener.onSessionUp(this.session);
+ verify(this.listenerReg, times(0)).close();
+
+ // create node
+ this.topologyRpcs.addLsp(createAddLspInput());
+ final Pcinitiate pcinitiate = (Pcinitiate) this.receivedMsgs.get(0);
+ final Requests req = pcinitiate.getPcinitiateMessage().getRequests().get(0);
+ final long srpId = req.getSrp().getOperationId().getValue();
+ final Tlvs tlvs = createLspTlvs(req.getLsp().getPlspId().getValue(), true,
+ this.testAddress, this.testAddress, this.testAddress, Optional.absent());
+ final Pcrpt pcRpt = MsgBuilderUtil.createPcRtpMessage(new LspBuilder(req.getLsp()).setTlvs(tlvs).setSync(true)
+ .setRemove(false).setOperational(OperationalStatus.Active).build(),
+ Optional.of(MsgBuilderUtil.createSrp(srpId)), MsgBuilderUtil.createPath(req.getEro().getSubobject()));
+ this.listener.onMessage(this.session, pcRpt);
+ readDataOperational(getDataBroker(), TOPO_IID, topology -> {
+ assertEquals(1, topology.getNode().size());
+ return topology;
+ });
+
+ // now we do session up again
+ this.listener.onSessionUp(this.session);
+ assertTrue(this.session.isClosed());
verify(this.listenerReg, times(1)).close();
+ // node should be removed after termination
+ checkNotPresentOperational(getDataBroker(), this.pathComputationClientIId);
+ }
+
+ @Test
+ public void testOnServerSessionManagerRestartAndSessionRecovery() throws Exception {
+ // close server session manager first
+ stopSessionManager();
+ // the registration should not be closed since it's never initialized
+ verify(this.listenerReg, times(0)).close();
+ assertFalse(this.session.isClosed());
+ this.listener.onSessionUp(this.session);
+ // verify the session was NOT added to topology
+ checkNotPresentOperational(getDataBroker(), TOPO_IID);
+ // still, the session should not be registered and thus close() is never called
+ verify(this.listenerReg, times(0)).close();
+ // verify the session is closed due to server session manager is closed
+ assertTrue(this.session.isClosed());
+ // send request
+ final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
+ final AddLspOutput output = futureOutput.get().getResult();
+ // deal with unsent request after session down
+ assertEquals(FailureType.Unsent, output.getFailure());
+ // PCC client is not there
checkNotPresentOperational(getDataBroker(), this.pathComputationClientIId);
+
+ // reset received message queue
+ this.receivedMsgs.clear();
+ // now we restart the session manager
+ startSessionManager();
+ // try to start the session again
+ // notice since the session was terminated before, it is not usable anymore.
+ // we need to get a new session instance. the new session will have the same local / remote preference
+ this.session = getPCEPSession(getLocalPref(), getRemotePref());
+ verify(this.listenerReg, times(0)).close();
+ assertFalse(this.session.isClosed());
+ this.listener.onSessionUp(this.session);
+ assertFalse(this.session.isClosed());
+
+ // create node
+ this.topologyRpcs.addLsp(createAddLspInput());
+ final Pcinitiate pcinitiate = (Pcinitiate) this.receivedMsgs.get(0);
+ final Requests req = pcinitiate.getPcinitiateMessage().getRequests().get(0);
+ final long srpId = req.getSrp().getOperationId().getValue();
+ final Tlvs tlvs = createLspTlvs(req.getLsp().getPlspId().getValue(), true,
+ this.testAddress, this.testAddress, this.testAddress, Optional.absent());
+ final Pcrpt pcRpt = MsgBuilderUtil.createPcRtpMessage(new LspBuilder(req.getLsp()).setTlvs(tlvs).setSync(true)
+ .setRemove(false).setOperational(OperationalStatus.Active).build(),
+ Optional.of(MsgBuilderUtil.createSrp(srpId)), MsgBuilderUtil.createPath(req.getEro().getSubobject()));
+ this.listener.onMessage(this.session, pcRpt);
+ readDataOperational(getDataBroker(), TOPO_IID, topology -> {
+ assertEquals(1, topology.getNode().size());
+ return topology;
+ });
}
@Test