BGPCEP-710: Create Network Topology Loader
[bgpcep.git] / pcep / topology / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / AbstractTopologySessionListener.java
index 3cbf2426a33fd33d9820a5ea56c2bfcf1f1d8b96..ffacb7caa5d54e545ee2e0e5962dd535434038a5 100755 (executable)
@@ -28,11 +28,8 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeMXBean;
-import org.opendaylight.controller.config.yang.pcep.topology.provider.PeerCapabilities;
-import org.opendaylight.controller.config.yang.pcep.topology.provider.ReplyTime;
-import org.opendaylight.controller.config.yang.pcep.topology.provider.SessionState;
-import org.opendaylight.controller.config.yang.pcep.topology.provider.StatefulMessages;
+import org.opendaylight.bgpcep.pcep.topology.provider.session.stats.SessionStateImpl;
+import org.opendaylight.bgpcep.pcep.topology.provider.session.stats.TopologySessionStats;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -78,8 +75,8 @@ import org.slf4j.LoggerFactory;
  * @param <S> identifier type of requests
  * @param <L> identifier type for LSPs
  */
-public abstract class AbstractTopologySessionListener<S, L> implements TopologySessionListener, ListenerStateRuntimeMXBean {
-    protected static final MessageHeader MESSAGE_HEADER = new MessageHeader() {
+public abstract class AbstractTopologySessionListener<S, L> implements TopologySessionListener, TopologySessionStats {
+    static final MessageHeader MESSAGE_HEADER = new MessageHeader() {
         private final ProtocolVersion version = new ProtocolVersion((short) 1);
 
         @Override
@@ -92,28 +89,28 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
             return this.version;
         }
     };
-    protected static final String MISSING_XML_TAG = "Mandatory XML tags are missing.";
+    static final String MISSING_XML_TAG = "Mandatory XML tags are missing.";
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologySessionListener.class);
     @GuardedBy("this")
-    protected final Map<L, String> lsps = new HashMap<>();
+    final Map<L, String> lsps = new HashMap<>();
+    @GuardedBy("this")
+    final SessionStateImpl listenerState;
     @GuardedBy("this")
     private final Map<S, PCEPRequest> requests = new HashMap<>();
-
     @GuardedBy("this")
     private final Map<String, ReportedLsp> lspData = new HashMap<>();
     private final ServerSessionManager serverSessionManager;
-    @GuardedBy("this")
-    private final SessionListenerState listenerState;
     private InstanceIdentifier<PathComputationClient> pccIdentifier;
     private TopologyNodeState nodeState;
+    @GuardedBy("this")
     private boolean synced = false;
     private PCEPSession session;
     private SyncOptimization syncOptimization;
     private boolean triggeredResyncInProcess;
 
-    protected AbstractTopologySessionListener(final ServerSessionManager serverSessionManager) {
+    AbstractTopologySessionListener(final ServerSessionManager serverSessionManager) {
         this.serverSessionManager = requireNonNull(serverSessionManager);
-        this.listenerState = new SessionListenerState();
+        this.listenerState = new SessionStateImpl(this);
     }
 
     @Override
@@ -128,7 +125,9 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
 
         this.syncOptimization = new SyncOptimization(session);
 
-        final TopologyNodeState state = this.serverSessionManager.takeNodeState(peerAddress, this, isLspDbRetreived());
+        final TopologyNodeState state = this.serverSessionManager.takeNodeState(peerAddress,
+                this, isLspDbRetreived());
+
         // takeNodeState(..) may fail when the server session manager is being restarted due to configuration change
         if (state == null) {
             LOG.error("Unable to fetch topology node state for PCEP session. Closing session {}", session);
@@ -143,9 +142,9 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
             this.onSessionTerminated(session, new PCEPCloseTermination(TerminationReason.UNKNOWN));
             return;
         }
-
         this.session = session;
         this.nodeState = state;
+        this.serverSessionManager.bind(this.nodeState.getNodeId(), this.listenerState);
 
         LOG.trace("Peer {} resolved to topology node {}", peerAddress, state.getNodeId());
 
@@ -162,7 +161,8 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
         final boolean isNodePresent = isLspDbRetreived() && initialNodeState != null;
         if (isNodePresent) {
             loadLspData(initialNodeState, this.lspData, this.lsps, isIncrementalSynchro());
-            pccBuilder.setReportedLsp(initialNodeState.getAugmentation(Node1.class).getPathComputationClient().getReportedLsp());
+            pccBuilder.setReportedLsp(initialNodeState.getAugmentation(Node1.class)
+                    .getPathComputationClient().getReportedLsp());
         }
         writeNode(pccBuilder, state, topologyAugment);
         this.listenerState.init(session);
@@ -181,18 +181,20 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
         Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                LOG.trace("Internal state for session {} updated successfully", AbstractTopologySessionListener.this.session);
+                LOG.trace("Internal state for session {} updated successfully",
+                        AbstractTopologySessionListener.this.session);
             }
 
             @Override
             public void onFailure(final Throwable t) {
-                LOG.error("Failed to update internal state for session {}, terminating it", AbstractTopologySessionListener.this.session, t);
+                LOG.error("Failed to update internal state for session {}, terminating it",
+                        AbstractTopologySessionListener.this.session, t);
                 AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
             }
         }, MoreExecutors.directExecutor());
     }
 
-    protected void updatePccState(final PccSyncState pccSyncState) {
+    void updatePccState(final PccSyncState pccSyncState) {
         if (this.nodeState == null) {
             LOG.info("Server Session Manager is closed.");
             AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
@@ -219,7 +221,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
         }, MoreExecutors.directExecutor());
     }
 
