BUG-6647 Increase code coverage and clean up II
[bgpcep.git] / pcep / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / AbstractTopologySessionListener.java
old mode 100644 (file)
new mode 100755 (executable)
index 2424fec..6cda92f
@@ -20,7 +20,17 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeMXBean;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeRegistration;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.PeerCapabilities;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.ReplyTime;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.SessionState;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.StatefulMessages;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -28,7 +38,7 @@ import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.protocol.pcep.PCEPTerminationReason;
 import org.opendaylight.protocol.pcep.TerminationReason;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddressBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddressBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.MessageHeader;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Object;
@@ -45,6 +55,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.reported.lsp.Path;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -59,7 +70,7 @@ import org.slf4j.LoggerFactory;
  * @param <S> identifier type of requests
  * @param <L> identifier type for LSPs
  */
-public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessionListener, TopologySessionListener {
+public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessionListener, TopologySessionListener, ListenerStateRuntimeMXBean {
     protected static final class MessageContext {
         private final Collection<PCEPRequest> requests = new ArrayList<>();
         private final WriteTransaction trans;
@@ -92,6 +103,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
             return this.version;
         }
     };
+
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologySessionListener.class);
 
     protected static final String MISSING_XML_TAG = "Mandatory XML tags are missing.";
@@ -103,16 +115,22 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
     private final Map<String, ReportedLsp> lspData = new HashMap<>();
 
     @GuardedBy("this")
-    private final Map<L, String> lsps = new HashMap<>();
+    protected final Map<L, String> lsps = new HashMap<>();
 
     private final ServerSessionManager serverSessionManager;
     private InstanceIdentifier<PathComputationClient> pccIdentifier;
     private TopologyNodeState nodeState;
     private boolean synced = false;
     private PCEPSession session;
+    private SyncOptimization syncOptimization;
+    private boolean triggeredResyncInProcess;
+
+    private ListenerStateRuntimeRegistration registration;
+    private final SessionListenerState listenerState;
 
     protected AbstractTopologySessionListener(final ServerSessionManager serverSessionManager) {
         this.serverSessionManager = Preconditions.checkNotNull(serverSessionManager);
+        this.listenerState = new SessionListenerState();
     }
 
     @Override
@@ -125,20 +143,41 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
          */
         final InetAddress peerAddress = session.getRemoteAddress();
 
-        final TopologyNodeState state = this.serverSessionManager.takeNodeState(peerAddress, this);
+        syncOptimization  = new SyncOptimization(session);
+
+        final TopologyNodeState state = this.serverSessionManager.takeNodeState(peerAddress, this, isLspDbRetreived());
+
+        this.session = session;
+        this.nodeState = state;
 
         LOG.trace("Peer {} resolved to topology node {}", peerAddress, state.getNodeId());
-        this.synced = false;
 
         // Our augmentation in the topology node
         final PathComputationClientBuilder pccBuilder = new PathComputationClientBuilder();
-        pccBuilder.setIpAddress(IpAddressBuilder.getDefaultInstance(peerAddress.getHostAddress()));
 
         onSessionUp(session, pccBuilder);
+        this.synced = isSynchronized();
 
-        final Node1 ta = new Node1Builder().setPathComputationClient(pccBuilder.build()).build();
+        pccBuilder.setIpAddress(IpAddressBuilder.getDefaultInstance(peerAddress.getHostAddress()));
         final InstanceIdentifier<Node1> topologyAugment = state.getNodeId().augmentation(Node1.class);
         this.pccIdentifier = topologyAugment.child(PathComputationClient.class);
