import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.protocol.pcep.PCEPCloseTermination;
import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
private int maxUnknownMessages;
// True if the listener should not be notified about events
+ @GuardedBy("this")
private boolean closed = false;
private final Channel channel;
@VisibleForTesting
ChannelFuture closeChannel() {
- LOG.info("Closing PCEP session: {}", this);
+ LOG.info("Closing PCEP session channel: {}", this.channel);
return this.channel.close();
}
+ @VisibleForTesting
+ public synchronized boolean isClosed() {
+ return this.closed;
+ }
+
/**
* Closes PCEP session without sending a Close message, as the channel is no longer active.
*/
@Override
- public void close() {
- LOG.info("Closing PCEP session: {}", this);
- closeChannel();
+ public synchronized void close() {
+ close(null);
}
/**
*/
@Override
public synchronized void close(final TerminationReason reason) {
- LOG.info("Closing PCEP session: {}", this);
+ if (this.closed) {
+ LOG.debug("Session is already closed.");
+ return;
+ }
this.closed = true;
- this.sendMessage(new CloseBuilder().setCCloseMessage(
- new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
- this.close();
+ // only send close message when the reason is provided
+ if (reason != null) {
+ LOG.info("Closing PCEP session with reason {}: {}", reason, this);
+ sendMessage(new CloseBuilder().setCCloseMessage(
+ new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
+ } else {
+ LOG.info("Closing PCEP session: {}", this);
+ }
+ closeChannel();
}
@Override
return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
}
- private synchronized void closeWithoutMessage(final TerminationReason reason) {
- LOG.info("Closing PCEP session without sending msg: {}", reason);
- this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
- this.closed = true;
- this.close();
- }
-
private synchronized void terminate(final TerminationReason reason) {
- LOG.info("Local PCEP session termination : {}", reason);
+ if (this.closed) {
+ LOG.debug("Session {} is already closed.", this);
+ return;
+ }
+ close(reason);
this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
- this.closed = true;
- this.sendMessage(new CloseBuilder().setCCloseMessage(
- new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
- this.close();
}
public synchronized void endOfInput() {
* @param msg incoming message
*/
public synchronized void handleMessage(final Message msg) {
+ if (this.closed) {
+ LOG.debug("PCEP Session {} is already closed, skip handling incoming message {}", this, msg);
+ return;
+ }
// Update last reception time
this.lastMessageReceivedAt = TICKER.read();
this.sessionState.updateLastReceivedMsg();
* exception is CLOSE message, which needs to be converted into a
* session DOWN event.
*/
- this.closeWithoutMessage(TerminationReason.forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason()));
+ close();
+ this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason.forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason())));
} else {
// This message needs to be handled by the user
if (msg instanceof PcerrMessage) {
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.protocol.pcep.PCEPCloseTermination;
import org.opendaylight.protocol.pcep.PCEPSession;
-import org.opendaylight.protocol.pcep.PCEPSessionListener;
import org.opendaylight.protocol.pcep.PCEPTerminationReason;
import org.opendaylight.protocol.pcep.TerminationReason;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddressBuilder;
* @param <S> identifier type of requests
* @param <L> identifier type for LSPs
*/
-public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessionListener, TopologySessionListener, ListenerStateRuntimeMXBean {
+public abstract class AbstractTopologySessionListener<S, L> implements TopologySessionListener, ListenerStateRuntimeMXBean {
protected static final class MessageContext {
private final Collection<PCEPRequest> requests = new ArrayList<>();
private final WriteTransaction trans;
// takeNodeState(..) may fail when the server session manager is being restarted due to configuration change
if (state == null) {
LOG.error("Unable to fetch topology node state for PCEP session. Closing session {}", session);
- this.onSessionDown(session, new RuntimeException("Unable to fetch topology node state for PCEP session with " + session.getRemoteAddress()));
+ session.close(TerminationReason.UNKNOWN);
+ this.onSessionTerminated(session, new PCEPCloseTermination(TerminationReason.UNKNOWN));
return;
}
if (this.session != null || this.nodeState != null) {
- LOG.error("PCEP session is already up. Closing session {}", session);
- this.onSessionDown(session, new IllegalStateException("Session is already up with " + session.getRemoteAddress()));
+ LOG.error("PCEP session is already up with {}. Closing session {}", session.getRemoteAddress(), session);
+ session.close(TerminationReason.UNKNOWN);
+ this.onSessionTerminated(session, new PCEPCloseTermination(TerminationReason.UNKNOWN));
return;
}
register();
if (this.registration == null) {
LOG.error("PCEP session fails to register. Closing session {}", session);
- this.onSessionDown(session, new RuntimeException("PCEP Session with " + session.getRemoteAddress() + " fails to register."));
+ session.close(TerminationReason.UNKNOWN);
+ this.onSessionTerminated(session, new PCEPCloseTermination(TerminationReason.UNKNOWN));
return;
}
this.listenerState.init(session);
}
protected void updatePccState(final PccSyncState pccSyncState) {
+ if (this.nodeState == null) {
+ LOG.info("Server Session Manager is closed.");
+ AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
+ return;
+ }
final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(pccSyncState).build());
if (pccSyncState != PccSyncState.Synchronized) {
Preconditions.checkNotNull(session);
this.serverSessionManager.releaseNodeState(this.nodeState, session, isLspDbPersisted());
this.nodeState = null;
+ try {
+ if (this.session != null) {
+ this.session.close();
+ }
+ session.close();
+ } catch (Exception e) {
+ LOG.error("Session {} cannot be closed.", session, e);
+ }
this.session = null;
this.syncOptimization = null;
unregister();
public final synchronized void onMessage(final PCEPSession session, final Message message) {
if (this.nodeState == null) {
LOG.warn("Topology node state is null. Unhandled message {} on session {}", message, session);
+ session.close(TerminationReason.UNKNOWN);
return;
}
final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeRegistrator;
import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiator;
+import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
import org.opendaylight.protocol.util.InetSocketAddressUtil;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpPrefix;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Prefix;
return this.manager.getSessionListener();
}
- protected final PCEPSession getPCEPSession(final Open localOpen, final Open remoteOpen) {
+ protected final PCEPSessionImpl getPCEPSession(final Open localOpen, final Open remoteOpen) {
return this.neg.createSession(this.clientListener, localOpen, remoteOpen);
}
}
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.protocol.pcep.PCEPCloseTermination;
-import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.TerminationReason;
+import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
import org.opendaylight.protocol.pcep.pcc.mock.spi.MsgBuilderUtil;
import org.opendaylight.protocol.pcep.spi.AbstractMessageParser;
import org.opendaylight.protocol.pcep.spi.PCEPErrors;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.pcupd.message.pcupd.message.Updates;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.stateful.capability.tlv.StatefulBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.symbolic.path.name.tlv.SymbolicPathNameBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Close;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.endpoints.address.family.Ipv4CaseBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.endpoints.address.family.ipv4._case.Ipv4Builder;
private Stateful07TopologySessionListener listener;
- private PCEPSession session;
+ private PCEPSessionImpl session;
@Override
@Before
assertEquals(FailureType.Unsent, output.getFailure());
}
+ /**
+ * When a session is somehow duplicated in controller, the controller should drop existing session
+ */
+ @Test
+ public void testDuplicatedSession() throws ReadFailedException {
+ this.listener.onSessionUp(this.session);
+ verify(this.listenerReg, times(0)).close();
+
+ // create node
+ this.topologyRpcs.addLsp(createAddLspInput());
+ final Pcinitiate pcinitiate = (Pcinitiate) this.receivedMsgs.get(0);
+ final Requests req = pcinitiate.getPcinitiateMessage().getRequests().get(0);
+ final long srpId = req.getSrp().getOperationId().getValue();
+ final Tlvs tlvs = createLspTlvs(req.getLsp().getPlspId().getValue(), true,
+ this.testAddress, this.testAddress, this.testAddress, Optional.absent());
+ final Pcrpt pcRpt = MsgBuilderUtil.createPcRtpMessage(new LspBuilder(req.getLsp()).setTlvs(tlvs).setSync(true)
+ .setRemove(false).setOperational(OperationalStatus.Active).build(),
+ Optional.of(MsgBuilderUtil.createSrp(srpId)), MsgBuilderUtil.createPath(req.getEro().getSubobject()));
+ this.listener.onMessage(this.session, pcRpt);
+ readDataOperational(getDataBroker(), TOPO_IID, topology -> {
+ assertEquals(1, topology.getNode().size());
+ return topology;
+ });
+
+ // now we do session up again
+ this.listener.onSessionUp(this.session);
+ assertTrue(this.session.isClosed());
+ verify(this.listenerReg, times(1)).close();
+ // node should be removed after termination
+ checkNotPresentOperational(getDataBroker(), this.pathComputationClientIId);
+ assertFalse(this.receivedMsgs.isEmpty());
+ // the last message should be a Close message
+ assertTrue(this.receivedMsgs.get(this.receivedMsgs.size() - 1) instanceof Close);
+ }
+
@Test
public void testOnSessionTermination() throws Exception {
this.listener.onSessionUp(this.session);