BUG-6647 Increase code coverage and clean up II
[bgpcep.git] / pcep / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / AbstractTopologySessionListener.java
old mode 100644 (file)
new mode 100755 (executable)
index d883d26..6cda92f
@@ -9,34 +9,41 @@ package org.opendaylight.bgpcep.pcep.topology.provider;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-
 import io.netty.util.concurrent.FutureListener;
-
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeMXBean;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeRegistration;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.PeerCapabilities;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.ReplyTime;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.SessionState;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.StatefulMessages;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.protocol.pcep.PCEPTerminationReason;
 import org.opendaylight.protocol.pcep.TerminationReason;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddressBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddressBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.MessageHeader;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Object;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.ProtocolVersion;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rsvp.rev130820.LspId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.LspId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.Node1;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.Node1Builder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.OperationResult;
@@ -48,16 +55,10 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.reported.lsp.Path;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,7 +70,26 @@ import org.slf4j.LoggerFactory;
  * @param <S> identifier type of requests
  * @param <L> identifier type for LSPs
  */
-public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessionListener, TopologySessionListener {
+public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessionListener, TopologySessionListener, ListenerStateRuntimeMXBean {
+    protected static final class MessageContext {
+        private final Collection<PCEPRequest> requests = new ArrayList<>();
+        private final WriteTransaction trans;
+
+        private MessageContext(final WriteTransaction trans) {
+            this.trans = Preconditions.checkNotNull(trans);
+        }
+
+        void resolveRequest(final PCEPRequest req) {
+            this.requests.add(req);
+        }
+
+        private void notifyRequests() {
+            for (final PCEPRequest r : this.requests) {
+                r.done(OperationResults.SUCCESS);
+            }
+        }
+    }
+
     protected static final MessageHeader MESSAGE_HEADER = new MessageHeader() {
         private final ProtocolVersion version = new ProtocolVersion((short) 1);
 
@@ -83,65 +103,34 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
             return this.version;
         }
     };
+
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologySessionListener.class);
 
-    private final Map<S, PCEPRequest> waitingRequests = new HashMap<>();
-    private final Map<S, PCEPRequest> sendingRequests = new HashMap<>();
+    protected static final String MISSING_XML_TAG = "Mandatory XML tags are missing.";
+
+    @GuardedBy("this")
+    private final Map<S, PCEPRequest> requests = new HashMap<>();
+
+    @GuardedBy("this")
     private final Map<String, ReportedLsp> lspData = new HashMap<>();
-    private final Map<L, String> lsps = new HashMap<>();
+
+    @GuardedBy("this")
+    protected final Map<L, String> lsps = new HashMap<>();
+
     private final ServerSessionManager serverSessionManager;
-    private InstanceIdentifier<Node> topologyNode;
-    private InstanceIdentifier<Node1> topologyAugment;
-    private PathComputationClientBuilder pccBuilder;
-    private Node1Builder topologyAugmentBuilder;
+    private InstanceIdentifier<PathComputationClient> pccIdentifier;
     private TopologyNodeState nodeState;
-    private boolean ownsTopology = false;
-    private boolean synced = false, dirty;
+    private boolean synced = false;
     private PCEPSession session;
