import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeMXBean;
import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeRegistration;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
import org.opendaylight.protocol.pcep.PCEPTerminationReason;
import org.opendaylight.protocol.pcep.TerminationReason;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddressBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddressBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.MessageHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Object;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.ReportedLspKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.pcep.client.attributes.path.computation.client.reported.lsp.Path;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
private final Map<String, ReportedLsp> lspData = new HashMap<>();
@GuardedBy("this")
- private final Map<L, String> lsps = new HashMap<>();
+ protected final Map<L, String> lsps = new HashMap<>();
private final ServerSessionManager serverSessionManager;
private InstanceIdentifier<PathComputationClient> pccIdentifier;
private TopologyNodeState nodeState;
private boolean synced = false;
private PCEPSession session;
+ private SyncOptimization syncOptimization;
+ private boolean triggeredResyncInProcess;
private ListenerStateRuntimeRegistration registration;
private final SessionListenerState listenerState;
*/
final InetAddress peerAddress = session.getRemoteAddress();
- final TopologyNodeState state = this.serverSessionManager.takeNodeState(peerAddress, this);
+ syncOptimization = new SyncOptimization(session);
+
+ final TopologyNodeState state = this.serverSessionManager.takeNodeState(peerAddress, this, isLspDbRetreived());
+
+ this.session = session;
+ this.nodeState = state;
LOG.trace("Peer {} resolved to topology node {}", peerAddress, state.getNodeId());
- this.synced = false;
// Our augmentation in the topology node
final PathComputationClientBuilder pccBuilder = new PathComputationClientBuilder();
- pccBuilder.setIpAddress(IpAddressBuilder.getDefaultInstance(peerAddress.getHostAddress()));
onSessionUp(session, pccBuilder);
+ this.synced = isSynchronized();
- final Node1 ta = new Node1Builder().setPathComputationClient(pccBuilder.build()).build();
+ pccBuilder.setIpAddress(IpAddressBuilder.getDefaultInstance(peerAddress.getHostAddress()));
final InstanceIdentifier<Node1> topologyAugment = state.getNodeId().augmentation(Node1.class);
this.pccIdentifier = topologyAugment.child(PathComputationClient.class);
+ final Node initialNodeState = state.getInitialNodeState();
+ final boolean isNodePresent = isLspDbRetreived() && initialNodeState != null;
+ if (isNodePresent) {
+ loadLspData(initialNodeState, lspData, lsps, isIncrementalSynchro());
+ pccBuilder.setReportedLsp(initialNodeState.getAugmentation(Node1.class).getPathComputationClient().getReportedLsp());
+ }
+ writeNode(pccBuilder, state, topologyAugment);
+ this.listenerState.init(session);
+ if (this.serverSessionManager.getRuntimeRootRegistration().isPresent()) {
+ this.registration = this.serverSessionManager.getRuntimeRootRegistration().get().register(this);
+ }
+ LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), state.getNodeId());
+ }
+
+ private void writeNode(final PathComputationClientBuilder pccBuilder, final TopologyNodeState state,
+ final InstanceIdentifier<Node1> topologyAugment) {
+ final Node1 ta = new Node1Builder().setPathComputationClient(pccBuilder.build()).build();
final ReadWriteTransaction trans = state.rwTransaction();
trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
@Override
public void onFailure(final Throwable t) {
LOG.error("Failed to update internal state for session {}, terminating it", session, t);
- session.close(TerminationReason.Unknown);
+ session.close(TerminationReason.UNKNOWN);
}
});
+ }
- this.session = session;
- this.nodeState = state;
- this.listenerState.init(session);
- if (this.serverSessionManager.getRuntimeRootRegistration().isPresent()) {
- this.registration = this.serverSessionManager.getRuntimeRootRegistration().get().register(this);
+ protected void updatePccState(final PccSyncState pccSyncState) {
+ final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
+ updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(pccSyncState).build());
+ if (pccSyncState != PccSyncState.Synchronized) {
+ this.synced = false;
+ this.triggeredResyncInProcess = true;
}
- LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), state.getNodeId());
+ // All set, commit the modifications
+ Futures.addCallback(ctx.trans.submit(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.trace("Internal state for session {} updated successfully", session);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("Failed to update internal state for session {}", session, t);
+ session.close(TerminationReason.UNKNOWN);
+ }
+ });
+ }
+
+ protected boolean isTriggeredSyncInProcess() {
+ return this.triggeredResyncInProcess;
}
@GuardedBy("this")
private void tearDown(final PCEPSession session) {
- this.serverSessionManager.releaseNodeState(this.nodeState, session);
+ this.serverSessionManager.releaseNodeState(this.nodeState, session, isLspDbPersisted());
this.nodeState = null;
this.session = null;
+ this.syncOptimization = null;
+ unregister();
// Clear all requests we know about
for (final Entry<S, PCEPRequest> e : this.requests.entrySet()) {
LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
r.done(OperationResults.UNSENT);
break;
+ default:
+ break;
}
}
this.requests.clear();
if (onMessage(ctx, message)) {
LOG.info("Unhandled message {} on session {}", message, session);
+ //cancel not supported, submit empty transaction
+ ctx.trans.submit();
return;
}
public void onFailure(final Throwable t) {
LOG.error("Failed to update internal state for session {}, closing it", session, t);
ctx.notifyRequests();
- session.close(TerminationReason.Unknown);
+ session.close(TerminationReason.UNKNOWN);
}
});
}
@Override
public void close() {
+ unregister();
+ if (this.session != null) {
+ this.session.close(TerminationReason.UNKNOWN);
+ }
+ }
+
+ private synchronized void unregister() {
if (this.registration != null) {
this.registration.close();
- }
- if (this.session != null) {
- this.session.close(TerminationReason.Unknown);
+ this.registration = null;
}
}
this.listenerState.updateStatefulSentMsg(message);
final PCEPRequest req = new PCEPRequest(metadata);
this.requests.put(requestId, req);
+ final int rpcTimeout = serverSessionManager.getRpcTimeout();
+ LOG.trace("RPC response timeout value is {} seconds", rpcTimeout);
+ if (rpcTimeout > 0) {
+ setupTimeoutHandler(requestId, req, rpcTimeout);
+ }
f.addListener(new FutureListener<Void>() {
@Override
return req.getFuture();
}
+ private void setupTimeoutHandler(final S requestId, final PCEPRequest req, final int timeout) {
+ final Timer timer = req.getTimer();
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ synchronized (AbstractTopologySessionListener.this) {
+ AbstractTopologySessionListener.this.requests.remove(requestId);
+ }
+ req.done();
+ LOG.info("Request {} timed-out waiting for response", requestId);
+ }
+ }, TimeUnit.SECONDS.toMillis(timeout));
+ LOG.trace("Set up response timeout handler for request {}", requestId);
+ }
+
/**
* Update an LSP in the data store
*
private List<Path> makeBeforeBreak(final ReportedLspBuilder rlb, final ReportedLsp previous, final String name, final boolean remove) {
// just one path should be reported
Preconditions.checkState(rlb.getPath().size() == 1);
- final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rsvp.rev130820.LspId reportedLspId = rlb.getPath().get(0).getLspId();
- // check previous report for existing paths
- final List<Path> updatedPaths = new ArrayList<>(previous.getPath());
- LOG.debug("Found previous paths {} to this lsp name {}", updatedPaths, name);
- for (final Path path : previous.getPath()) {
- //we found reported path in previous reports
- if (path.getLspId().getValue() == 0 || path.getLspId().equals(reportedLspId)) {
- LOG.debug("Match on lsp-id {}", path.getLspId().getValue() );
- // path that was reported previously and does have the same lsp-id, path will be updated
- final boolean r = updatedPaths.remove(path);
- LOG.trace("Request removed? {}", r);
+ final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rsvp.rev150820.LspId reportedLspId = rlb.getPath().get(0).getLspId();
+ final List<Path> updatedPaths;
+ //lspId = 0 and remove = false -> tunnel is down, still exists but no path is signaled
+ //remove existing tunnel's paths now, as explicit path remove will not come
+ if (!remove && reportedLspId.getValue() == 0) {
+ updatedPaths = new ArrayList<>();
+ LOG.debug("Remove previous paths {} to this lsp name {}", previous.getPath(), name);
+ } else {
+ // check previous report for existing paths
+ updatedPaths = new ArrayList<>(previous.getPath());
+ LOG.debug("Found previous paths {} to this lsp name {}", updatedPaths, name);
+ for (final Path path : previous.getPath()) {
+ //we found reported path in previous reports
+ if (path.getLspId().getValue() == 0 || path.getLspId().equals(reportedLspId)) {
+ LOG.debug("Match on lsp-id {}", path.getLspId().getValue() );
+ // path that was reported previously and does have the same lsp-id, path will be updated
+ final boolean r = updatedPaths.remove(path);
+ LOG.trace("Request removed? {}", r);
+ }
}
}
// if the path does not exist in previous report, add it to path list, it's a new ERO
// Update synchronization flag
this.synced = true;
- ctx.trans.merge(LogicalDatastoreType.OPERATIONAL, this.pccIdentifier, new PathComputationClientBuilder().setStateSync(PccSyncState.Synchronized).build());
+ if(this.triggeredResyncInProcess) {
+ this.triggeredResyncInProcess = false;
+ }
+ updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(PccSyncState.Synchronized).build());
// The node has completed synchronization, cleanup metadata no longer reported back
this.nodeState.cleanupExcept(this.lsps.values());
LOG.debug("Session {} achieved synchronized state", this.session);
}
+ protected final synchronized void updatePccNode(final MessageContext ctx, final PathComputationClient pcc) {
+ ctx.trans.merge(LogicalDatastoreType.OPERATIONAL, this.pccIdentifier, pcc);
+ }
+
protected final InstanceIdentifier<ReportedLsp> lspIdentifier(final String name) {
return this.pccIdentifier.child(ReportedLsp.class, new ReportedLspKey(name));
}
return this.lsps.get(id);
}
+ /**
+ * Reads operational data on this node. Doesn't attempt to read the data,
+ * if the node does not exist. In this case returns null.
+ *
+ * @param id InstanceIdentifier of the node
+ * @return null if the node does not exists, or operational data
+ */
protected final synchronized <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
+ if (this.nodeState == null) {
+ return null;
+ }
return this.nodeState.readOperationalData(id);
}
protected abstract Object validateReportedLsp(final Optional<ReportedLsp> rep, final LspId input);
+ protected abstract void loadLspData(final Node node, final Map<String, ReportedLsp> lspData, final Map<L, String> lsps, final boolean incrementalSynchro);
+
+ protected final boolean isLspDbPersisted() {
+ if (syncOptimization != null) {
+ return syncOptimization.isSyncAvoidanceEnabled();
+ }
+ return false;
+ }
+
+ protected final boolean isLspDbRetreived() {
+ if (syncOptimization != null) {
+ return syncOptimization.isDbVersionPresent();
+ }
+ return false;
+ }
+
+ /**
+ * Is Incremental synchronization if LSP-DB-VERSION are included,
+ * LSP-DB-VERSION TLV values doesnt match, and LSP-SYNC-CAPABILITY is enabled
+ * @return
+ */
+ protected final boolean isIncrementalSynchro() {
+ if (syncOptimization != null) {
+ return syncOptimization.isSyncAvoidanceEnabled() && syncOptimization.isDeltaSyncEnabled();
+ }
+ return false;
+ }
+
+ protected final boolean isTriggeredInitialSynchro() {
+ if (syncOptimization != null) {
+ return syncOptimization.isTriggeredInitSyncEnabled();
+ }
+ return false;
+ }
+
+ protected final boolean isTriggeredReSyncEnabled() {
+ if (syncOptimization != null) {
+ return syncOptimization.isTriggeredReSyncEnabled();
+ }
+ return false;
+ }
+
+ protected final boolean isSynchronized() {
+ if (syncOptimization != null) {
+ return syncOptimization.doesLspDbMatch();
+ }
+ return false;
+ }
+
protected SessionListenerState getSessionListenerState() {
return this.listenerState;
}
}
@Override
- public SessionState getSessionState() {
+ public synchronized SessionState getSessionState() {
return this.listenerState.getSessionState(this.session);
}
@Override
- public String getPeerId() {
+ public synchronized String getPeerId() {
return this.session.getPeerPref().getIpAddress();
}
}