Order imports to comply with checkstyle
[genius.git] / alivenessmonitor / alivenessmonitor-impl / src / main / java / org / opendaylight / genius / alivenessmonitor / internal / AlivenessMonitor.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.alivenessmonitor.internal;
9
10 import static org.opendaylight.genius.alivenessmonitor.internal.AlivenessMonitorUtil.getInterfaceMonitorMapId;
11 import static org.opendaylight.genius.alivenessmonitor.internal.AlivenessMonitorUtil.getMonitorMapId;
12 import static org.opendaylight.genius.alivenessmonitor.internal.AlivenessMonitorUtil.getMonitorProfileId;
13 import static org.opendaylight.genius.alivenessmonitor.internal.AlivenessMonitorUtil.getMonitorStateId;
14 import static org.opendaylight.genius.alivenessmonitor.internal.AlivenessMonitorUtil.getMonitoringInfoId;
15
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import com.google.common.base.Predicate;
19 import com.google.common.base.Strings;
20 import com.google.common.cache.CacheBuilder;
21 import com.google.common.cache.CacheLoader;
22 import com.google.common.cache.LoadingCache;
23 import com.google.common.util.concurrent.AsyncFunction;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.JdkFutureAdapters;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.SettableFuture;
29 import com.google.common.util.concurrent.ThreadFactoryBuilder;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.EnumMap;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ConcurrentMap;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.ScheduledExecutorService;
43 import java.util.concurrent.ScheduledFuture;
44 import java.util.concurrent.Semaphore;
45 import java.util.concurrent.ThreadFactory;
46 import java.util.concurrent.TimeUnit;
47 import javax.annotation.PostConstruct;
48 import javax.annotation.PreDestroy;
49 import javax.inject.Inject;
50 import javax.inject.Singleton;
51 import org.opendaylight.controller.liblldp.NetUtils;
52 import org.opendaylight.controller.liblldp.Packet;
53 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
54 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
55 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
56 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
57 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
58 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
59 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
60 import org.opendaylight.genius.mdsalutil.packet.Ethernet;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.AlivenessMonitorService;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.EtherTypes;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.LivenessState;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEvent;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEventBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseInput;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateInput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateOutput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateOutputBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileDeleteInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetInput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetOutput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetOutputBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartInput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartOutput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartOutputBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStatus;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStopInput;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorUnpauseInput;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitoringMode;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntry;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntryBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntryKey;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.EndpointType;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.endpoint.type.Interface;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.endpoint.type.IpAddress;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.configs.MonitoringInfo;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.configs.MonitoringInfoBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.event.EventData;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.event.EventDataBuilder;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profile.create.input.Profile;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profiles.MonitorProfile;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profiles.MonitorProfileBuilder;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.start.input.Config;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitorid.key.map.MonitoridKeyEntry;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitorid.key.map.MonitoridKeyEntryBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringState;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringStateBuilder;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInputBuilder;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInputBuilder;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInputBuilder;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInReason;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
111 import org.opendaylight.yangtools.concepts.ListenerRegistration;
112 import org.opendaylight.yangtools.yang.binding.DataObject;
113 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
114 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
115 import org.opendaylight.yangtools.yang.common.RpcResult;
116 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
117 import org.slf4j.Logger;
118 import org.slf4j.LoggerFactory;
119
120 @Singleton
121 public class AlivenessMonitor implements AlivenessMonitorService, PacketProcessingListener,
122         InterfaceStateListener, AutoCloseable {
123     private static final Logger LOG = LoggerFactory.getLogger(AlivenessMonitor.class);
124     private final DataBroker dataBroker;
125     private IdManagerService idManager;
126     private NotificationPublishService notificationPublishService;
127     private final NotificationService notificationService;
128     private final Map<Class<?>, AlivenessProtocolHandler> packetTypeToProtocolHandler;
129     private final Map<EtherTypes, AlivenessProtocolHandler> ethTypeToProtocolHandler;
130     private final ConcurrentMap<Long, ScheduledFuture<?>> monitoringTasks;
131     private final ScheduledExecutorService monitorService;
132     private final ExecutorService callbackExecutorService;
133     private LoadingCache<Long, String> monitorIdKeyCache;
134     private static final int THREAD_POOL_SIZE = 4;
135     private static final boolean INTERRUPT_TASK = true;
136     private static final int NO_DELAY = 0;
137     private static final Long INITIAL_COUNT = 0L;
138     private static final boolean CREATE_MISSING_PARENT = true;
139     private static final int INVALID_ID = 0;
140     final ConcurrentMap<String, Semaphore> lockMap = new ConcurrentHashMap<>();
141     private ListenerRegistration<AlivenessMonitor> listenerRegistration;
142
143     private class FutureCallbackImpl implements FutureCallback<Void> {
144         private String message;
145         public FutureCallbackImpl(String message) {
146             this.message = message;
147         }
148
149         @Override
150         public void onFailure(Throwable error) {
151             LOG.warn("Error in Datastore operation - {}", message, error);
152         }
153
154         @Override
155         public void onSuccess(Void result) {
156             LOG.debug("Success in Datastore operation - {}", message);
157         }
158     }
159
160     private class AlivenessMonitorTask implements Runnable {
161         private MonitoringInfo monitoringInfo;
162
163         public AlivenessMonitorTask(MonitoringInfo monitoringInfo) {
164             this.monitoringInfo = monitoringInfo;
165         }
166
167         @Override
168         public void run() {
169             LOG.trace("send monitor packet - {}", monitoringInfo);
170             sendMonitorPacket(monitoringInfo);
171         }
172     }
173
174     @Inject
175     public AlivenessMonitor(final DataBroker dataBroker, final IdManagerService idManager,
176                             final NotificationPublishService notificationPublishService,
177                             final NotificationService notificationService) {
178         this.dataBroker = dataBroker;
179         this.idManager = idManager;
180         this.notificationPublishService = notificationPublishService;
181         this.notificationService = notificationService;
182         ethTypeToProtocolHandler = new EnumMap<>(EtherTypes.class);
183         packetTypeToProtocolHandler = new HashMap<>();
184         monitorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
185                             getMonitoringThreadFactory("Aliveness Monitoring Task"));
186         callbackExecutorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
187                             getMonitoringThreadFactory("Aliveness Callback Handler"));
188         monitoringTasks = new ConcurrentHashMap<>();
189     }
190
191     @PostConstruct
192     public void start() {
193         LOG.info("{} start", getClass().getSimpleName());
194         createIdPool();
195         initializeCache();
196         listenerRegistration = notificationService.registerNotificationListener(this);
197     }
198
199     @Override
200     @PreDestroy
201     public void close() throws Exception {
202         monitorIdKeyCache.cleanUp();
203         monitorService.shutdown();
204         callbackExecutorService.shutdown();
205         if (listenerRegistration != null) {
206             listenerRegistration.close();
207             listenerRegistration = null;
208         }
209         LOG.info("{} close", getClass().getSimpleName());
210     }
211
212     // The following setters should only be used by the test classes.
213     public void setIdManager(IdManagerService idManager) {
214         this.idManager = idManager;
215     }
216
217     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
218         this.notificationPublishService = notificationPublishService;
219     }
220
221     private ThreadFactory getMonitoringThreadFactory(String threadNameFormat) {
222         ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
223         builder.setNameFormat(threadNameFormat);
224         builder.setUncaughtExceptionHandler(
225                 (t, e) -> LOG.error("Received Uncaught Exception event in Thread: {}", t.getName(), e));
226         return builder.build();
227     }
228
229     private void initializeCache() {
230         monitorIdKeyCache = CacheBuilder.newBuilder()
231                 .build(new CacheLoader<Long, String>() {
232                     @Override
233                     public String load(Long monitorId) throws Exception {
234                         return read(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId)).transform(
235                                 MonitoridKeyEntry::getMonitorKey).orNull();
236                     }
237                 });
238     }
239
240     public void registerHandler(EtherTypes etherType, AlivenessProtocolHandler protocolHandler) {
241         ethTypeToProtocolHandler.put(etherType, protocolHandler);
242         packetTypeToProtocolHandler.put(protocolHandler.getPacketClass(), protocolHandler);
243     }
244
245     private void createIdPool() {
246         CreateIdPoolInput createPool = new CreateIdPoolInputBuilder()
247                                             .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
248                                             .setLow(AlivenessMonitorConstants.MONITOR_IDPOOL_START)
249                                             .setHigh(AlivenessMonitorConstants.MONITOR_IDPOOL_SIZE)
250                                             .build();
251         Future<RpcResult<Void>> result = idManager.createIdPool(createPool);
252         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(result), new FutureCallback<RpcResult<Void>>() {
253
254             @Override
255             public void onFailure(Throwable error) {
256                 LOG.error("Failed to create idPool for Aliveness Monitor Service",error);
257             }
258
259             @Override
260             public void onSuccess(RpcResult<Void> result) {
261                 if (result.isSuccessful()) {
262                     LOG.debug("Created IdPool for Aliveness Monitor Service");
263                 } else {
264                     LOG.error("RPC to create Idpool failed {}", result.getErrors());
265                 }
266             }
267         });
268     }
269
270     private int getUniqueId(final String idKey) {
271         AllocateIdInput getIdInput = new AllocateIdInputBuilder()
272                   .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
273                   .setIdKey(idKey).build();
274
275         Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(getIdInput);
276
277         try {
278             RpcResult<AllocateIdOutput> rpcResult = result.get();
279             if (rpcResult.isSuccessful()) {
280                 return rpcResult.getResult().getIdValue().intValue();
281             } else {
282                 LOG.warn("RPC Call to Get Unique Id returned with Errors {}", rpcResult.getErrors());
283             }
284         } catch (InterruptedException | ExecutionException e) {
285             LOG.warn("Exception when getting Unique Id for key {}", idKey, e);
286         }
287         return INVALID_ID;
288     }
289
290     private void releaseId(String idKey) {
291         ReleaseIdInput idInput = new ReleaseIdInputBuilder()
292                                        .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
293                                        .setIdKey(idKey).build();
294         try {
295             Future<RpcResult<Void>> result = idManager.releaseId(idInput);
296             RpcResult<Void> rpcResult = result.get();
297             if (!rpcResult.isSuccessful()) {
298                 LOG.warn("RPC Call to release Id {} with Key {} returned with Errors {}",
299                                                             idKey, rpcResult.getErrors());
300             }
301         } catch (InterruptedException | ExecutionException e) {
302             LOG.warn("Exception when releasing Id for key {}", idKey, e);
303         }
304     }
305
306     @Override
307     public void onPacketReceived(PacketReceived packetReceived) {
308         Class<? extends PacketInReason> pktInReason = packetReceived.getPacketInReason();
309         if (LOG.isTraceEnabled()) {
310             LOG.trace("Packet Received {}", packetReceived );
311         }
312
313         if (pktInReason == SendToController.class) {
314             Packet packetInFormatted;
315             byte[] data = packetReceived.getPayload();
316             Ethernet res = new Ethernet();
317             try {
318                 packetInFormatted = res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
319             } catch (Exception e) {
320                 LOG.warn("Failed to decode packet: {}", e.getMessage());
321                 return;
322             }
323
324             if (packetInFormatted == null) {
325                 LOG.warn("Failed to deserialize Received Packet from table {}",
326                         packetReceived.getTableId().getValue());
327                 return;
328             }
329
330             Object objPayload = packetInFormatted.getPayload();
331
332             if (objPayload == null) {
333                 LOG.trace("Unsupported packet type. Ignoring the packet...");
334                 return;
335             }
336
337             if (LOG.isTraceEnabled()) {
338                 LOG.trace("onPacketReceived packet: {}, packet class: {}", packetReceived,
339                         objPayload.getClass());
340             }
341
342             AlivenessProtocolHandler livenessProtocolHandler = packetTypeToProtocolHandler.get(objPayload.getClass());
343             if (livenessProtocolHandler == null) {
344                     return;
345             }
346
347             String monitorKey = livenessProtocolHandler.handlePacketIn(packetInFormatted.getPayload(), packetReceived);
348
349             if (monitorKey != null) {
350                 processReceivedMonitorKey(monitorKey);
351             } else {
352                 LOG.debug("No monitorkey associated with received packet");
353             }
354         }
355     }
356
357     private void processReceivedMonitorKey(final String monitorKey) {
358         Preconditions.checkNotNull(monitorKey, "Monitor Key required to process the state");
359
360         LOG.debug("Processing monitorKey: {} for received packet", monitorKey);
361
362         final Semaphore lock = lockMap.get(monitorKey);
363         LOG.debug("Acquiring lock for monitor key : {} to process monitor packet", monitorKey);
364         acquireLock(lock);
365
366         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
367
368         ListenableFuture<Optional<MonitoringState>> stateResult =
369                 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
370
371         //READ Callback
372         Futures.addCallback(stateResult, new FutureCallback<Optional<MonitoringState>>() {
373
374             @Override
375             public void onSuccess(Optional<MonitoringState> optState) {
376
377                 if (optState.isPresent()) {
378                     final MonitoringState currentState = optState.get();
379
380                     if (LOG.isTraceEnabled()) {
381                         LOG.trace("OnPacketReceived : Monitoring state from ODS : {} ", currentState);
382                     }
383
384                     Long responsePendingCount = currentState.getResponsePendingCount();
385
386                     //Need to relook at the pending count logic to support N out of M scenarios
387 //                    if (currentState.getState() != LivenessState.Up) {
388 //                        //Reset responsePendingCount when state changes from DOWN to UP
389 //                        responsePendingCount = INITIAL_COUNT;
390 //                    }
391 //
392 //                    if (responsePendingCount > INITIAL_COUNT) {
393 //                        responsePendingCount = currentState.getResponsePendingCount() - 1;
394 //                    }
395                     responsePendingCount = INITIAL_COUNT;
396
397                     final boolean stateChanged =  (currentState.getState() == LivenessState.Down ||
398                                                            currentState.getState() == LivenessState.Unknown);
399
400                     final MonitoringState state =
401                             new MonitoringStateBuilder().setMonitorKey(monitorKey).setState(LivenessState.Up)
402                                     .setResponsePendingCount(responsePendingCount).build();
403                     tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), state);
404                     ListenableFuture<Void> writeResult = tx.submit();
405
406                     //WRITE Callback
407                     Futures.addCallback(writeResult, new FutureCallback<Void>() {
408                         @Override
409                         public void onSuccess(Void noarg) {
410                             releaseLock(lock);
411                             if (stateChanged) {
412                                 //send notifications
413                                 if (LOG.isTraceEnabled()) {
414                                     LOG.trace("Sending notification for monitor Id : {} with Current State: {}",
415                                             currentState.getMonitorId(), LivenessState.Up);
416                                 }
417                                 publishNotification(currentState.getMonitorId(), LivenessState.Up);
418                             } else {
419                                 if (LOG.isTraceEnabled()) {
420                                     LOG.trace("Successful in writing monitoring state {} to ODS", state);
421                                 }
422                             }
423                         }
424
425                         @Override
426                         public void onFailure(Throwable error) {
427                             releaseLock(lock);
428                             LOG.warn("Error in writing monitoring state : {} to Datastore", monitorKey, error);
429                             if (LOG.isTraceEnabled()) {
430                                 LOG.trace("Error in writing monitoring state: {} to Datastore", state);
431                             }
432                         }
433                     });
434                 } else {
435                     if (LOG.isTraceEnabled()) {
436                         LOG.trace("Monitoring State not available for key: {} to process the Packet received",
437                                 monitorKey);
438                     }
439                     //Complete the transaction
440                     tx.submit();
441                     releaseLock(lock);
442                 }
443             }
444
445             @Override
446             public void onFailure(Throwable error) {
447                 LOG.error("Error when reading Monitoring State for key: {} to process the Packet received", monitorKey, error);
448                 //FIXME: Not sure if the transaction status is valid to cancel
449                 tx.cancel();
450                 releaseLock(lock);
451             }
452         });
453     }
454
455     private String getIpAddress(EndpointType endpoint) {
456         String ipAddress = "";
457         if ( endpoint instanceof IpAddress) {
458             ipAddress = ((IpAddress) endpoint).getIpAddress().getIpv4Address().getValue();
459         } else if (endpoint instanceof Interface) {
460             ipAddress = ((Interface)endpoint).getInterfaceIp().getIpv4Address().getValue();
461         }
462         return ipAddress;
463     }
464
465     private String getUniqueKey(String interfaceName, String ethType, EndpointType source, EndpointType destination) {
466         StringBuilder builder =  new StringBuilder().append(interfaceName).append(AlivenessMonitorConstants.SEPERATOR)
467                                                     .append(ethType);
468         if (source != null) {
469             builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(source));
470         }
471
472         if (destination != null) {
473             builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(destination));
474         }
475         return builder.toString();
476     }
477
478     @Override
479     public Future<RpcResult<MonitorStartOutput>> monitorStart(MonitorStartInput input) {
480         RpcResultBuilder<MonitorStartOutput> rpcResultBuilder;
481         final Config in = input.getConfig();
482         Long profileId = in.getProfileId();
483         LOG.debug("Monitor Start invoked with Config: {}, Profile Id: {}", in, profileId);
484
485         try {
486             if (in.getMode() != MonitoringMode.OneOne) {
487                 throw new UnsupportedConfigException(
488                         "Unsupported Monitoring mode. Currently one-one mode is supported");
489             }
490
491             Optional<MonitorProfile> optProfile =
492                     read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
493             final MonitorProfile profile;
494             if (!optProfile.isPresent()) {
495                 String errMsg =  String.format("No monitoring profile associated with Id: %d", profileId);
496                 LOG.error("Monitor start failed. {}", errMsg);
497                 throw new RuntimeException(errMsg);
498             } else {
499                 profile = optProfile.get();
500             }
501
502             final EtherTypes ethType = profile.getProtocolType();
503
504             String interfaceName = null;
505             EndpointType srcEndpointType = in.getSource().getEndpointType();
506
507             if ( srcEndpointType instanceof Interface) {
508                 Interface endPoint = (Interface) srcEndpointType;
509                 interfaceName = endPoint.getInterfaceName();
510             } else {
511                 throw new UnsupportedConfigException(
512                         "Unsupported source Endpoint type. Only Interface Endpoint currently supported for monitoring");
513             }
514
515             if (Strings.isNullOrEmpty(interfaceName)) {
516                 throw new RuntimeException("Interface Name not defined in the source Endpoint");
517             }
518
519             //Initially the support is for one monitoring per interface.
520             //Revisit the retrieving monitor id logic when the multiple monitoring for same interface is needed.
521             EndpointType destEndpointType = null;
522             if (in.getDestination() != null) {
523                 destEndpointType = in.getDestination().getEndpointType();
524             }
525             String idKey = getUniqueKey(interfaceName, ethType.toString(), srcEndpointType, destEndpointType);
526             final long monitorId = getUniqueId(idKey);
527             Optional<MonitoringInfo> optKey = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
528             final AlivenessProtocolHandler handler;
529             if (optKey.isPresent()) {
530                 String message = String.format("Monitoring for the interface %s with this configuration "
531                         + "is already registered.", interfaceName);
532                 LOG.warn(message);
533                 MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
534                 rpcResultBuilder = RpcResultBuilder.success(output).withWarning(ErrorType.APPLICATION,
535                         "config-exists", message);
536                 return Futures.immediateFuture(rpcResultBuilder.build());
537             } else {
538                 //Construct the monitor key
539                 final MonitoringInfo monitoringInfo = new MonitoringInfoBuilder()
540                                                               .setId(monitorId)
541                                                               .setMode(in.getMode())
542                                                               .setProfileId(profileId)
543                                                               .setDestination(in.getDestination())
544                                                               .setSource(in.getSource()).build();
545                 //Construct the initial monitor state
546                 handler = ethTypeToProtocolHandler.get(ethType);
547                 final String monitoringKey = handler.getUniqueMonitoringKey(monitoringInfo);
548
549                 MonitoringState monitoringState = null;
550                 if (ethType == EtherTypes.Bfd) {
551                     monitoringState =
552                             new MonitoringStateBuilder().setMonitorKey(monitoringKey).setMonitorId(monitorId)
553                                     .setState(LivenessState.Unknown).setStatus(MonitorStatus.Started).build();
554                 } else {
555                     monitoringState =
556                             new MonitoringStateBuilder().setMonitorKey(monitoringKey).setMonitorId(monitorId)
557                                     .setState(LivenessState.Unknown).setStatus(MonitorStatus.Started)
558                                     .setRequestCount(INITIAL_COUNT)
559                                     .setResponsePendingCount(INITIAL_COUNT).build();
560                 }
561
562                 MonitoridKeyEntry mapEntry = new MonitoridKeyEntryBuilder().setMonitorId(monitorId)
563                                                                      .setMonitorKey(monitoringKey).build();
564
565                 WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
566
567                 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId), monitoringInfo,
568                         CREATE_MISSING_PARENT);
569                 LOG.debug("adding oper monitoring info {}", monitoringInfo);
570
571                 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitoringKey), monitoringState,
572                         CREATE_MISSING_PARENT);
573                 LOG.debug("adding oper monitoring state {}", monitoringState);
574
575                 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId), mapEntry, CREATE_MISSING_PARENT);
576                 LOG.debug("adding oper map entry {}", mapEntry);
577
578                 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
579                     @Override
580                     public void onFailure(Throwable error) {
581                         String errorMsg = String.format("Adding Monitoring info: %s in Datastore failed",
582                                 monitoringInfo);
583                         LOG.warn(errorMsg, error);
584                         throw new RuntimeException(errorMsg, error);
585                     }
586
587                     @Override
588                     public void onSuccess(Void noarg) {
589                         lockMap.put(monitoringKey, new Semaphore(1, true));
590                         if (ethType == EtherTypes.Bfd) {
591                             handler.startMonitoringTask(monitoringInfo);
592                             return;
593                         }
594                         //Schedule task
595                         LOG.debug("Scheduling monitor task for config: {}", in);
596                         scheduleMonitoringTask(monitoringInfo, profile.getMonitorInterval());
597                     }
598                 });
599             }
600
601             associateMonitorIdWithInterface(monitorId, interfaceName);
602
603             MonitorStartOutput output = new MonitorStartOutputBuilder()
604                                             .setMonitorId(monitorId).build();
605
606             rpcResultBuilder = RpcResultBuilder.success(output);
607         } catch(Exception e) {
608             LOG.error("Start Monitoring Failed. {}", e.getMessage(), e);
609             rpcResultBuilder = RpcResultBuilder.<MonitorStartOutput>failed().withError(ErrorType.APPLICATION, e.getMessage(), e);
610         }
611         return Futures.immediateFuture(rpcResultBuilder.build());
612     }
613
614     private void associateMonitorIdWithInterface(final Long monitorId, final String interfaceName) {
615         LOG.debug("associate monitor Id {} with interface {}", monitorId, interfaceName);
616         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
617         ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture =
618                 tx.read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
619         ListenableFuture<Void> updateFuture =
620                 Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
621
622                     @Override
623                     public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
624                         if (optEntry.isPresent()) {
625                             InterfaceMonitorEntry entry = optEntry.get();
626                             List<Long> monitorIds = entry.getMonitorIds();
627                             monitorIds.add(monitorId);
628                             InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder()
629                                     .setKey(new InterfaceMonitorEntryKey(interfaceName))
630                                     .setMonitorIds(monitorIds).build();
631                             tx.merge(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName),
632                                     newEntry);
633                         } else {
634                             //Create new monitor entry
635                             LOG.debug("Adding new interface-monitor association for interface {} with id {}",
636                                     interfaceName, monitorId);
637                             List<Long> monitorIds = new ArrayList<>();
638                             monitorIds.add(monitorId);
639                             InterfaceMonitorEntry newEntry =
640                                     new InterfaceMonitorEntryBuilder().setInterfaceName(interfaceName)
641                                             .setMonitorIds(monitorIds).build();
642                             tx.put(LogicalDatastoreType.OPERATIONAL,
643                                     getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
644                         }
645                         return tx.submit();
646                     }
647                 });
648
649         Futures.addCallback(updateFuture, new FutureCallbackImpl(
650                 String.format("Association of monitorId %d with Interface %s", monitorId, interfaceName)));
651     }
652
653     private void scheduleMonitoringTask(MonitoringInfo monitoringInfo, long monitorInterval) {
654         AlivenessMonitorTask monitorTask = new AlivenessMonitorTask(monitoringInfo);
655         ScheduledFuture<?> scheduledFutureResult = monitorService.scheduleAtFixedRate(
656                                   monitorTask, NO_DELAY, monitorInterval, TimeUnit.MILLISECONDS);
657         monitoringTasks.put(monitoringInfo.getId(), scheduledFutureResult);
658     }
659
660     @Override
661     public Future<RpcResult<Void>> monitorPause(MonitorPauseInput input) {
662         LOG.debug("Monitor Pause operation invoked for monitor id: {}", input.getMonitorId());
663         SettableFuture<RpcResult<Void>> result = SettableFuture.create();
664         final Long monitorId = input.getMonitorId();
665
666         //Set the monitoring status to Paused
667         updateMonitorStatusTo(monitorId, MonitorStatus.Paused, currentStatus -> currentStatus == MonitorStatus.Started);
668
669         if (stopMonitoringTask(monitorId)) {
670             result.set(RpcResultBuilder.<Void>success().build());
671         } else {
672             String errorMsg = String.format("No Monitoring Task availble to pause for the given monitor id : %d",
673                     monitorId);
674             LOG.error("Monitor Pause operation failed- {}",errorMsg);
675             result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
676         }
677
678         return result;
679     }
680
681     @Override
682     public Future<RpcResult<Void>> monitorUnpause(MonitorUnpauseInput input) {
683         LOG.debug("Monitor Unpause operation invoked for monitor id: {}", input.getMonitorId());
684         final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
685
686         final Long monitorId = input.getMonitorId();
687         final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
688         ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
689                 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
690
691         Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
692
693             @Override
694             public void onFailure(Throwable error) {
695                 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
696                 LOG.error("Monitor unpause Failed. {}", msg, error);
697                 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
698             }
699
700             @Override
701             public void onSuccess(Optional<MonitoringInfo> optInfo) {
702                 if (optInfo.isPresent()) {
703                     final MonitoringInfo info = optInfo.get();
704                     ListenableFuture<Optional<MonitorProfile>> readProfile =
705                             tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
706                     Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
707
708                         @Override
709                         public void onFailure(Throwable error) {
710                             String msg = String.format("Unable to read Monitoring profile associated with id %d",
711                                     info.getProfileId());
712                             LOG.warn("Monitor unpause Failed. {}", msg, error);
713                             result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error)
714                                     .build());
715                         }
716
717                         @Override
718                         public void onSuccess(Optional<MonitorProfile> optProfile) {
719                             tx.close();
720                             if (optProfile.isPresent()) {
721                                 updateMonitorStatusTo(monitorId, MonitorStatus.Started,
722                                         currentStatus -> (currentStatus == MonitorStatus.Paused ||
723                                                     currentStatus == MonitorStatus.Stopped));
724                                 MonitorProfile profile = optProfile.get();
725                                 LOG.debug("Monitor Resume - Scheduling monitoring task with Id: {}", monitorId);
726                                 EtherTypes protocolType = profile.getProtocolType();
727                                 if (protocolType == EtherTypes.Bfd) {
728                                     LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
729                                     ((HwVtepTunnelsStateHandler) ethTypeToProtocolHandler
730                                             .get(protocolType)).resetMonitoringTask(info, true);
731                                 } else {
732                                     scheduleMonitoringTask(info, profile.getMonitorInterval());
733                                 }
734                                 result.set(RpcResultBuilder.<Void>success().build());
735                             } else {
736                                 String msg = String.format("Monitoring profile associated with id %d is not present",
737                                         info.getProfileId());
738                                 LOG.warn("Monitor unpause Failed. {}", msg);
739                                 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
740                             }
741                         }
742                     });
743                 } else {
744                     tx.close();
745                     String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
746                     LOG.warn("Monitor unpause Failed. {}", msg);
747                     result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
748                 }
749             }
750         }, callbackExecutorService);
751
752         return result;
753     }
754
755     private boolean stopMonitoringTask(Long monitorId) {
756         return stopMonitoringTask(monitorId, INTERRUPT_TASK);
757     }
758
759     private boolean stopMonitoringTask(Long monitorId, boolean interruptTask) {
760         Optional<MonitoringInfo> optInfo = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
761         if (!optInfo.isPresent()) {
762             LOG.warn("There is no monitoring info present for monitor id {}", monitorId);
763             return false;
764         }
765         MonitoringInfo monitoringInfo = optInfo.get();
766         Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL,
767                 getMonitorProfileId(monitoringInfo.getProfileId()));
768         EtherTypes protocolType = optProfile.get().getProtocolType();
769         if (protocolType == EtherTypes.Bfd) {
770             LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
771             ((HwVtepTunnelsStateHandler) ethTypeToProtocolHandler.get(protocolType))
772                     .resetMonitoringTask(monitoringInfo, false);
773             return true;
774         }
775         ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
776         if (scheduledFutureResult != null) {
777             scheduledFutureResult.cancel(interruptTask);
778             return true;
779         }
780         return false;
781     }
782
783     Optional<MonitorProfile> getMonitorProfile(Long profileId) {
784         return read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
785     }
786
787     void acquireLock(Semaphore lock) {
788         if (lock == null) {
789             return;
790         }
791
792         boolean acquiredLock = false;
793         try {
794             acquiredLock = lock.tryAcquire(50, TimeUnit.MILLISECONDS);
795         } catch (InterruptedException e) {
796             LOG.warn("Thread interrupted when waiting to acquire the lock");
797         }
798
799         if (!acquiredLock) {
800             LOG.warn("Previous transaction did not complete in time. Releasing the lock to proceed");
801             lock.release();
802             try {
803                 lock.acquire();
804                 LOG.trace("Lock acquired successfully");
805             } catch (InterruptedException e) {
806                 LOG.warn("Acquire failed");
807             }
808         } else {
809             LOG.trace("Lock acquired successfully");
810         }
811     }
812
813     void releaseLock(Semaphore lock) {
814         if (lock != null) {
815             lock.release();
816         }
817     }
818
819     private void sendMonitorPacket(final MonitoringInfo monitoringInfo) {
820         //TODO: Handle interrupts
821         final Long monitorId = monitoringInfo.getId();
822         final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
823         if (monitorKey == null) {
824             LOG.warn("No monitor Key associated with id {} to send the monitor packet", monitorId);
825             return;
826         } else {
827             LOG.debug("Sending monitoring packet for key: {}", monitorKey);
828         }
829
830         final MonitorProfile profile;
831         Optional<MonitorProfile> optProfile = getMonitorProfile(monitoringInfo.getProfileId());
832         if (optProfile.isPresent()) {
833             profile = optProfile.get();
834         } else {
835             LOG.warn("No monitor profile associated with id {}. "
836                     + "Could not send Monitor packet for monitor-id {}", monitoringInfo.getProfileId(), monitorId);
837             return;
838         }
839
840         final Semaphore lock = lockMap.get(monitorKey);
841         LOG.debug("Acquiring lock for monitor key : {} to send monitor packet", monitorKey);
842         acquireLock(lock);
843
844         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
845         ListenableFuture<Optional<MonitoringState>> readResult =
846                                   tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
847         ListenableFuture<Void> writeResult =
848                 Futures.transform(readResult, new AsyncFunction<Optional<MonitoringState>, Void>() {
849
850             @Override
851             public ListenableFuture<Void> apply(Optional<MonitoringState> optState)
852                     throws Exception {
853                 if (optState.isPresent()) {
854                     MonitoringState state = optState.get();
855
856                     //Increase the request count
857                     Long requestCount = state.getRequestCount() + 1;
858
859                     //Check with the monitor window
860                     LivenessState currentLivenessState = state.getState();
861
862                     //Increase the pending response count
863                     long responsePendingCount = state.getResponsePendingCount();
864                     if (responsePendingCount < profile.getMonitorWindow()) {
865                         responsePendingCount = responsePendingCount + 1;
866                     }
867
868                     //Check with the failure thresold
869                     if (responsePendingCount >= profile.getFailureThreshold()) {
870                         //Change the state to down and notify
871                         if (currentLivenessState != LivenessState.Down) {
872                             LOG.debug("Response pending Count: {}, Failure threshold: {} for monitorId {}",
873                                     responsePendingCount, profile.getFailureThreshold(), state.getMonitorId());
874                             LOG.info("Sending notification for monitor Id : {} with State: {}",
875                                     state.getMonitorId(), LivenessState.Down);
876                             publishNotification(monitorId, LivenessState.Down);
877                             currentLivenessState = LivenessState.Down;
878                             //Reset requestCount when state changes from UP to DOWN
879                             requestCount = INITIAL_COUNT;
880                         }
881                     }
882
883                     //Update the ODS with state
884                     MonitoringState updatedState = new MonitoringStateBuilder(/*state*/)
885                             .setMonitorKey(state.getMonitorKey())
886                             .setRequestCount(requestCount)
887                             .setResponsePendingCount(responsePendingCount)
888                             .setState(currentLivenessState).build();
889                     tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(state.getMonitorKey()), updatedState);
890                     return tx.submit();
891                 } else {
892                     //Close the transaction
893                     tx.submit();
894                     String errorMsg = String.format("Monitoring State associated with id %d is not present to send packet out.", monitorId);
895                     return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
896                 }
897             }
898
899         });
900
901         Futures.addCallback(writeResult, new FutureCallback<Void>() {
902             @Override
903             public void onSuccess(Void noarg) {
904                 //invoke packetout on protocol handler
905                 AlivenessProtocolHandler handler = ethTypeToProtocolHandler.get(profile.getProtocolType());
906                 if (handler != null) {
907                     LOG.debug("Sending monitoring packet {}", monitoringInfo);
908                     handler.startMonitoringTask(monitoringInfo);
909                 }
910                 releaseLock(lock);
911             }
912
913             @Override
914             public void onFailure(Throwable error) {
915                 LOG.warn("Updating monitoring state for key: {} failed. Monitoring packet is not sent",
916                         monitorKey, error);
917                 releaseLock(lock);
918             }
919         });
920     }
921
922     void publishNotification(final Long monitorId, final LivenessState state) {
923         LOG.debug("Sending notification for id {}  - state {}", monitorId, state);
924         EventData data = new EventDataBuilder().setMonitorId(monitorId)
925                                                .setMonitorState(state).build();
926         MonitorEvent event = new MonitorEventBuilder().setEventData(data).build();
927         final ListenableFuture<?> eventFuture = notificationPublishService.offerNotification(event);
928         Futures.addCallback(eventFuture, new FutureCallback<Object>() {
929             @Override
930             public void onFailure(Throwable error) {
931                 LOG.warn("Error in notifying listeners for id {} - state {}", monitorId, state, error);
932             }
933
934             @Override
935             public void onSuccess(Object arg) {
936                 LOG.trace("Successful in notifying listeners for id {} - state {}", monitorId, state);
937             }
938         });
939     }
940
941     @Override
942     public Future<RpcResult<MonitorProfileCreateOutput>> monitorProfileCreate(final MonitorProfileCreateInput input) {
943         LOG.debug("Monitor Profile Create operation - {}", input.getProfile());
944         final SettableFuture<RpcResult<MonitorProfileCreateOutput>> result = SettableFuture.create();
945         Profile profile = input.getProfile();
946         final Long failureThreshold = profile.getFailureThreshold();
947         final Long monitorInterval = profile.getMonitorInterval();
948         final Long monitorWindow = profile.getMonitorWindow();
949         final EtherTypes ethType = profile.getProtocolType();
950         String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, ethType);
951         final Long profileId = (long) getUniqueId(idKey);
952
953         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
954         ListenableFuture<Optional<MonitorProfile>> readFuture =
955                                    tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
956         ListenableFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture =
957                 Futures.transform(readFuture,
958                         new AsyncFunction<Optional<MonitorProfile>, RpcResult<MonitorProfileCreateOutput>>() {
959
960                     @Override
961                     public ListenableFuture<RpcResult<MonitorProfileCreateOutput>> apply(
962                             Optional<MonitorProfile> optProfile) throws Exception {
963                         if (optProfile.isPresent()) {
964                             tx.cancel();
965                             MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
966                                     .setProfileId(profileId).build();
967                             String msg = String.format("Monitor profile %s already present for the given input",
968                                     input);
969                             LOG.warn(msg);
970                             result.set(RpcResultBuilder.success(output)
971                                     .withWarning(ErrorType.PROTOCOL, "profile-exists", msg).build());
972                         } else {
973                             final MonitorProfile monitorProfile = new MonitorProfileBuilder()
974                                     .setId(profileId)
975                                     .setFailureThreshold(failureThreshold)
976                                     .setMonitorInterval(monitorInterval)
977                                     .setMonitorWindow(monitorWindow)
978                                     .setProtocolType(ethType).build();
979                             tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId),
980                                     monitorProfile, CREATE_MISSING_PARENT);
981                             Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
982                                 @Override
983                                 public void onFailure(Throwable error) {
984                                     String msg = String.format("Error when storing monitorprofile %s in datastore",
985                                             monitorProfile);
986                                     LOG.error(msg, error);
987                                     result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
988                                                 .withError(ErrorType.APPLICATION, msg, error).build());
989                                 }
990                                 @Override
991                                 public void onSuccess(Void noarg) {
992                                     MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
993                                             .setProfileId(profileId).build();
994                                     result.set(RpcResultBuilder.success(output).build());
995                                 }
996                             });
997                         }
998                         return result;
999                     }
1000                 }, callbackExecutorService);
1001         Futures.addCallback(resultFuture, new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
1002             @Override
1003             public void onFailure(Throwable error) {
1004                 //This would happen when any error happens during reading for monitoring profile
1005                 String msg = String.format("Error in creating monitorprofile - %s", input);
1006                 result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
1007                             .withError(ErrorType.APPLICATION, msg, error).build());
1008                 LOG.error(msg, error);
1009             }
1010
1011             @Override
1012             public void onSuccess(RpcResult<MonitorProfileCreateOutput> result) {
1013                 LOG.debug("Successfully created monitor Profile {} ", input);
1014             }
1015         }, callbackExecutorService);
1016         return result;
1017     }
1018
1019
1020     @Override
1021     public Future<RpcResult<MonitorProfileGetOutput>> monitorProfileGet(MonitorProfileGetInput input){
1022         LOG.debug("Monitor Profile Get operation for input profile- {}", input.getProfile());
1023         RpcResultBuilder<MonitorProfileGetOutput> rpcResultBuilder;
1024         try{
1025             final Long profileId = getExistingProfileId(input);
1026
1027             MonitorProfileGetOutputBuilder output = new MonitorProfileGetOutputBuilder().setProfileId(profileId);
1028             rpcResultBuilder = RpcResultBuilder.success();
1029             rpcResultBuilder.withResult(output.build());
1030         }catch(Exception e){
1031             LOG.error("Retrieval of monitor profile ID for input {} failed due to {}" , input, e);
1032             rpcResultBuilder = RpcResultBuilder.failed();
1033         }
1034         return Futures.immediateFuture(rpcResultBuilder.build());
1035     }
1036
1037     private Long getExistingProfileId(MonitorProfileGetInput input){
1038         org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profile.get.input.Profile profile = input.getProfile();
1039         final Long failureThreshold = profile.getFailureThreshold();
1040         final Long monitorInterval = profile.getMonitorInterval();
1041         final Long monitorWindow = profile.getMonitorWindow();
1042         final EtherTypes ethType = profile.getProtocolType();
1043         LOG.debug("getExistingProfileId for profile : {}", input.getProfile());
1044         String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, ethType);
1045         LOG.debug("Obtained existing profile ID for profile : {}", input.getProfile());
1046         return ((long) getUniqueId(idKey));
1047     }
1048
1049     private String getUniqueProfileKey(Long failureThreshold, Long monitorInterval, Long monitorWindow,
1050                                        EtherTypes ethType) {
1051         return String.valueOf(failureThreshold) + AlivenessMonitorConstants.SEPERATOR +
1052                 monitorInterval + AlivenessMonitorConstants.SEPERATOR +
1053                 monitorWindow + AlivenessMonitorConstants.SEPERATOR +
1054                 ethType + AlivenessMonitorConstants.SEPERATOR;
1055     }
1056
1057     @Override
1058     public Future<RpcResult<Void>> monitorProfileDelete(final MonitorProfileDeleteInput input) {
1059         LOG.debug("Monitor Profile delete for Id: {}", input.getProfileId());
1060         final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
1061         final Long profileId = input.getProfileId();
1062         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1063         ListenableFuture<Optional<MonitorProfile>> readFuture =
1064                 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
1065         ListenableFuture<RpcResult<Void>> writeFuture =
1066                 Futures.transform(readFuture, new AsyncFunction<Optional<MonitorProfile>, RpcResult<Void>>() {
1067                     @Override
1068                     public ListenableFuture<RpcResult<Void>> apply(final Optional<MonitorProfile> optProfile) throws Exception {
1069                         if (optProfile.isPresent()) {
1070                             tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
1071                             Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
1072                                 @Override
1073                                 public void onFailure(Throwable error) {
1074                                     String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
1075                                     LOG.error(msg, error);
1076                                     result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
1077                                 }
1078
1079                                 @Override
1080                                 public void onSuccess(Void noarg) {
1081                                     MonitorProfile profile = optProfile.get();
1082                                     String id = getUniqueProfileKey(profile.getFailureThreshold(), profile.getMonitorInterval(),
1083                                             profile.getMonitorWindow(), profile.getProtocolType());
1084                                     releaseId(id);
1085                                     result.set(RpcResultBuilder.<Void>success().build());
1086                                 }
1087                             });
1088                         } else {
1089                             String msg = String.format("Monitor profile with Id: %d does not exist", profileId);
1090                             LOG.info(msg);
1091                             result.set(RpcResultBuilder.<Void>success().withWarning(ErrorType.PROTOCOL, "invalid-value", msg).build());
1092                         }
1093                         return result;
1094                     }
1095                 }, callbackExecutorService);
1096
1097         Futures.addCallback(writeFuture, new FutureCallback<RpcResult<Void>>() {
1098
1099             @Override
1100             public void onFailure(Throwable error) {
1101                 String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
1102                 LOG.error(msg, error);
1103                 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
1104             }
1105
1106             @Override
1107             public void onSuccess(RpcResult<Void> noarg) {
1108                 LOG.debug("Successfully removed Monitor Profile {}", profileId);
1109             }
1110         }, callbackExecutorService);
1111         return result;
1112     }
1113
1114     @Override
1115     public Future<RpcResult<Void>> monitorStop(MonitorStopInput input) {
1116         LOG.debug("Monitor Stop operation for monitor id - {}", input.getMonitorId());
1117         SettableFuture<RpcResult<Void>> result = SettableFuture.create();
1118
1119         final Long monitorId = input.getMonitorId();
1120         Optional<MonitoringInfo> optInfo = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1121         if (optInfo.isPresent()) {
1122             //Stop the monitoring task
1123             stopMonitoringTask(monitorId);
1124
1125             //Cleanup the Data store
1126             WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
1127             String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1128             if (monitorKey != null) {
1129                 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
1130                 monitorIdKeyCache.invalidate(monitorId);
1131             }
1132
1133             tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1134             Futures.addCallback(tx.submit(),
1135                     new FutureCallbackImpl(String.format("Delete monitor state with Id %d", monitorId)));
1136
1137             MonitoringInfo info = optInfo.get();
1138             String interfaceName = getInterfaceName(info.getSource().getEndpointType());
1139             if (interfaceName != null) {
1140                 removeMonitorIdFromInterfaceAssociation(monitorId, interfaceName);
1141             }
1142             releaseIdForMonitoringInfo(info);
1143
1144             lockMap.remove(monitorKey);
1145
1146             result.set(RpcResultBuilder.<Void>success().build());
1147         } else {
1148             String errorMsg = String.format("Do not have monitoring information associated with key %d", monitorId);
1149             LOG.error("Delete monitoring operation Failed - {}", errorMsg);
1150             result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
1151         }
1152
1153         return result;
1154     }
1155
1156     private void removeMonitorIdFromInterfaceAssociation(final Long monitorId, final String interfaceName) {
1157         LOG.debug("Remove monitorId {} from Interface association {}", monitorId, interfaceName);
1158         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1159         ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
1160                 getInterfaceMonitorMapId(interfaceName));
1161         ListenableFuture<Void> updateFuture =
1162                 Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
1163
1164             @Override
1165             public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
1166                 if (optEntry.isPresent()) {
1167                     InterfaceMonitorEntry entry = optEntry.get();
1168                     List<Long> monitorIds = entry.getMonitorIds();
1169                     monitorIds.remove(monitorId);
1170                     InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder(entry)
1171                                        .setKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
1172                     tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
1173                     return tx.submit();
1174                 } else {
1175                     LOG.warn("No Interface map entry found {} to remove monitorId {}", interfaceName, monitorId);
1176                     tx.cancel();
1177                     return Futures.immediateFuture(null);
1178                 }
1179             }
1180         });
1181
1182         Futures.addCallback(updateFuture, new FutureCallbackImpl(
1183                      String.format("Dis-association of monitorId %d with Interface %s", monitorId, interfaceName)));
1184     }
1185
1186
1187     private void releaseIdForMonitoringInfo(MonitoringInfo info) {
1188         Long monitorId = info.getId();
1189         EndpointType source = info.getSource().getEndpointType();
1190         String interfaceName = getInterfaceName(source);
1191         if (!Strings.isNullOrEmpty(interfaceName)) {
1192             Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1193             if (optProfile.isPresent()) {
1194                 EtherTypes ethType = optProfile.get().getProtocolType();
1195                 EndpointType destination = (info.getDestination() != null) ? info.getDestination().getEndpointType() : null;
1196                 String idKey = getUniqueKey(interfaceName, ethType.toString(), source, destination);
1197                 releaseId(idKey);
1198             } else {
1199                 LOG.warn("Could not release monitorId {}. No profile associated with it", monitorId);
1200             }
1201         }
1202     }
1203
1204     private String getInterfaceName(EndpointType endpoint) {
1205         String interfaceName = null;
1206         if (endpoint instanceof Interface) {
1207             interfaceName = ((Interface)endpoint).getInterfaceName();
1208         }
1209         return interfaceName;
1210     }
1211
1212     private void stopMonitoring(long monitorId) {
1213         updateMonitorStatusTo(monitorId, MonitorStatus.Stopped, currentStatus -> currentStatus != MonitorStatus.Stopped);
1214         if (!stopMonitoringTask(monitorId)) {
1215             LOG.warn("No monitoring task running to perform cancel operation for monitorId {}", monitorId);
1216         }
1217     }
1218
1219     private void updateMonitorStatusTo(final Long monitorId, final MonitorStatus newStatus, final Predicate<MonitorStatus> isValidStatus) {
1220         final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1221         if (monitorKey == null) {
1222             LOG.warn("No monitor Key associated with id {} to change the monitor status to {}", monitorId, newStatus);
1223             return;
1224         }
1225         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1226
1227         ListenableFuture<Optional<MonitoringState>> readResult =
1228                             tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
1229
1230         ListenableFuture<Void> writeResult = Futures.transform(readResult,
1231                 new AsyncFunction<Optional<MonitoringState>, Void>() {
1232             @Override
1233             public ListenableFuture<Void> apply(Optional<MonitoringState> optState) throws Exception {
1234                 if (optState.isPresent()) {
1235                     MonitoringState state = optState.get();
1236                     if (isValidStatus.apply(state.getStatus())) {
1237                         MonitoringState updatedState = new MonitoringStateBuilder().setMonitorKey(monitorKey)
1238                                                                               .setStatus(newStatus).build();
1239                         tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), updatedState);
1240                     } else {
1241                         LOG.warn("Invalid Monitoring status {}, cannot be updated to {} for monitorId {}"
1242                                                                     , state.getStatus(), newStatus, monitorId);
1243                     }
1244                 } else {
1245                     LOG.warn("No associated monitoring state data available to update the status to {} for {}", newStatus, monitorId);
1246                 }
1247                 return tx.submit();
1248             }
1249         });
1250
1251         Futures.addCallback(writeResult,
1252                         new FutureCallbackImpl(String.format("Monitor status update for %d to %s",
1253                                 monitorId, newStatus.toString())));
1254     }
1255
1256     private void resumeMonitoring(final long monitorId) {
1257         final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
1258         ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
1259                 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1260
1261         Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
1262
1263             @Override
1264             public void onFailure(Throwable error) {
1265                 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
1266                 LOG.error("Monitor resume Failed. {}", msg, error);
1267             }
1268
1269             @Override
1270             public void onSuccess(Optional<MonitoringInfo> optInfo) {
1271                 if (optInfo.isPresent()) {
1272                     final MonitoringInfo info = optInfo.get();
1273                     ListenableFuture<Optional<MonitorProfile>> readProfile =
1274                             tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1275                     Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
1276
1277                         @Override
1278                         public void onFailure(Throwable error) {
1279                             String msg = String.format("Unable to read Monitoring profile associated with id %d",
1280                                     info.getProfileId());
1281                             LOG.warn("Monitor resume Failed. {}", msg, error);
1282                         }
1283
1284                         @Override
1285                         public void onSuccess(Optional<MonitorProfile> optProfile) {
1286                             tx.close();
1287                             if (optProfile.isPresent()) {
1288                                 updateMonitorStatusTo(monitorId, MonitorStatus.Started,
1289                                         currentStatus -> currentStatus != MonitorStatus.Started);
1290                                 MonitorProfile profile = optProfile.get();
1291                                 LOG.debug("Monitor Resume - Scheduling monitoring task for Id: {}", monitorId);
1292                                 scheduleMonitoringTask(info, profile.getMonitorInterval());
1293                             } else {
1294                                 String msg = String.format("Monitoring profile associated with id %d is not present",
1295                                         info.getProfileId());
1296                                 LOG.warn("Monitor resume Failed. {}", msg);
1297                             }
1298                         }
1299                     });
1300                 } else {
1301                     tx.close();
1302                     String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
1303                     LOG.warn("Monitor resume Failed. {}", msg);
1304                 }
1305             }
1306         });
1307     }
1308
1309     //DATA STORE OPERATIONS
1310     <T extends DataObject> Optional<T> read(LogicalDatastoreType datastoreType, InstanceIdentifier<T> path) {
1311         try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction()) {
1312             return tx.read(datastoreType, path).get();
1313         } catch (InterruptedException | ExecutionException e) {
1314             LOG.warn("Error reading data from path {} in datastore {}", path, datastoreType, e);
1315         }
1316
1317         return Optional.absent();
1318     }
1319
1320     @Override
1321     public void onInterfaceStateUp(String interfaceName) {
1322         List<Long> monitorIds = getMonitorIds(interfaceName);
1323         if (monitorIds.isEmpty()) {
1324             LOG.warn("Could not get monitorId for interface: {}", interfaceName);
1325             return;
1326         }
1327         for(Long monitorId : monitorIds) {
1328             LOG.debug("Resume monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1329             resumeMonitoring(monitorId);
1330         }
1331     }
1332
1333     @Override
1334     public void onInterfaceStateDown(String interfaceName) {
1335         List<Long> monitorIds = getMonitorIds(interfaceName);
1336         if (monitorIds.isEmpty()) {
1337             LOG.warn("Could not get monitorIds for interface: {}", interfaceName);
1338             return;
1339         }
1340         for(Long monitorId : monitorIds) {
1341             LOG.debug("Suspend monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1342             stopMonitoring(monitorId);
1343         }
1344     }
1345
1346     private List<Long> getMonitorIds(String interfaceName) {
1347         return read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName)).transform(
1348                 InterfaceMonitorEntry::getMonitorIds).or(Collections.emptyList());
1349     }
1350
1351 }