Implement TopologySessionStats in AbstractTopologySessionListener
[bgpcep.git] / pcep / topology / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / AbstractTopologySessionListener.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.pcep.topology.provider;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.Iterables;
13 import com.google.common.collect.Maps;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import io.netty.util.concurrent.FutureListener;
20 import java.net.InetAddress;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.Objects;
28 import java.util.Optional;
29 import java.util.Timer;
30 import java.util.TimerTask;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import org.checkerframework.checker.lock.qual.GuardedBy;
35 import org.checkerframework.checker.lock.qual.Holding;
36 import org.eclipse.jdt.annotation.NonNull;
37 import org.eclipse.jdt.annotation.Nullable;
38 import org.opendaylight.bgpcep.pcep.topology.provider.session.stats.SessionStateImpl;
39 import org.opendaylight.bgpcep.pcep.topology.provider.session.stats.TopologySessionStats;
40 import org.opendaylight.mdsal.binding.api.WriteTransaction;
41 import org.opendaylight.mdsal.common.api.CommitInfo;
42 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
43 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
44 import org.opendaylight.protocol.pcep.PCEPSession;
45 import org.opendaylight.protocol.pcep.PCEPTerminationReason;
46 import org.opendaylight.protocol.pcep.TerminationReason;
47 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IetfInetUtil;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.initiated.rev200720.Stateful1;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.LspObject;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.Path1;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.PlspId;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.SrpIdNumber;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.StatefulTlv1Builder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.Tlvs1;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.lsp.object.Lsp;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.stateful.capability.tlv.Stateful;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Message;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.MessageHeader;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Object;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.ProtocolVersion;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.open.Tlvs;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.LspId;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.Node1;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.Node1Builder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.OperationResult;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.PccSyncState;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.TearDownSessionInput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.lsp.metadata.Metadata;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.pcep.client.attributes.PathComputationClient;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.pcep.client.attributes.PathComputationClientBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.pcep.client.attributes.path.computation.client.ReportedLsp;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.pcep.client.attributes.path.computation.client.ReportedLspBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.pcep.client.attributes.path.computation.client.ReportedLspKey;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.pcep.client.attributes.path.computation.client.StatefulTlvBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.pcep.client.attributes.path.computation.client.reported.lsp.Path;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.pcep.client.attributes.path.computation.client.reported.lsp.PathKey;
77 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
78 import org.opendaylight.yangtools.yang.binding.DataObject;
79 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
80 import org.opendaylight.yangtools.yang.common.RpcResult;
81 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
82 import org.opendaylight.yangtools.yang.common.Uint8;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
85
86 /**
87  * Base class for PCEP topology providers. It handles the common tasks involved in managing a PCEP server (PCE)
88  * endpoint, and exposing a network topology based on it. It needs to be subclassed to form a fully functional block,
89  * where the subclass provides handling of incoming messages.
90  */
91 public abstract class AbstractTopologySessionListener implements TopologySessionListener, TopologySessionStats {
92     private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologySessionListener.class);
93
94     static final String MISSING_XML_TAG = "Mandatory XML tags are missing.";
95     static final MessageHeader MESSAGE_HEADER = new MessageHeader() {
96         private final ProtocolVersion version = new ProtocolVersion(Uint8.ONE);
97
98         @Override
99         public Class<MessageHeader> implementedInterface() {
100             return MessageHeader.class;
101         }
102
103         @Override
104         public ProtocolVersion getVersion() {
105             return version;
106         }
107     };
108
109     private final AtomicBoolean statefulCapability = new AtomicBoolean(false);
110     private final AtomicBoolean lspUpdateCapability = new AtomicBoolean(false);
111     private final AtomicBoolean initiationCapability = new AtomicBoolean(false);
112
113     @GuardedBy("this")
114     final Map<PlspId, String> lsps = new HashMap<>();
115     @GuardedBy("this")
116     SessionStateImpl listenerState;
117
118     @GuardedBy("this")
119     private final Map<SrpIdNumber, PCEPRequest> requests = new HashMap<>();
120     @GuardedBy("this")
121     private final Map<String, ReportedLsp> lspData = new ConcurrentHashMap<>();
122     private final ServerSessionManager serverSessionManager;
123     private InstanceIdentifier<PathComputationClient> pccIdentifier;
124     @GuardedBy("this")
125     private TopologyNodeState nodeState;
126     private final AtomicBoolean synced = new AtomicBoolean(false);
127     @GuardedBy("this")
128     private PCEPSession session;
129     @GuardedBy("this")
130     private SyncOptimization syncOptimization;
131     @GuardedBy("this")
132     private boolean triggeredResyncInProcess;
133
134     AbstractTopologySessionListener(final ServerSessionManager serverSessionManager) {
135         this.serverSessionManager = requireNonNull(serverSessionManager);
136     }
137
138     @Override
139     public final void onSessionUp(final PCEPSession psession) {
140         synchronized (serverSessionManager) {
141             synchronized (this) {
142                 /*
143                  * The session went up. Look up the router in Inventory model, create it if it
144                  * is not there (marking that fact for later deletion), and mark it as
145                  * synchronizing. Also create it in the topology model, with empty LSP list.
146                  */
147                 final InetAddress peerAddress = psession.getRemoteAddress();
148
149                 syncOptimization = new SyncOptimization(psession);
150                 final boolean haveLspDbVersion = syncOptimization.isDbVersionPresent();
151
152                 final TopologyNodeState state =
153                         serverSessionManager.takeNodeState(peerAddress, this, haveLspDbVersion);
154
155                 // takeNodeState(..) may fail when the server session manager is being restarted
156                 // due to configuration change
157                 if (state == null) {
158                     LOG.error("Unable to fetch topology node state for PCEP session. Closing session {}", psession);
159                     psession.close(TerminationReason.UNKNOWN);
160                     onSessionTerminated(psession, new PCEPCloseTermination(TerminationReason.UNKNOWN));
161                     return;
162                 }
163
164                 if (session != null || nodeState != null) {
165                     LOG.error("PCEP session is already up with {}. Closing session {}", peerAddress, psession);
166                     psession.close(TerminationReason.UNKNOWN);
167                     onSessionTerminated(psession, new PCEPCloseTermination(TerminationReason.UNKNOWN));
168                     return;
169                 }
170                 session = psession;
171                 nodeState = state;
172
173                 LOG.trace("Peer {} resolved to topology node {}", peerAddress, state.getNodeId());
174
175                 // Our augmentation in the topology node
176                 final PathComputationClientBuilder pccBuilder = new PathComputationClientBuilder()
177                     .setIpAddress(IetfInetUtil.INSTANCE.ipAddressNoZoneFor(peerAddress));
178
179                 // Let subclass fill the details
180                 updateStatefulCapabilities(pccBuilder, peerAddress, psession.getRemoteTlvs());
181
182                 synced.set(isSynchronized());
183
184                 final InstanceIdentifier<Node1> topologyAugment = state.getNodeId().augmentation(Node1.class);
185                 pccIdentifier = topologyAugment.child(PathComputationClient.class);
186
187                 if (haveLspDbVersion) {
188                     final Node initialNodeState = state.getInitialNodeState();
189                     if (initialNodeState != null) {
190                         loadLspData(initialNodeState, lspData, lsps, isIncrementalSynchro());
191                         pccBuilder.setReportedLsp(
192                             initialNodeState.augmentation(Node1.class).getPathComputationClient().getReportedLsp());
193                     }
194                 }
195                 state.storeNode(topologyAugment,
196                         new Node1Builder().setPathComputationClient(pccBuilder.build()).build(), psession);
197
198                 listenerState = new SessionStateImpl(this, psession);
199                 serverSessionManager.bind(state.getNodeId(), listenerState);
200                 LOG.info("Session with {} attached to topology node {}", peerAddress, state.getNodeId());
201             }
202         }
203     }
204
205     @Holding("this")
206     private void updateStatefulCapabilities(final PathComputationClientBuilder pccBuilder,
207             final InetAddress peerAddress, final @Nullable Tlvs remoteTlvs) {
208         if (remoteTlvs != null) {
209             final Tlvs1 statefulTlvs = remoteTlvs.augmentation(Tlvs1.class);
210             if (statefulTlvs != null) {
211                 final Stateful stateful = statefulTlvs.getStateful();
212                 if (stateful != null) {
213                     statefulCapability.set(true);
214                     final var updateCap = stateful.getLspUpdateCapability();
215                     if (updateCap != null) {
216                         lspUpdateCapability.set(updateCap);
217                     }
218                     final Stateful1 stateful1 = stateful.augmentation(Stateful1.class);
219                     if (stateful1 != null) {
220                         final var initiation = stateful1.getInitiation();
221                         if (initiation != null) {
222                             initiationCapability.set(initiation);
223                         }
224                     }
225
226                     pccBuilder.setReportedLsp(Map.of());
227                     if (isSynchronized()) {
228                         pccBuilder.setStateSync(PccSyncState.Synchronized);
229                     } else if (isTriggeredInitialSynchro()) {
230                         pccBuilder.setStateSync(PccSyncState.TriggeredInitialSync);
231                     } else if (isIncrementalSynchro()) {
232                         pccBuilder.setStateSync(PccSyncState.IncrementalSync);
233                     } else {
234                         pccBuilder.setStateSync(PccSyncState.InitialResync);
235                     }
236                     pccBuilder.setStatefulTlv(new StatefulTlvBuilder()
237                         .addAugmentation(new StatefulTlv1Builder(statefulTlvs).build())
238                         .build());
239                     return;
240                 }
241             }
242         }
243         LOG.debug("Peer {} does not advertise stateful TLV", peerAddress);
244     }
245
246     synchronized void updatePccState(final PccSyncState pccSyncState) {
247         if (nodeState == null) {
248             LOG.info("Server Session Manager is closed.");
249             session.close(TerminationReason.UNKNOWN);
250             return;
251         }
252         final MessageContext ctx = new MessageContext(nodeState.getChain().newWriteOnlyTransaction());
253         updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(pccSyncState).build());
254         if (pccSyncState != PccSyncState.Synchronized) {
255             synced.set(false);
256             triggeredResyncInProcess = true;
257         }
258         // All set, commit the modifications
259         ctx.trans.commit().addCallback(new FutureCallback<CommitInfo>() {
260             @Override
261             public void onSuccess(final CommitInfo result) {
262                 LOG.trace("Pcc Internal state for session {} updated successfully", session);
263             }
264
265             @Override
266             public void onFailure(final Throwable cause) {
267                 LOG.error("Failed to update Pcc internal state for session {}", session, cause);
268                 session.close(TerminationReason.UNKNOWN);
269             }
270         }, MoreExecutors.directExecutor());
271     }
272
273     synchronized boolean isTriggeredSyncInProcess() {
274         return triggeredResyncInProcess;
275     }
276
277     /**
278      * Tear down the given PCEP session. It's OK to call this method even after the session
279      * is already down. It always clear up the current session status.
280      */
281     @SuppressWarnings("checkstyle:IllegalCatch")
282     private void tearDown(final PCEPSession psession) {
283         requireNonNull(psession);
284         synchronized (serverSessionManager) {
285             synchronized (this) {
286                 serverSessionManager.releaseNodeState(nodeState, psession.getRemoteAddress(), isLspDbPersisted());
287                 clearNodeState();
288
289                 try {
290                     if (session != null) {
291                         session.close();
292                     }
293                     psession.close();
294                 } catch (final Exception e) {
295                     LOG.error("Session {} cannot be closed.", psession, e);
296                 }
297                 session = null;
298                 listenerState = null;
299                 syncOptimization = null;
300
301                 // Clear all requests we know about
302                 for (final Entry<SrpIdNumber, PCEPRequest> e : requests.entrySet()) {
303                     final PCEPRequest r = e.getValue();
304                     switch (r.getState()) {
305                         case DONE:
306                             // Done is done, nothing to do
307                             LOG.trace("Request {} was done when session went down.", e.getKey());
308                             break;
309                         case UNACKED:
310                             // Peer has not acked: results in failure
311                             LOG.info("Request {} was incomplete when session went down, failing the instruction",
312                                     e.getKey());
313                             r.done(OperationResults.NOACK);
314                             break;
315                         case UNSENT:
316                             // Peer has not been sent to the peer: results in cancellation
317                             LOG.debug("Request {} was not sent when session went down, cancelling the instruction",
318                                     e.getKey());
319                             r.done(OperationResults.UNSENT);
320                             break;
321                         default:
322                             break;
323                     }
324                 }
325                 requests.clear();
326             }
327         }
328     }
329
330     @Override
331     public final void onSessionDown(final PCEPSession psession, final Exception exception) {
332         LOG.warn("Session {} went down unexpectedly", psession, exception);
333         tearDown(psession);
334     }
335
336     @Override
337     public final void onSessionTerminated(final PCEPSession psession, final PCEPTerminationReason reason) {
338         LOG.info("Session {} terminated by peer with reason {}", psession, reason);
339         tearDown(psession);
340     }
341
342     @Override
343     public final synchronized void onMessage(final PCEPSession psession, final Message message) {
344         if (nodeState == null) {
345             LOG.warn("Topology node state is null. Unhandled message {} on session {}", message, psession);
346             psession.close(TerminationReason.UNKNOWN);
347             return;
348         }
349         final MessageContext ctx = new MessageContext(nodeState.getChain().newWriteOnlyTransaction());
350
351         if (onMessage(ctx, message)) {
352             LOG.warn("Unhandled message {} on session {}", message, psession);
353             //cancel not supported, submit empty transaction
354             ctx.trans.commit().addCallback(new FutureCallback<CommitInfo>() {
355                 @Override
356                 public void onSuccess(final CommitInfo result) {
357                     LOG.trace("Successful commit");
358                 }
359
360                 @Override
361                 public void onFailure(final Throwable trw) {
362                     LOG.error("Failed commit", trw);
363                 }
364             }, MoreExecutors.directExecutor());
365             return;
366         }
367
368         ctx.trans.commit().addCallback(new FutureCallback<CommitInfo>() {
369             @Override
370             public void onSuccess(final CommitInfo result) {
371                 LOG.trace("Internal state for session {} updated successfully", psession);
372                 ctx.notifyRequests();
373
374             }
375
376             @Override
377             public void onFailure(final Throwable throwable) {
378                 LOG.error("Failed to update internal state for session {}, closing it", psession, throwable);
379                 ctx.notifyRequests();
380                 psession.close(TerminationReason.UNKNOWN);
381             }
382         }, MoreExecutors.directExecutor());
383     }
384
385     /**
386      * Perform revision-specific message processing when a message arrives.
387      *
388      * @param ctx     Message processing context
389      * @param message Protocol message
390      * @return True if the message type is not handle.
391      */
392     protected abstract boolean onMessage(MessageContext ctx, Message message);
393
394     @Override
395     public void close() {
396         synchronized (serverSessionManager) {
397             synchronized (this) {
398                 clearNodeState();
399                 if (session != null) {
400                     LOG.info("Closing session {}", session);
401                     session.close(TerminationReason.UNKNOWN);
402                 }
403             }
404         }
405     }
406
407     @Holding({"this.serverSessionManager", "this"})
408     private void clearNodeState() {
409         if (nodeState != null) {
410             serverSessionManager.unbind(nodeState.getNodeId());
411             LOG.debug("Clear Node state: {}", nodeState.getNodeId());
412             nodeState = null;
413         }
414     }
415
416     final synchronized PCEPRequest removeRequest(final SrpIdNumber id) {
417         final PCEPRequest ret = requests.remove(id);
418         if (ret != null && listenerState != null) {
419             listenerState.processRequestStats(ret.getElapsedMillis());
420         }
421         LOG.trace("Removed request {} object {}", id, ret);
422         return ret;
423     }
424
425     final synchronized ListenableFuture<OperationResult> sendMessage(final Message message, final SrpIdNumber requestId,
426             final Metadata metadata) {
427         final var sendFuture = session.sendMessage(message);
428         listenerState.updateStatefulSentMsg(message);
429         final PCEPRequest req = new PCEPRequest(metadata);
430         requests.put(requestId, req);
431         final short rpcTimeout = serverSessionManager.getRpcTimeout();
432         LOG.trace("RPC response timeout value is {} seconds", rpcTimeout);
433         if (rpcTimeout > 0) {
434             setupTimeoutHandler(requestId, req, rpcTimeout);
435         }
436
437         sendFuture.addListener((FutureListener<Void>) future -> {
438             if (!future.isSuccess()) {
439                 synchronized (AbstractTopologySessionListener.this) {
440                     requests.remove(requestId);
441                 }
442                 req.done(OperationResults.UNSENT);
443                 LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
444             } else {
445                 req.sent();
446                 LOG.trace("Request {} sent to peer (object {})", requestId, req);
447             }
448         });
449
450         return req.getFuture();
451     }
452
453     private void setupTimeoutHandler(final SrpIdNumber requestId, final PCEPRequest req, final short timeout) {
454         final Timer timer = req.getTimer();
455         timer.schedule(new TimerTask() {
456             @Override
457             public void run() {
458                 synchronized (AbstractTopologySessionListener.this) {
459                     requests.remove(requestId);
460                 }
461                 req.done();
462                 LOG.info("Request {} timed-out waiting for response", requestId);
463             }
464         }, TimeUnit.SECONDS.toMillis(timeout));
465         LOG.trace("Set up response timeout handler for request {}", requestId);
466     }
467
468     /**
469      * Update an LSP in the data store.
470      *
471      * @param ctx       Message context
472      * @param id        Revision-specific LSP identifier
473      * @param lspName   LSP name
474      * @param rlb       Reported LSP builder
475      * @param solicited True if the update was solicited
476      * @param remove    True if this is an LSP path removal
477      */
478     protected final synchronized void updateLsp(final MessageContext ctx, final PlspId id, final String lspName,
479             final ReportedLspBuilder rlb, final boolean solicited, final boolean remove) {
480
481         final String name;
482         if (lspName == null) {
483             name = lsps.get(id);
484             if (name == null) {
485                 LOG.error("PLSPID {} seen for the first time, not reporting the LSP", id);
486                 return;
487             }
488         } else {
489             name = lspName;
490         }
491
492         LOG.debug("Saved LSP {} with name {}", id, name);
493         lsps.put(id, name);
494
495         final ReportedLsp previous = lspData.get(name);
496         // if no previous report about the lsp exist, just proceed
497         if (previous != null) {
498             final Map<PathKey, Path> updatedPaths = makeBeforeBreak(rlb, previous, name, remove);
499             // if all paths or the last path were deleted, delete whole tunnel
500             if (updatedPaths.isEmpty()) {
501                 LOG.debug("All paths were removed, removing LSP with {}.", id);
502                 removeLsp(ctx, id);
503                 return;
504             }
505             rlb.setPath(updatedPaths);
506         }
507         rlb.withKey(new ReportedLspKey(name));
508         rlb.setName(name);
509
510         // If this is an unsolicited update. We need to make sure we retain the metadata already present
511         if (solicited) {
512             nodeState.setLspMetadata(name, rlb.getMetadata());
513         } else {
514             rlb.setMetadata(nodeState.getLspMetadata(name));
515         }
516
517         final ReportedLsp rl = rlb.build();
518         ctx.trans.put(LogicalDatastoreType.OPERATIONAL, pccIdentifier.child(ReportedLsp.class, rlb.key()), rl);
519         LOG.debug("LSP {} updated to MD-SAL", name);
520
521         lspData.put(name, rl);
522     }
523
524     private static Map<PathKey, Path> makeBeforeBreak(final ReportedLspBuilder rlb, final ReportedLsp previous,
525             final String name, final boolean remove) {
526         // just one path should be reported
527         final Path path = Iterables.getOnlyElement(rlb.getPath().values());
528         final var reportedLspId = path.getLspId();
529         final List<Path> updatedPaths;
530         //lspId = 0 and remove = false -> tunnel is down, still exists but no path is signaled
531         //remove existing tunnel's paths now, as explicit path remove will not come
532         if (!remove && reportedLspId.getValue().toJava() == 0) {
533             updatedPaths = new ArrayList<>();
534             LOG.debug("Remove previous paths {} to this lsp name {}", previous.getPath(), name);
535         } else {
536             // check previous report for existing paths
537             final Collection<Path> prev = previous.nonnullPath().values();
538             updatedPaths = new ArrayList<>(prev);
539             LOG.debug("Found previous paths {} to this lsp name {}", updatedPaths, name);
540             for (final Path prevPath : prev) {
541                 //we found reported path in previous reports
542                 if (prevPath.getLspId().getValue().toJava() == 0 || prevPath.getLspId().equals(reportedLspId)) {
543                     LOG.debug("Match on lsp-id {}", prevPath.getLspId().getValue());
544                     // path that was reported previously and does have the same lsp-id, path will be updated
545                     final boolean r = updatedPaths.remove(prevPath);
546                     LOG.trace("Request removed? {}", r);
547                 }
548             }
549         }
550         // if the path does not exist in previous report, add it to path list, it's a new ERO
551         // only one path will be added
552         //lspId is 0 means confirmation message that shouldn't be added (because we have no means of deleting it later)
553         LOG.trace("Adding new path {} to {}", path, updatedPaths);
554         updatedPaths.add(path);
555         if (remove) {
556             if (reportedLspId.getValue().toJava() == 0) {
557                 // if lsp-id also 0, remove all paths
558                 LOG.debug("Removing all paths.");
559                 updatedPaths.clear();
560             } else {
561                 // path is marked to be removed
562                 LOG.debug("Removing path {} from {}", path, updatedPaths);
563                 final boolean r = updatedPaths.remove(path);
564                 LOG.trace("Request removed? {}", r);
565             }
566         }
567         LOG.debug("Setting new paths {} to lsp {}", updatedPaths, name);
568         return Maps.uniqueIndex(updatedPaths, Path::key);
569     }
570
571     /**
572      * Indicate that the peer has completed state synchronization.
573      *
574      * @param ctx Message context
575      */
576     protected final synchronized void stateSynchronizationAchieved(final MessageContext ctx) {
577         if (synced.getAndSet(true)) {
578             LOG.debug("State synchronization achieved while synchronizing, not updating state");
579             return;
580         }
581
582         triggeredResyncInProcess = false;
583         updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(PccSyncState.Synchronized).build());
584
585         // The node has completed synchronization, cleanup metadata no longer reported back
586         nodeState.cleanupExcept(lsps.values());
587         LOG.debug("Session {} achieved synchronized state", session);
588     }
589
590     protected final synchronized void updatePccNode(final MessageContext ctx, final PathComputationClient pcc) {
591         ctx.trans.merge(LogicalDatastoreType.OPERATIONAL, pccIdentifier, pcc);
592     }
593
594     protected final @NonNull InstanceIdentifier<ReportedLsp> lspIdentifier(final String name) {
595         return pccIdentifier.child(ReportedLsp.class, new ReportedLspKey(name));
596     }
597
598     /**
599      * Remove LSP from the database.
600      *
601      * @param ctx Message Context
602      * @param id  Revision-specific LSP identifier
603      */
604     protected final synchronized void removeLsp(final MessageContext ctx, final PlspId id) {
605         final String name = lsps.remove(id);
606         LOG.debug("LSP {} removed", name);
607         ctx.trans.delete(LogicalDatastoreType.OPERATIONAL, lspIdentifier(name));
608         lspData.remove(name);
609     }
610
611     @Holding("this")
612     final String lookupLspName(final PlspId id) {
613         return lsps.get(requireNonNull(id, "ID parameter null."));
614     }
615
616     /**
617      * Reads operational data on this node. Doesn't attempt to read the data,
618      * if the node does not exist. In this case returns null.
619      *
620      * @param id InstanceIdentifier of the node
621      * @return null if the node does not exists, or operational data
622      */
623     final synchronized <T extends DataObject> FluentFuture<Optional<T>> readOperationalData(
624             final InstanceIdentifier<T> id) {
625         return nodeState == null ? null : nodeState.readOperationalData(id);
626     }
627
628     protected abstract Object validateReportedLsp(Optional<ReportedLsp> rep, LspId input);
629
630     protected abstract void loadLspData(Node node, Map<String, ReportedLsp> lspData, Map<PlspId, String> lsps,
631             boolean incrementalSynchro);
632
633     final boolean isLspDbPersisted() {
634         return syncOptimization != null && syncOptimization.isSyncAvoidanceEnabled();
635     }
636
637     /**
638      * Is Incremental synchronization if LSP-DB-VERSION are included,
639      * LSP-DB-VERSION TLV values doesnt match, and  LSP-SYNC-CAPABILITY is enabled.
640      */
641     final synchronized boolean isIncrementalSynchro() {
642         return syncOptimization != null && syncOptimization.isSyncAvoidanceEnabled()
643                 && syncOptimization.isDeltaSyncEnabled();
644     }
645
646     final synchronized boolean isTriggeredInitialSynchro() {
647         return syncOptimization != null && syncOptimization.isTriggeredInitSyncEnabled();
648     }
649
650     final synchronized boolean isTriggeredReSyncEnabled() {
651         return syncOptimization != null && syncOptimization.isTriggeredReSyncEnabled();
652     }
653
654     protected final synchronized boolean isSynchronized() {
655         return syncOptimization != null && syncOptimization.doesLspDbMatch();
656     }
657
658     @Override
659     public final int getDelegatedLspsCount() {
660         return Math.toIntExact(lspData.values().stream()
661             .map(ReportedLsp::getPath).filter(pathList -> pathList != null && !pathList.isEmpty())
662             // pick the first path, as delegate status should be same in each path
663             .map(pathList -> pathList.values().iterator().next())
664             .map(path -> path.augmentation(Path1.class)).filter(Objects::nonNull)
665             .map(LspObject::getLsp).filter(Objects::nonNull)
666             .filter(Lsp::getDelegate)
667             .count());
668     }
669
670     @Override
671     public final boolean isSessionSynchronized() {
672         return synced.get();
673     }
674
675     @Override
676     public final boolean isInitiationCapability() {
677         return initiationCapability.get();
678     }
679
680     @Override
681     public final boolean isStatefulCapability() {
682         return statefulCapability.get();
683     }
684
685     @Override
686     public final boolean isLspUpdateCapability() {
687         return lspUpdateCapability.get();
688     }
689
690
691     @Override
692     public synchronized ListenableFuture<RpcResult<Void>> tearDownSession(final TearDownSessionInput input) {
693         close();
694         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
695     }
696
697     static final class MessageContext {
698         private final Collection<PCEPRequest> requests = new ArrayList<>();
699         private final WriteTransaction trans;
700
701         private MessageContext(final WriteTransaction trans) {
702             this.trans = requireNonNull(trans);
703         }
704
705         void resolveRequest(final PCEPRequest req) {
706             requests.add(req);
707         }
708
709         private void notifyRequests() {
710             for (final PCEPRequest r : requests) {
711                 r.done(OperationResults.SUCCESS);
712             }
713         }
714     }
715 }