+        final Node initialNodeState = state.getInitialNodeState();
+        final boolean isNodePresent = isLspDbRetreived() && initialNodeState != null;
+        if (isNodePresent) {
+            loadLspData(initialNodeState, lspData, lsps, isIncrementalSynchro());
+            pccBuilder.setReportedLsp(initialNodeState.getAugmentation(Node1.class).getPathComputationClient().getReportedLsp());
+        }
+        writeNode(pccBuilder, state, topologyAugment);
+        this.listenerState.init(session);
+        if (this.serverSessionManager.getRuntimeRootRegistration().isPresent()) {
+            this.registration = this.serverSessionManager.getRuntimeRootRegistration().get().register(this);
+        }
+        LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), state.getNodeId());
+    }
+
+    private void writeNode(final PathComputationClientBuilder pccBuilder, final TopologyNodeState state,
+            final InstanceIdentifier<Node1> topologyAugment) {
+        final Node1 ta = new Node1Builder().setPathComputationClient(pccBuilder.build()).build();
 
         final ReadWriteTransaction trans = state.rwTransaction();
         trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
@@ -154,20 +193,44 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
             @Override
             public void onFailure(final Throwable t) {
                 LOG.error("Failed to update internal state for session {}, terminating it", session, t);
-                session.close(TerminationReason.Unknown);
+                session.close(TerminationReason.UNKNOWN);
             }
         });
+    }
 
-        this.session = session;
-        this.nodeState = state;
-        LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), state.getNodeId());
+    protected void updatePccState(final PccSyncState pccSyncState) {
+        final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
+        updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(pccSyncState).build());
+        if (pccSyncState != PccSyncState.Synchronized) {
+            this.synced = false;
+            this.triggeredResyncInProcess = true;
+        }
+        // All set, commit the modifications
+        Futures.addCallback(ctx.trans.submit(), new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.trace("Internal state for session {} updated successfully", session);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.error("Failed to update internal state for session {}", session, t);
+                session.close(TerminationReason.UNKNOWN);
+            }
+        });
+    }
+
+    protected boolean isTriggeredSyncInProcess() {
+        return this.triggeredResyncInProcess;
     }
 
     @GuardedBy("this")
     private void tearDown(final PCEPSession session) {
-        this.serverSessionManager.releaseNodeState(this.nodeState, session);
+        this.serverSessionManager.releaseNodeState(this.nodeState, session, isLspDbPersisted());
         this.nodeState = null;
         this.session = null;
+        this.syncOptimization = null;
+        unregister();
 
         // Clear all requests we know about
         for (final Entry<S, PCEPRequest> e : this.requests.entrySet()) {
@@ -186,6 +249,8 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
                 LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
                 r.done(OperationResults.UNSENT);
                 break;
+            default:
+                break;
             }
         }
         this.requests.clear();
@@ -209,6 +274,8 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
 
         if (onMessage(ctx, message)) {
             LOG.info("Unhandled message {} on session {}", message, session);
+            //cancel not supported, submit empty transaction
+            ctx.trans.submit();
             return;
         }
 
@@ -223,20 +290,31 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
             public void onFailure(final Throwable t) {
                 LOG.error("Failed to update internal state for session {}, closing it", session, t);
                 ctx.notifyRequests();
-                session.close(TerminationReason.Unknown);
+                session.close(TerminationReason.UNKNOWN);
             }
         });
     }
 
     @Override
     public void close() {
+        unregister();
         if (this.session != null) {
-            this.session.close(TerminationReason.Unknown);
+            this.session.close(TerminationReason.UNKNOWN);
+        }
+    }
+
+    private synchronized void unregister() {
+        if (this.registration != null) {
+            this.registration.close();
+            this.registration = null;
         }
     }
 
     protected final synchronized PCEPRequest removeRequest(final S id) {
         final PCEPRequest ret = this.requests.remove(id);
+        if (ret != null) {
+            this.listenerState.processRequestStats(ret.getElapsedMillis());
+        }
         LOG.trace("Removed request {} object {}", id, ret);
         return ret;
     }
