2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.alivenessmonitor.internal;
10 import static org.opendaylight.genius.alivenessmonitor.internal.AlivenessMonitorUtil.getInterfaceMonitorMapId;
11 import static org.opendaylight.genius.alivenessmonitor.internal.AlivenessMonitorUtil.getMonitorMapId;
12 import static org.opendaylight.genius.alivenessmonitor.internal.AlivenessMonitorUtil.getMonitorProfileId;
13 import static org.opendaylight.genius.alivenessmonitor.internal.AlivenessMonitorUtil.getMonitorStateId;
14 import static org.opendaylight.genius.alivenessmonitor.internal.AlivenessMonitorUtil.getMonitoringInfoId;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import com.google.common.base.Predicate;
19 import com.google.common.base.Strings;
20 import com.google.common.cache.CacheBuilder;
21 import com.google.common.cache.CacheLoader;
22 import com.google.common.cache.LoadingCache;
23 import com.google.common.util.concurrent.AsyncFunction;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.JdkFutureAdapters;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.SettableFuture;
29 import com.google.common.util.concurrent.ThreadFactoryBuilder;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.EnumMap;
33 import java.util.HashMap;
34 import java.util.List;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ConcurrentMap;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.ScheduledExecutorService;
43 import java.util.concurrent.ScheduledFuture;
44 import java.util.concurrent.Semaphore;
45 import java.util.concurrent.ThreadFactory;
46 import java.util.concurrent.TimeUnit;
47 import javax.annotation.PostConstruct;
48 import javax.annotation.PreDestroy;
49 import javax.inject.Inject;
50 import javax.inject.Singleton;
51 import org.opendaylight.controller.liblldp.NetUtils;
52 import org.opendaylight.controller.liblldp.Packet;
53 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
54 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
55 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
56 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
57 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
58 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
59 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
60 import org.opendaylight.genius.mdsalutil.packet.Ethernet;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.AlivenessMonitorService;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.EtherTypes;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.LivenessState;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEvent;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEventBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseInput;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateInput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateOutput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateOutputBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileDeleteInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetInput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetOutput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetOutputBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartInput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartOutput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartOutputBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStatus;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStopInput;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorUnpauseInput;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitoringMode;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntry;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntryBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntryKey;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.EndpointType;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.endpoint.type.Interface;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.endpoint.type.IpAddress;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.configs.MonitoringInfo;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.configs.MonitoringInfoBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.event.EventData;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.event.EventDataBuilder;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profile.create.input.Profile;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profiles.MonitorProfile;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profiles.MonitorProfileBuilder;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.start.input.Config;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitorid.key.map.MonitoridKeyEntry;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitorid.key.map.MonitoridKeyEntryBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringState;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringStateBuilder;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInputBuilder;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInputBuilder;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInputBuilder;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInReason;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
111 import org.opendaylight.yangtools.concepts.ListenerRegistration;
112 import org.opendaylight.yangtools.yang.binding.DataObject;
113 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
114 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
115 import org.opendaylight.yangtools.yang.common.RpcResult;
116 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
117 import org.slf4j.Logger;
118 import org.slf4j.LoggerFactory;
121 public class AlivenessMonitor implements AlivenessMonitorService, PacketProcessingListener,
122 InterfaceStateListener, AutoCloseable {
123 private static final Logger LOG = LoggerFactory.getLogger(AlivenessMonitor.class);
124 private final DataBroker dataBroker;
125 private IdManagerService idManager;
126 private NotificationPublishService notificationPublishService;
127 private final NotificationService notificationService;
128 private final Map<Class<?>, AlivenessProtocolHandler> packetTypeToProtocolHandler;
129 private final Map<EtherTypes, AlivenessProtocolHandler> ethTypeToProtocolHandler;
130 private final ConcurrentMap<Long, ScheduledFuture<?>> monitoringTasks;
131 private final ScheduledExecutorService monitorService;
132 private final ExecutorService callbackExecutorService;
133 private LoadingCache<Long, String> monitorIdKeyCache;
134 private static final int THREAD_POOL_SIZE = 4;
135 private static final boolean INTERRUPT_TASK = true;
136 private static final int NO_DELAY = 0;
137 private static final Long INITIAL_COUNT = 0L;
138 private static final boolean CREATE_MISSING_PARENT = true;
139 private static final int INVALID_ID = 0;
140 final ConcurrentMap<String, Semaphore> lockMap = new ConcurrentHashMap<>();
141 private ListenerRegistration<AlivenessMonitor> listenerRegistration;
143 private class FutureCallbackImpl implements FutureCallback<Void> {
144 private String message;
145 public FutureCallbackImpl(String message) {
146 this.message = message;
150 public void onFailure(Throwable error) {
151 LOG.warn("Error in Datastore operation - {}", message, error);
155 public void onSuccess(Void result) {
156 LOG.debug("Success in Datastore operation - {}", message);
160 private class AlivenessMonitorTask implements Runnable {
161 private MonitoringInfo monitoringInfo;
163 public AlivenessMonitorTask(MonitoringInfo monitoringInfo) {
164 this.monitoringInfo = monitoringInfo;
169 LOG.trace("send monitor packet - {}", monitoringInfo);
170 sendMonitorPacket(monitoringInfo);
175 public AlivenessMonitor(final DataBroker dataBroker, final IdManagerService idManager,
176 final NotificationPublishService notificationPublishService,
177 final NotificationService notificationService) {
178 this.dataBroker = dataBroker;
179 this.idManager = idManager;
180 this.notificationPublishService = notificationPublishService;
181 this.notificationService = notificationService;
182 ethTypeToProtocolHandler = new EnumMap<>(EtherTypes.class);
183 packetTypeToProtocolHandler = new HashMap<>();
184 monitorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
185 getMonitoringThreadFactory("Aliveness Monitoring Task"));
186 callbackExecutorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
187 getMonitoringThreadFactory("Aliveness Callback Handler"));
188 monitoringTasks = new ConcurrentHashMap<>();
192 public void start() {
193 LOG.info("{} start", getClass().getSimpleName());
196 listenerRegistration = notificationService.registerNotificationListener(this);
201 public void close() throws Exception {
202 monitorIdKeyCache.cleanUp();
203 monitorService.shutdown();
204 callbackExecutorService.shutdown();
205 if (listenerRegistration != null) {
206 listenerRegistration.close();
207 listenerRegistration = null;
209 LOG.info("{} close", getClass().getSimpleName());
212 // The following setters should only be used by the test classes.
213 public void setIdManager(IdManagerService idManager) {
214 this.idManager = idManager;
217 public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
218 this.notificationPublishService = notificationPublishService;
221 private ThreadFactory getMonitoringThreadFactory(String threadNameFormat) {
222 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
223 builder.setNameFormat(threadNameFormat);
224 builder.setUncaughtExceptionHandler(
225 (t, e) -> LOG.error("Received Uncaught Exception event in Thread: {}", t.getName(), e));
226 return builder.build();
229 private void initializeCache() {
230 monitorIdKeyCache = CacheBuilder.newBuilder()
231 .build(new CacheLoader<Long, String>() {
233 public String load(Long monitorId) throws Exception {
234 return read(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId)).transform(
235 MonitoridKeyEntry::getMonitorKey).orNull();
240 public void registerHandler(EtherTypes etherType, AlivenessProtocolHandler protocolHandler) {
241 ethTypeToProtocolHandler.put(etherType, protocolHandler);
242 packetTypeToProtocolHandler.put(protocolHandler.getPacketClass(), protocolHandler);
245 private void createIdPool() {
246 CreateIdPoolInput createPool = new CreateIdPoolInputBuilder()
247 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
248 .setLow(AlivenessMonitorConstants.MONITOR_IDPOOL_START)
249 .setHigh(AlivenessMonitorConstants.MONITOR_IDPOOL_SIZE)
251 Future<RpcResult<Void>> result = idManager.createIdPool(createPool);
252 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(result), new FutureCallback<RpcResult<Void>>() {
255 public void onFailure(Throwable error) {
256 LOG.error("Failed to create idPool for Aliveness Monitor Service",error);
260 public void onSuccess(RpcResult<Void> result) {
261 if (result.isSuccessful()) {
262 LOG.debug("Created IdPool for Aliveness Monitor Service");
264 LOG.error("RPC to create Idpool failed {}", result.getErrors());
270 private int getUniqueId(final String idKey) {
271 AllocateIdInput getIdInput = new AllocateIdInputBuilder()
272 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
273 .setIdKey(idKey).build();
275 Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(getIdInput);
278 RpcResult<AllocateIdOutput> rpcResult = result.get();
279 if (rpcResult.isSuccessful()) {
280 return rpcResult.getResult().getIdValue().intValue();
282 LOG.warn("RPC Call to Get Unique Id returned with Errors {}", rpcResult.getErrors());
284 } catch (InterruptedException | ExecutionException e) {
285 LOG.warn("Exception when getting Unique Id for key {}", idKey, e);
290 private void releaseId(String idKey) {
291 ReleaseIdInput idInput = new ReleaseIdInputBuilder()
292 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
293 .setIdKey(idKey).build();
295 Future<RpcResult<Void>> result = idManager.releaseId(idInput);
296 RpcResult<Void> rpcResult = result.get();
297 if (!rpcResult.isSuccessful()) {
298 LOG.warn("RPC Call to release Id {} with Key {} returned with Errors {}",
299 idKey, rpcResult.getErrors());
301 } catch (InterruptedException | ExecutionException e) {
302 LOG.warn("Exception when releasing Id for key {}", idKey, e);
307 public void onPacketReceived(PacketReceived packetReceived) {
308 Class<? extends PacketInReason> pktInReason = packetReceived.getPacketInReason();
309 if (LOG.isTraceEnabled()) {
310 LOG.trace("Packet Received {}", packetReceived );
313 if (pktInReason == SendToController.class) {
314 Packet packetInFormatted;
315 byte[] data = packetReceived.getPayload();
316 Ethernet res = new Ethernet();
318 packetInFormatted = res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
319 } catch (Exception e) {
320 LOG.warn("Failed to decode packet: {}", e.getMessage());
324 if (packetInFormatted == null) {
325 LOG.warn("Failed to deserialize Received Packet from table {}",
326 packetReceived.getTableId().getValue());
330 Object objPayload = packetInFormatted.getPayload();
332 if (objPayload == null) {
333 LOG.trace("Unsupported packet type. Ignoring the packet...");
337 if (LOG.isTraceEnabled()) {
338 LOG.trace("onPacketReceived packet: {}, packet class: {}", packetReceived,
339 objPayload.getClass());
342 AlivenessProtocolHandler livenessProtocolHandler = packetTypeToProtocolHandler.get(objPayload.getClass());
343 if (livenessProtocolHandler == null) {
347 String monitorKey = livenessProtocolHandler.handlePacketIn(packetInFormatted.getPayload(), packetReceived);
349 if (monitorKey != null) {
350 processReceivedMonitorKey(monitorKey);
352 LOG.debug("No monitorkey associated with received packet");
357 private void processReceivedMonitorKey(final String monitorKey) {
358 Preconditions.checkNotNull(monitorKey, "Monitor Key required to process the state");
360 LOG.debug("Processing monitorKey: {} for received packet", monitorKey);
362 final Semaphore lock = lockMap.get(monitorKey);
363 LOG.debug("Acquiring lock for monitor key : {} to process monitor packet", monitorKey);
366 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
368 ListenableFuture<Optional<MonitoringState>> stateResult =
369 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
372 Futures.addCallback(stateResult, new FutureCallback<Optional<MonitoringState>>() {
375 public void onSuccess(Optional<MonitoringState> optState) {
377 if (optState.isPresent()) {
378 final MonitoringState currentState = optState.get();
380 if (LOG.isTraceEnabled()) {
381 LOG.trace("OnPacketReceived : Monitoring state from ODS : {} ", currentState);
384 Long responsePendingCount = currentState.getResponsePendingCount();
386 //Need to relook at the pending count logic to support N out of M scenarios
387 // if (currentState.getState() != LivenessState.Up) {
388 // //Reset responsePendingCount when state changes from DOWN to UP
389 // responsePendingCount = INITIAL_COUNT;
392 // if (responsePendingCount > INITIAL_COUNT) {
393 // responsePendingCount = currentState.getResponsePendingCount() - 1;
395 responsePendingCount = INITIAL_COUNT;
397 final boolean stateChanged = (currentState.getState() == LivenessState.Down ||
398 currentState.getState() == LivenessState.Unknown);
400 final MonitoringState state =
401 new MonitoringStateBuilder().setMonitorKey(monitorKey).setState(LivenessState.Up)
402 .setResponsePendingCount(responsePendingCount).build();
403 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), state);
404 ListenableFuture<Void> writeResult = tx.submit();
407 Futures.addCallback(writeResult, new FutureCallback<Void>() {
409 public void onSuccess(Void noarg) {
413 if (LOG.isTraceEnabled()) {
414 LOG.trace("Sending notification for monitor Id : {} with Current State: {}",
415 currentState.getMonitorId(), LivenessState.Up);
417 publishNotification(currentState.getMonitorId(), LivenessState.Up);
419 if (LOG.isTraceEnabled()) {
420 LOG.trace("Successful in writing monitoring state {} to ODS", state);
426 public void onFailure(Throwable error) {
428 LOG.warn("Error in writing monitoring state : {} to Datastore", monitorKey, error);
429 if (LOG.isTraceEnabled()) {
430 LOG.trace("Error in writing monitoring state: {} to Datastore", state);
435 if (LOG.isTraceEnabled()) {
436 LOG.trace("Monitoring State not available for key: {} to process the Packet received",
439 //Complete the transaction
446 public void onFailure(Throwable error) {
447 LOG.error("Error when reading Monitoring State for key: {} to process the Packet received", monitorKey, error);
448 //FIXME: Not sure if the transaction status is valid to cancel
455 private String getIpAddress(EndpointType endpoint) {
456 String ipAddress = "";
457 if ( endpoint instanceof IpAddress) {
458 ipAddress = ((IpAddress) endpoint).getIpAddress().getIpv4Address().getValue();
459 } else if (endpoint instanceof Interface) {
460 ipAddress = ((Interface)endpoint).getInterfaceIp().getIpv4Address().getValue();
465 private String getUniqueKey(String interfaceName, String ethType, EndpointType source, EndpointType destination) {
466 StringBuilder builder = new StringBuilder().append(interfaceName).append(AlivenessMonitorConstants.SEPERATOR)
468 if (source != null) {
469 builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(source));
472 if (destination != null) {
473 builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(destination));
475 return builder.toString();
479 public Future<RpcResult<MonitorStartOutput>> monitorStart(MonitorStartInput input) {
480 RpcResultBuilder<MonitorStartOutput> rpcResultBuilder;
481 final Config in = input.getConfig();
482 Long profileId = in.getProfileId();
483 LOG.debug("Monitor Start invoked with Config: {}, Profile Id: {}", in, profileId);
486 if (in.getMode() != MonitoringMode.OneOne) {
487 throw new UnsupportedConfigException(
488 "Unsupported Monitoring mode. Currently one-one mode is supported");
491 Optional<MonitorProfile> optProfile =
492 read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
493 final MonitorProfile profile;
494 if (!optProfile.isPresent()) {
495 String errMsg = String.format("No monitoring profile associated with Id: %d", profileId);
496 LOG.error("Monitor start failed. {}", errMsg);
497 throw new RuntimeException(errMsg);
499 profile = optProfile.get();
502 final EtherTypes ethType = profile.getProtocolType();
504 String interfaceName = null;
505 EndpointType srcEndpointType = in.getSource().getEndpointType();
507 if ( srcEndpointType instanceof Interface) {
508 Interface endPoint = (Interface) srcEndpointType;
509 interfaceName = endPoint.getInterfaceName();
511 throw new UnsupportedConfigException(
512 "Unsupported source Endpoint type. Only Interface Endpoint currently supported for monitoring");
515 if (Strings.isNullOrEmpty(interfaceName)) {
516 throw new RuntimeException("Interface Name not defined in the source Endpoint");
519 //Initially the support is for one monitoring per interface.
520 //Revisit the retrieving monitor id logic when the multiple monitoring for same interface is needed.
521 EndpointType destEndpointType = null;
522 if (in.getDestination() != null) {
523 destEndpointType = in.getDestination().getEndpointType();
525 String idKey = getUniqueKey(interfaceName, ethType.toString(), srcEndpointType, destEndpointType);
526 final long monitorId = getUniqueId(idKey);
527 Optional<MonitoringInfo> optKey = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
528 final AlivenessProtocolHandler handler;
529 if (optKey.isPresent()) {
530 String message = String.format("Monitoring for the interface %s with this configuration "
531 + "is already registered.", interfaceName);
533 MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
534 rpcResultBuilder = RpcResultBuilder.success(output).withWarning(ErrorType.APPLICATION,
535 "config-exists", message);
536 return Futures.immediateFuture(rpcResultBuilder.build());
538 //Construct the monitor key
539 final MonitoringInfo monitoringInfo = new MonitoringInfoBuilder()
541 .setMode(in.getMode())
542 .setProfileId(profileId)
543 .setDestination(in.getDestination())
544 .setSource(in.getSource()).build();
545 //Construct the initial monitor state
546 handler = ethTypeToProtocolHandler.get(ethType);
547 final String monitoringKey = handler.getUniqueMonitoringKey(monitoringInfo);
549 MonitoringState monitoringState = null;
550 if (ethType == EtherTypes.Bfd) {
552 new MonitoringStateBuilder().setMonitorKey(monitoringKey).setMonitorId(monitorId)
553 .setState(LivenessState.Unknown).setStatus(MonitorStatus.Started).build();
556 new MonitoringStateBuilder().setMonitorKey(monitoringKey).setMonitorId(monitorId)
557 .setState(LivenessState.Unknown).setStatus(MonitorStatus.Started)
558 .setRequestCount(INITIAL_COUNT)
559 .setResponsePendingCount(INITIAL_COUNT).build();
562 MonitoridKeyEntry mapEntry = new MonitoridKeyEntryBuilder().setMonitorId(monitorId)
563 .setMonitorKey(monitoringKey).build();
565 WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
567 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId), monitoringInfo,
568 CREATE_MISSING_PARENT);
569 LOG.debug("adding oper monitoring info {}", monitoringInfo);
571 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitoringKey), monitoringState,
572 CREATE_MISSING_PARENT);
573 LOG.debug("adding oper monitoring state {}", monitoringState);
575 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId), mapEntry, CREATE_MISSING_PARENT);
576 LOG.debug("adding oper map entry {}", mapEntry);
578 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
580 public void onFailure(Throwable error) {
581 String errorMsg = String.format("Adding Monitoring info: %s in Datastore failed",
583 LOG.warn(errorMsg, error);
584 throw new RuntimeException(errorMsg, error);
588 public void onSuccess(Void noarg) {
589 lockMap.put(monitoringKey, new Semaphore(1, true));
590 if (ethType == EtherTypes.Bfd) {
591 handler.startMonitoringTask(monitoringInfo);
595 LOG.debug("Scheduling monitor task for config: {}", in);
596 scheduleMonitoringTask(monitoringInfo, profile.getMonitorInterval());
601 associateMonitorIdWithInterface(monitorId, interfaceName);
603 MonitorStartOutput output = new MonitorStartOutputBuilder()
604 .setMonitorId(monitorId).build();
606 rpcResultBuilder = RpcResultBuilder.success(output);
607 } catch(Exception e) {
608 LOG.error("Start Monitoring Failed. {}", e.getMessage(), e);
609 rpcResultBuilder = RpcResultBuilder.<MonitorStartOutput>failed().withError(ErrorType.APPLICATION, e.getMessage(), e);
611 return Futures.immediateFuture(rpcResultBuilder.build());
614 private void associateMonitorIdWithInterface(final Long monitorId, final String interfaceName) {
615 LOG.debug("associate monitor Id {} with interface {}", monitorId, interfaceName);
616 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
617 ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture =
618 tx.read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
619 ListenableFuture<Void> updateFuture =
620 Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
623 public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
624 if (optEntry.isPresent()) {
625 InterfaceMonitorEntry entry = optEntry.get();
626 List<Long> monitorIds = entry.getMonitorIds();
627 monitorIds.add(monitorId);
628 InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder()
629 .setKey(new InterfaceMonitorEntryKey(interfaceName))
630 .setMonitorIds(monitorIds).build();
631 tx.merge(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName),
634 //Create new monitor entry
635 LOG.debug("Adding new interface-monitor association for interface {} with id {}",
636 interfaceName, monitorId);
637 List<Long> monitorIds = new ArrayList<>();
638 monitorIds.add(monitorId);
639 InterfaceMonitorEntry newEntry =
640 new InterfaceMonitorEntryBuilder().setInterfaceName(interfaceName)
641 .setMonitorIds(monitorIds).build();
642 tx.put(LogicalDatastoreType.OPERATIONAL,
643 getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
649 Futures.addCallback(updateFuture, new FutureCallbackImpl(
650 String.format("Association of monitorId %d with Interface %s", monitorId, interfaceName)));
653 private void scheduleMonitoringTask(MonitoringInfo monitoringInfo, long monitorInterval) {
654 AlivenessMonitorTask monitorTask = new AlivenessMonitorTask(monitoringInfo);
655 ScheduledFuture<?> scheduledFutureResult = monitorService.scheduleAtFixedRate(
656 monitorTask, NO_DELAY, monitorInterval, TimeUnit.MILLISECONDS);
657 monitoringTasks.put(monitoringInfo.getId(), scheduledFutureResult);
661 public Future<RpcResult<Void>> monitorPause(MonitorPauseInput input) {
662 LOG.debug("Monitor Pause operation invoked for monitor id: {}", input.getMonitorId());
663 SettableFuture<RpcResult<Void>> result = SettableFuture.create();
664 final Long monitorId = input.getMonitorId();
666 //Set the monitoring status to Paused
667 updateMonitorStatusTo(monitorId, MonitorStatus.Paused, currentStatus -> currentStatus == MonitorStatus.Started);
669 if (stopMonitoringTask(monitorId)) {
670 result.set(RpcResultBuilder.<Void>success().build());
672 String errorMsg = String.format("No Monitoring Task availble to pause for the given monitor id : %d",
674 LOG.error("Monitor Pause operation failed- {}",errorMsg);
675 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
682 public Future<RpcResult<Void>> monitorUnpause(MonitorUnpauseInput input) {
683 LOG.debug("Monitor Unpause operation invoked for monitor id: {}", input.getMonitorId());
684 final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
686 final Long monitorId = input.getMonitorId();
687 final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
688 ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
689 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
691 Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
694 public void onFailure(Throwable error) {
695 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
696 LOG.error("Monitor unpause Failed. {}", msg, error);
697 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
701 public void onSuccess(Optional<MonitoringInfo> optInfo) {
702 if (optInfo.isPresent()) {
703 final MonitoringInfo info = optInfo.get();
704 ListenableFuture<Optional<MonitorProfile>> readProfile =
705 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
706 Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
709 public void onFailure(Throwable error) {
710 String msg = String.format("Unable to read Monitoring profile associated with id %d",
711 info.getProfileId());
712 LOG.warn("Monitor unpause Failed. {}", msg, error);
713 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error)
718 public void onSuccess(Optional<MonitorProfile> optProfile) {
720 if (optProfile.isPresent()) {
721 updateMonitorStatusTo(monitorId, MonitorStatus.Started,
722 currentStatus -> (currentStatus == MonitorStatus.Paused ||
723 currentStatus == MonitorStatus.Stopped));
724 MonitorProfile profile = optProfile.get();
725 LOG.debug("Monitor Resume - Scheduling monitoring task with Id: {}", monitorId);
726 EtherTypes protocolType = profile.getProtocolType();
727 if (protocolType == EtherTypes.Bfd) {
728 LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
729 ((HwVtepTunnelsStateHandler) ethTypeToProtocolHandler
730 .get(protocolType)).resetMonitoringTask(info, true);
732 scheduleMonitoringTask(info, profile.getMonitorInterval());
734 result.set(RpcResultBuilder.<Void>success().build());
736 String msg = String.format("Monitoring profile associated with id %d is not present",
737 info.getProfileId());
738 LOG.warn("Monitor unpause Failed. {}", msg);
739 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
745 String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
746 LOG.warn("Monitor unpause Failed. {}", msg);
747 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
750 }, callbackExecutorService);
755 private boolean stopMonitoringTask(Long monitorId) {
756 return stopMonitoringTask(monitorId, INTERRUPT_TASK);
759 private boolean stopMonitoringTask(Long monitorId, boolean interruptTask) {
760 Optional<MonitoringInfo> optInfo = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
761 if (!optInfo.isPresent()) {
762 LOG.warn("There is no monitoring info present for monitor id {}", monitorId);
765 MonitoringInfo monitoringInfo = optInfo.get();
766 Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL,
767 getMonitorProfileId(monitoringInfo.getProfileId()));
768 EtherTypes protocolType = optProfile.get().getProtocolType();
769 if (protocolType == EtherTypes.Bfd) {
770 LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
771 ((HwVtepTunnelsStateHandler) ethTypeToProtocolHandler.get(protocolType))
772 .resetMonitoringTask(monitoringInfo, false);
775 ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
776 if (scheduledFutureResult != null) {
777 scheduledFutureResult.cancel(interruptTask);
783 Optional<MonitorProfile> getMonitorProfile(Long profileId) {
784 return read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
787 void acquireLock(Semaphore lock) {
792 boolean acquiredLock = false;
794 acquiredLock = lock.tryAcquire(50, TimeUnit.MILLISECONDS);
795 } catch (InterruptedException e) {
796 LOG.warn("Thread interrupted when waiting to acquire the lock");
800 LOG.warn("Previous transaction did not complete in time. Releasing the lock to proceed");
804 LOG.trace("Lock acquired successfully");
805 } catch (InterruptedException e) {
806 LOG.warn("Acquire failed");
809 LOG.trace("Lock acquired successfully");
813 void releaseLock(Semaphore lock) {
819 private void sendMonitorPacket(final MonitoringInfo monitoringInfo) {
820 //TODO: Handle interrupts
821 final Long monitorId = monitoringInfo.getId();
822 final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
823 if (monitorKey == null) {
824 LOG.warn("No monitor Key associated with id {} to send the monitor packet", monitorId);
827 LOG.debug("Sending monitoring packet for key: {}", monitorKey);
830 final MonitorProfile profile;
831 Optional<MonitorProfile> optProfile = getMonitorProfile(monitoringInfo.getProfileId());
832 if (optProfile.isPresent()) {
833 profile = optProfile.get();
835 LOG.warn("No monitor profile associated with id {}. "
836 + "Could not send Monitor packet for monitor-id {}", monitoringInfo.getProfileId(), monitorId);
840 final Semaphore lock = lockMap.get(monitorKey);
841 LOG.debug("Acquiring lock for monitor key : {} to send monitor packet", monitorKey);
844 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
845 ListenableFuture<Optional<MonitoringState>> readResult =
846 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
847 ListenableFuture<Void> writeResult =
848 Futures.transform(readResult, new AsyncFunction<Optional<MonitoringState>, Void>() {
851 public ListenableFuture<Void> apply(Optional<MonitoringState> optState)
853 if (optState.isPresent()) {
854 MonitoringState state = optState.get();
856 //Increase the request count
857 Long requestCount = state.getRequestCount() + 1;
859 //Check with the monitor window
860 LivenessState currentLivenessState = state.getState();
862 //Increase the pending response count
863 long responsePendingCount = state.getResponsePendingCount();
864 if (responsePendingCount < profile.getMonitorWindow()) {
865 responsePendingCount = responsePendingCount + 1;
868 //Check with the failure thresold
869 if (responsePendingCount >= profile.getFailureThreshold()) {
870 //Change the state to down and notify
871 if (currentLivenessState != LivenessState.Down) {
872 LOG.debug("Response pending Count: {}, Failure threshold: {} for monitorId {}",
873 responsePendingCount, profile.getFailureThreshold(), state.getMonitorId());
874 LOG.info("Sending notification for monitor Id : {} with State: {}",
875 state.getMonitorId(), LivenessState.Down);
876 publishNotification(monitorId, LivenessState.Down);
877 currentLivenessState = LivenessState.Down;
878 //Reset requestCount when state changes from UP to DOWN
879 requestCount = INITIAL_COUNT;
883 //Update the ODS with state
884 MonitoringState updatedState = new MonitoringStateBuilder(/*state*/)
885 .setMonitorKey(state.getMonitorKey())
886 .setRequestCount(requestCount)
887 .setResponsePendingCount(responsePendingCount)
888 .setState(currentLivenessState).build();
889 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(state.getMonitorKey()), updatedState);
892 //Close the transaction
894 String errorMsg = String.format("Monitoring State associated with id %d is not present to send packet out.", monitorId);
895 return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
901 Futures.addCallback(writeResult, new FutureCallback<Void>() {
903 public void onSuccess(Void noarg) {
904 //invoke packetout on protocol handler
905 AlivenessProtocolHandler handler = ethTypeToProtocolHandler.get(profile.getProtocolType());
906 if (handler != null) {
907 LOG.debug("Sending monitoring packet {}", monitoringInfo);
908 handler.startMonitoringTask(monitoringInfo);
914 public void onFailure(Throwable error) {
915 LOG.warn("Updating monitoring state for key: {} failed. Monitoring packet is not sent",
922 void publishNotification(final Long monitorId, final LivenessState state) {
923 LOG.debug("Sending notification for id {} - state {}", monitorId, state);
924 EventData data = new EventDataBuilder().setMonitorId(monitorId)
925 .setMonitorState(state).build();
926 MonitorEvent event = new MonitorEventBuilder().setEventData(data).build();
927 final ListenableFuture<?> eventFuture = notificationPublishService.offerNotification(event);
928 Futures.addCallback(eventFuture, new FutureCallback<Object>() {
930 public void onFailure(Throwable error) {
931 LOG.warn("Error in notifying listeners for id {} - state {}", monitorId, state, error);
935 public void onSuccess(Object arg) {
936 LOG.trace("Successful in notifying listeners for id {} - state {}", monitorId, state);
942 public Future<RpcResult<MonitorProfileCreateOutput>> monitorProfileCreate(final MonitorProfileCreateInput input) {
943 LOG.debug("Monitor Profile Create operation - {}", input.getProfile());
944 final SettableFuture<RpcResult<MonitorProfileCreateOutput>> result = SettableFuture.create();
945 Profile profile = input.getProfile();
946 final Long failureThreshold = profile.getFailureThreshold();
947 final Long monitorInterval = profile.getMonitorInterval();
948 final Long monitorWindow = profile.getMonitorWindow();
949 final EtherTypes ethType = profile.getProtocolType();
950 String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, ethType);
951 final Long profileId = (long) getUniqueId(idKey);
953 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
954 ListenableFuture<Optional<MonitorProfile>> readFuture =
955 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
956 ListenableFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture =
957 Futures.transform(readFuture,
958 new AsyncFunction<Optional<MonitorProfile>, RpcResult<MonitorProfileCreateOutput>>() {
961 public ListenableFuture<RpcResult<MonitorProfileCreateOutput>> apply(
962 Optional<MonitorProfile> optProfile) throws Exception {
963 if (optProfile.isPresent()) {
965 MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
966 .setProfileId(profileId).build();
967 String msg = String.format("Monitor profile %s already present for the given input",
970 result.set(RpcResultBuilder.success(output)
971 .withWarning(ErrorType.PROTOCOL, "profile-exists", msg).build());
973 final MonitorProfile monitorProfile = new MonitorProfileBuilder()
975 .setFailureThreshold(failureThreshold)
976 .setMonitorInterval(monitorInterval)
977 .setMonitorWindow(monitorWindow)
978 .setProtocolType(ethType).build();
979 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId),
980 monitorProfile, CREATE_MISSING_PARENT);
981 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
983 public void onFailure(Throwable error) {
984 String msg = String.format("Error when storing monitorprofile %s in datastore",
986 LOG.error(msg, error);
987 result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
988 .withError(ErrorType.APPLICATION, msg, error).build());
991 public void onSuccess(Void noarg) {
992 MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
993 .setProfileId(profileId).build();
994 result.set(RpcResultBuilder.success(output).build());
1000 }, callbackExecutorService);
1001 Futures.addCallback(resultFuture, new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
1003 public void onFailure(Throwable error) {
1004 //This would happen when any error happens during reading for monitoring profile
1005 String msg = String.format("Error in creating monitorprofile - %s", input);
1006 result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
1007 .withError(ErrorType.APPLICATION, msg, error).build());
1008 LOG.error(msg, error);
1012 public void onSuccess(RpcResult<MonitorProfileCreateOutput> result) {
1013 LOG.debug("Successfully created monitor Profile {} ", input);
1015 }, callbackExecutorService);
1021 public Future<RpcResult<MonitorProfileGetOutput>> monitorProfileGet(MonitorProfileGetInput input){
1022 LOG.debug("Monitor Profile Get operation for input profile- {}", input.getProfile());
1023 RpcResultBuilder<MonitorProfileGetOutput> rpcResultBuilder;
1025 final Long profileId = getExistingProfileId(input);
1027 MonitorProfileGetOutputBuilder output = new MonitorProfileGetOutputBuilder().setProfileId(profileId);
1028 rpcResultBuilder = RpcResultBuilder.success();
1029 rpcResultBuilder.withResult(output.build());
1030 }catch(Exception e){
1031 LOG.error("Retrieval of monitor profile ID for input {} failed due to {}" , input, e);
1032 rpcResultBuilder = RpcResultBuilder.failed();
1034 return Futures.immediateFuture(rpcResultBuilder.build());
1037 private Long getExistingProfileId(MonitorProfileGetInput input){
1038 org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profile.get.input.Profile profile = input.getProfile();
1039 final Long failureThreshold = profile.getFailureThreshold();
1040 final Long monitorInterval = profile.getMonitorInterval();
1041 final Long monitorWindow = profile.getMonitorWindow();
1042 final EtherTypes ethType = profile.getProtocolType();
1043 LOG.debug("getExistingProfileId for profile : {}", input.getProfile());
1044 String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, ethType);
1045 LOG.debug("Obtained existing profile ID for profile : {}", input.getProfile());
1046 return ((long) getUniqueId(idKey));
1049 private String getUniqueProfileKey(Long failureThreshold, Long monitorInterval, Long monitorWindow,
1050 EtherTypes ethType) {
1051 return String.valueOf(failureThreshold) + AlivenessMonitorConstants.SEPERATOR +
1052 monitorInterval + AlivenessMonitorConstants.SEPERATOR +
1053 monitorWindow + AlivenessMonitorConstants.SEPERATOR +
1054 ethType + AlivenessMonitorConstants.SEPERATOR;
1058 public Future<RpcResult<Void>> monitorProfileDelete(final MonitorProfileDeleteInput input) {
1059 LOG.debug("Monitor Profile delete for Id: {}", input.getProfileId());
1060 final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
1061 final Long profileId = input.getProfileId();
1062 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1063 ListenableFuture<Optional<MonitorProfile>> readFuture =
1064 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
1065 ListenableFuture<RpcResult<Void>> writeFuture =
1066 Futures.transform(readFuture, new AsyncFunction<Optional<MonitorProfile>, RpcResult<Void>>() {
1068 public ListenableFuture<RpcResult<Void>> apply(final Optional<MonitorProfile> optProfile) throws Exception {
1069 if (optProfile.isPresent()) {
1070 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
1071 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
1073 public void onFailure(Throwable error) {
1074 String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
1075 LOG.error(msg, error);
1076 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
1080 public void onSuccess(Void noarg) {
1081 MonitorProfile profile = optProfile.get();
1082 String id = getUniqueProfileKey(profile.getFailureThreshold(), profile.getMonitorInterval(),
1083 profile.getMonitorWindow(), profile.getProtocolType());
1085 result.set(RpcResultBuilder.<Void>success().build());
1089 String msg = String.format("Monitor profile with Id: %d does not exist", profileId);
1091 result.set(RpcResultBuilder.<Void>success().withWarning(ErrorType.PROTOCOL, "invalid-value", msg).build());
1095 }, callbackExecutorService);
1097 Futures.addCallback(writeFuture, new FutureCallback<RpcResult<Void>>() {
1100 public void onFailure(Throwable error) {
1101 String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
1102 LOG.error(msg, error);
1103 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
1107 public void onSuccess(RpcResult<Void> noarg) {
1108 LOG.debug("Successfully removed Monitor Profile {}", profileId);
1110 }, callbackExecutorService);
1115 public Future<RpcResult<Void>> monitorStop(MonitorStopInput input) {
1116 LOG.debug("Monitor Stop operation for monitor id - {}", input.getMonitorId());
1117 SettableFuture<RpcResult<Void>> result = SettableFuture.create();
1119 final Long monitorId = input.getMonitorId();
1120 Optional<MonitoringInfo> optInfo = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1121 if (optInfo.isPresent()) {
1122 //Stop the monitoring task
1123 stopMonitoringTask(monitorId);
1125 //Cleanup the Data store
1126 WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
1127 String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1128 if (monitorKey != null) {
1129 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
1130 monitorIdKeyCache.invalidate(monitorId);
1133 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1134 Futures.addCallback(tx.submit(),
1135 new FutureCallbackImpl(String.format("Delete monitor state with Id %d", monitorId)));
1137 MonitoringInfo info = optInfo.get();
1138 String interfaceName = getInterfaceName(info.getSource().getEndpointType());
1139 if (interfaceName != null) {
1140 removeMonitorIdFromInterfaceAssociation(monitorId, interfaceName);
1142 releaseIdForMonitoringInfo(info);
1144 lockMap.remove(monitorKey);
1146 result.set(RpcResultBuilder.<Void>success().build());
1148 String errorMsg = String.format("Do not have monitoring information associated with key %d", monitorId);
1149 LOG.error("Delete monitoring operation Failed - {}", errorMsg);
1150 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
1156 private void removeMonitorIdFromInterfaceAssociation(final Long monitorId, final String interfaceName) {
1157 LOG.debug("Remove monitorId {} from Interface association {}", monitorId, interfaceName);
1158 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1159 ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
1160 getInterfaceMonitorMapId(interfaceName));
1161 ListenableFuture<Void> updateFuture =
1162 Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
1165 public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
1166 if (optEntry.isPresent()) {
1167 InterfaceMonitorEntry entry = optEntry.get();
1168 List<Long> monitorIds = entry.getMonitorIds();
1169 monitorIds.remove(monitorId);
1170 InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder(entry)
1171 .setKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
1172 tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
1175 LOG.warn("No Interface map entry found {} to remove monitorId {}", interfaceName, monitorId);
1177 return Futures.immediateFuture(null);
1182 Futures.addCallback(updateFuture, new FutureCallbackImpl(
1183 String.format("Dis-association of monitorId %d with Interface %s", monitorId, interfaceName)));
1187 private void releaseIdForMonitoringInfo(MonitoringInfo info) {
1188 Long monitorId = info.getId();
1189 EndpointType source = info.getSource().getEndpointType();
1190 String interfaceName = getInterfaceName(source);
1191 if (!Strings.isNullOrEmpty(interfaceName)) {
1192 Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1193 if (optProfile.isPresent()) {
1194 EtherTypes ethType = optProfile.get().getProtocolType();
1195 EndpointType destination = (info.getDestination() != null) ? info.getDestination().getEndpointType() : null;
1196 String idKey = getUniqueKey(interfaceName, ethType.toString(), source, destination);
1199 LOG.warn("Could not release monitorId {}. No profile associated with it", monitorId);
1204 private String getInterfaceName(EndpointType endpoint) {
1205 String interfaceName = null;
1206 if (endpoint instanceof Interface) {
1207 interfaceName = ((Interface)endpoint).getInterfaceName();
1209 return interfaceName;
1212 private void stopMonitoring(long monitorId) {
1213 updateMonitorStatusTo(monitorId, MonitorStatus.Stopped, currentStatus -> currentStatus != MonitorStatus.Stopped);
1214 if (!stopMonitoringTask(monitorId)) {
1215 LOG.warn("No monitoring task running to perform cancel operation for monitorId {}", monitorId);
1219 private void updateMonitorStatusTo(final Long monitorId, final MonitorStatus newStatus, final Predicate<MonitorStatus> isValidStatus) {
1220 final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1221 if (monitorKey == null) {
1222 LOG.warn("No monitor Key associated with id {} to change the monitor status to {}", monitorId, newStatus);
1225 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1227 ListenableFuture<Optional<MonitoringState>> readResult =
1228 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
1230 ListenableFuture<Void> writeResult = Futures.transform(readResult,
1231 new AsyncFunction<Optional<MonitoringState>, Void>() {
1233 public ListenableFuture<Void> apply(Optional<MonitoringState> optState) throws Exception {
1234 if (optState.isPresent()) {
1235 MonitoringState state = optState.get();
1236 if (isValidStatus.apply(state.getStatus())) {
1237 MonitoringState updatedState = new MonitoringStateBuilder().setMonitorKey(monitorKey)
1238 .setStatus(newStatus).build();
1239 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), updatedState);
1241 LOG.warn("Invalid Monitoring status {}, cannot be updated to {} for monitorId {}"
1242 , state.getStatus(), newStatus, monitorId);
1245 LOG.warn("No associated monitoring state data available to update the status to {} for {}", newStatus, monitorId);
1251 Futures.addCallback(writeResult,
1252 new FutureCallbackImpl(String.format("Monitor status update for %d to %s",
1253 monitorId, newStatus.toString())));
1256 private void resumeMonitoring(final long monitorId) {
1257 final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
1258 ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
1259 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1261 Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
1264 public void onFailure(Throwable error) {
1265 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
1266 LOG.error("Monitor resume Failed. {}", msg, error);
1270 public void onSuccess(Optional<MonitoringInfo> optInfo) {
1271 if (optInfo.isPresent()) {
1272 final MonitoringInfo info = optInfo.get();
1273 ListenableFuture<Optional<MonitorProfile>> readProfile =
1274 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1275 Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
1278 public void onFailure(Throwable error) {
1279 String msg = String.format("Unable to read Monitoring profile associated with id %d",
1280 info.getProfileId());
1281 LOG.warn("Monitor resume Failed. {}", msg, error);
1285 public void onSuccess(Optional<MonitorProfile> optProfile) {
1287 if (optProfile.isPresent()) {
1288 updateMonitorStatusTo(monitorId, MonitorStatus.Started,
1289 currentStatus -> currentStatus != MonitorStatus.Started);
1290 MonitorProfile profile = optProfile.get();
1291 LOG.debug("Monitor Resume - Scheduling monitoring task for Id: {}", monitorId);
1292 scheduleMonitoringTask(info, profile.getMonitorInterval());
1294 String msg = String.format("Monitoring profile associated with id %d is not present",
1295 info.getProfileId());
1296 LOG.warn("Monitor resume Failed. {}", msg);
1302 String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
1303 LOG.warn("Monitor resume Failed. {}", msg);
1309 //DATA STORE OPERATIONS
1310 <T extends DataObject> Optional<T> read(LogicalDatastoreType datastoreType, InstanceIdentifier<T> path) {
1311 try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction()) {
1312 return tx.read(datastoreType, path).get();
1313 } catch (InterruptedException | ExecutionException e) {
1314 LOG.warn("Error reading data from path {} in datastore {}", path, datastoreType, e);
1317 return Optional.absent();
1321 public void onInterfaceStateUp(String interfaceName) {
1322 List<Long> monitorIds = getMonitorIds(interfaceName);
1323 if (monitorIds.isEmpty()) {
1324 LOG.warn("Could not get monitorId for interface: {}", interfaceName);
1327 for(Long monitorId : monitorIds) {
1328 LOG.debug("Resume monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1329 resumeMonitoring(monitorId);
1334 public void onInterfaceStateDown(String interfaceName) {
1335 List<Long> monitorIds = getMonitorIds(interfaceName);
1336 if (monitorIds.isEmpty()) {
1337 LOG.warn("Could not get monitorIds for interface: {}", interfaceName);
1340 for(Long monitorId : monitorIds) {
1341 LOG.debug("Suspend monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1342 stopMonitoring(monitorId);
1346 private List<Long> getMonitorIds(String interfaceName) {
1347 return read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName)).transform(
1348 InterfaceMonitorEntry::getMonitorIds).or(Collections.emptyList());