Removed checkstyle warnings.
[bgpcep.git] / pcep / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / AbstractTopologySessionListener.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.pcep.topology.provider;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.collect.Lists;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.JdkFutureAdapters;
15 import com.google.common.util.concurrent.ListenableFuture;
16
17 import io.netty.util.concurrent.FutureListener;
18
19 import java.net.InetAddress;
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.Map.Entry;
23
24 import javax.annotation.concurrent.GuardedBy;
25
26 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
27 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
28 import org.opendaylight.protocol.pcep.PCEPSession;
29 import org.opendaylight.protocol.pcep.PCEPSessionListener;
30 import org.opendaylight.protocol.pcep.PCEPTerminationReason;
31 import org.opendaylight.protocol.pcep.TerminationReason;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddressBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.MessageHeader;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.ProtocolVersion;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.Node1;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.Node1Builder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.OperationResult;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.PccSyncState;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.lsp.metadata.Metadata;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.PathComputationClient;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.PathComputationClientBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLsp;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspKey;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
51 import org.opendaylight.yangtools.yang.binding.DataContainer;
52 import org.opendaylight.yangtools.yang.binding.DataObject;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
55 import org.opendaylight.yangtools.yang.common.RpcResult;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 /**
60  * Base class for PCEP topology providers. It handles the common tasks involved in managing a PCEP server (PCE)
61  * endpoint, and exposing a network topology based on it. It needs to be subclassed to form a fully functional block,
62  * where the subclass provides handling of incoming messages.
63  *
64  * @param <S> identifier type of requests
65  * @param <L> identifier type for LSPs
66  */
67 public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessionListener, TopologySessionListener {
68     protected static final MessageHeader MESSAGE_HEADER = new MessageHeader() {
69         private final ProtocolVersion version = new ProtocolVersion((short) 1);
70
71         @Override
72         public Class<? extends DataContainer> getImplementedInterface() {
73             return MessageHeader.class;
74         }
75
76         @Override
77         public ProtocolVersion getVersion() {
78             return this.version;
79         }
80     };
81     private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologySessionListener.class);
82
83     private final Map<S, PCEPRequest> waitingRequests = new HashMap<>();
84     private final Map<S, PCEPRequest> sendingRequests = new HashMap<>();
85     private final Map<String, ReportedLsp> lspData = new HashMap<>();
86     private final Map<L, String> lsps = new HashMap<>();
87     private final ServerSessionManager serverSessionManager;
88     private InstanceIdentifier<Node> topologyNode;
89     private InstanceIdentifier<Node1> topologyAugment;
90     private PathComputationClientBuilder pccBuilder;
91     private Node1Builder topologyAugmentBuilder;
92     private TopologyNodeState nodeState;
93     private boolean ownsTopology = false;
94     private boolean synced = false, dirty;
95     private PCEPSession session;
96
97     protected AbstractTopologySessionListener(final ServerSessionManager serverSessionManager) {
98         this.serverSessionManager = Preconditions.checkNotNull(serverSessionManager);
99     }
100
101     private static String createNodeId(final InetAddress addr) {
102         return "pcc://" + addr.getHostAddress();
103     }
104
105     private Node topologyNode(final DataModificationTransaction trans, final InetAddress address) {
106         final String pccId = createNodeId(address);
107         final Topology topo = (Topology) trans.readOperationalData(this.serverSessionManager.getTopology());
108
109         for (final Node n : topo.getNode()) {
110             LOG.debug("Matching topology node {} to id {}", n, pccId);
111             if (n.getNodeId().getValue().equals(pccId)) {
112                 this.topologyNode = this.serverSessionManager.getTopology().child(Node.class, n.getKey());
113                 LOG.debug("Reusing topology node {} for id {} at {}", n, pccId, this.topologyNode);
114                 return n;
115             }
116         }
117
118         /*
119          * We failed to find a matching node. Let's create a dynamic one
120          * and note that we are the owner (so we clean it up afterwards).
121          */
122         final NodeId id = new NodeId(pccId);
123         final NodeKey nk = new NodeKey(id);
124         final InstanceIdentifier<Node> nti = this.serverSessionManager.getTopology().child(Node.class, nk);
125
126         final Node ret = new NodeBuilder().setKey(nk).setNodeId(id).build();
127
128         trans.putOperationalData(nti, ret);
129         LOG.debug("Created topology node {} for id {} at {}", ret, pccId, nti);
130         this.ownsTopology = true;
131         this.topologyNode = nti;
132         return ret;
133     }
134
135     @Override
136     public final synchronized void onSessionUp(final PCEPSession session) {
137         /*
138          * The session went up. Look up the router in Inventory model,
139          * create it if it is not there (marking that fact for later
140          * deletion), and mark it as synchronizing. Also create it in
141          * the topology model, with empty LSP list.
142          */
143         final InetAddress peerAddress = session.getRemoteAddress();
144         final DataModificationTransaction trans = this.serverSessionManager.beginTransaction();
145
146         final Node topoNode = topologyNode(trans, peerAddress);
147         LOG.debug("Peer {} resolved to topology node {}", peerAddress, topoNode);
148
149         // Our augmentation in the topology node
150         this.synced = false;
151         this.pccBuilder = new PathComputationClientBuilder();
152         this.pccBuilder.setIpAddress(IpAddressBuilder.getDefaultInstance(peerAddress.getHostAddress()));
153
154         onSessionUp(session, this.pccBuilder);
155
156         this.topologyAugmentBuilder = new Node1Builder().setPathComputationClient(this.pccBuilder.build());
157         this.topologyAugment = this.topologyNode.augmentation(Node1.class);
158         final Node1 ta = this.topologyAugmentBuilder.build();
159
160         trans.putOperationalData(this.topologyAugment, ta);
161         LOG.debug("Peer data {} set to {}", this.topologyAugment, ta);
162
163         // All set, commit the modifications
164         final ListenableFuture<RpcResult<TransactionStatus>> f = JdkFutureAdapters.listenInPoolThread(trans.commit());
165         Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
166             @Override
167             public void onSuccess(final RpcResult<TransactionStatus> result) {
168                 LOG.trace("Internal state for session {} updated successfully", session);
169             }
170
171             @Override
172             public void onFailure(final Throwable t) {
173                 LOG.error("Failed to update internal state for session {}, terminating it", session, t);
174                 session.close(TerminationReason.Unknown);
175             }
176         });
177
178         this.nodeState = this.serverSessionManager.takeNodeState(topoNode.getNodeId(), this);
179         this.session = session;
180         LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), topoNode.getNodeId());
181     }
182
183     @GuardedBy("this")
184     private void tearDown(final PCEPSession session) {
185         this.serverSessionManager.releaseNodeState(this.nodeState);
186         this.nodeState = null;
187         this.session = null;
188
189         final DataModificationTransaction trans = this.serverSessionManager.beginTransaction();
190
191         // The session went down. Undo all the Topology changes we have done.
192         trans.removeOperationalData(this.topologyAugment);
193         if (this.ownsTopology) {
194             trans.removeOperationalData(this.topologyNode);
195         }
196
197         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(trans.commit()), new FutureCallback<RpcResult<TransactionStatus>>() {
198             @Override
199             public void onSuccess(final RpcResult<TransactionStatus> result) {
200                 LOG.trace("Internal state for session {} cleaned up successfully", session);
201             }
202
203             @Override
204             public void onFailure(final Throwable t) {
205                 LOG.error("Failed to cleanup internal state for session {}", session, t);
206             }
207         });
208
209         // Clear all requests which have not been sent to the peer: they result in cancellation
210         for (final Entry<S, PCEPRequest> e : this.sendingRequests.entrySet()) {
211             LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
212             e.getValue().setResult(OperationResults.UNSENT);
213         }
214         this.sendingRequests.clear();
215
216         // CLear all requests which have not been acked by the peer: they result in failure
217         for (final Entry<S, PCEPRequest> e : this.waitingRequests.entrySet()) {
218             LOG.info("Request {} was incomplete when session went down, failing the instruction", e.getKey());
219             e.getValue().setResult(OperationResults.NOACK);
220         }
221         this.waitingRequests.clear();
222     }
223
224     @Override
225     public final synchronized void onSessionDown(final PCEPSession session, final Exception e) {
226         LOG.warn("Session {} went down unexpectedly", session, e);
227         tearDown(session);
228     }
229
230     @Override
231     public final synchronized void onSessionTerminated(final PCEPSession session, final PCEPTerminationReason reason) {
232         LOG.info("Session {} terminated by peer with reason {}", session, reason);
233         tearDown(session);
234     }
235
236     @Override
237     public final synchronized void onMessage(final PCEPSession session, final Message message) {
238         final DataModificationTransaction trans = this.serverSessionManager.beginTransaction();
239
240         this.dirty = false;
241
242         if (onMessage(trans, message)) {
243             LOG.info("Unhandled message {} on session {}", message, session);
244             return;
245         }
246
247         if (this.dirty) {
248             LOG.debug("Internal state changed, forcing sync");
249             this.pccBuilder.setReportedLsp(Lists.newArrayList(this.lspData.values()));
250             this.topologyAugmentBuilder.setPathComputationClient(this.pccBuilder.build());
251             final Node1 ta = this.topologyAugmentBuilder.build();
252
253             trans.removeOperationalData(this.topologyAugment);
254             trans.putOperationalData(this.topologyAugment, ta);
255             LOG.debug("Peer data {} set to {}", this.topologyAugment, ta);
256             this.dirty = false;
257         } else {
258             LOG.debug("State has not changed, skipping sync");
259         }
260
261         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(trans.commit()), new FutureCallback<RpcResult<TransactionStatus>>() {
262             @Override
263             public void onSuccess(final RpcResult<TransactionStatus> result) {
264                 LOG.trace("Internal state for session {} updated successfully", session);
265             }
266
267             @Override
268             public void onFailure(final Throwable t) {
269                 LOG.error("Failed to update internal state for session {}, closing it", session, t);
270                 session.close(TerminationReason.Unknown);
271             }
272         });
273     }
274
275     @Override
276     public void close() {
277         if (this.session != null) {
278             this.session.close(TerminationReason.Unknown);
279         }
280     }
281
282     protected InstanceIdentifierBuilder<PathComputationClient> pccIdentifier() {
283         return this.topologyAugment.builder().child(PathComputationClient.class);
284     }
285
286     protected final synchronized PCEPRequest removeRequest(final S id) {
287         return this.waitingRequests.remove(id);
288     }
289
290     private synchronized void messageSendingComplete(final S requestId, final io.netty.util.concurrent.Future<Void> future) {
291         final PCEPRequest req = this.sendingRequests.remove(requestId);
292
293         if (future.isSuccess()) {
294             this.waitingRequests.put(requestId, req);
295         } else {
296             LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
297             req.setResult(OperationResults.UNSENT);
298         }
299     }
300
301     protected final synchronized ListenableFuture<OperationResult> sendMessage(final Message message, final S requestId,
302             final Metadata metadata) {
303         final io.netty.util.concurrent.Future<Void> f = this.session.sendMessage(message);
304         final PCEPRequest req = new PCEPRequest(metadata);
305
306         this.sendingRequests.put(requestId, req);
307
308         f.addListener(new FutureListener<Void>() {
309             @Override
310             public void operationComplete(final io.netty.util.concurrent.Future<Void> future) {
311                 messageSendingComplete(requestId, future);
312             }
313         });
314
315         return req.getFuture();
316     }
317
318     protected final synchronized void updateLsp(final DataModificationTransaction trans, final L id, final String lspName,
319             final ReportedLspBuilder rlb, final boolean solicited) {
320
321         final String name;
322         if (lspName == null) {
323             name = this.lsps.get(id);
324             if (name == null) {
325                 LOG.error("PLSPID {} seen for the first time, not reporting the LSP", id);
326                 return;
327             }
328         } else {
329             name = lspName;
330         }
331
332         LOG.debug("Saved LSP {} with name {}", id, name);
333         this.lsps.put(id, name);
334
335         Preconditions.checkState(name != null);
336         rlb.setKey(new ReportedLspKey(name));
337         rlb.setName(name);
338
339         // If this is an unsolicited update. We need to make sure we retain the metadata already present
340         if (solicited) {
341             this.nodeState.setLspMetadata(name, rlb.getMetadata());
342         } else {
343             rlb.setMetadata(this.nodeState.getLspMetadata(name));
344         }
345
346         LOG.debug("LSP {} forcing update to MD-SAL", name);
347         this.dirty = true;
348         this.lspData.put(name, rlb.build());
349     }
350
351     protected final synchronized void stateSynchronizationAchieved(final DataModificationTransaction trans) {
352         if (this.synced) {
353             LOG.debug("State synchronization achieved while synchronized, not updating state");
354             return;
355         }
356
357         // Update synchronization flag
358         this.synced = true;
359         this.pccBuilder.setStateSync(PccSyncState.Synchronized).build();
360         this.dirty = true;
361
362         // The node has completed synchronization, cleanup metadata no longer reported back
363         this.nodeState.cleanupExcept(this.lsps.values());
364         LOG.debug("Session {} achieved synchronized state", this.session);
365     }
366
367     protected final InstanceIdentifierBuilder<ReportedLsp> lspIdentifier(final String name) {
368         return pccIdentifier().child(ReportedLsp.class, new ReportedLspKey(name));
369     }
370
371     protected final synchronized void removeLsp(final DataModificationTransaction trans, final L id) {
372         final String name = this.lsps.remove(id);
373         this.dirty = true;
374         LOG.debug("LSP {} removed", name);
375         this.lspData.remove(name);
376     }
377
378     protected abstract void onSessionUp(PCEPSession session, PathComputationClientBuilder pccBuilder);
379
380     protected abstract boolean onMessage(DataModificationTransaction trans, Message message);
381
382     protected String lookupLspName(final L id) {
383         Preconditions.checkNotNull(id, "ID parameter null.");
384         return this.lsps.get(id);
385     }
386
387     protected final <T extends DataObject> T readOperationalData(final InstanceIdentifier<T> id) {
388         return this.serverSessionManager.readOperationalData(id);
389     }
390 }