BUG-592: use new InstanceIdentifier APIs
[bgpcep.git] / pcep / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / AbstractTopologySessionListener.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.pcep.topology.provider;
9
10 import io.netty.util.concurrent.FutureListener;
11
12 import java.net.InetAddress;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.Map.Entry;
16
17 import javax.annotation.concurrent.GuardedBy;
18
19 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
20 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
21 import org.opendaylight.protocol.pcep.PCEPSession;
22 import org.opendaylight.protocol.pcep.PCEPSessionListener;
23 import org.opendaylight.protocol.pcep.PCEPTerminationReason;
24 import org.opendaylight.protocol.pcep.TerminationReason;
25 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddressBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.MessageHeader;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.ProtocolVersion;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.Node1;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.Node1Builder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.OperationResult;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.PccSyncState;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.lsp.metadata.Metadata;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.PathComputationClient;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.PathComputationClientBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLsp;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspKey;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
44 import org.opendaylight.yangtools.yang.binding.DataContainer;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import com.google.common.base.Preconditions;
52 import com.google.common.collect.Lists;
53 import com.google.common.util.concurrent.FutureCallback;
54 import com.google.common.util.concurrent.Futures;
55 import com.google.common.util.concurrent.JdkFutureAdapters;
56 import com.google.common.util.concurrent.ListenableFuture;
57
58 public abstract class AbstractTopologySessionListener<SRPID, PLSPID> implements PCEPSessionListener, TopologySessionListener {
59         protected static final MessageHeader MESSAGE_HEADER = new MessageHeader() {
60                 private final ProtocolVersion version = new ProtocolVersion((short) 1);
61
62                 @Override
63                 public Class<? extends DataContainer> getImplementedInterface() {
64                         return MessageHeader.class;
65                 }
66
67                 @Override
68                 public ProtocolVersion getVersion() {
69                         return this.version;
70                 }
71         };
72         private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologySessionListener.class);
73
74         // FIXME: make this private
75         protected final ServerSessionManager serverSessionManager;
76
77         private final Map<SRPID, PCEPRequest> waitingRequests = new HashMap<>();
78         private final Map<SRPID, PCEPRequest> sendingRequests = new HashMap<>();
79         private final Map<String, ReportedLsp> lspData = new HashMap<>();
80         private final Map<PLSPID, String> lsps = new HashMap<>();
81         private InstanceIdentifier<Node> topologyNode;
82         private InstanceIdentifier<Node1> topologyAugment;
83         private PathComputationClientBuilder pccBuilder;
84         private Node1Builder topologyAugmentBuilder;
85         private TopologyNodeState nodeState;
86         private boolean ownsTopology = false;
87         private boolean synced = false, dirty;
88         private PCEPSession session;
89
90         protected AbstractTopologySessionListener(final ServerSessionManager serverSessionManager) {
91                 this.serverSessionManager = Preconditions.checkNotNull(serverSessionManager);
92         }
93
94         private static String createNodeId(final InetAddress addr) {
95                 return "pcc://" + addr.getHostAddress();
96         }
97
98         private Node topologyNode(final DataModificationTransaction trans, final InetAddress address) {
99                 final String pccId = createNodeId(address);
100                 final Topology topo = (Topology) trans.readOperationalData(this.serverSessionManager.getTopology());
101
102                 for (final Node n : topo.getNode()) {
103                         LOG.debug("Matching topology node {} to id {}", n, pccId);
104                         if (n.getNodeId().getValue().equals(pccId)) {
105                                 this.topologyNode = this.serverSessionManager.getTopology().child(Node.class, n.getKey());
106                                 LOG.debug("Reusing topology node {} for id {} at {}", n, pccId, this.topologyNode);
107                                 return n;
108                         }
109                 }
110
111                 /*
112                  * We failed to find a matching node. Let's create a dynamic one
113                  * and note that we are the owner (so we clean it up afterwards).
114                  */
115                 final NodeId id = new NodeId(pccId);
116                 final NodeKey nk = new NodeKey(id);
117                 final InstanceIdentifier<Node> nti = this.serverSessionManager.getTopology().child(Node.class, nk);
118
119                 final Node ret = new NodeBuilder().setKey(nk).setNodeId(id).build();
120
121                 trans.putOperationalData(nti, ret);
122                 LOG.debug("Created topology node {} for id {} at {}", ret, pccId, nti);
123                 this.ownsTopology = true;
124                 this.topologyNode = nti;
125                 return ret;
126         }
127
128         @Override
129         public final synchronized void onSessionUp(final PCEPSession session) {
130                 /*
131                  * The session went up. Look up the router in Inventory model,
132                  * create it if it is not there (marking that fact for later
133                  * deletion), and mark it as synchronizing. Also create it in
134                  * the topology model, with empty LSP list.
135                  */
136                 final InetAddress peerAddress = session.getRemoteAddress();
137                 final DataModificationTransaction trans = this.serverSessionManager.beginTransaction();
138
139                 final Node topoNode = topologyNode(trans, peerAddress);
140                 LOG.debug("Peer {} resolved to topology node {}", peerAddress, topoNode);
141
142                 // Our augmentation in the topology node
143                 this.synced = false;
144                 this.pccBuilder = new PathComputationClientBuilder();
145                 this.pccBuilder.setIpAddress(IpAddressBuilder.getDefaultInstance(peerAddress.getHostAddress()));
146
147                 onSessionUp(session, this.pccBuilder);
148
149                 this.topologyAugmentBuilder = new Node1Builder().setPathComputationClient(this.pccBuilder.build());
150                 this.topologyAugment = this.topologyNode.augmentation(Node1.class);
151                 final Node1 ta = this.topologyAugmentBuilder.build();
152
153                 trans.putOperationalData(this.topologyAugment, ta);
154                 LOG.debug("Peer data {} set to {}", this.topologyAugment, ta);
155
156                 // All set, commit the modifications
157                 final ListenableFuture<RpcResult<TransactionStatus>> f = JdkFutureAdapters.listenInPoolThread(trans.commit());
158                 Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
159                         @Override
160                         public void onSuccess(final RpcResult<TransactionStatus> result) {
161                                 LOG.trace("Internal state for session {} updated successfully", session);
162                         }
163
164                         @Override
165                         public void onFailure(final Throwable t) {
166                                 LOG.error("Failed to update internal state for session {}, terminating it", session, t);
167                                 session.close(TerminationReason.Unknown);
168                         }
169                 });
170
171                 this.nodeState = this.serverSessionManager.takeNodeState(topoNode.getNodeId(), this);
172                 this.session = session;
173                 LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), topoNode.getNodeId());
174         }
175
176         @GuardedBy("this")
177         private void tearDown(final PCEPSession session) {
178                 this.serverSessionManager.releaseNodeState(this.nodeState);
179                 this.nodeState = null;
180                 this.session = null;
181
182                 final DataModificationTransaction trans = this.serverSessionManager.beginTransaction();
183
184                 // The session went down. Undo all the Topology changes we have done.
185                 trans.removeOperationalData(this.topologyAugment);
186                 if (this.ownsTopology) {
187                         trans.removeOperationalData(this.topologyNode);
188                 }
189
190                 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(trans.commit()), new FutureCallback<RpcResult<TransactionStatus>>() {
191                         @Override
192                         public void onSuccess(final RpcResult<TransactionStatus> result) {
193                                 LOG.trace("Internal state for session {} cleaned up successfully", session);
194                         }
195
196                         @Override
197                         public void onFailure(final Throwable t) {
198                                 LOG.error("Failed to cleanup internal state for session {}", session, t);
199                         }
200                 });
201
202                 // Clear all requests which have not been sent to the peer: they result in cancellation
203                 for (final Entry<SRPID, PCEPRequest> e : this.sendingRequests.entrySet()) {
204                         LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
205                         e.getValue().setResult(OperationResults.UNSENT);
206                 }
207                 this.sendingRequests.clear();
208
209                 // CLear all requests which have not been acked by the peer: they result in failure
210                 for (final Entry<SRPID, PCEPRequest> e : this.waitingRequests.entrySet()) {
211                         LOG.info("Request {} was incomplete when session went down, failing the instruction", e.getKey());
212                         e.getValue().setResult(OperationResults.NOACK);
213                 }
214                 this.waitingRequests.clear();
215         }
216
217         @Override
218         public final synchronized void onSessionDown(final PCEPSession session, final Exception e) {
219                 LOG.warn("Session {} went down unexpectedly", e);
220                 tearDown(session);
221         }
222
223         @Override
224         public final synchronized void onSessionTerminated(final PCEPSession session, final PCEPTerminationReason reason) {
225                 LOG.info("Session {} terminated by peer with reason {}", session, reason);
226                 tearDown(session);
227         }
228
229         @Override
230         public final synchronized void onMessage(final PCEPSession session, final Message message) {
231                 final DataModificationTransaction trans = this.serverSessionManager.beginTransaction();
232
233                 dirty = false;
234
235                 if (onMessage(trans, message)) {
236                         LOG.info("Unhandled message {} on session {}", message, session);
237                         return;
238                 }
239
240                 if (dirty) {
241                         LOG.debug("Internal state changed, forcing sync");
242                         this.pccBuilder.setReportedLsp(Lists.newArrayList(lspData.values()));
243                         this.topologyAugmentBuilder.setPathComputationClient(this.pccBuilder.build());
244                         final Node1 ta = this.topologyAugmentBuilder.build();
245
246                         //trans.removeOperationalData(this.topologyAugment);
247                         trans.putOperationalData(this.topologyAugment, ta);
248                         LOG.debug("Peer data {} set to {}", this.topologyAugment, ta);
249                         dirty = false;
250                 } else {
251                         LOG.debug("State has not changed, skipping sync");
252                 }
253
254                 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(trans.commit()), new FutureCallback<RpcResult<TransactionStatus>>() {
255                         @Override
256                         public void onSuccess(final RpcResult<TransactionStatus> result) {
257                                 LOG.trace("Internal state for session {} updated successfully", session);
258                         }
259
260                         @Override
261                         public void onFailure(final Throwable t) {
262                                 LOG.error("Failed to update internal state for session {}, closing it", session, t);
263                                 session.close(TerminationReason.Unknown);
264                         }
265                 });
266         }
267
268         @Override
269         public void close() {
270                 if (this.session != null) {
271                         this.session.close(TerminationReason.Unknown);
272                 }
273         }
274
275         protected InstanceIdentifierBuilder<PathComputationClient> pccIdentifier() {
276                 return this.topologyAugment.builder().child(PathComputationClient.class);
277         }
278
279         protected final synchronized PCEPRequest removeRequest(final SRPID id) {
280                 return this.waitingRequests.remove(id);
281         }
282
283         private synchronized void messageSendingComplete(final SRPID requestId, final io.netty.util.concurrent.Future<Void> future) {
284                 final PCEPRequest req = this.sendingRequests.remove(requestId);
285
286                 if (future.isSuccess()) {
287                         this.waitingRequests.put(requestId, req);
288                 } else {
289                         LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
290                         req.setResult(OperationResults.UNSENT);
291                 }
292         }
293
294         protected final synchronized ListenableFuture<OperationResult> sendMessage(final Message message, final SRPID requestId,
295                         final Metadata metadata) {
296                 final io.netty.util.concurrent.Future<Void> f = this.session.sendMessage(message);
297                 final PCEPRequest req = new PCEPRequest(metadata);
298
299                 this.sendingRequests.put(requestId, req);
300
301                 f.addListener(new FutureListener<Void>() {
302                         @Override
303                         public void operationComplete(final io.netty.util.concurrent.Future<Void> future) {
304                                 messageSendingComplete(requestId, future);
305                         }
306                 });
307
308                 return req.getFuture();
309         }
310
311         protected final synchronized void updateLsp(final DataModificationTransaction trans, final PLSPID id, String name,
312                         final ReportedLspBuilder rlb, final boolean solicited) {
313                 if (name == null) {
314                         name = this.lsps.get(id);
315                         if (name == null) {
316                                 LOG.error("PLSPID {} seen for the first time, not reporting the LSP", id);
317                                 return;
318                         }
319                 }
320                 LOG.debug("Saved LSP {} with name {}", id, name);
321                 this.lsps.put(id, name);
322
323                 Preconditions.checkState(name != null);
324                 rlb.setKey(new ReportedLspKey(name));
325                 rlb.setName(name);
326
327                 // If this is an unsolicited update. We need to make sure we retain the metadata already present
328                 if (solicited) {
329                         this.nodeState.setLspMetadata(name, rlb.getMetadata());
330                 } else {
331                         rlb.setMetadata(this.nodeState.getLspMetadata(name));
332                 }
333
334                 LOG.debug("LSP {} forcing update to MD-SAL", name);
335                 dirty = true;
336                 lspData .put(name, rlb.build());
337         }
338
339         protected final synchronized void stateSynchronizationAchieved(final DataModificationTransaction trans) {
340                 if (this.synced) {
341                         LOG.debug("State synchronization achieved while synchronized, not updating state");
342                         return;
343                 }
344
345                 // Update synchronization flag
346                 this.synced = true;
347                 this.pccBuilder.setStateSync(PccSyncState.Synchronized).build();
348                 this.dirty = true;
349
350                 // The node has completed synchronization, cleanup metadata no longer reported back
351                 this.nodeState.cleanupExcept(this.lsps.values());
352                 LOG.debug("Session {} achieved synchronized state", this.session);
353         }
354
355         protected final InstanceIdentifierBuilder<ReportedLsp> lspIdentifier(final String name) {
356                 return pccIdentifier().child(ReportedLsp.class, new ReportedLspKey(name));
357         }
358
359         protected final synchronized void removeLsp(final DataModificationTransaction trans, final PLSPID id) {
360                 final String name = this.lsps.remove(id);
361                 dirty = true;
362                 LOG.debug("LSP {} removed", name);
363                 lspData.remove(name);
364         }
365
366         abstract protected void onSessionUp(PCEPSession session, PathComputationClientBuilder pccBuilder);
367
368         abstract protected boolean onMessage(DataModificationTransaction trans, Message message);
369
370         protected String lookupLspName(final PLSPID id) {
371                 Preconditions.checkNotNull(id, "ID parameter null.");
372                 return this.lsps.get(id);
373         }
374 }