Initial Alivenessmonitor code
[vpnservice.git] / alivenessmonitor / alivenessmonitor-impl / src / main / java / org / opendaylight / vpnservice / alivenessmonitor / internal / AlivenessMonitor.java
1 /*
2  * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.vpnservice.alivenessmonitor.internal;
9
10 import java.lang.Thread.UncaughtExceptionHandler;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.EnumMap;
14 import java.util.HashMap;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.Semaphore;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28
29 import org.opendaylight.controller.liblldp.NetUtils;
30 import org.opendaylight.controller.liblldp.Packet;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
34 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
35 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
36 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
37 import org.opendaylight.vpnservice.mdsalutil.packet.Ethernet;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.AlivenessMonitorService;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.EtherTypes;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.LivenessState;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorEvent;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorEventBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorPauseInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateOutputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileDeleteInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartOutputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStatus;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStopInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorUnpauseInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitoringMode;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntry;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntryBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntryKey;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.EndpointType;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.Interface;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.IpAddress;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfo;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfoBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.event.EventData;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.event.EventDataBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profile.create.input.Profile;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfile;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfileBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.start.input.Config;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntry;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntryBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringState;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringStateBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdInputBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdOutput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.CreateIdPoolInput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.CreateIdPoolInputBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.IdManagerService;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.ReleaseIdInput;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.ReleaseIdInputBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.OdlInterfaceRpcService;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInReason;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
87 import org.opendaylight.yangtools.yang.binding.DataObject;
88 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
89 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
90 import org.opendaylight.yangtools.yang.common.RpcResult;
91 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
92 import org.slf4j.Logger;
93 import org.slf4j.LoggerFactory;
94
95 import com.google.common.base.Optional;
96 import com.google.common.base.Preconditions;
97 import com.google.common.base.Predicate;
98 import com.google.common.base.Strings;
99 import com.google.common.cache.CacheBuilder;
100 import com.google.common.cache.CacheLoader;
101 import com.google.common.cache.LoadingCache;
102 import com.google.common.util.concurrent.AsyncFunction;
103 import com.google.common.util.concurrent.FutureCallback;
104 import com.google.common.util.concurrent.Futures;
105 import com.google.common.util.concurrent.JdkFutureAdapters;
106 import com.google.common.util.concurrent.ListenableFuture;
107 import com.google.common.util.concurrent.SettableFuture;
108 import com.google.common.util.concurrent.ThreadFactoryBuilder;
109
110 import static org.opendaylight.vpnservice.alivenessmonitor.internal.AlivenessMonitorUtil.*;
111
112 public class AlivenessMonitor implements AlivenessMonitorService, PacketProcessingListener,
113                                          ServiceProvider, InterfaceStateListener, AutoCloseable {
114     private static final Logger LOG = LoggerFactory.getLogger(AlivenessMonitor.class);
115     private final DataBroker broker;
116     private IdManagerService idManager;
117     private PacketProcessingService packetProcessingService;
118     private NotificationPublishService notificationPublishService;
119     private OdlInterfaceRpcService interfaceManager;
120     private Map<Class<?>, AlivenessProtocolHandler> packetTypeToProtocolHandler;
121     private Map<EtherTypes, AlivenessProtocolHandler> ethTypeToProtocolHandler;
122     private ConcurrentMap<Long, ScheduledFuture<?>> monitoringTasks;
123     private LoadingCache<Long, String> monitorIdKeyCache;
124     private ScheduledExecutorService monitorService;
125     private ExecutorService callbackExecutorService;
126
127     private static final int THREAD_POOL_SIZE = 4;
128     private static final boolean INTERRUPT_TASK = true;
129     private static final int NO_DELAY = 0;
130     private static final Long INITIAL_COUNT = 0L;
131     private static final boolean CREATE_MISSING_PARENT = true;
132     private static final int INVALID_ID = 0;
133     private ConcurrentMap<String, Semaphore> lockMap = new ConcurrentHashMap<>();
134
135     private class FutureCallbackImpl implements FutureCallback<Void> {
136         private String message;
137         public FutureCallbackImpl(String message) {
138             this.message = message;
139         }
140
141         @Override
142         public void onFailure(Throwable error) {
143             LOG.warn("Error in Datastore operation - {}", message, error);
144         }
145
146         @Override
147         public void onSuccess(Void result) {
148             LOG.debug("Success in Datastore operation - {}", message);
149         }
150     }
151
152     private class AlivenessMonitorTask implements Runnable {
153         private MonitoringInfo monitoringInfo;
154
155         public AlivenessMonitorTask(MonitoringInfo monitoringInfo) {
156             this.monitoringInfo = monitoringInfo;
157         }
158
159         @Override
160         public void run() {
161             if(LOG.isTraceEnabled()) {
162                 LOG.trace("send monitor packet - {}", monitoringInfo);
163             }
164             sendMonitorPacket(monitoringInfo);
165         }
166     }
167
168     public AlivenessMonitor(DataBroker dataBroker) {
169         broker = dataBroker;
170         ethTypeToProtocolHandler = new EnumMap<>(EtherTypes.class);
171         packetTypeToProtocolHandler = new HashMap<>();
172         monitorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
173                             getMonitoringThreadFactory("Aliveness Monitoring Task"));
174         callbackExecutorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
175                             getMonitoringThreadFactory("Aliveness Callback Handler"));
176         monitoringTasks = new ConcurrentHashMap<>();
177         initilizeCache();
178     }
179
180     private ThreadFactory getMonitoringThreadFactory(String threadNameFormat) {
181         ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
182         builder.setNameFormat(threadNameFormat);
183         builder.setUncaughtExceptionHandler( new UncaughtExceptionHandler() {
184             @Override
185             public void uncaughtException(Thread t, Throwable e) {
186                 LOG.error("Received Uncaught Exception event in Thread: {}", t.getName(), e);
187             }
188         });
189         return builder.build();
190     }
191
192     private void initilizeCache() {
193         monitorIdKeyCache = CacheBuilder.newBuilder()
194                 .build(new CacheLoader<Long, String>() {
195                     @Override
196                     public String load(Long monitorId) throws Exception {
197                         String monitorKey = null;
198                         Optional<MonitoridKeyEntry> optKey = read(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId));
199                         if(optKey.isPresent()) {
200                             monitorKey = optKey.get().getMonitorKey();
201                         }
202                         return monitorKey;
203                     }
204                 });
205     }
206
207     @Override
208     public void close() throws Exception {
209         monitorIdKeyCache.cleanUp();
210         monitorService.shutdown();
211         callbackExecutorService.shutdown();
212     }
213
214     @Override
215     public DataBroker getDataBroker() {
216         return broker;
217     }
218
219     @Override
220     public OdlInterfaceRpcService getInterfaceManager() {
221         return interfaceManager;
222     }
223
224     public void setPacketProcessingService(PacketProcessingService pktProcessingService) {
225         this.packetProcessingService = pktProcessingService;
226     }
227
228     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
229         this.notificationPublishService = notificationPublishService;
230     }
231
232     public void setInterfaceManager(OdlInterfaceRpcService interfaceManager) {
233         this.interfaceManager = interfaceManager;
234     }
235
236     public void setIdManager(IdManagerService idManager) {
237         this.idManager = idManager;
238         createIdPool();
239     }
240
241     public void registerHandler(EtherTypes etherType, AlivenessProtocolHandler protocolHandler) {
242         ethTypeToProtocolHandler.put(etherType, protocolHandler);
243         packetTypeToProtocolHandler.put(protocolHandler.getPacketClass(), protocolHandler);
244     }
245
246     private void createIdPool() {
247         CreateIdPoolInput createPool = new CreateIdPoolInputBuilder()
248                                             .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
249                                             .setLow(AlivenessMonitorConstants.MONITOR_IDPOOL_START)
250                                             .setHigh(AlivenessMonitorConstants.MONITOR_IDPOOL_SIZE)
251                                             .build();
252         Future<RpcResult<Void>> result = idManager.createIdPool(createPool);
253         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(result), new FutureCallback<RpcResult<Void>>() {
254
255             @Override
256             public void onFailure(Throwable error) {
257                 LOG.error("Failed to create idPool for Aliveness Monitor Service",error);
258             }
259
260             @Override
261             public void onSuccess(RpcResult<Void> result) {
262                 if(result.isSuccessful()) {
263                     LOG.debug("Created IdPool for Aliveness Monitor Service");
264                 } else {
265                     LOG.error("RPC to create Idpool failed {}", result.getErrors());
266                 }
267             }
268         });
269     }
270
271     private int getUniqueId(final String idKey) {
272         AllocateIdInput getIdInput = new AllocateIdInputBuilder()
273                   .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
274                   .setIdKey(idKey).build();
275
276         Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(getIdInput);
277
278         try {
279             RpcResult<AllocateIdOutput> rpcResult = result.get();
280             if(rpcResult.isSuccessful()) {
281                 return rpcResult.getResult().getIdValue().intValue();
282             } else {
283                 LOG.warn("RPC Call to Get Unique Id returned with Errors {}", rpcResult.getErrors());
284             }
285         } catch (InterruptedException | ExecutionException e) {
286             LOG.warn("Exception when getting Unique Id for key {}", idKey, e);
287         }
288         return INVALID_ID;
289     }
290
291     private void releaseId(String idKey) {
292         ReleaseIdInput idInput = new ReleaseIdInputBuilder()
293                                        .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
294                                        .setIdKey(idKey).build();
295         try {
296             Future<RpcResult<Void>> result = idManager.releaseId(idInput);
297             RpcResult<Void> rpcResult = result.get();
298             if(!rpcResult.isSuccessful()) {
299                 LOG.warn("RPC Call to release Id {} with Key {} returned with Errors {}",
300                                                             idKey, rpcResult.getErrors());
301             }
302         } catch (InterruptedException | ExecutionException e) {
303             LOG.warn("Exception when releasing Id for key {}", idKey, e);
304         }
305     }
306
307     @Override
308     public void onPacketReceived(PacketReceived packetReceived) {
309         Class<? extends PacketInReason> pktInReason = packetReceived.getPacketInReason();
310         if(LOG.isTraceEnabled()) {
311             LOG.trace("Packet Received {}", packetReceived );
312         }
313
314         if (pktInReason == SendToController.class) {
315             Packet packetInFormatted;
316             byte[] data = packetReceived.getPayload();
317             Ethernet res = new Ethernet();
318             try {
319                 packetInFormatted = res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
320             } catch (Exception e) {
321                 LOG.warn("Failed to decode packet: {}", e.getMessage());
322                 return;
323             }
324
325             if(packetInFormatted == null) {
326                 LOG.warn("Failed to deserialize Received Packet from table {}", packetReceived.getTableId().getValue());
327                 return;
328             }
329
330             Object objPayload = packetInFormatted.getPayload();
331
332             if(objPayload == null) {
333                 LOG.trace("Unsupported packet type. Ignoring the packet...");
334                 return;
335             }
336
337             if (LOG.isTraceEnabled()) {
338                 LOG.trace("onPacketReceived packet: {}, packet class: {}", packetReceived,
339                         objPayload.getClass());
340             }
341
342             AlivenessProtocolHandler livenessProtocolHandler = packetTypeToProtocolHandler.get(objPayload.getClass());
343             if (livenessProtocolHandler == null) {
344                     return;
345             }
346
347             String monitorKey = livenessProtocolHandler.handlePacketIn(packetInFormatted.getPayload(), packetReceived);
348
349             if(monitorKey != null) {
350                 processReceivedMonitorKey(monitorKey);
351             } else {
352                 LOG.debug("No monitorkey associated with received packet");
353             }
354         }
355     }
356
357     private void processReceivedMonitorKey(final String monitorKey) {
358         Preconditions.checkNotNull(monitorKey, "Monitor Key required to process the state");
359
360         LOG.debug("Processing monitorKey: {} for received packet", monitorKey);
361
362         final Semaphore lock = lockMap.get(monitorKey);
363         LOG.debug("Acquiring lock for monitor key : {} to process monitor packet", monitorKey);
364         acquireLock(lock);
365
366         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
367
368         ListenableFuture<Optional<MonitoringState>> stateResult = tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
369
370         //READ Callback
371         Futures.addCallback(stateResult, new FutureCallback<Optional<MonitoringState>>() {
372
373             @Override
374             public void onSuccess(Optional<MonitoringState> optState) {
375
376                 if(optState.isPresent()) {
377                     final MonitoringState currentState = optState.get();
378
379                     if(LOG.isTraceEnabled()) {
380                         LOG.trace("OnPacketReceived : Monitoring state from ODS : {} ", currentState);
381                     }
382
383                     Long responsePendingCount = currentState.getResponsePendingCount();
384
385                     //Need to relook at the pending count logic to support N out of M scenarios
386 //                    if(currentState.getState() != LivenessState.Up) {
387 //                        //Reset responsePendingCount when state changes from DOWN to UP
388 //                        responsePendingCount = INITIAL_COUNT;
389 //                    }
390 //
391 //                    if(responsePendingCount > INITIAL_COUNT) {
392 //                        responsePendingCount = currentState.getResponsePendingCount() - 1;
393 //                    }
394                     responsePendingCount = INITIAL_COUNT;
395
396                     final boolean stateChanged =  (currentState.getState() == LivenessState.Down ||
397                                                            currentState.getState() == LivenessState.Unknown);
398
399                     final MonitoringState state = new MonitoringStateBuilder().setMonitorKey(monitorKey).setState(LivenessState.Up)
400                                                            .setResponsePendingCount(responsePendingCount).build();
401                     tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), state);
402                     ListenableFuture<Void> writeResult = tx.submit();
403
404                     //WRITE Callback
405                     Futures.addCallback(writeResult, new FutureCallback<Void>() {
406                         @Override
407                         public void onSuccess(Void noarg) {
408                             releaseLock(lock);
409                             if(stateChanged) {
410                                 //send notifications
411                                 LOG.info("Sending notification for monitor Id : {} with Current State: {}",
412                                         currentState.getMonitorId(), LivenessState.Up);
413                                 publishNotification(currentState.getMonitorId(), LivenessState.Up);
414                             } else {
415                                 if(LOG.isTraceEnabled()) {
416                                     LOG.trace("Successful in writing monitoring state {} to ODS", state);
417                                 }
418                             }
419                         }
420
421                         @Override
422                         public void onFailure(Throwable error) {
423                             releaseLock(lock);
424                             LOG.warn("Error in writing monitoring state : {} to Datastore", monitorKey, error);
425                             if(LOG.isTraceEnabled()) {
426                                 LOG.trace("Error in writing monitoring state: {} to Datastore", state);
427                             }
428                         }
429                     });
430                 } else {
431                     LOG.warn("Monitoring State not available for key: {} to process the Packet received", monitorKey);
432                     //Complete the transaction
433                     tx.submit();
434                     releaseLock(lock);
435                 }
436             }
437
438             @Override
439             public void onFailure(Throwable error) {
440                 LOG.error("Error when reading Monitoring State for key: {} to process the Packet received", monitorKey, error);
441                 //FIXME: Not sure if the transaction status is valid to cancel
442                 tx.cancel();
443                 releaseLock(lock);
444             }
445         });
446     }
447
448     @Override
449     public PacketProcessingService getPacketProcessingService() {
450         return packetProcessingService;
451     }
452
453     private String getIpAddress(EndpointType endpoint) {
454         String ipAddress = "";
455         if( endpoint instanceof IpAddress) {
456             ipAddress = ((IpAddress) endpoint).getIpAddress().getIpv4Address().getValue();
457         } else if (endpoint instanceof Interface) {
458             ipAddress = ((Interface)endpoint).getInterfaceIp().getIpv4Address().getValue();
459         }
460         return ipAddress;
461     }
462
463     private String getUniqueKey(String interfaceName, String ethType, EndpointType source, EndpointType destination) {
464         StringBuilder builder =  new StringBuilder().append(interfaceName).append(AlivenessMonitorConstants.SEPERATOR)
465                                                     .append(ethType);
466         if(source != null) {
467             builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(source));
468         }
469
470         if(destination != null) {
471             builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(destination));
472         }
473         return builder.toString();
474     }
475
476     @Override
477     public Future<RpcResult<MonitorStartOutput>> monitorStart(MonitorStartInput input) {
478         RpcResultBuilder<MonitorStartOutput> rpcResultBuilder;
479         final Config in = input.getConfig();
480         Long profileId = in.getProfileId();
481         LOG.debug("Monitor Start invoked with Config: {}, Profile Id: {}", in, profileId);
482
483         try {
484             if(in.getMode() != MonitoringMode.OneOne) {
485                 throw new UnsupportedConfigException(
486                         "Unsupported Monitoring mode. Currently one-one mode is supported");
487             }
488
489             Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
490             final MonitorProfile profile;
491             if(!optProfile.isPresent()) {
492                 String errMsg =  String.format("No monitoring profile associated with Id: %d", profileId);
493                 LOG.error("Monitor start failed. {}", errMsg);
494                 throw new RuntimeException(errMsg);
495             } else {
496                 profile = optProfile.get();
497             }
498
499             EtherTypes ethType = profile.getProtocolType();
500
501             String interfaceName = null;
502             EndpointType srcEndpointType = in.getSource().getEndpointType();
503
504             if( srcEndpointType instanceof Interface) {
505                 Interface endPoint = (Interface) srcEndpointType;
506                 interfaceName = endPoint.getInterfaceName();
507             } else {
508                 throw new UnsupportedConfigException(
509                         "Unsupported source Endpoint type. Only Interface Endpoint currently supported for monitoring");
510             }
511
512             if(Strings.isNullOrEmpty(interfaceName)) {
513                 throw new RuntimeException("Interface Name not defined in the source Endpoint");
514             }
515
516             //Initially the support is for one monitoring per interface. 
517             //Revisit the retrieving monitor id logic when the multiple monitoring for same interface is needed.
518             EndpointType destEndpointType = null;
519             if(in.getDestination() != null) {
520                 destEndpointType = in.getDestination().getEndpointType();
521             }
522             String idKey = getUniqueKey(interfaceName, ethType.toString(), srcEndpointType, destEndpointType);
523             final long monitorId = getUniqueId(idKey);
524             Optional<MonitoringInfo> optKey = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
525
526             if(optKey.isPresent()) {
527                 String message = String.format("Monitoring for the interface %s with this configuration is already registered.", interfaceName);
528                 LOG.warn(message);
529                 MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
530                 rpcResultBuilder = RpcResultBuilder.success(output).withWarning(ErrorType.APPLICATION, "config-exists", message);
531                 return Futures.immediateFuture(rpcResultBuilder.build());
532             } else {
533                 //Construct the monitor key
534                 final MonitoringInfo monitoringInfo = new MonitoringInfoBuilder()
535                                                               .setId(monitorId)
536                                                               .setMode(in.getMode())
537                                                               .setProfileId(profileId)
538                                                               .setDestination(in.getDestination())
539                                                               .setSource(in.getSource()).build();
540                 //Construct the initial monitor state
541                 AlivenessProtocolHandler handler = ethTypeToProtocolHandler.get(ethType);
542                 final String monitoringKey = handler.getUniqueMonitoringKey(monitoringInfo);
543
544                 MonitoringState monitoringState = new MonitoringStateBuilder()
545                                                            .setMonitorKey(monitoringKey)
546                                                            .setMonitorId(monitorId)
547                                                            .setState(LivenessState.Unknown)
548                                                            .setStatus(MonitorStatus.Started)
549                                                            .setRequestCount(INITIAL_COUNT)
550                                                            .setResponsePendingCount(INITIAL_COUNT).build();
551
552                 MonitoridKeyEntry mapEntry = new MonitoridKeyEntryBuilder().setMonitorId(monitorId)
553                                                                      .setMonitorKey(monitoringKey).build();
554
555                 WriteTransaction tx = broker.newWriteOnlyTransaction();
556
557                 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId), monitoringInfo, CREATE_MISSING_PARENT);
558
559                 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitoringKey), monitoringState, CREATE_MISSING_PARENT);
560
561                 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId), mapEntry, CREATE_MISSING_PARENT);
562
563                 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
564                     @Override
565                     public void onFailure(Throwable error) {
566                         String errorMsg = String.format("Adding Monitoring info: %s in Datastore failed", monitoringInfo);
567                         LOG.warn(errorMsg, error);
568                         throw new RuntimeException(errorMsg, error);
569                     }
570
571                     @Override
572                     public void onSuccess(Void noarg) {
573                         //Schedule task
574                         LOG.debug("Scheduling monitor task for config: {}", in);
575                         scheduleMonitoringTask(monitoringInfo, profile.getMonitorInterval());
576                         lockMap.put(monitoringKey, new Semaphore(1, true));
577                     }
578                 });
579             }
580
581             associateMonitorIdWithInterface(monitorId, interfaceName);
582
583             MonitorStartOutput output = new MonitorStartOutputBuilder()
584                                             .setMonitorId(monitorId).build();
585
586             rpcResultBuilder = RpcResultBuilder.success(output);
587         } catch(Exception e) {
588             LOG.error("Start Monitoring Failed. {}", e.getMessage(), e);
589             rpcResultBuilder = RpcResultBuilder.<MonitorStartOutput>failed().withError(ErrorType.APPLICATION, e.getMessage(), e);
590         }
591         return Futures.immediateFuture(rpcResultBuilder.build());
592     }
593
594     private void associateMonitorIdWithInterface(final Long monitorId, final String interfaceName) {
595         LOG.debug("associate monitor Id {} with interface {}", monitorId, interfaceName);
596         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
597         ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture =
598                                         tx.read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
599         ListenableFuture<Void> updateFuture =
600                 Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
601
602                     @Override
603                     public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
604                         if(optEntry.isPresent()) {
605                             InterfaceMonitorEntry entry = optEntry.get();
606                             List<Long> monitorIds = entry.getMonitorIds();
607                             monitorIds.add(monitorId);
608                             InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder()
609                                         .setKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
610                             tx.merge(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry);
611                         } else {
612                             //Create new monitor entry
613                             LOG.debug("Adding new interface-monitor association for interface {} with id {}", interfaceName, monitorId);
614                             List<Long> monitorIds = new ArrayList<>();
615                             monitorIds.add(monitorId);
616                             InterfaceMonitorEntry newEntry =
617                                     new InterfaceMonitorEntryBuilder().setInterfaceName(interfaceName).setMonitorIds(monitorIds).build();
618                             tx.put(LogicalDatastoreType.OPERATIONAL,
619                                                   getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
620                         }
621                         return tx.submit();
622                     }
623                 });
624
625         Futures.addCallback(updateFuture, new FutureCallbackImpl(
626                      String.format("Association of monitorId %d with Interface %s", monitorId, interfaceName)));
627     }
628
629     private void scheduleMonitoringTask(MonitoringInfo monitoringInfo, long monitorInterval) {
630         AlivenessMonitorTask monitorTask = new AlivenessMonitorTask(monitoringInfo);
631         ScheduledFuture<?> scheduledFutureResult = monitorService.scheduleAtFixedRate(
632                                   monitorTask, NO_DELAY, monitorInterval, TimeUnit.MILLISECONDS);
633         monitoringTasks.put(monitoringInfo.getId(), scheduledFutureResult);
634     }
635
636     @Override
637     public Future<RpcResult<Void>> monitorPause(MonitorPauseInput input) {
638         LOG.debug("Monitor Pause operation invoked for monitor id: {}", input.getMonitorId());
639         SettableFuture<RpcResult<Void>> result = SettableFuture.create();
640         final Long monitorId = input.getMonitorId();
641
642         //Set the monitoring status to Paused
643         updateMonitorStatusTo(monitorId, MonitorStatus.Paused, new Predicate<MonitorStatus>() {
644             @Override
645             public boolean apply(MonitorStatus currentStatus) {
646                 return currentStatus == MonitorStatus.Started;
647             }
648         });
649
650         if(stopMonitoringTask(monitorId)) {
651             result.set(RpcResultBuilder.<Void>success().build());
652         } else {
653             String errorMsg = String.format("No Monitoring Task availble to pause for the given monitor id : %d", monitorId);
654             LOG.error("Monitor Pause operation failed- {}",errorMsg);
655             result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
656         }
657
658         return result;
659     }
660
661     @Override
662     public Future<RpcResult<Void>> monitorUnpause(MonitorUnpauseInput input) {
663         LOG.debug("Monitor Unpause operation invoked for monitor id: {}", input.getMonitorId());
664         final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
665
666         final Long monitorId = input.getMonitorId();
667         final ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
668         ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
669                 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
670
671         Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
672
673             @Override
674             public void onFailure(Throwable error) {
675                 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
676                 LOG.error("Monitor unpause Failed. {}", msg, error);
677                 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
678             }
679
680             @Override
681             public void onSuccess(Optional<MonitoringInfo> optInfo) {
682                 if(optInfo.isPresent()) {
683                     final MonitoringInfo info = optInfo.get();
684                     ListenableFuture<Optional<MonitorProfile>> readProfile =
685                             tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
686                     Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
687
688                         @Override
689                         public void onFailure(Throwable error) {
690                             String msg = String.format("Unable to read Monitoring profile associated with id %d", info.getProfileId());
691                             LOG.warn("Monitor unpause Failed. {}", msg, error);
692                             result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
693                         }
694
695                         @Override
696                         public void onSuccess(Optional<MonitorProfile> optProfile) {
697                             tx.close();
698                             if(optProfile.isPresent()) {
699                                 updateMonitorStatusTo(monitorId, MonitorStatus.Started, new Predicate<MonitorStatus>() {
700                                     @Override
701                                     public boolean apply(MonitorStatus currentStatus) {
702                                         return (currentStatus == MonitorStatus.Paused ||
703                                                     currentStatus == MonitorStatus.Stopped);
704                                     }
705                                 });
706                                 MonitorProfile profile = optProfile.get();
707                                 LOG.debug("Monitor Resume - Scheduling monitoring task with Id: {}", monitorId);
708                                 scheduleMonitoringTask(info, profile.getMonitorInterval());
709                                 result.set(RpcResultBuilder.<Void>success().build());
710                             } else {
711                                 String msg = String.format("Monitoring profile associated with id %d is not present", info.getProfileId());
712                                 LOG.warn("Monitor unpause Failed. {}", msg);
713                                 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
714                             }
715                         }
716                     });
717                 } else {
718                     tx.close();
719                     String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
720                     LOG.warn("Monitor unpause Failed. {}", msg);
721                     result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
722                 }
723             }
724         }, callbackExecutorService);
725
726         return result;
727     }
728
729     private boolean stopMonitoringTask(Long monitorId) {
730         return stopMonitoringTask(monitorId, INTERRUPT_TASK);
731     }
732
733     private boolean stopMonitoringTask(Long monitorId, boolean interruptTask) {
734         ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
735         if(scheduledFutureResult != null) {
736             scheduledFutureResult.cancel(interruptTask);
737             return true;
738         }
739         return false;
740     }
741
742     private Optional<MonitorProfile> getMonitorProfile(Long profileId) {
743         return read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
744     }
745
746     private void acquireLock(Semaphore lock) {
747         if(lock == null) {
748             return;
749         }
750
751         boolean acquiredLock = false;
752         try {
753             acquiredLock = lock.tryAcquire(50, TimeUnit.MILLISECONDS);
754         } catch (InterruptedException e) {
755             LOG.warn("Thread interrupted when waiting to acquire the lock");
756         }
757
758         if(!acquiredLock) {
759             LOG.warn("Previous transaction did not complete in time. Releasing the lock to proceed");
760             lock.release();
761             try {
762                 lock.acquire();
763                 LOG.trace("Lock acquired successfully");
764             } catch (InterruptedException e) {
765                 LOG.warn("Acquire failed");
766             }
767         } else {
768             LOG.trace("Lock acquired successfully");
769         }
770     }
771
772     private void releaseLock(Semaphore lock) {
773         if(lock != null) {
774             lock.release();
775         }
776     }
777
778     private void sendMonitorPacket(final MonitoringInfo monitoringInfo) {
779         //TODO: Handle interrupts
780         final Long monitorId = monitoringInfo.getId();
781         final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
782         if(monitorKey == null) {
783             LOG.warn("No monitor Key associated with id {} to send the monitor packet", monitorId);
784             return;
785         } else {
786             LOG.debug("Sending monitoring packet for key: {}", monitorKey);
787         }
788
789         final MonitorProfile profile;
790         Optional<MonitorProfile> optProfile = getMonitorProfile(monitoringInfo.getProfileId());
791         if(optProfile.isPresent()) {
792             profile = optProfile.get();
793         } else {
794             LOG.warn("No monitor profile associated with id {}. "
795                     + "Could not send Monitor packet for monitor-id {}", monitoringInfo.getProfileId(), monitorId);
796             return;
797         }
798
799         final Semaphore lock = lockMap.get(monitorKey);
800         LOG.debug("Acquiring lock for monitor key : {} to send monitor packet", monitorKey);
801         acquireLock(lock);
802
803         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
804         ListenableFuture<Optional<MonitoringState>> readResult =
805                                   tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
806         ListenableFuture<Void> writeResult = Futures.transform(readResult, new AsyncFunction<Optional<MonitoringState>, Void>() {
807
808             @Override
809             public ListenableFuture<Void> apply(Optional<MonitoringState> optState)
810                     throws Exception {
811                 if(optState.isPresent()) {
812                     MonitoringState state = optState.get();
813
814                     //Increase the request count
815                     Long requestCount = state.getRequestCount() + 1;
816
817                     //Check with the monitor window
818                     LivenessState currentLivenessState = state.getState();
819
820                     //Increase the pending response count
821                     long responsePendingCount = state.getResponsePendingCount();
822                     if(responsePendingCount < profile.getMonitorWindow()) {
823                         responsePendingCount = responsePendingCount + 1;
824                     }
825
826                     //Check with the failure thresold
827                     if(responsePendingCount >= profile.getFailureThreshold()) {
828                         //Change the state to down and notify
829                         if(currentLivenessState != LivenessState.Down) {
830                             LOG.debug("Response pending Count: {}, Failure threshold: {} for monitorId {}",
831                                     responsePendingCount, profile.getFailureThreshold(), state.getMonitorId());
832                             LOG.info("Sending notification for monitor Id : {} with State: {}",  
833                                     state.getMonitorId(), LivenessState.Down);
834                             publishNotification(monitorId, LivenessState.Down);
835                             currentLivenessState = LivenessState.Down;
836                             //Reset requestCount when state changes from UP to DOWN
837                             requestCount = INITIAL_COUNT;
838                         }
839                     }
840
841                     //Update the ODS with state
842                     MonitoringState updatedState = new MonitoringStateBuilder(/*state*/).setMonitorKey(state.getMonitorKey())
843                                                                                     .setRequestCount(requestCount)
844                                                                                     .setResponsePendingCount(responsePendingCount)
845                                                                                     .setState(currentLivenessState).build();
846                     tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(state.getMonitorKey()), updatedState);
847                     return tx.submit();
848                 } else {
849                     //Close the transaction
850                     tx.submit();
851                     String errorMsg = String.format("Monitoring State associated with id %d is not present to send packet out.", monitorId);
852                     return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
853                 }
854             }
855
856         });
857
858         Futures.addCallback(writeResult, new FutureCallback<Void>() {
859             @Override
860             public void onSuccess(Void noarg) {
861                 //invoke packetout on protocol handler
862                 AlivenessProtocolHandler handler = ethTypeToProtocolHandler.get(profile.getProtocolType());
863                 if(handler != null) {
864                     LOG.debug("Sending monitoring packet {}", monitoringInfo);
865                     handler.sendPacketOut(monitoringInfo);
866                 }
867                 releaseLock(lock);
868             }
869
870             @Override
871             public void onFailure(Throwable error) {
872                 LOG.warn("Updating monitoring state for key: {} failed. Monitoring packet is not sent", monitorKey, error);
873                 releaseLock(lock);
874             }
875
876         });
877
878     }
879
880     private void publishNotification(final Long monitorId, final LivenessState state) {
881         LOG.debug("Sending notification for id {}  - state {}", monitorId, state);
882         EventData data = new EventDataBuilder().setMonitorId(monitorId)
883                                                .setMonitorState(state).build();
884         MonitorEvent event = new MonitorEventBuilder().setEventData(data).build();;
885         final ListenableFuture<? extends Object> eventFuture = notificationPublishService.offerNotification(event);
886         Futures.addCallback(eventFuture, new FutureCallback<Object>() {
887             @Override
888             public void onFailure(Throwable error) {
889                 LOG.warn("Error in notifying listeners for id {} - state {}", monitorId, state, error);
890             }
891
892             @Override
893             public void onSuccess(Object arg) {
894                 LOG.trace("Successful in notifying listeners for id {} - state {}", monitorId, state);
895             }
896         });
897     }
898
899     @Override
900     public Future<RpcResult<MonitorProfileCreateOutput>> monitorProfileCreate(final MonitorProfileCreateInput input) {
901         LOG.debug("Monitor Profile Create operation - {}", input.getProfile());
902         final SettableFuture<RpcResult<MonitorProfileCreateOutput>> result = SettableFuture.create();
903         Profile profile = input.getProfile();
904         final Long failureThreshold = profile.getFailureThreshold();
905         final Long monitorInterval = profile.getMonitorInterval();
906         final Long monitorWindow = profile.getMonitorWindow();
907         final EtherTypes ethType = profile.getProtocolType();
908         String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, ethType);
909         final Long profileId = Long.valueOf(getUniqueId(idKey));
910
911         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
912         ListenableFuture<Optional<MonitorProfile>> readFuture =
913                                    tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
914         ListenableFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture =
915                 Futures.transform(readFuture, new AsyncFunction<Optional<MonitorProfile>, RpcResult<MonitorProfileCreateOutput>>() {
916
917                     @Override
918                     public ListenableFuture<RpcResult<MonitorProfileCreateOutput>> apply(
919                             Optional<MonitorProfile> optProfile) throws Exception {
920                         if(optProfile.isPresent()) {
921                             tx.cancel();
922                             MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
923                                                                             .setProfileId(profileId).build();
924                             String msg = String.format("Monitor profile %s already present for the given input", input);
925                             LOG.warn(msg);
926                             result.set(RpcResultBuilder.success(output)
927                                          .withWarning(ErrorType.PROTOCOL, "profile-exists", msg).build());
928                         } else {
929                             final MonitorProfile monitorProfile = new MonitorProfileBuilder().setId(profileId)
930                                                                                        .setFailureThreshold(failureThreshold)
931                                                                                        .setMonitorInterval(monitorInterval)
932                                                                                        .setMonitorWindow(monitorWindow)
933                                                                                        .setProtocolType(ethType).build();
934                             tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId), monitorProfile, CREATE_MISSING_PARENT);
935                             Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
936                                 @Override
937                                 public void onFailure(Throwable error) {
938                                     String msg =
939                                             String.format("Error when storing monitorprofile %s in datastore", monitorProfile);
940                                     LOG.error(msg, error);
941                                     result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
942                                                 .withError(ErrorType.APPLICATION, msg, error).build());
943                                 }
944                                 @Override
945                                 public void onSuccess(Void noarg) {
946                                     MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
947                                                                               .setProfileId(profileId).build();
948                                     result.set(RpcResultBuilder.success(output).build());
949                                 }
950                             });
951                         }
952                         return result;
953                     }
954                 }, callbackExecutorService);
955         Futures.addCallback(resultFuture, new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
956             @Override
957             public void onFailure(Throwable error) {
958                 //This would happen when any error happens during reading for monitoring profile
959                 String msg = String.format("Error in creating monitorprofile - %s", input);
960                 result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
961                             .withError(ErrorType.APPLICATION, msg, error).build());
962                 LOG.error(msg, error);
963             }
964
965             @Override
966             public void onSuccess(RpcResult<MonitorProfileCreateOutput> result) {
967                 LOG.debug("Successfully created monitor Profile {} ", input);
968             }
969         }, callbackExecutorService);
970         return result;
971     }
972
973     private String getUniqueProfileKey(Long failureThreshold,Long monitorInterval,Long monitorWindow,EtherTypes ethType) {
974         return new StringBuilder().append(failureThreshold).append(AlivenessMonitorConstants.SEPERATOR)
975                                   .append(monitorInterval).append(AlivenessMonitorConstants.SEPERATOR)
976                                   .append(monitorWindow).append(AlivenessMonitorConstants.SEPERATOR)
977                                   .append(ethType).append(AlivenessMonitorConstants.SEPERATOR).toString();
978     }
979
980     @Override
981     public Future<RpcResult<Void>> monitorProfileDelete(final MonitorProfileDeleteInput input) {
982         LOG.debug("Monitor Profile delete for Id: {}", input.getProfileId());
983         final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
984         final Long profileId = input.getProfileId();
985         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
986         ListenableFuture<Optional<MonitorProfile>> readFuture =
987                                    tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
988         ListenableFuture<RpcResult<Void>> writeFuture =
989                    Futures.transform(readFuture, new AsyncFunction<Optional<MonitorProfile>, RpcResult<Void>>() {
990
991                         @Override
992                         public ListenableFuture<RpcResult<Void>> apply(final Optional<MonitorProfile> optProfile) throws Exception {
993                             if(optProfile.isPresent()) {
994                                 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
995                                 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
996                                     @Override
997                                     public void onFailure(Throwable error) {
998                                         String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
999                                         LOG.error(msg, error);
1000                                         result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
1001                                     }
1002
1003                                     @Override
1004                                     public void onSuccess(Void noarg) {
1005                                         MonitorProfile profile = optProfile.get();
1006                                         String id = getUniqueProfileKey(profile.getFailureThreshold(), profile.getMonitorInterval(), 
1007                                                                                  profile.getMonitorWindow(), profile.getProtocolType());
1008                                         releaseId(id);
1009                                         result.set(RpcResultBuilder.<Void>success().build());
1010                                     }
1011                                 });
1012                             } else {
1013                                 String msg = String.format("Monitor profile with Id: %d does not exist", profileId);
1014                                 LOG.info(msg);
1015                                 result.set(RpcResultBuilder.<Void>success().withWarning(ErrorType.PROTOCOL, "invalid-value", msg).build());
1016                             }
1017                             return result;
1018                         }
1019                     }, callbackExecutorService);
1020
1021         Futures.addCallback(writeFuture, new FutureCallback<RpcResult<Void>>() {
1022
1023             @Override
1024             public void onFailure(Throwable error) {
1025                 String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
1026                 LOG.error(msg, error);
1027                 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
1028             }
1029
1030             @Override
1031             public void onSuccess(RpcResult<Void> noarg) {
1032                 LOG.debug("Successfully removed Monitor Profile {}", profileId);
1033             }
1034         }, callbackExecutorService);
1035         return result;
1036     }
1037
1038     @Override
1039     public Future<RpcResult<Void>> monitorStop(MonitorStopInput input) {
1040         LOG.debug("Monitor Stop operation for monitor id - {}", input.getMonitorId());
1041         SettableFuture<RpcResult<Void>> result = SettableFuture.create();
1042
1043         final Long monitorId = input.getMonitorId();
1044         Optional<MonitoringInfo> optInfo = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1045         if(optInfo.isPresent()) {
1046             //Stop the monitoring task
1047             stopMonitoringTask(monitorId);
1048
1049             //Cleanup the Data store
1050             WriteTransaction tx = broker.newWriteOnlyTransaction();
1051             String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1052             if(monitorKey != null) {
1053                 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
1054                 monitorIdKeyCache.invalidate(monitorId);
1055             }
1056
1057             tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1058             Futures.addCallback(tx.submit(),
1059                     new FutureCallbackImpl(String.format("Delete monitor state with Id %d", monitorId)));
1060
1061             MonitoringInfo info = optInfo.get();
1062             String interfaceName = getInterfaceName(info.getSource().getEndpointType());
1063             if(interfaceName != null) {
1064                 removeMonitorIdFromInterfaceAssociation(monitorId, interfaceName);
1065             }
1066             releaseIdForMonitoringInfo(info);
1067
1068             lockMap.remove(monitorKey);
1069
1070             result.set(RpcResultBuilder.<Void>success().build());
1071         } else {
1072             String errorMsg = String.format("Do not have monitoring information associated with key %d", monitorId);
1073             LOG.error("Delete monitoring operation Failed - {}", errorMsg);
1074             result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
1075         }
1076
1077         return result;
1078     }
1079
1080     private void removeMonitorIdFromInterfaceAssociation(final Long monitorId, final String interfaceName) {
1081         LOG.debug("Remove monitorId {} from Interface association {}", monitorId, interfaceName);
1082         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
1083         ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
1084         ListenableFuture<Void> updateFuture = Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
1085
1086             @Override
1087             public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
1088                 if(optEntry.isPresent()) {
1089                     InterfaceMonitorEntry entry = optEntry.get();
1090                     List<Long> monitorIds = entry.getMonitorIds();
1091                     monitorIds.remove(monitorId);
1092                     InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder(entry)
1093                                        .setKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
1094                     tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
1095                     return tx.submit();
1096                 } else {
1097                     LOG.warn("No Interface map entry found {} to remove monitorId {}", interfaceName, monitorId);
1098                     tx.cancel();
1099                     return Futures.immediateFuture(null);
1100                 }
1101             }
1102         });
1103
1104         Futures.addCallback(updateFuture, new FutureCallbackImpl(
1105                      String.format("Dis-association of monitorId %d with Interface %s", monitorId, interfaceName)));
1106     }
1107
1108
1109     private void releaseIdForMonitoringInfo(MonitoringInfo info) {
1110         Long monitorId = info.getId();
1111         EndpointType source = info.getSource().getEndpointType();
1112         String interfaceName = getInterfaceName(source);
1113         if(!Strings.isNullOrEmpty(interfaceName)) {
1114             Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1115             if(optProfile.isPresent()) {
1116                 EtherTypes ethType = optProfile.get().getProtocolType();
1117                 EndpointType destination = (info.getDestination() != null) ? info.getDestination().getEndpointType() : null; 
1118                 String idKey = getUniqueKey(interfaceName, ethType.toString(), source, destination);
1119                 releaseId(idKey);
1120             } else {
1121                 LOG.warn("Could not release monitorId {}. No profile associated with it", monitorId);
1122             }
1123         }
1124     }
1125
1126     private String getInterfaceName(EndpointType endpoint) {
1127         String interfaceName = null;
1128         if(endpoint instanceof Interface) {
1129             interfaceName = ((Interface)endpoint).getInterfaceName();
1130         }
1131         return interfaceName;
1132     }
1133
1134     private void stopMonitoring(long monitorId) {
1135         updateMonitorStatusTo(monitorId, MonitorStatus.Stopped, new Predicate<MonitorStatus>() {
1136             @Override
1137             public boolean apply(MonitorStatus currentStatus) {
1138                 return currentStatus != MonitorStatus.Stopped;
1139             }
1140         });
1141         if(!stopMonitoringTask(monitorId)) {
1142             LOG.warn("No monitoring task running to perform cancel operation for monitorId {}", monitorId);
1143         }
1144     }
1145
1146     private void updateMonitorStatusTo(final Long monitorId, final MonitorStatus newStatus, final Predicate<MonitorStatus> isValidStatus) {
1147         final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1148         if(monitorKey == null) {
1149             LOG.warn("No monitor Key associated with id {} to change the monitor status to {}", monitorId, newStatus);
1150             return;
1151         }
1152         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
1153
1154         ListenableFuture<Optional<MonitoringState>> readResult =
1155                             tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
1156
1157         ListenableFuture<Void> writeResult = Futures.transform(readResult, new AsyncFunction<Optional<MonitoringState>, Void>() {
1158             @Override
1159             public ListenableFuture<Void> apply(Optional<MonitoringState> optState) throws Exception {
1160                 if(optState.isPresent()) {
1161                     MonitoringState state = optState.get();
1162                     if(isValidStatus.apply(state.getStatus())) {
1163                         MonitoringState updatedState = new MonitoringStateBuilder().setMonitorKey(monitorKey)
1164                                                                               .setStatus(newStatus).build();
1165                         tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), updatedState);
1166                     } else {
1167                         LOG.warn("Invalid Monitoring status {}, cannot be updated to {} for monitorId {}"
1168                                                                     , state.getStatus(), newStatus, monitorId);
1169                     }
1170                 } else {
1171                     LOG.warn("No associated monitoring state data available to update the status to {} for {}", newStatus, monitorId);
1172                 }
1173                 return tx.submit();
1174             }
1175         });
1176
1177         Futures.addCallback(writeResult,
1178                         new FutureCallbackImpl(String.format("Monitor status update for %d to %s", monitorId, newStatus.toString())));
1179     }
1180
1181     private void resumeMonitoring(final long monitorId) {
1182         final ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
1183         ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
1184                 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1185
1186         Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
1187
1188             @Override
1189             public void onFailure(Throwable error) {
1190                 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
1191                 LOG.error("Monitor resume Failed. {}", msg, error);
1192             }
1193
1194             @Override
1195             public void onSuccess(Optional<MonitoringInfo> optInfo) {
1196                 if(optInfo.isPresent()) {
1197                     final MonitoringInfo info = optInfo.get();
1198                     ListenableFuture<Optional<MonitorProfile>> readProfile =
1199                             tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1200                     Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
1201
1202                         @Override
1203                         public void onFailure(Throwable error) {
1204                             String msg = String.format("Unable to read Monitoring profile associated with id %d", info.getProfileId());
1205                             LOG.warn("Monitor resume Failed. {}", msg, error);
1206                         }
1207
1208                         @Override
1209                         public void onSuccess(Optional<MonitorProfile> optProfile) {
1210                             tx.close();
1211                             if(optProfile.isPresent()) {
1212                                 updateMonitorStatusTo(monitorId, MonitorStatus.Started, new Predicate<MonitorStatus>() {
1213                                     @Override
1214                                     public boolean apply(MonitorStatus currentStatus) {
1215                                         return currentStatus != MonitorStatus.Started;
1216                                     }
1217                                 });
1218                                 MonitorProfile profile = optProfile.get();
1219                                 LOG.debug("Monitor Resume - Scheduling monitoring task for Id: {}", monitorId);
1220                                 scheduleMonitoringTask(info, profile.getMonitorInterval());
1221                             } else {
1222                                 String msg = String.format("Monitoring profile associated with id %d is not present", info.getProfileId());
1223                                 LOG.warn("Monitor resume Failed. {}", msg);
1224                             }
1225                         }
1226                     });
1227                 } else {
1228                     tx.close();
1229                     String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
1230                     LOG.warn("Monitor resume Failed. {}", msg);
1231                 }
1232             }
1233         });
1234     }
1235
1236     //DATA STORE OPERATIONS
1237     private <T extends DataObject> Optional<T> read(LogicalDatastoreType datastoreType, InstanceIdentifier<T> path) {
1238         ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
1239
1240         Optional<T> result = Optional.absent();
1241         try {
1242             result = tx.read(datastoreType, path).get();
1243         } catch (InterruptedException | ExecutionException e) {
1244             LOG.warn("Error reading data from path {} in datastore {}", path, datastoreType, e);
1245         } finally {
1246             tx.close();
1247         }
1248
1249         return result;
1250     }
1251
1252     @Override
1253     public void onInterfaceStateUp(String interfaceName) {
1254         List<Long> monitorIds = getMonitorIds(interfaceName);
1255         if(monitorIds.isEmpty()) {
1256             LOG.warn("Could not get monitorId for interface: {}", interfaceName);
1257             return;
1258         }
1259         for(Long monitorId : monitorIds) {
1260             LOG.debug("Resume monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1261             resumeMonitoring(monitorId);
1262         }
1263     }
1264
1265     @Override
1266     public void onInterfaceStateDown(String interfaceName) {
1267         List<Long> monitorIds = getMonitorIds(interfaceName);
1268         if(monitorIds.isEmpty()) {
1269             LOG.warn("Could not get monitorIds for interface: {}", interfaceName);
1270             return;
1271         }
1272         for(Long monitorId : monitorIds) {
1273             LOG.debug("Suspend monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1274             stopMonitoring(monitorId);
1275         }
1276     }
1277
1278     private List<Long> getMonitorIds(String interfaceName) {
1279         Optional<InterfaceMonitorEntry> optEntry = read(LogicalDatastoreType.OPERATIONAL,
1280                                                            getInterfaceMonitorMapId(interfaceName));
1281         if(optEntry.isPresent()) {
1282             InterfaceMonitorEntry entry = optEntry.get();
1283             return entry.getMonitorIds();
1284         }
1285         return Collections.emptyList();
1286     }
1287
1288 }