Fixed small bugs in PCEP. Adjusted configuration.
[bgpcep.git] / pcep / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / AbstractTopologySessionListener.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.pcep.topology.provider;
9
10 import io.netty.util.concurrent.FutureListener;
11
12 import java.net.InetAddress;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.Map.Entry;
16
17 import javax.annotation.concurrent.GuardedBy;
18
19 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
20 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
21 import org.opendaylight.protocol.pcep.PCEPSession;
22 import org.opendaylight.protocol.pcep.PCEPSessionListener;
23 import org.opendaylight.protocol.pcep.PCEPTerminationReason;
24 import org.opendaylight.protocol.pcep.TerminationReason;
25 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddressBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Pcerr;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.PcerrBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.MessageHeader;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.ProtocolVersion;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcerr.message.PcerrMessageBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.Node1;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.Node1Builder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.OperationResult;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.PccSyncState;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.lsp.metadata.Metadata;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.PathComputationClient;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.PathComputationClientBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLsp;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspKey;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
47 import org.opendaylight.yangtools.yang.binding.DataContainer;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import com.google.common.base.Preconditions;
55 import com.google.common.util.concurrent.FutureCallback;
56 import com.google.common.util.concurrent.Futures;
57 import com.google.common.util.concurrent.JdkFutureAdapters;
58 import com.google.common.util.concurrent.ListenableFuture;
59
60 public abstract class AbstractTopologySessionListener<SRPID, PLSPID> implements PCEPSessionListener, TopologySessionListener {
61         protected static final MessageHeader MESSAGE_HEADER = new MessageHeader() {
62                 private final ProtocolVersion version = new ProtocolVersion((short) 1);
63
64                 @Override
65                 public Class<? extends DataContainer> getImplementedInterface() {
66                         return MessageHeader.class;
67                 }
68
69                 @Override
70                 public ProtocolVersion getVersion() {
71                         return this.version;
72                 }
73         };
74         private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologySessionListener.class);
75         private static final Pcerr UNHANDLED_MESSAGE_ERROR = new PcerrBuilder().setPcerrMessage(
76                         new PcerrMessageBuilder().setErrorType(null).build()).build();
77
78         // FIXME: make this private
79         protected final ServerSessionManager serverSessionManager;
80
81         private final Map<SRPID, PCEPRequest> waitingRequests = new HashMap<>();
82         private final Map<SRPID, PCEPRequest> sendingRequests = new HashMap<>();
83         private final Map<PLSPID, String> lsps = new HashMap<>();
84         private InstanceIdentifier<Node> topologyNode;
85         private InstanceIdentifier<Node1> topologyAugment;
86         private PathComputationClientBuilder pccBuilder;
87         private Node1Builder topologyAugmentBuilder;
88         private TopologyNodeState nodeState;
89         private boolean ownsTopology = false;
90         private boolean synced = false;
91         private PCEPSession session;
92
93         protected AbstractTopologySessionListener(final ServerSessionManager serverSessionManager) {
94                 this.serverSessionManager = Preconditions.checkNotNull(serverSessionManager);
95         }
96
97         private static String createNodeId(final InetAddress addr) {
98                 return "pcc://" + addr.getHostAddress();
99         }
100
101         private Node topologyNode(final DataModificationTransaction trans, final InetAddress address) {
102                 final String pccId = createNodeId(address);
103                 final Topology topo = (Topology) trans.readOperationalData(this.serverSessionManager.getTopology());
104
105                 for (final Node n : topo.getNode()) {
106                         LOG.debug("Matching topology node {} to id {}", n, pccId);
107                         if (n.getNodeId().getValue().equals(pccId)) {
108                                 this.topologyNode = InstanceIdentifier.builder(this.serverSessionManager.getTopology()).child(Node.class, n.getKey()).toInstance();
109                                 LOG.debug("Reusing topology node {} for id {} at {}", n, pccId, this.topologyNode);
110                                 return n;
111                         }
112                 }
113
114                 /*
115                  * We failed to find a matching node. Let's create a dynamic one
116                  * and note that we are the owner (so we clean it up afterwards).
117                  */
118                 final NodeId id = new NodeId(pccId);
119                 final NodeKey nk = new NodeKey(id);
120                 final InstanceIdentifier<Node> nti = InstanceIdentifier.builder(this.serverSessionManager.getTopology()).child(Node.class, nk).toInstance();
121
122                 final Node ret = new NodeBuilder().setKey(nk).setNodeId(id).build();
123
124                 trans.putOperationalData(nti, ret);
125                 LOG.debug("Created topology node {} for id {} at {}", ret, pccId, nti);
126                 this.ownsTopology = true;
127                 this.topologyNode = nti;
128                 return ret;
129         }
130
131         @Override
132         public final synchronized void onSessionUp(final PCEPSession session) {
133                 /*
134                  * The session went up. Look up the router in Inventory model,
135                  * create it if it is not there (marking that fact for later
136                  * deletion), and mark it as synchronizing. Also create it in
137                  * the topology model, with empty LSP list.
138                  */
139                 final InetAddress peerAddress = session.getRemoteAddress();
140                 final DataModificationTransaction trans = this.serverSessionManager.beginTransaction();
141
142                 final Node topoNode = topologyNode(trans, peerAddress);
143                 LOG.debug("Peer {} resolved to topology node {}", peerAddress, topoNode);
144
145                 // Our augmentation in the topology node
146                 this.synced = false;
147                 this.pccBuilder = new PathComputationClientBuilder();
148                 this.pccBuilder.setIpAddress(IpAddressBuilder.getDefaultInstance(peerAddress.getHostAddress()));
149
150                 onSessionUp(session, this.pccBuilder);
151
152                 this.topologyAugmentBuilder = new Node1Builder().setPathComputationClient(this.pccBuilder.build());
153                 this.topologyAugment = InstanceIdentifier.builder(this.topologyNode).augmentation(Node1.class).toInstance();
154                 final Node1 ta = this.topologyAugmentBuilder.build();
155
156                 trans.putOperationalData(this.topologyAugment, ta);
157                 LOG.debug("Peer data {} set to {}", this.topologyAugment, ta);
158
159                 // All set, commit the modifications
160                 final ListenableFuture<RpcResult<TransactionStatus>> f = JdkFutureAdapters.listenInPoolThread(trans.commit());
161                 Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
162                         @Override
163                         public void onSuccess(final RpcResult<TransactionStatus> result) {
164                                 LOG.trace("Internal state for session {} updated successfully", session);
165                         }
166
167                         @Override
168                         public void onFailure(final Throwable t) {
169                                 LOG.error("Failed to update internal state for session {}, terminating it", session, t);
170                                 session.close(TerminationReason.Unknown);
171                         }
172                 });
173
174                 this.nodeState = this.serverSessionManager.takeNodeState(topoNode.getNodeId(), this);
175                 this.session = session;
176                 LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), topoNode.getNodeId());
177         }
178
179         @GuardedBy("this")
180         private void tearDown(final PCEPSession session) {
181                 this.serverSessionManager.releaseNodeState(this.nodeState);
182                 this.nodeState = null;
183                 this.session = null;
184
185                 final DataModificationTransaction trans = this.serverSessionManager.beginTransaction();
186
187                 // The session went down. Undo all the Topology changes we have done.
188                 trans.removeOperationalData(this.topologyAugment);
189                 if (this.ownsTopology) {
190                         trans.removeOperationalData(this.topologyNode);
191                 }
192
193                 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(trans.commit()), new FutureCallback<RpcResult<TransactionStatus>>() {
194                         @Override
195                         public void onSuccess(final RpcResult<TransactionStatus> result) {
196                                 LOG.trace("Internal state for session {} cleaned up successfully", session);
197                         }
198
199                         @Override
200                         public void onFailure(final Throwable t) {
201                                 LOG.error("Failed to cleanup internal state for session {}", session, t);
202                         }
203                 });
204
205                 // Clear all requests which have not been sent to the peer: they result in cancellation
206                 for (final Entry<SRPID, PCEPRequest> e : this.sendingRequests.entrySet()) {
207                         LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
208                         e.getValue().setResult(OperationResults.UNSENT);
209                 }
210                 this.sendingRequests.clear();
211
212                 // CLear all requests which have not been acked by the peer: they result in failure
213                 for (final Entry<SRPID, PCEPRequest> e : this.waitingRequests.entrySet()) {
214                         LOG.info("Request {} was incomplete when session went down, failing the instruction", e.getKey());
215                         e.getValue().setResult(OperationResults.NOACK);
216                 }
217                 this.waitingRequests.clear();
218         }
219
220         @Override
221         public final synchronized void onSessionDown(final PCEPSession session, final Exception e) {
222                 LOG.warn("Session {} went down unexpectedly", e);
223                 tearDown(session);
224         }
225
226         @Override
227         public final synchronized void onSessionTerminated(final PCEPSession session, final PCEPTerminationReason reason) {
228                 LOG.info("Session {} terminated by peer with reason {}", session, reason);
229                 tearDown(session);
230         }
231
232         @Override
233         public final synchronized void onMessage(final PCEPSession session, final Message message) {
234                 final DataModificationTransaction trans = this.serverSessionManager.beginTransaction();
235
236                 if (onMessage(trans, message)) {
237                         LOG.info("Unhandled message {} on session {}", message, session);
238                         session.sendMessage(UNHANDLED_MESSAGE_ERROR);
239                         return;
240                 }
241
242                 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(trans.commit()), new FutureCallback<RpcResult<TransactionStatus>>() {
243                         @Override
244                         public void onSuccess(final RpcResult<TransactionStatus> result) {
245                                 LOG.trace("Internal state for session {} updated successfully", session);
246                         }
247
248                         @Override
249                         public void onFailure(final Throwable t) {
250                                 LOG.error("Failed to update internal state for session {}, closing it", session, t);
251                                 session.close(TerminationReason.Unknown);
252                         }
253                 });
254         }
255
256         @Override
257         public void close() {
258                 if (this.session != null) {
259                         this.session.close(TerminationReason.Unknown);
260                 }
261         }
262
263         protected InstanceIdentifierBuilder<PathComputationClient> pccIdentifier() {
264                 return InstanceIdentifier.builder(this.topologyAugment).child(PathComputationClient.class);
265         }
266
267         protected final synchronized PCEPRequest removeRequest(final SRPID id) {
268                 return this.waitingRequests.remove(id);
269         }
270
271         private synchronized void messageSendingComplete(final SRPID requestId, final io.netty.util.concurrent.Future<Void> future) {
272                 final PCEPRequest req = this.sendingRequests.remove(requestId);
273
274                 if (future.isSuccess()) {
275                         this.waitingRequests.put(requestId, req);
276                 } else {
277                         LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
278                         req.setResult(OperationResults.UNSENT);
279                 }
280         }
281
282         protected final synchronized ListenableFuture<OperationResult> sendMessage(final Message message, final SRPID requestId,
283                         final Metadata metadata) {
284                 final io.netty.util.concurrent.Future<Void> f = this.session.sendMessage(message);
285                 final PCEPRequest req = new PCEPRequest(metadata);
286
287                 this.sendingRequests.put(requestId, req);
288
289                 f.addListener(new FutureListener<Void>() {
290                         @Override
291                         public void operationComplete(final io.netty.util.concurrent.Future<Void> future) {
292                                 messageSendingComplete(requestId, future);
293                         }
294                 });
295
296                 return req.getFuture();
297         }
298
299         protected final synchronized void updateLsp(final DataModificationTransaction trans, final PLSPID id, String name,
300                         final ReportedLspBuilder rlb, final boolean solicited) {
301                 if (name == null) {
302                         name = this.lsps.get(id);
303                         if (name == null) {
304                                 LOG.error("PLSPID {} seen for the first time, not reporting the LSP", id);
305                                 return;
306                         }
307                 }
308                 this.lsps.put(id, name);
309
310                 Preconditions.checkState(name != null);
311                 rlb.setKey(new ReportedLspKey(name));
312
313                 // If this is an unsolicited update. We need to make sure we retain the metadata already present
314                 if (solicited) {
315                         this.nodeState.setLspMetadata(name, rlb.getMetadata());
316                 } else {
317                         rlb.setMetadata(this.nodeState.getLspMetadata(name));
318                 }
319
320                 trans.putOperationalData(lspIdentifier(name).build(), rlb.build());
321         }
322
323         protected final synchronized void stateSynchronizationAchieved(final DataModificationTransaction trans) {
324                 if (this.synced) {
325                         LOG.debug("State synchronization achieved while synchronized, not updating state");
326                         return;
327                 }
328
329                 // Update synchronization flag
330                 this.synced = true;
331                 this.topologyAugmentBuilder.setPathComputationClient(this.pccBuilder.setStateSync(PccSyncState.Synchronized).build());
332                 final Node1 ta = this.topologyAugmentBuilder.build();
333                 trans.putOperationalData(this.topologyAugment, ta);
334                 LOG.debug("Peer data {} set to {}", this.topologyAugment, ta);
335
336                 // The node has completed synchronization, cleanup metadata no longer reported back
337                 this.nodeState.cleanupExcept(this.lsps.values());
338                 LOG.debug("Session {} achieved synchronized state", this.session);
339         }
340
341         protected final InstanceIdentifierBuilder<ReportedLsp> lspIdentifier(final String name) {
342                 return pccIdentifier().child(ReportedLsp.class, new ReportedLspKey(name));
343         }
344
345         protected final synchronized void removeLsp(final DataModificationTransaction trans, final PLSPID id) {
346                 final String name = this.lsps.remove(id);
347                 if (name != null) {
348                         trans.removeOperationalData(lspIdentifier(name).build());
349                 }
350
351                 LOG.debug("LSP {} removed", name);
352         }
353
354         abstract protected void onSessionUp(PCEPSession session, PathComputationClientBuilder pccBuilder);
355
356         abstract protected boolean onMessage(DataModificationTransaction trans, Message message);
357 }