import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-
import io.netty.util.concurrent.FutureListener;
-
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeMXBean;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeRegistration;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.PeerCapabilities;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.ReplyTime;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.SessionState;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.StatefulMessages;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
import org.opendaylight.protocol.pcep.PCEPTerminationReason;
import org.opendaylight.protocol.pcep.TerminationReason;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddressBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddressBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.MessageHeader;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Object;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.ProtocolVersion;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rsvp.rev130820.LspId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.LspId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.Node1;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.Node1Builder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.OperationResult;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.reported.lsp.Path;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* @param <S> identifier type of requests
* @param <L> identifier type for LSPs
*/
-public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessionListener, TopologySessionListener {
+public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessionListener, TopologySessionListener, ListenerStateRuntimeMXBean {
+ protected static final class MessageContext {
+ private final Collection<PCEPRequest> requests = new ArrayList<>();
+ private final WriteTransaction trans;
+
+ private MessageContext(final WriteTransaction trans) {
+ this.trans = Preconditions.checkNotNull(trans);
+ }
+
+ void resolveRequest(final PCEPRequest req) {
+ this.requests.add(req);
+ }
+
+ private void notifyRequests() {
+ for (final PCEPRequest r : this.requests) {
+ r.done(OperationResults.SUCCESS);
+ }
+ }
+ }
+
protected static final MessageHeader MESSAGE_HEADER = new MessageHeader() {
private final ProtocolVersion version = new ProtocolVersion((short) 1);
return this.version;
}
};
+
private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologySessionListener.class);
- private final Map<S, PCEPRequest> waitingRequests = new HashMap<>();
- private final Map<S, PCEPRequest> sendingRequests = new HashMap<>();
+ protected static final String MISSING_XML_TAG = "Mandatory XML tags are missing.";
+
+ @GuardedBy("this")
+ private final Map<S, PCEPRequest> requests = new HashMap<>();
+
+ @GuardedBy("this")
private final Map<String, ReportedLsp> lspData = new HashMap<>();
- private final Map<L, String> lsps = new HashMap<>();
+
+ @GuardedBy("this")
+ protected final Map<L, String> lsps = new HashMap<>();
+
private final ServerSessionManager serverSessionManager;
- private InstanceIdentifier<Node> topologyNode;
- private InstanceIdentifier<Node1> topologyAugment;
- private PathComputationClientBuilder pccBuilder;
- private Node1Builder topologyAugmentBuilder;
+ private InstanceIdentifier<PathComputationClient> pccIdentifier;
private TopologyNodeState nodeState;
- private boolean ownsTopology = false;
- private boolean synced = false, dirty;
+ private boolean synced = false;
private PCEPSession session;
+ private SyncOptimization syncOptimization;
+ private boolean triggeredResyncInProcess;
+
+ private ListenerStateRuntimeRegistration registration;
+ private final SessionListenerState listenerState;
protected AbstractTopologySessionListener(final ServerSessionManager serverSessionManager) {
this.serverSessionManager = Preconditions.checkNotNull(serverSessionManager);
- }
-
- private static String createNodeId(final InetAddress addr) {
- return "pcc://" + addr.getHostAddress();
- }
-
- private Node topologyNode(final ReadWriteTransaction trans, final InetAddress address) {
- final String pccId = createNodeId(address);
-
- // FIXME: Futures.transform...
- try {
- Optional<Topology> topoMaybe = trans.read(LogicalDatastoreType.OPERATIONAL, this.serverSessionManager.getTopology()).get();
- Preconditions.checkState(topoMaybe.isPresent(), "Failed to find topology.");
- final Topology topo = topoMaybe.get();
- for (final Node n : topo.getNode()) {
- LOG.debug("Matching topology node {} to id {}", n, pccId);
- if (n.getNodeId().getValue().equals(pccId)) {
- this.topologyNode = this.serverSessionManager.getTopology().child(Node.class, n.getKey());
- LOG.debug("Reusing topology node {} for id {} at {}", n, pccId, this.topologyNode);
- return n;
- }
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new IllegalStateException("Failed to ensure topology presence.", e);
- }
-
- /*
- * We failed to find a matching node. Let's create a dynamic one
- * and note that we are the owner (so we clean it up afterwards).
- */
- final NodeId id = new NodeId(pccId);
- final NodeKey nk = new NodeKey(id);
- final InstanceIdentifier<Node> nti = this.serverSessionManager.getTopology().child(Node.class, nk);
-
- final Node ret = new NodeBuilder().setKey(nk).setNodeId(id).build();
-
- trans.put(LogicalDatastoreType.OPERATIONAL, nti, ret);
- LOG.debug("Created topology node {} for id {} at {}", ret, pccId, nti);
- this.ownsTopology = true;
- this.topologyNode = nti;
- return ret;
+ this.listenerState = new SessionListenerState();
}
@Override
* the topology model, with empty LSP list.
*/
final InetAddress peerAddress = session.getRemoteAddress();
- final ReadWriteTransaction trans = this.serverSessionManager.rwTransaction();
- final Node topoNode = topologyNode(trans, peerAddress);
- LOG.trace("Peer {} resolved to topology node {}", peerAddress, topoNode);
+ syncOptimization = new SyncOptimization(session);
- // Our augmentation in the topology node
- this.synced = false;
- this.pccBuilder = new PathComputationClientBuilder();
- this.pccBuilder.setIpAddress(IpAddressBuilder.getDefaultInstance(peerAddress.getHostAddress()));
+ final TopologyNodeState state = this.serverSessionManager.takeNodeState(peerAddress, this, isLspDbRetreived());
+
+ this.session = session;
+ this.nodeState = state;
- onSessionUp(session, this.pccBuilder);
+ LOG.trace("Peer {} resolved to topology node {}", peerAddress, state.getNodeId());
+
+ // Our augmentation in the topology node
+ final PathComputationClientBuilder pccBuilder = new PathComputationClientBuilder();
+
+ onSessionUp(session, pccBuilder);
+ this.synced = isSynchronized();
+
+ pccBuilder.setIpAddress(IpAddressBuilder.getDefaultInstance(peerAddress.getHostAddress()));
+ final InstanceIdentifier<Node1> topologyAugment = state.getNodeId().augmentation(Node1.class);
+ this.pccIdentifier = topologyAugment.child(PathComputationClient.class);
+ final Node initialNodeState = state.getInitialNodeState();
+ final boolean isNodePresent = isLspDbRetreived() && initialNodeState != null;
+ if (isNodePresent) {
+ loadLspData(initialNodeState, lspData, lsps, isIncrementalSynchro());
+ pccBuilder.setReportedLsp(initialNodeState.getAugmentation(Node1.class).getPathComputationClient().getReportedLsp());
+ }
+ writeNode(pccBuilder, state, topologyAugment);
+ this.listenerState.init(session);
+ if (this.serverSessionManager.getRuntimeRootRegistration().isPresent()) {
+ this.registration = this.serverSessionManager.getRuntimeRootRegistration().get().register(this);
+ }
+ LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), state.getNodeId());
+ }
- this.topologyAugmentBuilder = new Node1Builder().setPathComputationClient(this.pccBuilder.build());
- this.topologyAugment = this.topologyNode.augmentation(Node1.class);
- final Node1 ta = this.topologyAugmentBuilder.build();
+ private void writeNode(final PathComputationClientBuilder pccBuilder, final TopologyNodeState state,
+ final InstanceIdentifier<Node1> topologyAugment) {
+ final Node1 ta = new Node1Builder().setPathComputationClient(pccBuilder.build()).build();
- trans.put(LogicalDatastoreType.OPERATIONAL, this.topologyAugment, ta);
- LOG.trace("Peer data {} set to {}", this.topologyAugment, ta);
+ final ReadWriteTransaction trans = state.rwTransaction();
+ trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
+ LOG.trace("Peer data {} set to {}", topologyAugment, ta);
// All set, commit the modifications
- Futures.addCallback(trans.commit(), new FutureCallback<RpcResult<TransactionStatus>>() {
+ Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
@Override
- public void onSuccess(final RpcResult<TransactionStatus> result) {
+ public void onSuccess(final Void result) {
LOG.trace("Internal state for session {} updated successfully", session);
}
@Override
public void onFailure(final Throwable t) {
LOG.error("Failed to update internal state for session {}, terminating it", session, t);
- session.close(TerminationReason.Unknown);
+ session.close(TerminationReason.UNKNOWN);
}
});
-
- this.nodeState = this.serverSessionManager.takeNodeState(topoNode.getNodeId(), this);
- this.session = session;
- LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), topoNode.getNodeId());
}
- @GuardedBy("this")
- private void tearDown(final PCEPSession session) {
- this.serverSessionManager.releaseNodeState(this.nodeState);
- this.nodeState = null;
- this.session = null;
-
- // The session went down. Undo all the Topology changes we have done.
- final WriteTransaction trans = this.serverSessionManager.beginTransaction();
- trans.delete(LogicalDatastoreType.OPERATIONAL, this.topologyAugment);
- if (this.ownsTopology) {
- trans.delete(LogicalDatastoreType.OPERATIONAL, this.topologyNode);
+ protected void updatePccState(final PccSyncState pccSyncState) {
+ final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
+ updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(pccSyncState).build());
+ if (pccSyncState != PccSyncState.Synchronized) {
+ this.synced = false;
+ this.triggeredResyncInProcess = true;
}
-
- Futures.addCallback(trans.commit(), new FutureCallback<RpcResult<TransactionStatus>>() {
+ // All set, commit the modifications
+ Futures.addCallback(ctx.trans.submit(), new FutureCallback<Void>() {
@Override
- public void onSuccess(final RpcResult<TransactionStatus> result) {
- LOG.trace("Internal state for session {} cleaned up successfully", session);
+ public void onSuccess(final Void result) {
+ LOG.trace("Internal state for session {} updated successfully", session);
}
@Override
public void onFailure(final Throwable t) {
- LOG.error("Failed to cleanup internal state for session {}", session, t);
+ LOG.error("Failed to update internal state for session {}", session, t);
+ session.close(TerminationReason.UNKNOWN);
}
});
+ }
- // Clear all requests which have not been sent to the peer: they result in cancellation
- for (final Entry<S, PCEPRequest> e : this.sendingRequests.entrySet()) {
- LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
- e.getValue().setResult(OperationResults.UNSENT);
- }
- this.sendingRequests.clear();
+ protected boolean isTriggeredSyncInProcess() {
+ return this.triggeredResyncInProcess;
+ }
- // CLear all requests which have not been acked by the peer: they result in failure
- for (final Entry<S, PCEPRequest> e : this.waitingRequests.entrySet()) {
- LOG.info("Request {} was incomplete when session went down, failing the instruction", e.getKey());
- e.getValue().setResult(OperationResults.NOACK);
+ @GuardedBy("this")
+ private void tearDown(final PCEPSession session) {
+ this.serverSessionManager.releaseNodeState(this.nodeState, session, isLspDbPersisted());
+ this.nodeState = null;
+ this.session = null;
+ this.syncOptimization = null;
+ unregister();
+
+ // Clear all requests we know about
+ for (final Entry<S, PCEPRequest> e : this.requests.entrySet()) {
+ final PCEPRequest r = e.getValue();
+ switch (r.getState()) {
+ case DONE:
+ // Done is done, nothing to do
+ break;
+ case UNACKED:
+ // Peer has not acked: results in failure
+ LOG.info("Request {} was incomplete when session went down, failing the instruction", e.getKey());
+ r.done(OperationResults.NOACK);
+ break;
+ case UNSENT:
+ // Peer has not been sent to the peer: results in cancellation
+ LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
+ r.done(OperationResults.UNSENT);
+ break;
+ default:
+ break;
+ }
}
- this.waitingRequests.clear();
+ this.requests.clear();
}
@Override
@Override
public final synchronized void onMessage(final PCEPSession session, final Message message) {
- final WriteTransaction trans = this.serverSessionManager.beginTransaction();
-
- this.dirty = false;
+ final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
- if (onMessage(trans, message)) {
+ if (onMessage(ctx, message)) {
LOG.info("Unhandled message {} on session {}", message, session);
+ //cancel not supported, submit empty transaction
+ ctx.trans.submit();
return;
}
- if (this.dirty) {
- LOG.debug("Internal state changed, forcing sync");
- this.pccBuilder.setReportedLsp(Lists.newArrayList(this.lspData.values()));
- this.topologyAugmentBuilder.setPathComputationClient(this.pccBuilder.build());
- final Node1 ta = this.topologyAugmentBuilder.build();
-
- trans.put(LogicalDatastoreType.OPERATIONAL, this.topologyAugment, ta);
- LOG.trace("Peer data {} set to {}", this.topologyAugment, ta);
- this.dirty = false;
- } else {
- LOG.debug("State has not changed, skipping sync");
- }
-
- Futures.addCallback(trans.commit(), new FutureCallback<RpcResult<TransactionStatus>>() {
+ Futures.addCallback(ctx.trans.submit(), new FutureCallback<Void>() {
@Override
- public void onSuccess(final RpcResult<TransactionStatus> result) {
+ public void onSuccess(final Void result) {
LOG.trace("Internal state for session {} updated successfully", session);
+ ctx.notifyRequests();
}
@Override
public void onFailure(final Throwable t) {
LOG.error("Failed to update internal state for session {}, closing it", session, t);
- session.close(TerminationReason.Unknown);
+ ctx.notifyRequests();
+ session.close(TerminationReason.UNKNOWN);
}
});
}
@Override
public void close() {
+ unregister();
if (this.session != null) {
- this.session.close(TerminationReason.Unknown);
+ this.session.close(TerminationReason.UNKNOWN);
}
}
- protected InstanceIdentifierBuilder<PathComputationClient> pccIdentifier() {
- return this.topologyAugment.builder().child(PathComputationClient.class);
+ private synchronized void unregister() {
+ if (this.registration != null) {
+ this.registration.close();
+ this.registration = null;
+ }
}
protected final synchronized PCEPRequest removeRequest(final S id) {
- final PCEPRequest ret = this.waitingRequests.remove(id);
+ final PCEPRequest ret = this.requests.remove(id);
+ if (ret != null) {
+ this.listenerState.processRequestStats(ret.getElapsedMillis());
+ }
LOG.trace("Removed request {} object {}", id, ret);
return ret;
}
- private synchronized void messageSendingComplete(final S requestId, final io.netty.util.concurrent.Future<Void> future) {
- final PCEPRequest req = this.sendingRequests.remove(requestId);
-
- if (future.isSuccess()) {
- this.waitingRequests.put(requestId, req);
- LOG.trace("Request {} sent to peer (object {})", requestId, req);
- } else {
- LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
- req.setResult(OperationResults.UNSENT);
- }
- }
-
protected final synchronized ListenableFuture<OperationResult> sendMessage(final Message message, final S requestId,
- final Metadata metadata) {
+ final Metadata metadata) {
final io.netty.util.concurrent.Future<Void> f = this.session.sendMessage(message);
+ this.listenerState.updateStatefulSentMsg(message);
final PCEPRequest req = new PCEPRequest(metadata);
-
- this.sendingRequests.put(requestId, req);
+ this.requests.put(requestId, req);
+ final int rpcTimeout = serverSessionManager.getRpcTimeout();
+ LOG.trace("RPC response timeout value is {} seconds", rpcTimeout);
+ if (rpcTimeout > 0) {
+ setupTimeoutHandler(requestId, req, rpcTimeout);
+ }
f.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(final io.netty.util.concurrent.Future<Void> future) {
- messageSendingComplete(requestId, future);
+ if (!future.isSuccess()) {
+ synchronized (AbstractTopologySessionListener.this) {
+ AbstractTopologySessionListener.this.requests.remove(requestId);
+ }
+ req.done(OperationResults.UNSENT);
+ LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
+ } else {
+ req.sent();
+ LOG.trace("Request {} sent to peer (object {})", requestId, req);
+ }
}
});
return req.getFuture();
}
- protected final synchronized void updateLsp(final WriteTransaction trans, final L id, final String lspName,
- final ReportedLspBuilder rlb, final boolean solicited, final boolean remove) {
+ private void setupTimeoutHandler(final S requestId, final PCEPRequest req, final int timeout) {
+ final Timer timer = req.getTimer();
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ synchronized (AbstractTopologySessionListener.this) {
+ AbstractTopologySessionListener.this.requests.remove(requestId);
+ }
+ req.done();
+ LOG.info("Request {} timed-out waiting for response", requestId);
+ }
+ }, TimeUnit.SECONDS.toMillis(timeout));
+ LOG.trace("Set up response timeout handler for request {}", requestId);
+ }
+
+ /**
+ * Update an LSP in the data store
+ *
+ * @param ctx Message context
+ * @param id Revision-specific LSP identifier
+ * @param lspName LSP name
+ * @param rlb Reported LSP builder
+ * @param solicited True if the update was solicited
+ * @param remove True if this is an LSP path removal
+ */
+ protected final synchronized void updateLsp(final MessageContext ctx, final L id, final String lspName,
+ final ReportedLspBuilder rlb, final boolean solicited, final boolean remove) {
final String name;
if (lspName == null) {
LOG.debug("Saved LSP {} with name {}", id, name);
this.lsps.put(id, name);
- // just one path should be reported
- Preconditions.checkState(rlb.getPath().size() == 1);
- LspId reportedLspId = rlb.getPath().get(0).getLspId();
- // check previous report for existing paths
- ReportedLsp previous = this.lspData.get(name);
+
+ final ReportedLsp previous = this.lspData.get(name);
// if no previous report about the lsp exist, just proceed
if (previous != null) {
- List<Path> updatedPaths = new ArrayList<>(previous.getPath());
- LOG.debug("Found previous paths {} to this lsp name {}", updatedPaths, name);
- for (Path path : previous.getPath()) {
- //we found reported path in previous reports
- if (path.getLspId().getValue() == 0 || path.getLspId().equals(reportedLspId)) {
- LOG.debug("Match on lsp-id {}", path.getLspId().getValue() );
- // path that was reported previously and does have the same lsp-id, path will be updated
- final boolean r = updatedPaths.remove(path);
- LOG.trace("Request removed? {}", r);
- }
- }
- // if the path does not exist in previous report, add it to path list, it's a new ERO
- // only one path will be added
- //lspId is 0 means confirmation message that shouldn't be added (because we have no means of deleting it later)
- LOG.trace("Adding new path {} to {}", rlb.getPath(), updatedPaths);
- updatedPaths.addAll(rlb.getPath());
- if (remove) {
- if (reportedLspId.getValue() == 0) {
- // if lsp-id also 0, remove all paths
- LOG.debug("Removing all paths.");
- updatedPaths.clear();
- } else {
- // path is marked to be removed
- LOG.debug("Removing path {} from {}", rlb.getPath(), updatedPaths);
- final boolean r = updatedPaths.removeAll(rlb.getPath());
- LOG.trace("Request removed? {}", r);
- }
- }
+ final List<Path> updatedPaths = makeBeforeBreak(rlb, previous, name, remove);
// if all paths or the last path were deleted, delete whole tunnel
- if (updatedPaths.isEmpty()) {
+ if (updatedPaths == null || updatedPaths.isEmpty()) {
LOG.debug("All paths were removed, removing LSP with {}.", id);
- removeLsp(trans, id);
+ removeLsp(ctx, id);
return;
}
- LOG.debug("Setting new paths {} to lsp {}", updatedPaths, name);
rlb.setPath(updatedPaths);
}
- Preconditions.checkState(name != null);
rlb.setKey(new ReportedLspKey(name));
rlb.setName(name);
rlb.setMetadata(this.nodeState.getLspMetadata(name));
}
- LOG.debug("LSP {} forcing update to MD-SAL", name);
- this.dirty = true;
- this.lspData.put(name, rlb.build());
+ final ReportedLsp rl = rlb.build();
+ ctx.trans.put(LogicalDatastoreType.OPERATIONAL, this.pccIdentifier.child(ReportedLsp.class, rlb.getKey()), rl);
+ LOG.debug("LSP {} updated to MD-SAL", name);
+
+ this.lspData.put(name, rl);
+ }
+
+ private List<Path> makeBeforeBreak(final ReportedLspBuilder rlb, final ReportedLsp previous, final String name, final boolean remove) {
+ // just one path should be reported
+ Preconditions.checkState(rlb.getPath().size() == 1);
+ final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rsvp.rev150820.LspId reportedLspId = rlb.getPath().get(0).getLspId();
+ final List<Path> updatedPaths;
+ //lspId = 0 and remove = false -> tunnel is down, still exists but no path is signaled
+ //remove existing tunnel's paths now, as explicit path remove will not come
+ if (!remove && reportedLspId.getValue() == 0) {
+ updatedPaths = new ArrayList<>();
+ LOG.debug("Remove previous paths {} to this lsp name {}", previous.getPath(), name);
+ } else {
+ // check previous report for existing paths
+ updatedPaths = new ArrayList<>(previous.getPath());
+ LOG.debug("Found previous paths {} to this lsp name {}", updatedPaths, name);
+ for (final Path path : previous.getPath()) {
+ //we found reported path in previous reports
+ if (path.getLspId().getValue() == 0 || path.getLspId().equals(reportedLspId)) {
+ LOG.debug("Match on lsp-id {}", path.getLspId().getValue() );
+ // path that was reported previously and does have the same lsp-id, path will be updated
+ final boolean r = updatedPaths.remove(path);
+ LOG.trace("Request removed? {}", r);
+ }
+ }
+ }
+ // if the path does not exist in previous report, add it to path list, it's a new ERO
+ // only one path will be added
+ //lspId is 0 means confirmation message that shouldn't be added (because we have no means of deleting it later)
+ LOG.trace("Adding new path {} to {}", rlb.getPath(), updatedPaths);
+ updatedPaths.addAll(rlb.getPath());
+ if (remove) {
+ if (reportedLspId.getValue() == 0) {
+ // if lsp-id also 0, remove all paths
+ LOG.debug("Removing all paths.");
+ updatedPaths.clear();
+ } else {
+ // path is marked to be removed
+ LOG.debug("Removing path {} from {}", rlb.getPath(), updatedPaths);
+ final boolean r = updatedPaths.removeAll(rlb.getPath());
+ LOG.trace("Request removed? {}", r);
+ }
+ }
+ LOG.debug("Setting new paths {} to lsp {}", updatedPaths, name);
+ return updatedPaths;
}
- protected final synchronized void stateSynchronizationAchieved(final WriteTransaction trans) {
+ /**
+ * Indicate that the peer has completed state synchronization.
+ *
+ * @param ctx Message context
+ */
+ protected final synchronized void stateSynchronizationAchieved(final MessageContext ctx) {
if (this.synced) {
LOG.debug("State synchronization achieved while synchronized, not updating state");
return;
// Update synchronization flag
this.synced = true;
- this.pccBuilder.setStateSync(PccSyncState.Synchronized).build();
- this.dirty = true;
+ if(this.triggeredResyncInProcess) {
+ this.triggeredResyncInProcess = false;
+ }
+ updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(PccSyncState.Synchronized).build());
// The node has completed synchronization, cleanup metadata no longer reported back
this.nodeState.cleanupExcept(this.lsps.values());
LOG.debug("Session {} achieved synchronized state", this.session);
}
- protected final InstanceIdentifierBuilder<ReportedLsp> lspIdentifier(final String name) {
- return pccIdentifier().child(ReportedLsp.class, new ReportedLspKey(name));
+ protected final synchronized void updatePccNode(final MessageContext ctx, final PathComputationClient pcc) {
+ ctx.trans.merge(LogicalDatastoreType.OPERATIONAL, this.pccIdentifier, pcc);
}
- protected final synchronized void removeLsp(final WriteTransaction trans, final L id) {
+ protected final InstanceIdentifier<ReportedLsp> lspIdentifier(final String name) {
+ return this.pccIdentifier.child(ReportedLsp.class, new ReportedLspKey(name));
+ }
+
+ /**
+ * Remove LSP from the database.
+ *
+ * @param ctx Message Context
+ * @param id Revision-specific LSP identifier
+ */
+ protected final synchronized void removeLsp(final MessageContext ctx, final L id) {
final String name = this.lsps.remove(id);
- this.dirty = true;
LOG.debug("LSP {} removed", name);
+ ctx.trans.delete(LogicalDatastoreType.OPERATIONAL, lspIdentifier(name));
this.lspData.remove(name);
}
protected abstract void onSessionUp(PCEPSession session, PathComputationClientBuilder pccBuilder);
- protected abstract boolean onMessage(WriteTransaction trans, Message message);
+ /**
+ * Perform revision-specific message processing when a message arrives.
+ *
+ * @param ctx Message processing context
+ * @param message Protocol message
+ * @return True if the message type is not handle.
+ */
+ protected abstract boolean onMessage(MessageContext ctx, Message message);
- protected String lookupLspName(final L id) {
+ protected final String lookupLspName(final L id) {
Preconditions.checkNotNull(id, "ID parameter null.");
return this.lsps.get(id);
}
- protected final <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
- return this.serverSessionManager.readOperationalData(id);
+ /**
+ * Reads operational data on this node. Doesn't attempt to read the data,
+ * if the node does not exist. In this case returns null.
+ *
+ * @param id InstanceIdentifier of the node
+ * @return null if the node does not exists, or operational data
+ */
+ protected final synchronized <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
+ if (this.nodeState == null) {
+ return null;
+ }
+ return this.nodeState.readOperationalData(id);
+ }
+
+ protected abstract Object validateReportedLsp(final Optional<ReportedLsp> rep, final LspId input);
+
+ protected abstract void loadLspData(final Node node, final Map<String, ReportedLsp> lspData, final Map<L, String> lsps, final boolean incrementalSynchro);
+
+ protected final boolean isLspDbPersisted() {
+ if (syncOptimization != null) {
+ return syncOptimization.isSyncAvoidanceEnabled();
+ }
+ return false;
+ }
+
+ protected final boolean isLspDbRetreived() {
+ if (syncOptimization != null) {
+ return syncOptimization.isDbVersionPresent();
+ }
+ return false;
+ }
+
+ /**
+ * Is Incremental synchronization if LSP-DB-VERSION are included,
+ * LSP-DB-VERSION TLV values doesnt match, and LSP-SYNC-CAPABILITY is enabled
+ * @return
+ */
+ protected final boolean isIncrementalSynchro() {
+ if (syncOptimization != null) {
+ return syncOptimization.isSyncAvoidanceEnabled() && syncOptimization.isDeltaSyncEnabled();
+ }
+ return false;
+ }
+
+ protected final boolean isTriggeredInitialSynchro() {
+ if (syncOptimization != null) {
+ return syncOptimization.isTriggeredInitSyncEnabled();
+ }
+ return false;
+ }
+
+ protected final boolean isTriggeredReSyncEnabled() {
+ if (syncOptimization != null) {
+ return syncOptimization.isTriggeredReSyncEnabled();
+ }
+ return false;
+ }
+
+ protected final boolean isSynchronized() {
+ if (syncOptimization != null) {
+ return syncOptimization.doesLspDbMatch();
+ }
+ return false;
+ }
+
+ protected SessionListenerState getSessionListenerState() {
+ return this.listenerState;
+ }
+
+ @Override
+ public Integer getDelegatedLspsCount() {
+ return this.lsps.size();
+ }
+
+ @Override
+ public Boolean getSynchronized() {
+ return this.synced;
+ }
+
+ @Override
+ public StatefulMessages getStatefulMessages() {
+ return this.listenerState.getStatefulMessages();
+ }
+
+ @Override
+ public synchronized void resetStats() {
+ this.listenerState.resetStats(this.session);
+ }
+
+ @Override
+ public ReplyTime getReplyTime() {
+ return this.listenerState.getReplyTime();
+ }
+
+ @Override
+ public PeerCapabilities getPeerCapabilities() {
+ return this.listenerState.getPeerCapabilities();
+ }
+
+ @Override
+ public void tearDownSession() {
+ this.close();
+ }
+
+ @Override
+ public synchronized SessionState getSessionState() {
+ return this.listenerState.getSessionState(this.session);
+ }
+
+ @Override
+ public synchronized String getPeerId() {
+ return this.session.getPeerPref().getIpAddress();
}
}