import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeMXBean;
-import org.opendaylight.controller.config.yang.pcep.topology.provider.PeerCapabilities;
-import org.opendaylight.controller.config.yang.pcep.topology.provider.ReplyTime;
-import org.opendaylight.controller.config.yang.pcep.topology.provider.SessionState;
-import org.opendaylight.controller.config.yang.pcep.topology.provider.StatefulMessages;
+import org.opendaylight.bgpcep.pcep.topology.provider.session.stats.SessionStateImpl;
+import org.opendaylight.bgpcep.pcep.topology.provider.session.stats.TopologySessionStats;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
* @param <S> identifier type of requests
* @param <L> identifier type for LSPs
*/
-public abstract class AbstractTopologySessionListener<S, L> implements TopologySessionListener, ListenerStateRuntimeMXBean {
- protected static final MessageHeader MESSAGE_HEADER = new MessageHeader() {
+public abstract class AbstractTopologySessionListener<S, L> implements TopologySessionListener, TopologySessionStats {
+ static final MessageHeader MESSAGE_HEADER = new MessageHeader() {
private final ProtocolVersion version = new ProtocolVersion((short) 1);
@Override
return this.version;
}
};
- protected static final String MISSING_XML_TAG = "Mandatory XML tags are missing.";
+ static final String MISSING_XML_TAG = "Mandatory XML tags are missing.";
private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologySessionListener.class);
@GuardedBy("this")
- protected final Map<L, String> lsps = new HashMap<>();
+ final Map<L, String> lsps = new HashMap<>();
+ @GuardedBy("this")
+ final SessionStateImpl listenerState;
@GuardedBy("this")
private final Map<S, PCEPRequest> requests = new HashMap<>();
-
@GuardedBy("this")
private final Map<String, ReportedLsp> lspData = new HashMap<>();
private final ServerSessionManager serverSessionManager;
- @GuardedBy("this")
- private final SessionListenerState listenerState;
private InstanceIdentifier<PathComputationClient> pccIdentifier;
private TopologyNodeState nodeState;
+ @GuardedBy("this")
private boolean synced = false;
private PCEPSession session;
private SyncOptimization syncOptimization;
private boolean triggeredResyncInProcess;
- protected AbstractTopologySessionListener(final ServerSessionManager serverSessionManager) {
+ AbstractTopologySessionListener(final ServerSessionManager serverSessionManager) {
this.serverSessionManager = requireNonNull(serverSessionManager);
- this.listenerState = new SessionListenerState();
+ this.listenerState = new SessionStateImpl(this);
}
@Override
this.syncOptimization = new SyncOptimization(session);
- final TopologyNodeState state = this.serverSessionManager.takeNodeState(peerAddress, this, isLspDbRetreived());
+ final TopologyNodeState state = this.serverSessionManager.takeNodeState(peerAddress,
+ this, isLspDbRetreived());
+
// takeNodeState(..) may fail when the server session manager is being restarted due to configuration change
if (state == null) {
LOG.error("Unable to fetch topology node state for PCEP session. Closing session {}", session);
this.onSessionTerminated(session, new PCEPCloseTermination(TerminationReason.UNKNOWN));
return;
}
-
this.session = session;
this.nodeState = state;
+ this.serverSessionManager.bind(this.nodeState.getNodeId(), this.listenerState);
LOG.trace("Peer {} resolved to topology node {}", peerAddress, state.getNodeId());
final boolean isNodePresent = isLspDbRetreived() && initialNodeState != null;
if (isNodePresent) {
loadLspData(initialNodeState, this.lspData, this.lsps, isIncrementalSynchro());
- pccBuilder.setReportedLsp(initialNodeState.getAugmentation(Node1.class).getPathComputationClient().getReportedLsp());
+ pccBuilder.setReportedLsp(initialNodeState.getAugmentation(Node1.class)
+ .getPathComputationClient().getReportedLsp());
}
writeNode(pccBuilder, state, topologyAugment);
this.listenerState.init(session);
Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- LOG.trace("Internal state for session {} updated successfully", AbstractTopologySessionListener.this.session);
+ LOG.trace("Internal state for session {} updated successfully",
+ AbstractTopologySessionListener.this.session);
}
@Override
public void onFailure(final Throwable t) {
- LOG.error("Failed to update internal state for session {}, terminating it", AbstractTopologySessionListener.this.session, t);
+ LOG.error("Failed to update internal state for session {}, terminating it",
+ AbstractTopologySessionListener.this.session, t);
AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
}
}, MoreExecutors.directExecutor());
}
- protected void updatePccState(final PccSyncState pccSyncState) {
+ void updatePccState(final PccSyncState pccSyncState) {
if (this.nodeState == null) {
LOG.info("Server Session Manager is closed.");
AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
}, MoreExecutors.directExecutor());
}
- protected boolean isTriggeredSyncInProcess() {
+ boolean isTriggeredSyncInProcess() {
return this.triggeredResyncInProcess;
}
*/
@GuardedBy("this")
private synchronized void tearDown(final PCEPSession session) {
+
requireNonNull(session);
this.serverSessionManager.releaseNodeState(this.nodeState, session, isLspDbPersisted());
- this.nodeState = null;
+ if (this.nodeState != null) {
+ this.serverSessionManager.unbind(this.nodeState.getNodeId());
+ this.nodeState = null;
+ }
+
try {
if (this.session != null) {
this.session.close();
}
session.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Session {} cannot be closed.", session, e);
}
this.session = null;
}
}
- protected final synchronized PCEPRequest removeRequest(final S id) {
+ final synchronized PCEPRequest removeRequest(final S id) {
final PCEPRequest ret = this.requests.remove(id);
if (ret != null) {
this.listenerState.processRequestStats(ret.getElapsedMillis());
return ret;
}
- protected final synchronized ListenableFuture<OperationResult> sendMessage(final Message message, final S requestId,
+ final synchronized ListenableFuture<OperationResult> sendMessage(final Message message, final S requestId,
final Metadata metadata) {
final io.netty.util.concurrent.Future<Void> f = this.session.sendMessage(message);
this.listenerState.updateStatefulSentMsg(message);
*/
protected abstract boolean onMessage(MessageContext ctx, Message message);
- protected final String lookupLspName(final L id) {
+ final String lookupLspName(final L id) {
requireNonNull(id, "ID parameter null.");
return this.lsps.get(id);
}
* @param id InstanceIdentifier of the node
* @return null if the node does not exists, or operational data
*/
- protected final synchronized <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
+ final synchronized <T extends DataObject> ListenableFuture<Optional<T>>
+ readOperationalData(final InstanceIdentifier<T> id) {
if (this.nodeState == null) {
return null;
}
protected abstract Object validateReportedLsp(final Optional<ReportedLsp> rep, final LspId input);
- protected abstract void loadLspData(final Node node, final Map<String, ReportedLsp> lspData, final Map<L, String> lsps, final boolean incrementalSynchro);
+ protected abstract void loadLspData(final Node node, final Map<String, ReportedLsp> lspData,
+ final Map<L, String> lsps, final boolean incrementalSynchro);
- protected final boolean isLspDbPersisted() {
+ final boolean isLspDbPersisted() {
if (this.syncOptimization != null) {
return this.syncOptimization.isSyncAvoidanceEnabled();
}
return false;
}
- protected final boolean isLspDbRetreived() {
+ final boolean isLspDbRetreived() {
if (this.syncOptimization != null) {
return this.syncOptimization.isDbVersionPresent();
}
*
* @return
*/
- protected final boolean isIncrementalSynchro() {
+ final boolean isIncrementalSynchro() {
if (this.syncOptimization != null) {
return this.syncOptimization.isSyncAvoidanceEnabled() && this.syncOptimization.isDeltaSyncEnabled();
}
return false;
}
- protected final boolean isTriggeredInitialSynchro() {
+ final boolean isTriggeredInitialSynchro() {
if (this.syncOptimization != null) {
return this.syncOptimization.isTriggeredInitSyncEnabled();
}
return false;
}
- protected final boolean isTriggeredReSyncEnabled() {
+ final boolean isTriggeredReSyncEnabled() {
if (this.syncOptimization != null) {
return this.syncOptimization.isTriggeredReSyncEnabled();
}
return false;
}
- protected synchronized SessionListenerState getSessionListenerState() {
- return this.listenerState;
- }
-
@Override
- public synchronized Integer getDelegatedLspsCount() {
+ public int getDelegatedLspsCount() {
return Math.toIntExact(this.lspData.values().stream()
.map(ReportedLsp::getPath).filter(Objects::nonNull).filter(pathList -> !pathList.isEmpty())
// pick the first path, as delegate status should be same in each path
}
@Override
- public Boolean getSynchronized() {
+ public synchronized boolean isSessionSynchronized() {
return this.synced;
}
- @Override
- public StatefulMessages getStatefulMessages() {
- return this.listenerState.getStatefulMessages();
- }
-
- @Override
- public synchronized ReplyTime getReplyTime() {
- return this.listenerState.getReplyTime();
- }
-
- @Override
- public synchronized PeerCapabilities getPeerCapabilities() {
- return this.listenerState.getPeerCapabilities();
- }
-
@Override
public synchronized ListenableFuture<RpcResult<Void>> tearDownSession(final TearDownSessionInput input) {
close();
return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
}
- @Override
- public synchronized SessionState getSessionState() {
- return this.listenerState.getSessionState(this.session);
- }
-
- @Override
- public synchronized String getPeerId() {
- return this.session.getPeerPref().getIpAddress();
- }
-
- protected static final class MessageContext {
+ static final class MessageContext {
private final Collection<PCEPRequest> requests = new ArrayList<>();
private final WriteTransaction trans;