2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.alivenessmonitor.internal;
10 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getInterfaceMonitorMapId;
11 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorMapId;
12 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorProfileId;
13 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorStateId;
14 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitoringInfoId;
15 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.nullToEmpty;
16 import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import com.google.common.base.Predicate;
21 import com.google.common.base.Strings;
22 import com.google.common.cache.CacheBuilder;
23 import com.google.common.cache.CacheLoader;
24 import com.google.common.cache.LoadingCache;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.SettableFuture;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.ScheduledExecutorService;
39 import java.util.concurrent.ScheduledFuture;
40 import java.util.concurrent.Semaphore;
41 import java.util.concurrent.TimeUnit;
42 import javax.annotation.Nonnull;
43 import javax.annotation.PreDestroy;
44 import javax.inject.Inject;
45 import javax.inject.Singleton;
46 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
47 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
48 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
49 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
50 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
51 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandler;
52 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
53 import org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil;
54 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
55 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
56 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
57 import org.opendaylight.genius.mdsalutil.packet.Ethernet;
58 import org.opendaylight.genius.mdsalutil.packet.utils.PacketUtil;
59 import org.opendaylight.infrautils.utils.concurrent.Executors;
60 import org.opendaylight.openflowplugin.libraries.liblldp.NetUtils;
61 import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
62 import org.opendaylight.openflowplugin.libraries.liblldp.PacketException;
63 import org.opendaylight.serviceutils.tools.mdsal.listener.AbstractClusteredSyncDataTreeChangeListener;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.AlivenessMonitorService;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.LivenessState;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEvent;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEventBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseInput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseOutput;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateOutput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateOutputBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileDeleteInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileDeleteOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetInput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetOutput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetOutputBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProtocolType;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartInput;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartOutput;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartOutputBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStatus;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStopInput;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStopOutput;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorUnpauseInput;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorUnpauseOutput;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitoringMode;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitoringStates;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntry;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntryBuilder;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntryKey;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.EndpointType;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.endpoint.type.Interface;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.configs.MonitoringInfo;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.configs.MonitoringInfoBuilder;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.event.EventData;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.event.EventDataBuilder;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profile.create.input.Profile;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profiles.MonitorProfile;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profiles.MonitorProfileBuilder;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.start.input.Config;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitorid.key.map.MonitoridKeyEntry;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitorid.key.map.MonitoridKeyEntryBuilder;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringState;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringStateBuilder;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInputBuilder;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInputBuilder;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutput;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInputBuilder;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutput;
116 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInReason;
117 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
118 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
119 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
120 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
121 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
122 import org.opendaylight.yangtools.yang.common.RpcResult;
123 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
124 import org.slf4j.Logger;
125 import org.slf4j.LoggerFactory;
128 public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListener<MonitoringState>
129 implements AlivenessMonitorService, PacketProcessingListener, InterfaceStateListener {
131 private static final Logger LOG = LoggerFactory.getLogger(AlivenessMonitor.class);
133 private static final int THREAD_POOL_SIZE = 4;
134 private static final boolean INTERRUPT_TASK = true;
135 private static final int NO_DELAY = 0;
136 private static final Long INITIAL_COUNT = 0L;
137 private static final boolean CREATE_MISSING_PARENT = true;
138 private static final int INVALID_ID = 0;
140 private static class FutureCallbackImpl implements FutureCallback<Object> {
141 private final String message;
143 FutureCallbackImpl(String message) {
144 this.message = message;
148 public void onFailure(Throwable error) {
149 LOG.warn("Error in Datastore operation - {}", message, error);
153 public void onSuccess(Object result) {
154 LOG.debug("Success in Datastore operation - {}", message);
158 private class AlivenessMonitorTask implements Runnable {
159 private final MonitoringInfo monitoringInfo;
161 AlivenessMonitorTask(MonitoringInfo monitoringInfo) {
162 this.monitoringInfo = monitoringInfo;
167 LOG.trace("send monitor packet - {}", monitoringInfo);
168 sendMonitorPacket(monitoringInfo);
172 private final DataBroker dataBroker;
173 private final ManagedNewTransactionRunner txRunner;
174 private final IdManagerService idManager;
175 private final NotificationPublishService notificationPublishService;
176 private final AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry;
177 private final ConcurrentMap<Long, ScheduledFuture<?>> monitoringTasks = new ConcurrentHashMap<>();
178 private final ScheduledExecutorService monitorService;
179 private final ExecutorService callbackExecutorService;
180 private final LoadingCache<Long, String> monitorIdKeyCache;
181 private final ConcurrentMap<String, Semaphore> lockMap = new ConcurrentHashMap<>();
184 public AlivenessMonitor(final DataBroker dataBroker, final IdManagerService idManager,
185 final NotificationPublishService notificationPublishService,
186 AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry) {
187 super(dataBroker, LogicalDatastoreType.OPERATIONAL,
188 InstanceIdentifier.create(MonitoringStates.class).child(MonitoringState.class));
189 this.dataBroker = dataBroker;
190 this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
191 this.idManager = idManager;
192 this.notificationPublishService = notificationPublishService;
193 this.alivenessProtocolHandlerRegistry = alivenessProtocolHandlerRegistry;
195 monitorService = Executors.newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Aliveness Monitoring Task", LOG);
196 callbackExecutorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE, "Aliveness Callback Handler", LOG);
199 monitorIdKeyCache = CacheBuilder.newBuilder().build(new CacheLoader<Long, String>() {
201 public String load(@Nonnull Long monitorId) {
202 return SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
203 dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId))
204 .toJavaUtil().map(MonitoridKeyEntry::getMonitorKey).orElse(null);
208 LOG.info("{} started", getClass().getSimpleName());
213 public void close() {
214 monitorIdKeyCache.cleanUp();
215 monitorService.shutdown();
216 callbackExecutorService.shutdown();
217 LOG.info("{} close", getClass().getSimpleName());
220 Semaphore getLock(String key) {
221 return lockMap.get(key);
224 private void createIdPool() {
225 CreateIdPoolInput createPool = new CreateIdPoolInputBuilder()
226 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
227 .setLow(AlivenessMonitorConstants.MONITOR_IDPOOL_START)
228 .setHigh(AlivenessMonitorConstants.MONITOR_IDPOOL_SIZE).build();
229 Futures.addCallback(idManager.createIdPool(createPool), new FutureCallback<RpcResult<CreateIdPoolOutput>>() {
231 public void onFailure(Throwable error) {
232 LOG.error("Failed to create idPool for Aliveness Monitor Service", error);
236 public void onSuccess(@Nonnull RpcResult<CreateIdPoolOutput> result) {
237 if (result.isSuccessful()) {
238 LOG.debug("Created IdPool for Aliveness Monitor Service");
240 LOG.error("RPC to create Idpool failed {}", result.getErrors());
243 }, callbackExecutorService);
246 private int getUniqueId(final String idKey) {
247 AllocateIdInput getIdInput = new AllocateIdInputBuilder()
248 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME).setIdKey(idKey).build();
250 Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(getIdInput);
253 RpcResult<AllocateIdOutput> rpcResult = result.get();
254 if (rpcResult.isSuccessful()) {
255 return rpcResult.getResult().getIdValue().intValue();
257 LOG.warn("RPC Call to Get Unique Id returned with Errors {}", rpcResult.getErrors());
259 } catch (InterruptedException | ExecutionException e) {
260 LOG.warn("Exception when getting Unique Id for key {}", idKey, e);
265 private void releaseId(String idKey) {
266 ReleaseIdInput idInput = new ReleaseIdInputBuilder().setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
267 .setIdKey(idKey).build();
269 ListenableFuture<RpcResult<ReleaseIdOutput>> result = idManager.releaseId(idInput);
270 RpcResult<ReleaseIdOutput> rpcResult = result.get();
271 if (!rpcResult.isSuccessful()) {
272 LOG.warn("RPC Call to release Id {} returned with Errors {}", idKey, rpcResult.getErrors());
274 } catch (InterruptedException | ExecutionException e) {
275 LOG.warn("Exception when releasing Id for key {}", idKey, e);
280 public void onPacketReceived(PacketReceived packetReceived) {
281 Class<? extends PacketInReason> pktInReason = packetReceived.getPacketInReason();
282 if (LOG.isTraceEnabled()) {
283 LOG.trace("Packet Received {}", packetReceived);
285 if (pktInReason != SendToController.class) {
288 byte[] data = packetReceived.getPayload();
289 Packet protocolPacket = null;
290 AlivenessProtocolHandler<Packet> livenessProtocolHandler = null;
292 if (!PacketUtil.isIpv6NaPacket(data)) {
293 Packet packetInFormatted;
294 Ethernet res = new Ethernet();
296 packetInFormatted = res.deserialize(data, 0, data.length * NetUtils.NUM_BITS_IN_A_BYTE);
297 } catch (PacketException e) {
298 LOG.warn("Failed to decode packet: ", e);
302 if (packetInFormatted == null) {
303 LOG.warn("Failed to deserialize Received Packet from table {}", packetReceived.getTableId().getValue());
307 protocolPacket = packetInFormatted.getPayload();
308 if (protocolPacket == null) {
309 LOG.trace("Unsupported packet type. Ignoring the packet...");
312 if (LOG.isTraceEnabled()) {
313 LOG.trace("onPacketReceived packet: {}, packet class: {}", packetReceived, protocolPacket.getClass());
315 livenessProtocolHandler = getAlivenessProtocolHandler(protocolPacket.getClass());
317 } else if (PacketUtil.isIpv6NaPacket(data)) {
318 livenessProtocolHandler = getAlivenessProtocolHandler(MonitorProtocolType.Ipv6Nd);
320 if (livenessProtocolHandler == null) {
324 String monitorKey = livenessProtocolHandler.handlePacketIn(protocolPacket, packetReceived);
325 if (monitorKey != null) {
326 processReceivedMonitorKey(monitorKey);
328 LOG.debug("No monitorkey associated with received packet");
332 @SuppressWarnings("unchecked")
333 private AlivenessProtocolHandler<Packet> getAlivenessProtocolHandler(Class<? extends Packet> protocolHandlerClass) {
334 return (AlivenessProtocolHandler<Packet>) alivenessProtocolHandlerRegistry.getOpt(protocolHandlerClass);
337 @SuppressWarnings("unchecked")
338 private AlivenessProtocolHandler<Packet> getAlivenessProtocolHandler(MonitorProtocolType protocolType) {
339 return (AlivenessProtocolHandler<Packet>) alivenessProtocolHandlerRegistry.get(protocolType);
342 private void processReceivedMonitorKey(final String monitorKey) {
344 final MonitoringState currentState;
345 final MonitoringState state;
347 Result(MonitoringState currentState, MonitoringState state) {
348 this.currentState = currentState;
353 Preconditions.checkNotNull(monitorKey, "Monitor Key required to process the state");
355 LOG.debug("Processing monitorKey: {} for received packet", monitorKey);
357 final Semaphore lock = lockMap.get(monitorKey);
358 LOG.debug("Acquiring lock for monitor key : {} to process monitor packet", monitorKey);
361 txRunner.applyWithNewReadWriteTransactionAndSubmit(OPERATIONAL, tx -> {
362 Optional<MonitoringState> optState = tx.read(getMonitorStateId(monitorKey)).get();
363 if (optState.isPresent()) {
364 final MonitoringState currentState = optState.get();
366 if (LOG.isTraceEnabled()) {
367 LOG.trace("OnPacketReceived : Monitoring state from ODS : {} ", currentState);
370 // Long responsePendingCount = currentState.getResponsePendingCount();
372 // Need to relook at the pending count logic to support N
373 // out of M scenarios
374 // if (currentState.getState() != LivenessState.Up) {
375 // //Reset responsePendingCount when state changes from DOWN
377 // responsePendingCount = INITIAL_COUNT;
380 // if (responsePendingCount > INITIAL_COUNT) {
381 // responsePendingCount =
382 // currentState.getResponsePendingCount() - 1;
384 Long responsePendingCount = INITIAL_COUNT;
386 final MonitoringState state = new MonitoringStateBuilder().setMonitorKey(monitorKey)
387 .setState(LivenessState.Up).setResponsePendingCount(responsePendingCount).build();
388 tx.merge(getMonitorStateId(monitorKey), state);
390 return Optional.of(new Result(currentState, state));
392 if (LOG.isTraceEnabled()) {
393 LOG.trace("Monitoring State not available for key: {} to process the Packet received",
396 return Optional.<Result>absent();
398 }).addCallback(new FutureCallback<Optional<Result>>() {
400 public void onSuccess(Optional<Result> optResult) {
402 optResult.toJavaUtil().ifPresent(result -> {
403 final boolean stateChanged = result.currentState.getState() == LivenessState.Down
404 || result.currentState.getState() == LivenessState.Unknown;
406 // send notifications
407 if (LOG.isTraceEnabled()) {
408 LOG.trace("Sending notification for monitor Id : {} with Current State: {}",
409 result.currentState.getMonitorId(), LivenessState.Up);
411 publishNotification(result.currentState.getMonitorId(), LivenessState.Up);
413 if (LOG.isTraceEnabled()) {
414 LOG.trace("Successful in writing monitoring state {} to ODS", result.state);
421 public void onFailure(Throwable error) {
423 LOG.warn("Error in reading or writing monitoring state : {} to Datastore", monitorKey, error);
425 }, callbackExecutorService);
428 private String getUniqueKey(String interfaceName, String protocolType, EndpointType source,
429 EndpointType destination) {
430 StringBuilder builder = new StringBuilder().append(interfaceName).append(AlivenessMonitorConstants.SEPERATOR)
431 .append(protocolType);
432 if (source != null) {
433 builder.append(AlivenessMonitorConstants.SEPERATOR).append(AlivenessMonitorUtil.getIpAddress(source));
436 if (destination != null) {
437 builder.append(AlivenessMonitorConstants.SEPERATOR).append(AlivenessMonitorUtil.getIpAddress(destination));
439 return builder.toString();
443 public ListenableFuture<RpcResult<MonitorStartOutput>> monitorStart(MonitorStartInput input) {
444 RpcResultBuilder<MonitorStartOutput> rpcResultBuilder;
445 final Config in = input.getConfig();
446 Long profileId = in.getProfileId();
447 LOG.debug("Monitor Start invoked with Config: {}, Profile Id: {}", in, profileId);
450 if (in.getMode() != MonitoringMode.OneOne) {
451 throw new UnsupportedConfigException(
452 "Unsupported Monitoring mode. Currently one-one mode is supported");
455 Optional<MonitorProfile> optProfile =
456 SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
457 dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
458 final MonitorProfile profile;
459 if (!optProfile.isPresent()) {
460 String errMsg = String.format("No monitoring profile associated with Id: %d", profileId);
461 LOG.error("Monitor start failed. {}", errMsg);
462 throw new RuntimeException(errMsg);
464 profile = optProfile.get();
467 final MonitorProtocolType protocolType = profile.getProtocolType();
469 String interfaceName = null;
470 EndpointType srcEndpointType = in.getSource().getEndpointType();
472 if (srcEndpointType instanceof Interface) {
473 Interface endPoint = (Interface) srcEndpointType;
474 interfaceName = endPoint.getInterfaceName();
476 throw new UnsupportedConfigException(
477 "Unsupported source Endpoint type. Only Interface Endpoint currently supported for monitoring");
480 if (Strings.isNullOrEmpty(interfaceName)) {
481 throw new RuntimeException("Interface Name not defined in the source Endpoint");
484 // Initially the support is for one monitoring per interface.
485 // Revisit the retrieving monitor id logic when the multiple
486 // monitoring for same interface is needed.
487 EndpointType destEndpointType = null;
488 if (in.getDestination() != null) {
489 destEndpointType = in.getDestination().getEndpointType();
491 String idKey = getUniqueKey(interfaceName, protocolType.toString(), srcEndpointType, destEndpointType);
492 final long monitorId = getUniqueId(idKey);
493 Optional<MonitoringInfo> optKey =
494 SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
495 dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
496 final AlivenessProtocolHandler<?> handler;
497 if (optKey.isPresent()) {
498 String message = String.format(
499 "Monitoring for the interface %s with this configuration " + "is already registered.",
501 LOG.warn("Monitoring for the interface {} with this configuration is already registered.",
503 MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
504 rpcResultBuilder = RpcResultBuilder.success(output).withWarning(ErrorType.APPLICATION,
505 "config-exists", message);
506 return Futures.immediateFuture(rpcResultBuilder.build());
508 // Construct the monitor key
509 final MonitoringInfo monitoringInfo = new MonitoringInfoBuilder().setId(monitorId).setMode(in.getMode())
510 .setProfileId(profileId).setDestination(in.getDestination()).setSource(in.getSource()).build();
511 // Construct the initial monitor state
512 handler = alivenessProtocolHandlerRegistry.get(protocolType);
513 final String monitoringKey = handler.getUniqueMonitoringKey(monitoringInfo);
515 MonitoringStateBuilder monitoringStateBuilder =
516 new MonitoringStateBuilder().setMonitorKey(monitoringKey).setMonitorId(monitorId)
517 .setState(LivenessState.Unknown).setStatus(MonitorStatus.Started);
518 if (protocolType != MonitorProtocolType.Bfd) {
519 monitoringStateBuilder.setRequestCount(INITIAL_COUNT).setResponsePendingCount(INITIAL_COUNT);
521 MonitoringState monitoringState = monitoringStateBuilder.build();
523 txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, operTx -> {
524 operTx.put(getMonitoringInfoId(monitorId), monitoringInfo, CREATE_MISSING_PARENT);
525 LOG.debug("adding oper monitoring info {}", monitoringInfo);
527 operTx.put(getMonitorStateId(monitoringKey), monitoringState, CREATE_MISSING_PARENT);
528 LOG.debug("adding oper monitoring state {}", monitoringState);
530 MonitoridKeyEntry mapEntry = new MonitoridKeyEntryBuilder().setMonitorId(monitorId)
531 .setMonitorKey(monitoringKey).build();
532 operTx.put(getMonitorMapId(monitorId), mapEntry, CREATE_MISSING_PARENT);
533 LOG.debug("adding oper map entry {}", mapEntry);
534 }).addCallback(new FutureCallback<Void>() {
536 public void onFailure(Throwable error) {
537 String errorMsg = String.format("Adding Monitoring info: %s in Datastore failed",
539 LOG.warn("Adding Monitoring info: {} in Datastore failed", monitoringInfo, error);
540 throw new RuntimeException(errorMsg, error);
544 public void onSuccess(Void ignored) {
545 lockMap.put(monitoringKey, new Semaphore(1, true));
546 if (protocolType == MonitorProtocolType.Bfd) {
547 handler.startMonitoringTask(monitoringInfo);
551 LOG.debug("Scheduling monitor task for config: {}", in);
552 scheduleMonitoringTask(monitoringInfo, profile.getMonitorInterval());
554 }, callbackExecutorService);
557 associateMonitorIdWithInterface(monitorId, interfaceName);
559 MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
561 rpcResultBuilder = RpcResultBuilder.success(output);
562 } catch (UnsupportedConfigException e) {
563 LOG.error("Start Monitoring Failed. ", e);
564 rpcResultBuilder = RpcResultBuilder.<MonitorStartOutput>failed().withError(ErrorType.APPLICATION,
567 return Futures.immediateFuture(rpcResultBuilder.build());
570 private void associateMonitorIdWithInterface(final Long monitorId, final String interfaceName) {
571 LOG.debug("associate monitor Id {} with interface {}", monitorId, interfaceName);
572 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
573 ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
574 getInterfaceMonitorMapId(interfaceName));
575 ListenableFuture<Void> updateFuture = Futures.transformAsync(readFuture, optEntry -> {
576 if (optEntry.isPresent()) {
577 InterfaceMonitorEntry entry = optEntry.get();
578 List<Long> monitorIds1 = new ArrayList<>(nullToEmpty(entry.getMonitorIds()));
579 monitorIds1.add(monitorId);
580 InterfaceMonitorEntry newEntry1 = new InterfaceMonitorEntryBuilder()
581 .withKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds1).build();
582 tx.merge(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry1);
584 // Create new monitor entry
585 LOG.debug("Adding new interface-monitor association for interface {} with id {}", interfaceName,
587 List<Long> monitorIds2 = new ArrayList<>();
588 monitorIds2.add(monitorId);
589 InterfaceMonitorEntry newEntry2 = new InterfaceMonitorEntryBuilder()
590 .setInterfaceName(interfaceName).setMonitorIds(monitorIds2).build();
591 tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry2,
592 CREATE_MISSING_PARENT);
595 }, callbackExecutorService);
597 Futures.addCallback(updateFuture, new FutureCallbackImpl(
598 String.format("Association of monitorId %d with Interface %s", monitorId, interfaceName)),
599 MoreExecutors.directExecutor());
602 private void scheduleMonitoringTask(MonitoringInfo monitoringInfo, long monitorInterval) {
603 AlivenessMonitorTask monitorTask = new AlivenessMonitorTask(monitoringInfo);
604 ScheduledFuture<?> scheduledFutureResult = monitorService.scheduleAtFixedRate(monitorTask, NO_DELAY,
605 monitorInterval, TimeUnit.MILLISECONDS);
606 monitoringTasks.put(monitoringInfo.getId(), scheduledFutureResult);
610 public ListenableFuture<RpcResult<MonitorPauseOutput>> monitorPause(MonitorPauseInput input) {
611 LOG.debug("Monitor Pause operation invoked for monitor id: {}", input.getMonitorId());
612 SettableFuture<RpcResult<MonitorPauseOutput>> result = SettableFuture.create();
613 final Long monitorId = input.getMonitorId();
615 // Set the monitoring status to Paused
616 updateMonitorStatusTo(monitorId, MonitorStatus.Paused, currentStatus -> currentStatus == MonitorStatus.Started);
618 if (stopMonitoringTask(monitorId)) {
619 result.set(RpcResultBuilder.<MonitorPauseOutput>success().build());
621 String errorMsg = String.format("No Monitoring Task availble to pause for the given monitor id : %d",
623 LOG.error("Monitor Pause operation failed- {}", errorMsg);
624 result.set(RpcResultBuilder.<MonitorPauseOutput>failed()
625 .withError(ErrorType.APPLICATION, errorMsg).build());
632 public ListenableFuture<RpcResult<MonitorUnpauseOutput>> monitorUnpause(MonitorUnpauseInput input) {
633 LOG.debug("Monitor Unpause operation invoked for monitor id: {}", input.getMonitorId());
634 final SettableFuture<RpcResult<MonitorUnpauseOutput>> result = SettableFuture.create();
636 final Long monitorId = input.getMonitorId();
637 final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
638 ListenableFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
639 getMonitoringInfoId(monitorId));
641 Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
644 public void onFailure(Throwable error) {
646 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
647 LOG.error("Monitor unpause Failed. {}", msg, error);
648 result.set(RpcResultBuilder.<MonitorUnpauseOutput>failed()
649 .withError(ErrorType.APPLICATION, msg, error).build());
653 public void onSuccess(@Nonnull Optional<MonitoringInfo> optInfo) {
654 if (optInfo.isPresent()) {
655 final MonitoringInfo info = optInfo.get();
656 ListenableFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
657 getMonitorProfileId(info.getProfileId()));
658 Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>() {
661 public void onFailure(Throwable error) {
663 String msg = String.format("Unable to read Monitoring profile associated with id %d",
664 info.getProfileId());
665 LOG.warn("Monitor unpause Failed. {}", msg, error);
666 result.set(RpcResultBuilder.<MonitorUnpauseOutput>failed()
667 .withError(ErrorType.APPLICATION, msg, error).build());
671 public void onSuccess(@Nonnull Optional<MonitorProfile> optProfile) {
673 if (optProfile.isPresent()) {
674 updateMonitorStatusTo(monitorId, MonitorStatus.Started,
675 currentStatus -> (currentStatus == MonitorStatus.Paused
676 || currentStatus == MonitorStatus.Stopped));
677 MonitorProfile profile = optProfile.get();
678 LOG.debug("Monitor Resume - Scheduling monitoring task with Id: {}", monitorId);
679 MonitorProtocolType protocolType = profile.getProtocolType();
680 if (protocolType == MonitorProtocolType.Bfd) {
681 LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
682 ((HwVtepTunnelsStateHandler) alivenessProtocolHandlerRegistry.get(protocolType))
683 .resetMonitoringTask(true);
685 scheduleMonitoringTask(info, profile.getMonitorInterval());
687 result.set(RpcResultBuilder.<MonitorUnpauseOutput>success().build());
689 String msg = String.format("Monitoring profile associated with id %d is not present",
690 info.getProfileId());
691 LOG.warn("Monitor unpause Failed. {}", msg);
693 RpcResultBuilder.<MonitorUnpauseOutput>failed()
694 .withError(ErrorType.APPLICATION, msg).build());
697 }, callbackExecutorService);
700 String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
701 LOG.warn("Monitor unpause Failed. {}", msg);
702 result.set(RpcResultBuilder.<MonitorUnpauseOutput>failed()
703 .withError(ErrorType.APPLICATION, msg).build());
706 }, callbackExecutorService);
711 private boolean stopMonitoringTask(Long monitorId) {
712 return stopMonitoringTask(monitorId, INTERRUPT_TASK);
715 private boolean stopMonitoringTask(Long monitorId, boolean interruptTask) {
716 Optional<MonitoringInfo> optInfo =
717 SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
718 LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
719 if (!optInfo.isPresent()) {
720 LOG.warn("There is no monitoring info present for monitor id {}", monitorId);
723 MonitoringInfo monitoringInfo = optInfo.get();
724 Optional<MonitorProfile> optProfile =
725 SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
726 LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(monitoringInfo.getProfileId()));
727 MonitorProtocolType protocolType = optProfile.get().getProtocolType();
728 if (protocolType == MonitorProtocolType.Bfd) {
729 LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
730 ((HwVtepTunnelsStateHandler) alivenessProtocolHandlerRegistry.get(protocolType))
731 .resetMonitoringTask(false);
734 ScheduledFuture<?> scheduledFutureResult = monitoringTasks.remove(monitorId);
735 if (scheduledFutureResult != null) {
736 scheduledFutureResult.cancel(interruptTask);
742 Optional<MonitorProfile> getMonitorProfile(Long profileId) {
743 return SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
744 dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
747 void acquireLock(Semaphore lock) {
752 boolean acquiredLock = false;
754 acquiredLock = lock.tryAcquire(50, TimeUnit.MILLISECONDS);
755 } catch (InterruptedException e) {
756 LOG.warn("Thread interrupted when waiting to acquire the lock");
760 LOG.warn("Previous transaction did not complete in time. Releasing the lock to proceed");
764 LOG.trace("Lock acquired successfully");
765 } catch (InterruptedException e) {
766 LOG.warn("Acquire failed");
769 LOG.trace("Lock acquired successfully");
773 void releaseLock(Semaphore lock) {
779 private void sendMonitorPacket(final MonitoringInfo monitoringInfo) {
780 // TODO: Handle interrupts
781 final Long monitorId = monitoringInfo.getId();
782 final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
783 if (monitorKey == null) {
784 LOG.warn("No monitor Key associated with id {} to send the monitor packet", monitorId);
787 LOG.debug("Sending monitoring packet for key: {}", monitorKey);
790 final MonitorProfile profile;
791 Optional<MonitorProfile> optProfile = getMonitorProfile(monitoringInfo.getProfileId());
792 if (optProfile.isPresent()) {
793 profile = optProfile.get();
795 LOG.warn("No monitor profile associated with id {}. " + "Could not send Monitor packet for monitor-id {}",
796 monitoringInfo.getProfileId(), monitorId);
800 final Semaphore lock = lockMap.get(monitorKey);
801 LOG.debug("Acquiring lock for monitor key : {} to send monitor packet", monitorKey);
804 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
805 ListenableFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
806 getMonitorStateId(monitorKey));
807 ListenableFuture<Void> writeResult = Futures.transformAsync(readResult, optState -> {
808 if (optState.isPresent()) {
809 MonitoringState state = optState.get();
811 // Increase the request count
812 Long requestCount = state.getRequestCount() + 1;
814 // Check with the monitor window
815 LivenessState currentLivenessState = state.getState();
817 // Increase the pending response count
818 long responsePendingCount = state.getResponsePendingCount();
819 if (responsePendingCount < profile.getMonitorWindow()) {
820 responsePendingCount = responsePendingCount + 1;
823 // Check with the failure threshold
824 if (responsePendingCount >= profile.getFailureThreshold()) {
825 // Change the state to down and notify
826 if (currentLivenessState != LivenessState.Down) {
827 LOG.debug("Response pending Count: {}, Failure threshold: {} for monitorId {}",
828 responsePendingCount, profile.getFailureThreshold(), state.getMonitorId());
829 LOG.info("Sending notification for monitor Id : {} with State: {}",
830 state.getMonitorId(), LivenessState.Down);
831 publishNotification(monitorId, LivenessState.Down);
832 currentLivenessState = LivenessState.Down;
833 // Reset requestCount when state changes
835 requestCount = INITIAL_COUNT;
839 // Update the ODS with state
840 MonitoringState updatedState = new MonitoringStateBuilder(/* state */)
841 .setMonitorKey(state.getMonitorKey()).setRequestCount(requestCount)
842 .setResponsePendingCount(responsePendingCount).setState(currentLivenessState).build();
843 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(state.getMonitorKey()),
847 // Close the transaction
849 String errorMsg = String.format(
850 "Monitoring State associated with id %d is not present to send packet out.", monitorId);
851 return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
853 }, callbackExecutorService);
855 Futures.addCallback(writeResult, new FutureCallback<Void>() {
857 public void onSuccess(Void noarg) {
858 // invoke packetout on protocol handler
859 AlivenessProtocolHandler<?> handler =
860 alivenessProtocolHandlerRegistry.getOpt(profile.getProtocolType());
861 if (handler != null) {
862 LOG.debug("Sending monitoring packet {}", monitoringInfo);
863 handler.startMonitoringTask(monitoringInfo);
869 public void onFailure(Throwable error) {
870 LOG.warn("Updating monitoring state for key: {} failed. Monitoring packet is not sent", monitorKey,
874 }, callbackExecutorService);
877 void publishNotification(final Long monitorId, final LivenessState state) {
878 LOG.debug("Sending notification for id {} - state {}", monitorId, state);
879 EventData data = new EventDataBuilder().setMonitorId(monitorId).setMonitorState(state).build();
880 MonitorEvent event = new MonitorEventBuilder().setEventData(data).build();
881 final ListenableFuture<?> eventFuture = notificationPublishService.offerNotification(event);
882 Futures.addCallback(eventFuture, new FutureCallback<Object>() {
884 public void onFailure(Throwable error) {
885 LOG.warn("Error in notifying listeners for id {} - state {}", monitorId, state, error);
889 public void onSuccess(Object arg) {
890 LOG.trace("Successful in notifying listeners for id {} - state {}", monitorId, state);
892 }, callbackExecutorService);
896 public ListenableFuture<RpcResult<MonitorProfileCreateOutput>> monitorProfileCreate(
897 final MonitorProfileCreateInput input) {
898 LOG.debug("Monitor Profile Create operation - {}", input.getProfile());
899 final SettableFuture<RpcResult<MonitorProfileCreateOutput>> returnFuture = SettableFuture.create();
900 Profile profile = input.getProfile();
901 final Long failureThreshold = profile.getFailureThreshold();
902 final Long monitorInterval = profile.getMonitorInterval();
903 final Long monitorWindow = profile.getMonitorWindow();
904 final MonitorProtocolType protocolType = profile.getProtocolType();
905 String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, protocolType);
906 final Long profileId = (long) getUniqueId(idKey);
908 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
909 ListenableFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
910 getMonitorProfileId(profileId));
911 ListenableFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture = Futures.transformAsync(readFuture,
913 if (optProfile.isPresent()) {
915 MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
916 .setProfileId(profileId).build();
917 String msg = String.format("Monitor profile %s already present for the given input", input);
919 returnFuture.set(RpcResultBuilder.success(output)
920 .withWarning(ErrorType.PROTOCOL, "profile-exists", msg).build());
922 final MonitorProfile monitorProfile = new MonitorProfileBuilder().setId(profileId)
923 .setFailureThreshold(failureThreshold).setMonitorInterval(monitorInterval)
924 .setMonitorWindow(monitorWindow).setProtocolType(protocolType).build();
925 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId), monitorProfile,
926 CREATE_MISSING_PARENT);
927 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
929 public void onFailure(Throwable error) {
930 String msg = String.format("Error when storing monitorprofile %s in datastore",
932 LOG.error("Error when storing monitorprofile {} in datastore", monitorProfile, error);
933 returnFuture.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
934 .withError(ErrorType.APPLICATION, msg, error).build());
938 public void onSuccess(Void noarg) {
939 MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
940 .setProfileId(profileId).build();
941 returnFuture.set(RpcResultBuilder.success(output).build());
943 }, callbackExecutorService);
946 }, callbackExecutorService);
947 Futures.addCallback(resultFuture, new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
949 public void onFailure(Throwable error) {
950 // This would happen when any error happens during reading for
951 // monitoring profile
952 String msg = String.format("Error in creating monitorprofile - %s", input);
953 returnFuture.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
954 .withError(ErrorType.APPLICATION, msg, error).build());
955 LOG.error("Error in creating monitorprofile - {} ", input, error);
959 public void onSuccess(RpcResult<MonitorProfileCreateOutput> result) {
960 LOG.debug("Successfully created monitor Profile {} ", input);
962 }, callbackExecutorService);
967 public ListenableFuture<RpcResult<MonitorProfileGetOutput>> monitorProfileGet(MonitorProfileGetInput input) {
968 LOG.debug("Monitor Profile Get operation for input profile- {}", input.getProfile());
969 RpcResultBuilder<MonitorProfileGetOutput> rpcResultBuilder;
970 final Long profileId = getExistingProfileId(input);
972 MonitorProfileGetOutputBuilder output = new MonitorProfileGetOutputBuilder().setProfileId(profileId);
973 rpcResultBuilder = RpcResultBuilder.success();
974 rpcResultBuilder.withResult(output.build());
975 return Futures.immediateFuture(rpcResultBuilder.build());
978 private Long getExistingProfileId(MonitorProfileGetInput input) {
979 org.opendaylight.yang.gen.v1.urn.opendaylight.genius
980 .alivenessmonitor.rev160411.monitor.profile.get.input.Profile profile = input
982 final Long failureThreshold = profile.getFailureThreshold();
983 final Long monitorInterval = profile.getMonitorInterval();
984 final Long monitorWindow = profile.getMonitorWindow();
985 final MonitorProtocolType protocolType = profile.getProtocolType();
986 LOG.debug("getExistingProfileId for profile : {}", input.getProfile());
987 String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, protocolType);
988 LOG.debug("Obtained existing profile ID for profile : {}", input.getProfile());
989 return (long) getUniqueId(idKey);
992 private String getUniqueProfileKey(Long failureThreshold, Long monitorInterval, Long monitorWindow,
993 MonitorProtocolType protocolType) {
994 return String.valueOf(failureThreshold) + AlivenessMonitorConstants.SEPERATOR + monitorInterval
995 + AlivenessMonitorConstants.SEPERATOR + monitorWindow + AlivenessMonitorConstants.SEPERATOR
996 + protocolType + AlivenessMonitorConstants.SEPERATOR;
1000 public ListenableFuture<RpcResult<MonitorProfileDeleteOutput>> monitorProfileDelete(
1001 final MonitorProfileDeleteInput input) {
1002 LOG.debug("Monitor Profile delete for Id: {}", input.getProfileId());
1003 final SettableFuture<RpcResult<MonitorProfileDeleteOutput>> result = SettableFuture.create();
1004 final Long profileId = input.getProfileId();
1005 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1006 ListenableFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
1007 getMonitorProfileId(profileId));
1008 ListenableFuture<RpcResult<MonitorProfileDeleteOutput>> writeFuture =
1009 Futures.transformAsync(readFuture, optProfile -> {
1010 if (optProfile.isPresent()) {
1011 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
1012 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
1014 public void onFailure(Throwable error) {
1015 String msg = String.format("Error when removing monitor profile %d from datastore",
1017 LOG.error("Error when removing monitor profile {} from datastore", profileId, error);
1018 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>failed()
1019 .withError(ErrorType.APPLICATION, msg, error)
1024 public void onSuccess(Void noarg) {
1025 MonitorProfile profile = optProfile.get();
1026 String id = getUniqueProfileKey(profile.getFailureThreshold(),
1027 profile.getMonitorInterval(), profile.getMonitorWindow(),
1028 profile.getProtocolType());
1030 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>success().build());
1032 }, callbackExecutorService);
1034 String msg = String.format("Monitor profile with Id: %d does not exist", profileId);
1036 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>success()
1037 .withWarning(ErrorType.PROTOCOL, "invalid-value", msg).build());
1040 }, callbackExecutorService);
1042 Futures.addCallback(writeFuture, new FutureCallback<RpcResult<MonitorProfileDeleteOutput>>() {
1045 public void onFailure(Throwable error) {
1046 String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
1047 LOG.error("Error when removing monitor profile {} from datastore", profileId, error);
1048 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>failed()
1049 .withError(ErrorType.APPLICATION, msg, error).build());
1053 public void onSuccess(RpcResult<MonitorProfileDeleteOutput> noarg) {
1054 LOG.debug("Successfully removed Monitor Profile {}", profileId);
1056 }, callbackExecutorService);
1061 public ListenableFuture<RpcResult<MonitorStopOutput>> monitorStop(MonitorStopInput input) {
1062 LOG.debug("Monitor Stop operation for monitor id - {}", input.getMonitorId());
1063 SettableFuture<RpcResult<MonitorStopOutput>> result = SettableFuture.create();
1065 final Long monitorId = input.getMonitorId();
1066 Optional<MonitoringInfo> optInfo =
1067 SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
1068 LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1069 if (optInfo.isPresent()) {
1070 // Stop the monitoring task
1071 stopMonitoringTask(monitorId);
1073 String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1075 // Cleanup the Data store
1076 txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, tx -> {
1077 if (monitorKey != null) {
1078 tx.delete(getMonitorStateId(monitorKey));
1079 monitorIdKeyCache.invalidate(monitorId);
1082 tx.delete(getMonitoringInfoId(monitorId));
1084 //Remove monitorid-key-map
1085 tx.delete(getMonitorMapId(monitorId));
1086 }).addCallback(new FutureCallbackImpl(String.format("Delete monitor state with Id %d", monitorId)),
1087 MoreExecutors.directExecutor());
1089 MonitoringInfo info = optInfo.get();
1090 String interfaceName = getInterfaceName(info.getSource().getEndpointType());
1091 if (interfaceName != null) {
1092 removeMonitorIdFromInterfaceAssociation(monitorId, interfaceName);
1094 releaseIdForMonitoringInfo(info);
1096 if (monitorKey != null) {
1097 lockMap.remove(monitorKey);
1100 result.set(RpcResultBuilder.<MonitorStopOutput>success().build());
1102 String errorMsg = String.format("Do not have monitoring information associated with key %d", monitorId);
1103 LOG.error("Delete monitoring operation Failed - {}", errorMsg);
1104 result.set(RpcResultBuilder.<MonitorStopOutput>failed().withError(ErrorType.APPLICATION, errorMsg).build());
1110 private void removeMonitorIdFromInterfaceAssociation(final Long monitorId, final String interfaceName) {
1111 LOG.debug("Remove monitorId {} from Interface association {}", monitorId, interfaceName);
1112 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1113 ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
1114 getInterfaceMonitorMapId(interfaceName));
1115 ListenableFuture<Void> updateFuture = Futures.transformAsync(readFuture, optEntry -> {
1116 if (optEntry.isPresent()) {
1117 InterfaceMonitorEntry entry = optEntry.get();
1118 List<Long> monitorIds = new ArrayList<>(nullToEmpty(entry.getMonitorIds()));
1119 monitorIds.remove(monitorId);
1120 if (monitorIds.isEmpty()) {
1121 tx.delete(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
1123 InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder(entry)
1124 .withKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
1125 tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry,
1126 CREATE_MISSING_PARENT);
1130 LOG.warn("No Interface map entry found {} to remove monitorId {}", interfaceName, monitorId);
1132 return Futures.immediateFuture(null);
1134 }, MoreExecutors.directExecutor());
1136 Futures.addCallback(updateFuture, new FutureCallbackImpl(
1137 String.format("Dis-association of monitorId %d with Interface %s", monitorId, interfaceName)),
1138 MoreExecutors.directExecutor());
1141 private void releaseIdForMonitoringInfo(MonitoringInfo info) {
1142 Long monitorId = info.getId();
1143 EndpointType source = info.getSource().getEndpointType();
1144 String interfaceName = getInterfaceName(source);
1145 if (!Strings.isNullOrEmpty(interfaceName)) {
1146 Optional<MonitorProfile> optProfile =
1147 SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
1148 LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1149 if (optProfile.isPresent()) {
1150 MonitorProtocolType protocolType = optProfile.get().getProtocolType();
1151 EndpointType destination = info.getDestination() != null ? info.getDestination().getEndpointType()
1153 String idKey = getUniqueKey(interfaceName, protocolType.toString(), source, destination);
1156 LOG.warn("Could not release monitorId {}. No profile associated with it", monitorId);
1161 private String getInterfaceName(EndpointType endpoint) {
1162 String interfaceName = null;
1163 if (endpoint instanceof Interface) {
1164 interfaceName = ((Interface) endpoint).getInterfaceName();
1166 return interfaceName;
1169 private void stopMonitoring(long monitorId) {
1170 updateMonitorStatusTo(monitorId, MonitorStatus.Stopped,
1171 currentStatus -> currentStatus != MonitorStatus.Stopped);
1172 if (!stopMonitoringTask(monitorId)) {
1173 LOG.warn("No monitoring task running to perform cancel operation for monitorId {}", monitorId);
1177 private void updateMonitorStatusTo(final Long monitorId, final MonitorStatus newStatus,
1178 final Predicate<MonitorStatus> isValidStatus) {
1179 final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1180 if (monitorKey == null) {
1181 LOG.warn("No monitor Key associated with id {} to change the monitor status to {}", monitorId, newStatus);
1184 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1186 ListenableFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
1187 getMonitorStateId(monitorKey));
1189 ListenableFuture<Void> writeResult = Futures.transformAsync(readResult, optState -> {
1190 if (optState.isPresent()) {
1191 MonitoringState state = optState.get();
1192 if (isValidStatus.apply(state.getStatus())) {
1193 MonitoringState updatedState = new MonitoringStateBuilder().setMonitorKey(monitorKey)
1194 .setStatus(newStatus).build();
1195 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), updatedState);
1197 LOG.warn("Invalid Monitoring status {}, cannot be updated to {} for monitorId {}",
1198 state.getStatus(), newStatus, monitorId);
1201 LOG.warn("No associated monitoring state data available to update the status to {} for {}",
1202 newStatus, monitorId);
1205 }, MoreExecutors.directExecutor());
1207 Futures.addCallback(writeResult, new FutureCallbackImpl(
1208 String.format("Monitor status update for %d to %s", monitorId, newStatus.toString())),
1209 MoreExecutors.directExecutor());
1212 private void resumeMonitoring(final long monitorId) {
1213 final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
1214 ListenableFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
1215 getMonitoringInfoId(monitorId));
1217 Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
1220 public void onFailure(Throwable error) {
1221 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
1222 LOG.error("Monitor resume Failed. {}", msg, error);
1227 public void onSuccess(@Nonnull Optional<MonitoringInfo> optInfo) {
1228 if (optInfo.isPresent()) {
1229 final MonitoringInfo info = optInfo.get();
1230 ListenableFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
1231 getMonitorProfileId(info.getProfileId()));
1232 Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>() {
1235 public void onFailure(Throwable error) {
1236 String msg = String.format("Unable to read Monitoring profile associated with id %d",
1237 info.getProfileId());
1238 LOG.warn("Monitor resume Failed. {}", msg, error);
1243 public void onSuccess(@Nonnull Optional<MonitorProfile> optProfile) {
1245 if (optProfile.isPresent()) {
1246 updateMonitorStatusTo(monitorId, MonitorStatus.Started,
1247 currentStatus -> currentStatus != MonitorStatus.Started);
1248 MonitorProfile profile = optProfile.get();
1249 LOG.debug("Monitor Resume - Scheduling monitoring task for Id: {}", monitorId);
1250 scheduleMonitoringTask(info, profile.getMonitorInterval());
1252 String msg = String.format("Monitoring profile associated with id %d is not present",
1253 info.getProfileId());
1254 LOG.warn("Monitor resume Failed. {}", msg);
1257 }, MoreExecutors.directExecutor());
1260 String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
1261 LOG.warn("Monitor resume Failed. {}", msg);
1264 }, MoreExecutors.directExecutor());
1268 public void onInterfaceStateUp(String interfaceName) {
1269 List<Long> monitorIds = getMonitorIds(interfaceName);
1270 if (monitorIds.isEmpty()) {
1271 LOG.warn("Could not get monitorId for interface: {}", interfaceName);
1274 for (Long monitorId : monitorIds) {
1275 LOG.debug("Resume monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1276 resumeMonitoring(monitorId);
1281 public void onInterfaceStateDown(String interfaceName) {
1282 List<Long> monitorIds = getMonitorIds(interfaceName);
1283 if (monitorIds.isEmpty()) {
1284 LOG.warn("Could not get monitorIds for interface: {}", interfaceName);
1287 for (Long monitorId : monitorIds) {
1288 LOG.debug("Suspend monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1289 stopMonitoring(monitorId);
1293 private List<Long> getMonitorIds(String interfaceName) {
1294 return SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
1295 dataBroker, LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName))
1296 .toJavaUtil().map(InterfaceMonitorEntry::getMonitorIds).orElse(Collections.emptyList());
1299 //handle monitor stop
1301 public void remove(@Nonnull InstanceIdentifier<MonitoringState> instanceIdentifier,
1302 @Nonnull MonitoringState removedDataObject) {
1303 final Long monitorId = removedDataObject.getMonitorId();
1304 LOG.debug("Monitor State remove listener invoked for monitor id: {}", monitorId);
1306 if (removedDataObject.getStatus() != MonitorStatus.Paused) {
1307 ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
1308 if (scheduledFutureResult != null) {
1309 LOG.debug("Stopping the task for Monitor id: {}", monitorId);
1310 stopMonitoringTask(monitorId);
1315 //handle monitor pause
1317 public void update(@Nonnull InstanceIdentifier<MonitoringState> instanceIdentifier,
1318 @Nonnull MonitoringState originalDataObject,
1319 @Nonnull MonitoringState updatedDataObject) {
1320 final Long monitorId = updatedDataObject.getMonitorId();
1321 LOG.debug("Monitor State update listener invoked for monitor id: {}", monitorId);
1323 if (updatedDataObject.getStatus() == MonitorStatus.Paused
1324 && originalDataObject.getStatus() != MonitorStatus.Paused) {
1325 ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
1326 if (scheduledFutureResult != null) {
1327 LOG.debug("Stopping the task for Monitor id: {}", monitorId);
1328 stopMonitoringTask(monitorId);