+    private SyncOptimization syncOptimization;
+    private boolean triggeredResyncInProcess;
+
+    private ListenerStateRuntimeRegistration registration;
+    private final SessionListenerState listenerState;
 
     protected AbstractTopologySessionListener(final ServerSessionManager serverSessionManager) {
         this.serverSessionManager = Preconditions.checkNotNull(serverSessionManager);
-    }
-
-    private static String createNodeId(final InetAddress addr) {
-        return "pcc://" + addr.getHostAddress();
-    }
-
-    private Node topologyNode(final ReadWriteTransaction trans, final InetAddress address) {
-        final String pccId = createNodeId(address);
-
-        // FIXME: Futures.transform...
-        try {
-            Optional<Topology> topoMaybe = trans.read(LogicalDatastoreType.OPERATIONAL, this.serverSessionManager.getTopology()).get();
-            Preconditions.checkState(topoMaybe.isPresent(), "Failed to find topology.");
-            final Topology topo = topoMaybe.get();
-            for (final Node n : topo.getNode()) {
-                LOG.debug("Matching topology node {} to id {}", n, pccId);
-                if (n.getNodeId().getValue().equals(pccId)) {
-                    this.topologyNode = this.serverSessionManager.getTopology().child(Node.class, n.getKey());
-                    LOG.debug("Reusing topology node {} for id {} at {}", n, pccId, this.topologyNode);
-                    return n;
-                }
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            throw new IllegalStateException("Failed to ensure topology presence.", e);
-        }
-
-        /*
-         * We failed to find a matching node. Let's create a dynamic one
-         * and note that we are the owner (so we clean it up afterwards).
-         */
-        final NodeId id = new NodeId(pccId);
-        final NodeKey nk = new NodeKey(id);
-        final InstanceIdentifier<Node> nti = this.serverSessionManager.getTopology().child(Node.class, nk);
-
-        final Node ret = new NodeBuilder().setKey(nk).setNodeId(id).build();
-
-        trans.put(LogicalDatastoreType.OPERATIONAL, nti, ret);
-        LOG.debug("Created topology node {} for id {} at {}", ret, pccId, nti);
-        this.ownsTopology = true;
-        this.topologyNode = nti;
-        return ret;
+        this.listenerState = new SessionListenerState();
     }
 
     @Override
@@ -153,82 +142,118 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
          * the topology model, with empty LSP list.
          */
         final InetAddress peerAddress = session.getRemoteAddress();
-        final ReadWriteTransaction trans = this.serverSessionManager.rwTransaction();
 
-        final Node topoNode = topologyNode(trans, peerAddress);
-        LOG.trace("Peer {} resolved to topology node {}", peerAddress, topoNode);
+        syncOptimization  = new SyncOptimization(session);
 
-        // Our augmentation in the topology node
-        this.synced = false;
-        this.pccBuilder = new PathComputationClientBuilder();
-        this.pccBuilder.setIpAddress(IpAddressBuilder.getDefaultInstance(peerAddress.getHostAddress()));
+        final TopologyNodeState state = this.serverSessionManager.takeNodeState(peerAddress, this, isLspDbRetreived());
+
+        this.session = session;
+        this.nodeState = state;
 
-        onSessionUp(session, this.pccBuilder);
+        LOG.trace("Peer {} resolved to topology node {}", peerAddress, state.getNodeId());
+
+        // Our augmentation in the topology node
+        final PathComputationClientBuilder pccBuilder = new PathComputationClientBuilder();
+
+        onSessionUp(session, pccBuilder);
+        this.synced = isSynchronized();
+
+        pccBuilder.setIpAddress(IpAddressBuilder.getDefaultInstance(peerAddress.getHostAddress()));
+        final InstanceIdentifier<Node1> topologyAugment = state.getNodeId().augmentation(Node1.class);
+        this.pccIdentifier = topologyAugment.child(PathComputationClient.class);
+        final Node initialNodeState = state.getInitialNodeState();
+        final boolean isNodePresent = isLspDbRetreived() && initialNodeState != null;
+        if (isNodePresent) {
+            loadLspData(initialNodeState, lspData, lsps, isIncrementalSynchro());
+            pccBuilder.setReportedLsp(initialNodeState.getAugmentation(Node1.class).getPathComputationClient().getReportedLsp());
+        }
+        writeNode(pccBuilder, state, topologyAugment);
+        this.listenerState.init(session);
+        if (this.serverSessionManager.getRuntimeRootRegistration().isPresent()) {
+            this.registration = this.serverSessionManager.getRuntimeRootRegistration().get().register(this);
+        }
+        LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), state.getNodeId());
+    }
 
-        this.topologyAugmentBuilder = new Node1Builder().setPathComputationClient(this.pccBuilder.build());
-        this.topologyAugment = this.topologyNode.augmentation(Node1.class);
-        final Node1 ta = this.topologyAugmentBuilder.build();
+    private void writeNode(final PathComputationClientBuilder pccBuilder, final TopologyNodeState state,
+            final InstanceIdentifier<Node1> topologyAugment) {
+        final Node1 ta = new Node1Builder().setPathComputationClient(pccBuilder.build()).build();
 
-        trans.put(LogicalDatastoreType.OPERATIONAL, this.topologyAugment, ta);
-        LOG.trace("Peer data {} set to {}", this.topologyAugment, ta);
+        final ReadWriteTransaction trans = state.rwTransaction();
+        trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
+        LOG.trace("Peer data {} set to {}", topologyAugment, ta);
 
         // All set, commit the modifications
-        Futures.addCallback(trans.commit(), new FutureCallback<RpcResult<TransactionStatus>>() {
+        Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
             @Override
-            public void onSuccess(final RpcResult<TransactionStatus> result) {
+            public void onSuccess(final Void result) {
                 LOG.trace("Internal state for session {} updated successfully", session);
             }
 
             @Override
             public void onFailure(final Throwable t) {
                 LOG.error("Failed to update internal state for session {}, terminating it", session, t);
-                session.close(TerminationReason.Unknown);
+                session.close(TerminationReason.UNKNOWN);
             }
         });
-
-        this.nodeState = this.serverSessionManager.takeNodeState(topoNode.getNodeId(), this);
-        this.session = session;
-        LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), topoNode.getNodeId());
     }
 