@@ -244,8 +322,14 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
     protected final synchronized ListenableFuture<OperationResult> sendMessage(final Message message, final S requestId,
         final Metadata metadata) {
         final io.netty.util.concurrent.Future<Void> f = this.session.sendMessage(message);
+        this.listenerState.updateStatefulSentMsg(message);
         final PCEPRequest req = new PCEPRequest(metadata);
         this.requests.put(requestId, req);
+        final int rpcTimeout = serverSessionManager.getRpcTimeout();
+        LOG.trace("RPC response timeout value is {} seconds", rpcTimeout);
+        if (rpcTimeout > 0) {
+            setupTimeoutHandler(requestId, req, rpcTimeout);
+        }
 
         f.addListener(new FutureListener<Void>() {
             @Override
@@ -266,6 +350,21 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         return req.getFuture();
     }
 
+    private void setupTimeoutHandler(final S requestId, final PCEPRequest req, final int timeout) {
+        final Timer timer = req.getTimer();
+        timer.schedule(new TimerTask() {
+            @Override
+            public void run() {
+                synchronized (AbstractTopologySessionListener.this) {
+                    AbstractTopologySessionListener.this.requests.remove(requestId);
+                }
+                req.done();
+                LOG.info("Request {} timed-out waiting for response", requestId);
+            }
+        }, TimeUnit.SECONDS.toMillis(timeout));
+        LOG.trace("Set up response timeout handler for request {}", requestId);
+    }
+
     /**
      * Update an LSP in the data store
      *
@@ -326,17 +425,25 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
     private List<Path> makeBeforeBreak(final ReportedLspBuilder rlb, final ReportedLsp previous, final String name, final boolean remove) {
         // just one path should be reported
         Preconditions.checkState(rlb.getPath().size() == 1);
-        final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rsvp.rev130820.LspId reportedLspId = rlb.getPath().get(0).getLspId();
-        // check previous report for existing paths
-        final List<Path> updatedPaths = new ArrayList<>(previous.getPath());
-        LOG.debug("Found previous paths {} to this lsp name {}", updatedPaths, name);
-        for (final Path path : previous.getPath()) {
-            //we found reported path in previous reports
-            if (path.getLspId().getValue() == 0 || path.getLspId().equals(reportedLspId)) {
-                LOG.debug("Match on lsp-id {}", path.getLspId().getValue() );
-                // path that was reported previously and does have the same lsp-id, path will be updated
-                final boolean r = updatedPaths.remove(path);
-                LOG.trace("Request removed? {}", r);
+        final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rsvp.rev150820.LspId reportedLspId = rlb.getPath().get(0).getLspId();
+        final List<Path> updatedPaths;
+        //lspId = 0 and remove = false -> tunnel is down, still exists but no path is signaled
+        //remove existing tunnel's paths now, as explicit path remove will not come
+        if (!remove && reportedLspId.getValue() == 0) {
+            updatedPaths = new ArrayList<>();
+            LOG.debug("Remove previous paths {} to this lsp name {}", previous.getPath(), name);
+        } else {
+            // check previous report for existing paths
+            updatedPaths = new ArrayList<>(previous.getPath());
+            LOG.debug("Found previous paths {} to this lsp name {}", updatedPaths, name);
+            for (final Path path : previous.getPath()) {
+                //we found reported path in previous reports
+                if (path.getLspId().getValue() == 0 || path.getLspId().equals(reportedLspId)) {
+                    LOG.debug("Match on lsp-id {}", path.getLspId().getValue() );
+                    // path that was reported previously and does have the same lsp-id, path will be updated
+                    final boolean r = updatedPaths.remove(path);
+                    LOG.trace("Request removed? {}", r);
+                }
             }
         }
         // if the path does not exist in previous report, add it to path list, it's a new ERO
@@ -373,13 +480,20 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
 
         // Update synchronization flag
         this.synced = true;
-        ctx.trans.merge(LogicalDatastoreType.OPERATIONAL, this.pccIdentifier, new PathComputationClientBuilder().setStateSync(PccSyncState.Synchronized).build());
+        if(this.triggeredResyncInProcess) {
+            this.triggeredResyncInProcess = false;
+        }
+        updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(PccSyncState.Synchronized).build());
 
         // The node has completed synchronization, cleanup metadata no longer reported back
         this.nodeState.cleanupExcept(this.lsps.values());
         LOG.debug("Session {} achieved synchronized state", this.session);
     }
 
+    protected final synchronized void updatePccNode(final MessageContext ctx, final PathComputationClient pcc) {
+        ctx.trans.merge(LogicalDatastoreType.OPERATIONAL, this.pccIdentifier, pcc);
+    }
+
     protected final InstanceIdentifier<ReportedLsp> lspIdentifier(final String name) {
         return this.pccIdentifier.child(ReportedLsp.class, new ReportedLspKey(name));
     }
@@ -413,9 +527,117 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         return this.lsps.get(id);
     }
 
+    /**
+     * Reads operational data on this node. Doesn't attempt to read the data,
+     * if the node does not exist. In this case returns null.
+     *
+     * @param id InstanceIdentifier of the node
+     * @return null if the node does not exists, or operational data
+     */
     protected final synchronized <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
+        if (this.nodeState == null) {
+            return null;
+        }
         return this.nodeState.readOperationalData(id);
     }
 
     protected abstract Object validateReportedLsp(final Optional<ReportedLsp> rep, final LspId input);
+
+    protected abstract void loadLspData(final Node node, final Map<String, ReportedLsp> lspData, final Map<L, String> lsps, final boolean incrementalSynchro);
+
+    protected final boolean isLspDbPersisted() {
+        if (syncOptimization != null) {
+            return syncOptimization.isSyncAvoidanceEnabled();
+        }
+        return false;
+    }
+
+    protected final boolean isLspDbRetreived() {
+        if (syncOptimization != null) {
+            return syncOptimization.isDbVersionPresent();
+        }
+        return false;
+    }
+
+    /**
+     * Is Incremental synchronization if LSP-DB-VERSION are included,
+     * LSP-DB-VERSION TLV values doesnt match, and  LSP-SYNC-CAPABILITY is enabled
+     * @return
+     */
+    protected final boolean isIncrementalSynchro() {
+        if (syncOptimization != null) {
+            return syncOptimization.isSyncAvoidanceEnabled() && syncOptimization.isDeltaSyncEnabled();
+        }
+        return false;
+    }
+
+    protected final boolean isTriggeredInitialSynchro() {
+        if (syncOptimization != null) {
+            return syncOptimization.isTriggeredInitSyncEnabled();
+        }
+        return false;
+    }
+
+    protected final boolean isTriggeredReSyncEnabled() {
+        if (syncOptimization != null) {
+            return syncOptimization.isTriggeredReSyncEnabled();
+        }
+        return false;
+    }
+
+    protected final boolean isSynchronized() {
+        if (syncOptimization != null) {
+            return syncOptimization.doesLspDbMatch();
+        }
+        return false;
+    }
+
+    protected SessionListenerState getSessionListenerState() {
+        return this.listenerState;
+    }
+
+    @Override
+    public Integer getDelegatedLspsCount() {
+        return this.lsps.size();
+    }
+
+    @Override
+    public Boolean getSynchronized() {
+        return this.synced;
+    }
+
+    @Override
+    public StatefulMessages getStatefulMessages() {
+        return this.listenerState.getStatefulMessages();
+    }
+
+    @Override
+    public synchronized void resetStats() {
+        this.listenerState.resetStats(this.session);
+    }
+
+    @Override
+    public ReplyTime getReplyTime() {
+        return this.listenerState.getReplyTime();
+    }
+
+    @Override
+    public PeerCapabilities getPeerCapabilities() {
+        return this.listenerState.getPeerCapabilities();
+    }
+
+    @Override
+    public void tearDownSession() {
+        this.close();
+    }
+
+    @Override
+    public synchronized SessionState getSessionState() {
+        return this.listenerState.getSessionState(this.session);
+    }
+
+    @Override
+    public synchronized String getPeerId() {
+        return this.session.getPeerPref().getIpAddress();
+    }
 }