Remove ServerSessionManager.bind()/unbind()
[bgpcep.git] / pcep / topology / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / AbstractTopologySessionListener.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.pcep.topology.provider;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.Iterables;
13 import com.google.common.collect.Maps;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.Future;
19 import java.net.InetAddress;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Map.Entry;
26 import java.util.Objects;
27 import java.util.Optional;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import org.checkerframework.checker.lock.qual.GuardedBy;
31 import org.checkerframework.checker.lock.qual.Holding;
32 import org.eclipse.jdt.annotation.NonNull;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.opendaylight.bgpcep.pcep.topology.provider.session.stats.SessionStateImpl;
35 import org.opendaylight.bgpcep.pcep.topology.provider.session.stats.TopologySessionStats;
36 import org.opendaylight.mdsal.binding.api.WriteTransaction;
37 import org.opendaylight.mdsal.common.api.CommitInfo;
38 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
39 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
40 import org.opendaylight.protocol.pcep.PCEPSession;
41 import org.opendaylight.protocol.pcep.PCEPTerminationReason;
42 import org.opendaylight.protocol.pcep.TerminationReason;
43 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IetfInetUtil;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.initiated.rev200720.Stateful1;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.LspObject;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.Path1;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.PlspId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.SrpIdNumber;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.StatefulTlv1Builder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.Tlvs1;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.lsp.object.Lsp;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.stateful.capability.tlv.Stateful;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Message;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Object;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.open.Tlvs;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.LspId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.Node1;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.Node1Builder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.OperationResult;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.PccSyncState;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.TearDownSessionInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.lsp.metadata.Metadata;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.pcep.client.attributes.PathComputationClient;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.pcep.client.attributes.PathComputationClientBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.pcep.client.attributes.path.computation.client.ReportedLsp;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.pcep.client.attributes.path.computation.client.ReportedLspBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.pcep.client.attributes.path.computation.client.ReportedLspKey;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.pcep.client.attributes.path.computation.client.StatefulTlvBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.pcep.client.attributes.path.computation.client.reported.lsp.Path;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.pcep.client.attributes.path.computation.client.reported.lsp.PathKey;
71 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
72 import org.opendaylight.yangtools.yang.binding.DataObject;
73 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
74 import org.opendaylight.yangtools.yang.common.RpcResult;
75 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78
79 /**
80  * Base class for PCEP topology providers. It handles the common tasks involved in managing a PCEP server (PCE)
81  * endpoint, and exposing a network topology based on it. It needs to be subclassed to form a fully functional block,
82  * where the subclass provides handling of incoming messages.
83  */
84 public abstract class AbstractTopologySessionListener implements TopologySessionListener, TopologySessionStats {
85     private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologySessionListener.class);
86
87     private final AtomicBoolean statefulCapability = new AtomicBoolean(false);
88     private final AtomicBoolean lspUpdateCapability = new AtomicBoolean(false);
89     private final AtomicBoolean initiationCapability = new AtomicBoolean(false);
90
91     @GuardedBy("this")
92     final Map<PlspId, String> lsps = new HashMap<>();
93     @GuardedBy("this")
94     SessionStateImpl listenerState;
95
96     // FIXME: clarify lifecycle rules of this map, most notably the interaction of multiple SrpIdNumbers
97     @GuardedBy("this")
98     private final Map<SrpIdNumber, PCEPRequest> requests = new HashMap<>();
99     @GuardedBy("this")
100     private final Map<String, ReportedLsp> lspData = new ConcurrentHashMap<>();
101     private final TopologySessionStatsRegistry statsProvider;
102     private final ServerSessionManager serverSessionManager;
103
104     private InstanceIdentifier<PathComputationClient> pccIdentifier;
105     @GuardedBy("this")
106     private TopologyNodeState nodeState;
107     private final AtomicBoolean synced = new AtomicBoolean(false);
108     @GuardedBy("this")
109     private PCEPSession session;
110     @GuardedBy("this")
111     private SyncOptimization syncOptimization;
112     @GuardedBy("this")
113     private boolean triggeredResyncInProcess;
114
115     AbstractTopologySessionListener(final TopologySessionStatsRegistry statsProvider,
116             final ServerSessionManager serverSessionManager) {
117         this.statsProvider = requireNonNull(statsProvider);
118         this.serverSessionManager = requireNonNull(serverSessionManager);
119     }
120
121     @Override
122     public final void onSessionUp(final PCEPSession psession) {
123         synchronized (serverSessionManager) {
124             synchronized (this) {
125                 /*
126                  * The session went up. Look up the router in Inventory model, create it if it
127                  * is not there (marking that fact for later deletion), and mark it as
128                  * synchronizing. Also create it in the topology model, with empty LSP list.
129                  */
130                 final InetAddress peerAddress = psession.getRemoteAddress();
131
132                 syncOptimization = new SyncOptimization(psession);
133                 final boolean haveLspDbVersion = syncOptimization.isDbVersionPresent();
134
135                 final TopologyNodeState state =
136                         serverSessionManager.takeNodeState(peerAddress, this, haveLspDbVersion);
137
138                 // takeNodeState(..) may fail when the server session manager is being restarted
139                 // due to configuration change
140                 if (state == null) {
141                     LOG.error("Unable to fetch topology node state for PCEP session. Closing session {}", psession);
142                     psession.close(TerminationReason.UNKNOWN);
143                     onSessionTerminated(psession, new PCEPCloseTermination(TerminationReason.UNKNOWN));
144                     return;
145                 }
146
147                 if (session != null || nodeState != null) {
148                     LOG.error("PCEP session is already up with {}. Closing session {}", peerAddress, psession);
149                     psession.close(TerminationReason.UNKNOWN);
150                     onSessionTerminated(psession, new PCEPCloseTermination(TerminationReason.UNKNOWN));
151                     return;
152                 }
153                 session = psession;
154                 nodeState = state;
155
156                 LOG.trace("Peer {} resolved to topology node {}", peerAddress, state.getNodeId());
157
158                 // Our augmentation in the topology node
159                 final PathComputationClientBuilder pccBuilder = new PathComputationClientBuilder()
160                     .setIpAddress(IetfInetUtil.INSTANCE.ipAddressNoZoneFor(peerAddress));
161
162                 // Let subclass fill the details
163                 updateStatefulCapabilities(pccBuilder, peerAddress, psession.getRemoteTlvs());
164
165                 synced.set(isSynchronized());
166
167                 final InstanceIdentifier<Node1> topologyAugment = state.getNodeId().augmentation(Node1.class);
168                 pccIdentifier = topologyAugment.child(PathComputationClient.class);
169
170                 if (haveLspDbVersion) {
171                     final Node initialNodeState = state.getInitialNodeState();
172                     if (initialNodeState != null) {
173                         loadLspData(initialNodeState, lspData, lsps, isIncrementalSynchro());
174                         pccBuilder.setReportedLsp(
175                             initialNodeState.augmentation(Node1.class).getPathComputationClient().getReportedLsp());
176                     }
177                 }
178                 state.storeNode(topologyAugment,
179                         new Node1Builder().setPathComputationClient(pccBuilder.build()).build(), psession);
180
181                 // TODO: collapse assignment? needs to be verified through bytecode
182                 final var sessionState = new SessionStateImpl(this, psession);
183                 listenerState = sessionState;
184                 statsProvider.bind(state.getNodeId(), sessionState);
185                 LOG.info("Session with {} attached to topology node {}", peerAddress, state.getNodeId());
186             }
187         }
188     }
189
190     @Holding("this")
191     private void updateStatefulCapabilities(final PathComputationClientBuilder pccBuilder,
192             final InetAddress peerAddress, final @Nullable Tlvs remoteTlvs) {
193         if (remoteTlvs != null) {
194             final Tlvs1 statefulTlvs = remoteTlvs.augmentation(Tlvs1.class);
195             if (statefulTlvs != null) {
196                 final Stateful stateful = statefulTlvs.getStateful();
197                 if (stateful != null) {
198                     statefulCapability.set(true);
199                     final var updateCap = stateful.getLspUpdateCapability();
200                     if (updateCap != null) {
201                         lspUpdateCapability.set(updateCap);
202                     }
203                     final Stateful1 stateful1 = stateful.augmentation(Stateful1.class);
204                     if (stateful1 != null) {
205                         final var initiation = stateful1.getInitiation();
206                         if (initiation != null) {
207                             initiationCapability.set(initiation);
208                         }
209                     }
210
211                     pccBuilder.setReportedLsp(Map.of());
212                     if (isSynchronized()) {
213                         pccBuilder.setStateSync(PccSyncState.Synchronized);
214                     } else if (isTriggeredInitialSynchro()) {
215                         pccBuilder.setStateSync(PccSyncState.TriggeredInitialSync);
216                     } else if (isIncrementalSynchro()) {
217                         pccBuilder.setStateSync(PccSyncState.IncrementalSync);
218                     } else {
219                         pccBuilder.setStateSync(PccSyncState.InitialResync);
220                     }
221                     pccBuilder.setStatefulTlv(new StatefulTlvBuilder()
222                         .addAugmentation(new StatefulTlv1Builder(statefulTlvs).build())
223                         .build());
224                     return;
225                 }
226             }
227         }
228         LOG.debug("Peer {} does not advertise stateful TLV", peerAddress);
229     }
230
231     synchronized void updatePccState(final PccSyncState pccSyncState) {
232         if (nodeState == null) {
233             LOG.info("Server Session Manager is closed.");
234             session.close(TerminationReason.UNKNOWN);
235             return;
236         }
237         final MessageContext ctx = new MessageContext(nodeState.getChain().newWriteOnlyTransaction());
238         updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(pccSyncState).build());
239         if (pccSyncState != PccSyncState.Synchronized) {
240             synced.set(false);
241             triggeredResyncInProcess = true;
242         }
243         // All set, commit the modifications
244         ctx.trans.commit().addCallback(new FutureCallback<CommitInfo>() {
245             @Override
246             public void onSuccess(final CommitInfo result) {
247                 LOG.trace("Pcc Internal state for session {} updated successfully", session);
248             }
249
250             @Override
251             public void onFailure(final Throwable cause) {
252                 LOG.error("Failed to update Pcc internal state for session {}", session, cause);
253                 session.close(TerminationReason.UNKNOWN);
254             }
255         }, MoreExecutors.directExecutor());
256     }
257
258     synchronized boolean isTriggeredSyncInProcess() {
259         return triggeredResyncInProcess;
260     }
261
262     /**
263      * Tear down the given PCEP session. It's OK to call this method even after the session
264      * is already down. It always clear up the current session status.
265      */
266     @SuppressWarnings("checkstyle:IllegalCatch")
267     private void tearDown(final PCEPSession psession) {
268         requireNonNull(psession);
269         synchronized (serverSessionManager) {
270             synchronized (this) {
271                 serverSessionManager.releaseNodeState(nodeState, psession.getRemoteAddress(), isLspDbPersisted());
272                 clearNodeState();
273
274                 try {
275                     if (session != null) {
276                         session.close();
277                     }
278                     psession.close();
279                 } catch (final Exception e) {
280                     LOG.error("Session {} cannot be closed.", psession, e);
281                 }
282                 session = null;
283                 listenerState = null;
284                 syncOptimization = null;
285                 clearRequests();
286             }
287         }
288     }
289
290     @Override
291     public final void onSessionDown(final PCEPSession psession, final Exception exception) {
292         LOG.warn("Session {} went down unexpectedly", psession, exception);
293         tearDown(psession);
294     }
295
296     @Override
297     public final void onSessionTerminated(final PCEPSession psession, final PCEPTerminationReason reason) {
298         LOG.info("Session {} terminated by peer with reason {}", psession, reason);
299         tearDown(psession);
300     }
301
302     @Override
303     public final synchronized void onMessage(final PCEPSession psession, final Message message) {
304         if (nodeState == null) {
305             LOG.warn("Topology node state is null. Unhandled message {} on session {}", message, psession);
306             psession.close(TerminationReason.UNKNOWN);
307             return;
308         }
309         final MessageContext ctx = new MessageContext(nodeState.getChain().newWriteOnlyTransaction());
310
311         if (onMessage(ctx, message)) {
312             LOG.warn("Unhandled message {} on session {}", message, psession);
313             //cancel not supported, submit empty transaction
314             ctx.trans.commit().addCallback(new FutureCallback<CommitInfo>() {
315                 @Override
316                 public void onSuccess(final CommitInfo result) {
317                     LOG.trace("Successful commit");
318                 }
319
320                 @Override
321                 public void onFailure(final Throwable trw) {
322                     LOG.error("Failed commit", trw);
323                 }
324             }, MoreExecutors.directExecutor());
325             return;
326         }
327
328         ctx.trans.commit().addCallback(new FutureCallback<CommitInfo>() {
329             @Override
330             public void onSuccess(final CommitInfo result) {
331                 LOG.trace("Internal state for session {} updated successfully", psession);
332                 ctx.notifyRequests();
333
334             }
335
336             @Override
337             public void onFailure(final Throwable throwable) {
338                 LOG.error("Failed to update internal state for session {}, closing it", psession, throwable);
339                 ctx.notifyRequests();
340                 psession.close(TerminationReason.UNKNOWN);
341             }
342         }, MoreExecutors.directExecutor());
343     }
344
345     /**
346      * Perform revision-specific message processing when a message arrives.
347      *
348      * @param ctx     Message processing context
349      * @param message Protocol message
350      * @return True if the message type is not handle.
351      */
352     protected abstract boolean onMessage(MessageContext ctx, Message message);
353
354     // Non-final for mocking
355     @Override
356     public void close() {
357         synchronized (serverSessionManager) {
358             synchronized (this) {
359                 clearNodeState();
360                 if (session != null) {
361                     LOG.info("Closing session {}", session);
362                     session.close(TerminationReason.UNKNOWN);
363                     session = null;
364                 }
365                 listenerState = null;
366                 syncOptimization = null;
367                 clearRequests();
368             }
369         }
370     }
371
372     @Holding({"this.serverSessionManager", "this"})
373     private void clearNodeState() {
374         if (nodeState != null) {
375             statsProvider.unbind(nodeState.getNodeId());
376             LOG.debug("Clear Node state: {}", nodeState.getNodeId());
377             nodeState = null;
378         }
379     }
380
381     @Holding({"this.serverSessionManager", "this"})
382     private void clearRequests() {
383         // Clear all requests we know about
384         for (final Entry<SrpIdNumber, PCEPRequest> e : requests.entrySet()) {
385             // FIXME: exhaustive when we have JDK17+
386             switch (e.getValue().cancel()) {
387                 case DONE:
388                     // Done is done, nothing to do
389                     LOG.trace("Request {} was done when session went down.", e.getKey());
390                     break;
391                 case UNACKED:
392                     // Peer has not acked: results in failure
393                     LOG.info("Request {} was incomplete when session went down, failing the instruction",
394                             e.getKey());
395                     break;
396                 case UNSENT:
397                     // Peer has not been sent to the peer: results in cancellation
398                     LOG.debug("Request {} was not sent when session went down, cancelling the instruction",
399                             e.getKey());
400                     break;
401                 default:
402                     break;
403             }
404         }
405         requests.clear();
406     }
407
408     final synchronized PCEPRequest removeRequest(final SrpIdNumber id) {
409         final PCEPRequest ret = requests.remove(id);
410         if (ret != null && listenerState != null) {
411             listenerState.processRequestStats(ret.getElapsedMillis());
412         }
413         LOG.trace("Removed request {} object {}", id, ret);
414         return ret;
415     }
416
417     final synchronized ListenableFuture<OperationResult> sendMessage(final Message message, final SrpIdNumber requestId,
418             final Metadata metadata) {
419         final var sendFuture = session.sendMessage(message);
420         listenerState.updateStatefulSentMsg(message);
421
422         // Note: the timeout is held back by us holding the 'this' monitor, which timeoutExpired re-acquires
423         final var timeout = serverSessionManager.newRpcTimeout(this::timeoutExpired, requestId);
424         if (timeout != null) {
425             LOG.trace("Set up response timeout handler for request {}", requestId);
426         }
427
428         final PCEPRequest req = new PCEPRequest(metadata, timeout);
429         requests.put(requestId, req);
430
431         sendFuture.addListener(future -> sendCompleted(future, requestId, req));
432         return req.getFuture();
433     }
434
435     private void sendCompleted(final Future<?> future, final SrpIdNumber requestId, final PCEPRequest req) {
436         if (!future.isSuccess()) {
437             // FIXME: use concurrent operations and re-validate request vs. id
438             synchronized (AbstractTopologySessionListener.this) {
439                 requests.remove(requestId);
440             }
441             req.cancel();
442             LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
443         } else {
444             req.markUnacked();
445             LOG.trace("Request {} sent to peer (object {})", requestId, req);
446         }
447     }
448
449     private void timeoutExpired(final SrpIdNumber requestId) {
450         final PCEPRequest req;
451         synchronized (this) {
452             req = requests.remove(requestId);
453         }
454
455         if (req != null) {
456             LOG.info("Request {} timed-out waiting for response", requestId);
457             req.cancel();
458         }
459     }
460
461     /**
462      * Update an LSP in the data store.
463      *
464      * @param ctx       Message context
465      * @param id        Revision-specific LSP identifier
466      * @param lspName   LSP name
467      * @param rlb       Reported LSP builder
468      * @param solicited True if the update was solicited
469      * @param remove    True if this is an LSP path removal
470      */
471     protected final synchronized void updateLsp(final MessageContext ctx, final PlspId id, final String lspName,
472             final ReportedLspBuilder rlb, final boolean solicited, final boolean remove) {
473
474         final String name;
475         if (lspName == null) {
476             name = lsps.get(id);
477             if (name == null) {
478                 LOG.error("PLSPID {} seen for the first time, not reporting the LSP", id);
479                 return;
480             }
481         } else {
482             name = lspName;
483         }
484
485         LOG.debug("Saved LSP {} with name {}", id, name);
486         lsps.put(id, name);
487
488         final ReportedLsp previous = lspData.get(name);
489         // if no previous report about the lsp exist, just proceed
490         if (previous != null) {
491             final Map<PathKey, Path> updatedPaths = makeBeforeBreak(rlb, previous, name, remove);
492             // if all paths or the last path were deleted, delete whole tunnel
493             if (updatedPaths.isEmpty()) {
494                 LOG.debug("All paths were removed, removing LSP with {}.", id);
495                 removeLsp(ctx, id);
496                 return;
497             }
498             rlb.setPath(updatedPaths);
499         }
500         rlb.withKey(new ReportedLspKey(name));
501         rlb.setName(name);
502
503         // If this is an unsolicited update. We need to make sure we retain the metadata already present
504         if (solicited) {
505             nodeState.setLspMetadata(name, rlb.getMetadata());
506         } else {
507             rlb.setMetadata(nodeState.getLspMetadata(name));
508         }
509
510         final ReportedLsp rl = rlb.build();
511         ctx.trans.put(LogicalDatastoreType.OPERATIONAL, pccIdentifier.child(ReportedLsp.class, rlb.key()), rl);
512         LOG.debug("LSP {} updated to MD-SAL", name);
513
514         lspData.put(name, rl);
515     }
516
517     private static Map<PathKey, Path> makeBeforeBreak(final ReportedLspBuilder rlb, final ReportedLsp previous,
518             final String name, final boolean remove) {
519         // just one path should be reported
520         final Path path = Iterables.getOnlyElement(rlb.getPath().values());
521         final var reportedLspId = path.getLspId();
522         final List<Path> updatedPaths;
523         //lspId = 0 and remove = false -> tunnel is down, still exists but no path is signaled
524         //remove existing tunnel's paths now, as explicit path remove will not come
525         if (!remove && reportedLspId.getValue().toJava() == 0) {
526             updatedPaths = new ArrayList<>();
527             LOG.debug("Remove previous paths {} to this lsp name {}", previous.getPath(), name);
528         } else {
529             // check previous report for existing paths
530             final Collection<Path> prev = previous.nonnullPath().values();
531             updatedPaths = new ArrayList<>(prev);
532             LOG.debug("Found previous paths {} to this lsp name {}", updatedPaths, name);
533             for (final Path prevPath : prev) {
534                 //we found reported path in previous reports
535                 if (prevPath.getLspId().getValue().toJava() == 0 || prevPath.getLspId().equals(reportedLspId)) {
536                     LOG.debug("Match on lsp-id {}", prevPath.getLspId().getValue());
537                     // path that was reported previously and does have the same lsp-id, path will be updated
538                     final boolean r = updatedPaths.remove(prevPath);
539                     LOG.trace("Request removed? {}", r);
540                 }
541             }
542         }
543         // if the path does not exist in previous report, add it to path list, it's a new ERO
544         // only one path will be added
545         //lspId is 0 means confirmation message that shouldn't be added (because we have no means of deleting it later)
546         LOG.trace("Adding new path {} to {}", path, updatedPaths);
547         updatedPaths.add(path);
548         if (remove) {
549             if (reportedLspId.getValue().toJava() == 0) {
550                 // if lsp-id also 0, remove all paths
551                 LOG.debug("Removing all paths.");
552                 updatedPaths.clear();
553             } else {
554                 // path is marked to be removed
555                 LOG.debug("Removing path {} from {}", path, updatedPaths);
556                 final boolean r = updatedPaths.remove(path);
557                 LOG.trace("Request removed? {}", r);
558             }
559         }
560         LOG.debug("Setting new paths {} to lsp {}", updatedPaths, name);
561         return Maps.uniqueIndex(updatedPaths, Path::key);
562     }
563
564     /**
565      * Indicate that the peer has completed state synchronization.
566      *
567      * @param ctx Message context
568      */
569     protected final synchronized void stateSynchronizationAchieved(final MessageContext ctx) {
570         if (synced.getAndSet(true)) {
571             LOG.debug("State synchronization achieved while synchronizing, not updating state");
572             return;
573         }
574
575         triggeredResyncInProcess = false;
576         updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(PccSyncState.Synchronized).build());
577
578         // The node has completed synchronization, cleanup metadata no longer reported back
579         nodeState.cleanupExcept(lsps.values());
580         LOG.debug("Session {} achieved synchronized state", session);
581     }
582
583     protected final synchronized void updatePccNode(final MessageContext ctx, final PathComputationClient pcc) {
584         ctx.trans.merge(LogicalDatastoreType.OPERATIONAL, pccIdentifier, pcc);
585     }
586
587     protected final @NonNull InstanceIdentifier<ReportedLsp> lspIdentifier(final String name) {
588         return pccIdentifier.child(ReportedLsp.class, new ReportedLspKey(name));
589     }
590
591     /**
592      * Remove LSP from the database.
593      *
594      * @param ctx Message Context
595      * @param id  Revision-specific LSP identifier
596      */
597     protected final synchronized void removeLsp(final MessageContext ctx, final PlspId id) {
598         final String name = lsps.remove(id);
599         LOG.debug("LSP {} removed", name);
600         ctx.trans.delete(LogicalDatastoreType.OPERATIONAL, lspIdentifier(name));
601         lspData.remove(name);
602     }
603
604     @Holding("this")
605     final String lookupLspName(final PlspId id) {
606         return lsps.get(requireNonNull(id, "ID parameter null."));
607     }
608
609     /**
610      * Reads operational data on this node. Doesn't attempt to read the data,
611      * if the node does not exist. In this case returns null.
612      *
613      * @param id InstanceIdentifier of the node
614      * @return null if the node does not exists, or operational data
615      */
616     final synchronized <T extends DataObject> FluentFuture<Optional<T>> readOperationalData(
617             final InstanceIdentifier<T> id) {
618         return nodeState == null ? null : nodeState.readOperationalData(id);
619     }
620
621     protected abstract Object validateReportedLsp(Optional<ReportedLsp> rep, LspId input);
622
623     protected abstract void loadLspData(Node node, Map<String, ReportedLsp> lspData, Map<PlspId, String> lsps,
624             boolean incrementalSynchro);
625
626     final boolean isLspDbPersisted() {
627         return syncOptimization != null && syncOptimization.isSyncAvoidanceEnabled();
628     }
629
630     /**
631      * Is Incremental synchronization if LSP-DB-VERSION are included,
632      * LSP-DB-VERSION TLV values doesnt match, and  LSP-SYNC-CAPABILITY is enabled.
633      */
634     final synchronized boolean isIncrementalSynchro() {
635         return syncOptimization != null && syncOptimization.isSyncAvoidanceEnabled()
636                 && syncOptimization.isDeltaSyncEnabled();
637     }
638
639     final synchronized boolean isTriggeredInitialSynchro() {
640         return syncOptimization != null && syncOptimization.isTriggeredInitSyncEnabled();
641     }
642
643     final synchronized boolean isTriggeredReSyncEnabled() {
644         return syncOptimization != null && syncOptimization.isTriggeredReSyncEnabled();
645     }
646
647     protected final synchronized boolean isSynchronized() {
648         return syncOptimization != null && syncOptimization.doesLspDbMatch();
649     }
650
651     @Override
652     public final int getDelegatedLspsCount() {
653         return Math.toIntExact(lspData.values().stream()
654             .map(ReportedLsp::getPath).filter(pathList -> pathList != null && !pathList.isEmpty())
655             // pick the first path, as delegate status should be same in each path
656             .map(pathList -> pathList.values().iterator().next())
657             .map(path -> path.augmentation(Path1.class)).filter(Objects::nonNull)
658             .map(LspObject::getLsp).filter(Objects::nonNull)
659             .filter(Lsp::getDelegate)
660             .count());
661     }
662
663     @Override
664     public final boolean isSessionSynchronized() {
665         return synced.get();
666     }
667
668     @Override
669     public final boolean isInitiationCapability() {
670         return initiationCapability.get();
671     }
672
673     @Override
674     public final boolean isStatefulCapability() {
675         return statefulCapability.get();
676     }
677
678     @Override
679     public final boolean isLspUpdateCapability() {
680         return lspUpdateCapability.get();
681     }
682
683
684     @Override
685     public synchronized ListenableFuture<RpcResult<Void>> tearDownSession(final TearDownSessionInput input) {
686         close();
687         return RpcResultBuilder.<Void>success().buildFuture();
688     }
689
690     static final class MessageContext {
691         private final Collection<PCEPRequest> requests = new ArrayList<>();
692         private final WriteTransaction trans;
693
694         private MessageContext(final WriteTransaction trans) {
695             this.trans = requireNonNull(trans);
696         }
697
698         void resolveRequest(final PCEPRequest req) {
699             requests.add(req);
700         }
701
702         private void notifyRequests() {
703             for (final PCEPRequest r : requests) {
704                 r.finish(OperationResults.SUCCESS);
705             }
706         }
707     }
708 }