-    @GuardedBy("this")
-    private void tearDown(final PCEPSession session) {
-        this.serverSessionManager.releaseNodeState(this.nodeState);
-        this.nodeState = null;
-        this.session = null;
-
-        // The session went down. Undo all the Topology changes we have done.
-        final WriteTransaction trans = this.serverSessionManager.beginTransaction();
-        trans.delete(LogicalDatastoreType.OPERATIONAL, this.topologyAugment);
-        if (this.ownsTopology) {
-            trans.delete(LogicalDatastoreType.OPERATIONAL, this.topologyNode);
+    protected void updatePccState(final PccSyncState pccSyncState) {
+        final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
+        updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(pccSyncState).build());
+        if (pccSyncState != PccSyncState.Synchronized) {
+            this.synced = false;
+            this.triggeredResyncInProcess = true;
         }
-
-        Futures.addCallback(trans.commit(), new FutureCallback<RpcResult<TransactionStatus>>() {
+        // All set, commit the modifications
+        Futures.addCallback(ctx.trans.submit(), new FutureCallback<Void>() {
             @Override
-            public void onSuccess(final RpcResult<TransactionStatus> result) {
-                LOG.trace("Internal state for session {} cleaned up successfully", session);
+            public void onSuccess(final Void result) {
+                LOG.trace("Internal state for session {} updated successfully", session);
             }
 
             @Override
             public void onFailure(final Throwable t) {
-                LOG.error("Failed to cleanup internal state for session {}", session, t);
+                LOG.error("Failed to update internal state for session {}", session, t);
+                session.close(TerminationReason.UNKNOWN);
             }
         });
+    }
 
-        // Clear all requests which have not been sent to the peer: they result in cancellation
-        for (final Entry<S, PCEPRequest> e : this.sendingRequests.entrySet()) {
-            LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
-            e.getValue().setResult(OperationResults.UNSENT);
-        }
-        this.sendingRequests.clear();
+    protected boolean isTriggeredSyncInProcess() {
+        return this.triggeredResyncInProcess;
+    }
 
-        // CLear all requests which have not been acked by the peer: they result in failure
-        for (final Entry<S, PCEPRequest> e : this.waitingRequests.entrySet()) {
-            LOG.info("Request {} was incomplete when session went down, failing the instruction", e.getKey());
-            e.getValue().setResult(OperationResults.NOACK);
+    @GuardedBy("this")
+    private void tearDown(final PCEPSession session) {
+        this.serverSessionManager.releaseNodeState(this.nodeState, session, isLspDbPersisted());
+        this.nodeState = null;
+        this.session = null;
+        this.syncOptimization = null;
+        unregister();
+
+        // Clear all requests we know about
+        for (final Entry<S, PCEPRequest> e : this.requests.entrySet()) {
+            final PCEPRequest r = e.getValue();
+            switch (r.getState()) {
+            case DONE:
+                // Done is done, nothing to do
+                break;
+            case UNACKED:
+                // Peer has not acked: results in failure
+                LOG.info("Request {} was incomplete when session went down, failing the instruction", e.getKey());
+                r.done(OperationResults.NOACK);
+                break;
+            case UNSENT:
+                // Peer has not been sent to the peer: results in cancellation
+                LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
+                r.done(OperationResults.UNSENT);
+                break;
+            default:
+                break;
+            }
         }
-        this.waitingRequests.clear();
+        this.requests.clear();
     }
 
     @Override
@@ -245,90 +270,113 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
 
     @Override
     public final synchronized void onMessage(final PCEPSession session, final Message message) {
-        final WriteTransaction trans = this.serverSessionManager.beginTransaction();
-
-        this.dirty = false;
+        final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
 
-        if (onMessage(trans, message)) {
+        if (onMessage(ctx, message)) {
             LOG.info("Unhandled message {} on session {}", message, session);
+            //cancel not supported, submit empty transaction
+            ctx.trans.submit();
             return;
         }
 
-        if (this.dirty) {
-            LOG.debug("Internal state changed, forcing sync");
-            this.pccBuilder.setReportedLsp(Lists.newArrayList(this.lspData.values()));
-            this.topologyAugmentBuilder.setPathComputationClient(this.pccBuilder.build());
-            final Node1 ta = this.topologyAugmentBuilder.build();
-
-            trans.put(LogicalDatastoreType.OPERATIONAL, this.topologyAugment, ta);
-            LOG.trace("Peer data {} set to {}", this.topologyAugment, ta);
-            this.dirty = false;
-        } else {
-            LOG.debug("State has not changed, skipping sync");
-        }
-
-        Futures.addCallback(trans.commit(), new FutureCallback<RpcResult<TransactionStatus>>() {
+        Futures.addCallback(ctx.trans.submit(), new FutureCallback<Void>() {
             @Override
-            public void onSuccess(final RpcResult<TransactionStatus> result) {
+            public void onSuccess(final Void result) {
                 LOG.trace("Internal state for session {} updated successfully", session);
+                ctx.notifyRequests();
             }
 
             @Override
             public void onFailure(final Throwable t) {
                 LOG.error("Failed to update internal state for session {}, closing it", session, t);
-                session.close(TerminationReason.Unknown);
+                ctx.notifyRequests();
+                session.close(TerminationReason.UNKNOWN);
             }
         });
     }
 
     @Override
     public void close() {
+        unregister();
         if (this.session != null) {
-            this.session.close(TerminationReason.Unknown);
+            this.session.close(TerminationReason.UNKNOWN);
         }
     }
 
-    protected InstanceIdentifierBuilder<PathComputationClient> pccIdentifier() {
-        return this.topologyAugment.builder().child(PathComputationClient.class);
+    private synchronized void unregister() {
+        if (this.registration != null) {
+            this.registration.close();
+            this.registration = null;
+        }
     }
 
     protected final synchronized PCEPRequest removeRequest(final S id) {
-        final PCEPRequest ret = this.waitingRequests.remove(id);
+        final PCEPRequest ret = this.requests.remove(id);
+        if (ret != null) {
+            this.listenerState.processRequestStats(ret.getElapsedMillis());
+        }
         LOG.trace("Removed request {} object {}", id, ret);
         return ret;
     }
 
-    private synchronized void messageSendingComplete(final S requestId, final io.netty.util.concurrent.Future<Void> future) {
-        final PCEPRequest req = this.sendingRequests.remove(requestId);
-
-        if (future.isSuccess()) {
-            this.waitingRequests.put(requestId, req);
-            LOG.trace("Request {} sent to peer (object {})", requestId, req);
-        } else {
-            LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
-            req.setResult(OperationResults.UNSENT);
-        }
-    }
-
     protected final synchronized ListenableFuture<OperationResult> sendMessage(final Message message, final S requestId,
-            final Metadata metadata) {
+        final Metadata metadata) {
         final io.netty.util.concurrent.Future<Void> f = this.session.sendMessage(message);
+        this.listenerState.updateStatefulSentMsg(message);
         final PCEPRequest req = new PCEPRequest(metadata);
-
-        this.sendingRequests.put(requestId, req);
+        this.requests.put(requestId, req);
+        final int rpcTimeout = serverSessionManager.getRpcTimeout();
+        LOG.trace("RPC response timeout value is {} seconds", rpcTimeout);
+        if (rpcTimeout > 0) {
+            setupTimeoutHandler(requestId, req, rpcTimeout);
+        }
 
         f.addListener(new FutureListener<Void>() {
             @Override
             public void operationComplete(final io.netty.util.concurrent.Future<Void> future) {
-                messageSendingComplete(requestId, future);
+                if (!future.isSuccess()) {
+                    synchronized (AbstractTopologySessionListener.this) {
+                        AbstractTopologySessionListener.this.requests.remove(requestId);
+                    }
+                    req.done(OperationResults.UNSENT);
+                    LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
+                } else {
+                    req.sent();
+                    LOG.trace("Request {} sent to peer (object {})", requestId, req);
+                }
             }
         });
 
         return req.getFuture();
     }
 
-    protected final synchronized void updateLsp(final WriteTransaction trans, final L id, final String lspName,
-            final ReportedLspBuilder rlb, final boolean solicited, final boolean remove) {
+    private void setupTimeoutHandler(final S requestId, final PCEPRequest req, final int timeout) {
+        final Timer timer = req.getTimer();
+        timer.schedule(new TimerTask() {
+            @Override
+            public void run() {
+                synchronized (AbstractTopologySessionListener.this) {
+                    AbstractTopologySessionListener.this.requests.remove(requestId);
+                }
+                req.done();
+                LOG.info("Request {} timed-out waiting for response", requestId);
+            }
+        }, TimeUnit.SECONDS.toMillis(timeout));
+        LOG.trace("Set up response timeout handler for request {}", requestId);
+    }
+
+    /**
+     * Update an LSP in the data store
+     *
+     * @param ctx Message context
+     * @param id Revision-specific LSP identifier
+     * @param lspName LSP name
+     * @param rlb Reported LSP builder
+     * @param solicited True if the update was solicited
+     * @param remove True if this is an LSP path removal
+     */
+    protected final synchronized void updateLsp(final MessageContext ctx, final L id, final String lspName,
+        final ReportedLspBuilder rlb, final boolean solicited, final boolean remove) {
 
         final String name;
         if (lspName == null) {
@@ -344,51 +392,19 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         LOG.debug("Saved LSP {} with name {}", id, name);
         this.lsps.put(id, name);
 
-        // just one path should be reported
-        Preconditions.checkState(rlb.getPath().size() == 1);
-        LspId reportedLspId = rlb.getPath().get(0).getLspId();
-        // check previous report for existing paths
-        ReportedLsp previous = this.lspData.get(name);
+
+        final ReportedLsp previous = this.lspData.get(name);
         // if no previous report about the lsp exist, just proceed
         if (previous != null) {
-            List<Path> updatedPaths = new ArrayList<>(previous.getPath());
-            LOG.debug("Found previous paths {} to this lsp name {}", updatedPaths, name);
-            for (Path path : previous.getPath()) {
-                //we found reported path in previous reports
-                if (path.getLspId().getValue() == 0 || path.getLspId().equals(reportedLspId)) {
-                    LOG.debug("Match on lsp-id {}", path.getLspId().getValue() );
-                    // path that was reported previously and does have the same lsp-id, path will be updated
-                    final boolean r = updatedPaths.remove(path);
-                    LOG.trace("Request removed? {}", r);
-                }
-            }
-            // if the path does not exist in previous report, add it to path list, it's a new ERO
-            // only one path will be added
-            //lspId is 0 means confirmation message that shouldn't be added (because we have no means of deleting it later)
-            LOG.trace("Adding new path {} to {}", rlb.getPath(), updatedPaths);
-            updatedPaths.addAll(rlb.getPath());
-            if (remove) {
-                if (reportedLspId.getValue() == 0) {
-                    // if lsp-id also 0, remove all paths
-                    LOG.debug("Removing all paths.");
-                    updatedPaths.clear();
-                } else {
-                    // path is marked to be removed
-                    LOG.debug("Removing path {} from {}", rlb.getPath(), updatedPaths);
-                    final boolean r = updatedPaths.removeAll(rlb.getPath());
-                    LOG.trace("Request removed? {}", r);
-                }
-            }
+            final List<Path> updatedPaths = makeBeforeBreak(rlb, previous, name, remove);
             // if all paths or the last path were deleted, delete whole tunnel
-            if (updatedPaths.isEmpty()) {
+            if (updatedPaths == null || updatedPaths.isEmpty()) {
                 LOG.debug("All paths were removed, removing LSP with {}.", id);
-                removeLsp(trans, id);
+                removeLsp(ctx, id);
                 return;
             }
-            LOG.debug("Setting new paths {} to lsp {}", updatedPaths, name);
             rlb.setPath(updatedPaths);
         }
-        Preconditions.checkState(name != null);
         rlb.setKey(new ReportedLspKey(name));
         rlb.setName(name);
 
@@ -399,12 +415,64 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
             rlb.setMetadata(this.nodeState.getLspMetadata(name));
         }
 
-        LOG.debug("LSP {} forcing update to MD-SAL", name);
-        this.dirty = true;
-        this.lspData.put(name, rlb.build());
+        final ReportedLsp rl = rlb.build();
+        ctx.trans.put(LogicalDatastoreType.OPERATIONAL, this.pccIdentifier.child(ReportedLsp.class, rlb.getKey()), rl);
+        LOG.debug("LSP {} updated to MD-SAL", name);
+
+        this.lspData.put(name, rl);
+    }
+
+    private List<Path> makeBeforeBreak(final ReportedLspBuilder rlb, final ReportedLsp previous, final String name, final boolean remove) {
+        // just one path should be reported
+        Preconditions.checkState(rlb.getPath().size() == 1);
+        final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rsvp.rev150820.LspId reportedLspId = rlb.getPath().get(0).getLspId();
+        final List<Path> updatedPaths;
+        //lspId = 0 and remove = false -> tunnel is down, still exists but no path is signaled
+        //remove existing tunnel's paths now, as explicit path remove will not come
+        if (!remove && reportedLspId.getValue() == 0) {
+            updatedPaths = new ArrayList<>();
+            LOG.debug("Remove previous paths {} to this lsp name {}", previous.getPath(), name);
+        } else {
+            // check previous report for existing paths
+            updatedPaths = new ArrayList<>(previous.getPath());
+            LOG.debug("Found previous paths {} to this lsp name {}", updatedPaths, name);
+            for (final Path path : previous.getPath()) {
+                //we found reported path in previous reports
+                if (path.getLspId().getValue() == 0 || path.getLspId().equals(reportedLspId)) {
+                    LOG.debug("Match on lsp-id {}", path.getLspId().getValue() );
+                    // path that was reported previously and does have the same lsp-id, path will be updated
+                    final boolean r = updatedPaths.remove(path);
+                    LOG.trace("Request removed? {}", r);
+                }
+            }
+        }
+        // if the path does not exist in previous report, add it to path list, it's a new ERO
+        // only one path will be added
+        //lspId is 0 means confirmation message that shouldn't be added (because we have no means of deleting it later)
+        LOG.trace("Adding new path {} to {}", rlb.getPath(), updatedPaths);
+        updatedPaths.addAll(rlb.getPath());
+        if (remove) {
+            if (reportedLspId.getValue() == 0) {
+                // if lsp-id also 0, remove all paths
+                LOG.debug("Removing all paths.");
+                updatedPaths.clear();
+            } else {
+                // path is marked to be removed
+                LOG.debug("Removing path {} from {}", rlb.getPath(), updatedPaths);
+                final boolean r = updatedPaths.removeAll(rlb.getPath());
+                LOG.trace("Request removed? {}", r);
+            }
+        }
+        LOG.debug("Setting new paths {} to lsp {}", updatedPaths, name);
+        return updatedPaths;
     }
 
-    protected final synchronized void stateSynchronizationAchieved(final WriteTransaction trans) {
+    /**
+     * Indicate that the peer has completed state synchronization.
+     *
+     * @param ctx Message context
+     */
+    protected final synchronized void stateSynchronizationAchieved(final MessageContext ctx) {
         if (this.synced) {
             LOG.debug("State synchronization achieved while synchronized, not updating state");
             return;
@@ -412,35 +480,164 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
 
         // Update synchronization flag
         this.synced = true;
-        this.pccBuilder.setStateSync(PccSyncState.Synchronized).build();
-        this.dirty = true;
+        if(this.triggeredResyncInProcess) {
+            this.triggeredResyncInProcess = false;
+        }
+        updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(PccSyncState.Synchronized).build());
 
         // The node has completed synchronization, cleanup metadata no longer reported back
         this.nodeState.cleanupExcept(this.lsps.values());
         LOG.debug("Session {} achieved synchronized state", this.session);
     }
 
-    protected final InstanceIdentifierBuilder<ReportedLsp> lspIdentifier(final String name) {
-        return pccIdentifier().child(ReportedLsp.class, new ReportedLspKey(name));
+    protected final synchronized void updatePccNode(final MessageContext ctx, final PathComputationClient pcc) {
+        ctx.trans.merge(LogicalDatastoreType.OPERATIONAL, this.pccIdentifier, pcc);
     }
 
-    protected final synchronized void removeLsp(final WriteTransaction trans, final L id) {
+    protected final InstanceIdentifier<ReportedLsp> lspIdentifier(final String name) {
+        return this.pccIdentifier.child(ReportedLsp.class, new ReportedLspKey(name));
+    }
+
+    /**
+     * Remove LSP from the database.
+     *
+     * @param ctx Message Context
+     * @param id Revision-specific LSP identifier
+     */
+    protected final synchronized void removeLsp(final MessageContext ctx, final L id) {
         final String name = this.lsps.remove(id);
-        this.dirty = true;
         LOG.debug("LSP {} removed", name);
+        ctx.trans.delete(LogicalDatastoreType.OPERATIONAL, lspIdentifier(name));
         this.lspData.remove(name);
     }
 
     protected abstract void onSessionUp(PCEPSession session, PathComputationClientBuilder pccBuilder);
 
-    protected abstract boolean onMessage(WriteTransaction trans, Message message);
+    /**
+     * Perform revision-specific message processing when a message arrives.
+     *
+     * @param ctx Message processing context
+     * @param message Protocol message
+     * @return True if the message type is not handle.
+     */
+    protected abstract boolean onMessage(MessageContext ctx, Message message);
 
-    protected String lookupLspName(final L id) {
+    protected final String lookupLspName(final L id) {
         Preconditions.checkNotNull(id, "ID parameter null.");
         return this.lsps.get(id);
     }
 
-    protected final <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
-        return this.serverSessionManager.readOperationalData(id);
+    /**
+     * Reads operational data on this node. Doesn't attempt to read the data,
+     * if the node does not exist. In this case returns null.
+     *
+     * @param id InstanceIdentifier of the node
+     * @return null if the node does not exists, or operational data
+     */
+    protected final synchronized <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
+        if (this.nodeState == null) {
+            return null;
+        }
+        return this.nodeState.readOperationalData(id);
+    }
+
+    protected abstract Object validateReportedLsp(final Optional<ReportedLsp> rep, final LspId input);
+
+    protected abstract void loadLspData(final Node node, final Map<String, ReportedLsp> lspData, final Map<L, String> lsps, final boolean incrementalSynchro);
+
+    protected final boolean isLspDbPersisted() {
+        if (syncOptimization != null) {
+            return syncOptimization.isSyncAvoidanceEnabled();
+        }
+        return false;
+    }
+
+    protected final boolean isLspDbRetreived() {
+        if (syncOptimization != null) {
+            return syncOptimization.isDbVersionPresent();
+        }
+        return false;
+    }
+
+    /**
+     * Is Incremental synchronization if LSP-DB-VERSION are included,
+     * LSP-DB-VERSION TLV values doesnt match, and  LSP-SYNC-CAPABILITY is enabled
+     * @return
+     */
+    protected final boolean isIncrementalSynchro() {
+        if (syncOptimization != null) {
+            return syncOptimization.isSyncAvoidanceEnabled() && syncOptimization.isDeltaSyncEnabled();
+        }
+        return false;
+    }
+
+    protected final boolean isTriggeredInitialSynchro() {
+        if (syncOptimization != null) {
+            return syncOptimization.isTriggeredInitSyncEnabled();
+        }
+        return false;
+    }
+
+    protected final boolean isTriggeredReSyncEnabled() {
+        if (syncOptimization != null) {
+            return syncOptimization.isTriggeredReSyncEnabled();
+        }
+        return false;
+    }
+
+    protected final boolean isSynchronized() {
+        if (syncOptimization != null) {
+            return syncOptimization.doesLspDbMatch();
+        }
+        return false;
+    }
+
+    protected SessionListenerState getSessionListenerState() {
+        return this.listenerState;
+    }
+
+    @Override
+    public Integer getDelegatedLspsCount() {
+        return this.lsps.size();
+    }
+
+    @Override
+    public Boolean getSynchronized() {
+        return this.synced;
+    }
+
+    @Override
+    public StatefulMessages getStatefulMessages() {
+        return this.listenerState.getStatefulMessages();
+    }
+
+    @Override
+    public synchronized void resetStats() {
+        this.listenerState.resetStats(this.session);
+    }
+
+    @Override
+    public ReplyTime getReplyTime() {
+        return this.listenerState.getReplyTime();
+    }
+
+    @Override
+    public PeerCapabilities getPeerCapabilities() {
+        return this.listenerState.getPeerCapabilities();
+    }
+
+    @Override
+    public void tearDownSession() {
+        this.close();
+    }
+
+    @Override
+    public synchronized SessionState getSessionState() {
+        return this.listenerState.getSessionState(this.session);
+    }
+
+    @Override
+    public synchronized String getPeerId() {
+        return this.session.getPeerPref().getIpAddress();
     }
 }