-    protected boolean isTriggeredSyncInProcess() {
+    boolean isTriggeredSyncInProcess() {
         return this.triggeredResyncInProcess;
     }
 
@@ -231,15 +233,20 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
      */
     @GuardedBy("this")
     private synchronized void tearDown(final PCEPSession session) {
+
         requireNonNull(session);
         this.serverSessionManager.releaseNodeState(this.nodeState, session, isLspDbPersisted());
-        this.nodeState = null;
+        if (this.nodeState != null) {
+            this.serverSessionManager.unbind(this.nodeState.getNodeId());
+            this.nodeState = null;
+        }
+
         try {
             if (this.session != null) {
                 this.session.close();
             }
             session.close();
-        } catch (Exception e) {
+        } catch (final Exception e) {
             LOG.error("Session {} cannot be closed.", session, e);
         }
         this.session = null;
@@ -321,7 +328,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
         }
     }
 
-    protected final synchronized PCEPRequest removeRequest(final S id) {
+    final synchronized PCEPRequest removeRequest(final S id) {
         final PCEPRequest ret = this.requests.remove(id);
         if (ret != null) {
             this.listenerState.processRequestStats(ret.getElapsedMillis());
@@ -330,7 +337,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
         return ret;
     }
 
-    protected final synchronized ListenableFuture<OperationResult> sendMessage(final Message message, final S requestId,
+    final synchronized ListenableFuture<OperationResult> sendMessage(final Message message, final S requestId,
             final Metadata metadata) {
         final io.netty.util.concurrent.Future<Void> f = this.session.sendMessage(message);
         this.listenerState.updateStatefulSentMsg(message);
@@ -530,7 +537,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
      */
     protected abstract boolean onMessage(MessageContext ctx, Message message);
 
-    protected final String lookupLspName(final L id) {
+    final String lookupLspName(final L id) {
         requireNonNull(id, "ID parameter null.");
         return this.lsps.get(id);
     }
@@ -542,7 +549,8 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
      * @param id InstanceIdentifier of the node
      * @return null if the node does not exists, or operational data
      */
-    protected final synchronized <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
+    final synchronized <T extends DataObject> ListenableFuture<Optional<T>>
+    readOperationalData(final InstanceIdentifier<T> id) {
         if (this.nodeState == null) {
             return null;
         }
@@ -551,16 +559,17 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
 
     protected abstract Object validateReportedLsp(final Optional<ReportedLsp> rep, final LspId input);
 
-    protected abstract void loadLspData(final Node node, final Map<String, ReportedLsp> lspData, final Map<L, String> lsps, final boolean incrementalSynchro);
+    protected abstract void loadLspData(final Node node, final Map<String, ReportedLsp> lspData,
+            final Map<L, String> lsps, final boolean incrementalSynchro);
 
-    protected final boolean isLspDbPersisted() {
+    final boolean isLspDbPersisted() {
         if (this.syncOptimization != null) {
             return this.syncOptimization.isSyncAvoidanceEnabled();
         }
         return false;
     }
 
-    protected final boolean isLspDbRetreived() {
+    final boolean isLspDbRetreived() {
         if (this.syncOptimization != null) {
             return this.syncOptimization.isDbVersionPresent();
         }
@@ -573,21 +582,21 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
      *
      * @return
      */
-    protected final boolean isIncrementalSynchro() {
+    final boolean isIncrementalSynchro() {
         if (this.syncOptimization != null) {
             return this.syncOptimization.isSyncAvoidanceEnabled() && this.syncOptimization.isDeltaSyncEnabled();
         }
         return false;
     }
 
-    protected final boolean isTriggeredInitialSynchro() {
+    final boolean isTriggeredInitialSynchro() {
         if (this.syncOptimization != null) {
             return this.syncOptimization.isTriggeredInitSyncEnabled();
         }
         return false;
     }
 
-    protected final boolean isTriggeredReSyncEnabled() {
+    final boolean isTriggeredReSyncEnabled() {
         if (this.syncOptimization != null) {
             return this.syncOptimization.isTriggeredReSyncEnabled();
         }
@@ -601,12 +610,8 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
         return false;
     }
 
-    protected synchronized SessionListenerState getSessionListenerState() {
-        return this.listenerState;
-    }
-
     @Override
-    public synchronized Integer getDelegatedLspsCount() {
+    public int getDelegatedLspsCount() {
         return Math.toIntExact(this.lspData.values().stream()
                 .map(ReportedLsp::getPath).filter(Objects::nonNull).filter(pathList -> !pathList.isEmpty())
                 // pick the first path, as delegate status should be same in each path
@@ -618,42 +623,17 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
     }
 
     @Override
-    public Boolean getSynchronized() {
+    public synchronized boolean isSessionSynchronized() {
         return this.synced;
     }
 
-    @Override
-    public StatefulMessages getStatefulMessages() {
-        return this.listenerState.getStatefulMessages();
-    }
-
-    @Override
-    public synchronized ReplyTime getReplyTime() {
-        return this.listenerState.getReplyTime();
-    }
-
-    @Override
-    public synchronized PeerCapabilities getPeerCapabilities() {
-        return this.listenerState.getPeerCapabilities();
-    }
-
     @Override
     public synchronized ListenableFuture<RpcResult<Void>> tearDownSession(final TearDownSessionInput input) {
         close();
         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
     }
 
-    @Override
-    public synchronized SessionState getSessionState() {
-        return this.listenerState.getSessionState(this.session);
-    }
-
-    @Override
-    public synchronized String getPeerId() {
-        return this.session.getPeerPref().getIpAddress();
-    }
-
-    protected static final class MessageContext {
+    static final class MessageContext {
         private final Collection<PCEPRequest> requests = new ArrayList<>();
         private final WriteTransaction trans;