Bump versions by 0.1.0 for next dev cycle
[vpnservice.git] / alivenessmonitor / alivenessmonitor-impl / src / main / java / org / opendaylight / vpnservice / alivenessmonitor / internal / AlivenessMonitor.java
1 /*
2  * Copyright (c) 2015 - 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.vpnservice.alivenessmonitor.internal;
9
10 import java.lang.Thread.UncaughtExceptionHandler;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.EnumMap;
14 import java.util.HashMap;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.Semaphore;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28
29 import org.opendaylight.controller.liblldp.NetUtils;
30 import org.opendaylight.controller.liblldp.Packet;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
34 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
35 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
36 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
37 import org.opendaylight.vpnservice.mdsalutil.packet.Ethernet;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.AlivenessMonitorService;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.EtherTypes;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.LivenessState;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorEvent;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorEventBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorPauseInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateOutputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileDeleteInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartOutputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStatus;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStopInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorUnpauseInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitoringMode;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileGetInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileGetOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileGetOutputBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntry;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntryBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntryKey;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.EndpointType;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.Interface;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.IpAddress;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfo;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfoBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.event.EventData;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.event.EventDataBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profile.create.input.Profile;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfile;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfileBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.start.input.Config;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntry;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntryBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringState;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringStateBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdInput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdInputBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdOutput;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.CreateIdPoolInput;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.CreateIdPoolInputBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.IdManagerService;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.ReleaseIdInput;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.ReleaseIdInputBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.OdlInterfaceRpcService;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInReason;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
90 import org.opendaylight.yangtools.yang.binding.DataObject;
91 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
92 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
93 import org.opendaylight.yangtools.yang.common.RpcResult;
94 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
95 import org.slf4j.Logger;
96 import org.slf4j.LoggerFactory;
97
98 import com.google.common.base.Optional;
99 import com.google.common.base.Preconditions;
100 import com.google.common.base.Predicate;
101 import com.google.common.base.Strings;
102 import com.google.common.cache.CacheBuilder;
103 import com.google.common.cache.CacheLoader;
104 import com.google.common.cache.LoadingCache;
105 import com.google.common.util.concurrent.AsyncFunction;
106 import com.google.common.util.concurrent.FutureCallback;
107 import com.google.common.util.concurrent.Futures;
108 import com.google.common.util.concurrent.JdkFutureAdapters;
109 import com.google.common.util.concurrent.ListenableFuture;
110 import com.google.common.util.concurrent.SettableFuture;
111 import com.google.common.util.concurrent.ThreadFactoryBuilder;
112
113 import static org.opendaylight.vpnservice.alivenessmonitor.internal.AlivenessMonitorUtil.*;
114
115 public class AlivenessMonitor implements AlivenessMonitorService, PacketProcessingListener,
116                                          ServiceProvider, InterfaceStateListener, AutoCloseable {
117     private static final Logger LOG = LoggerFactory.getLogger(AlivenessMonitor.class);
118     private final DataBroker broker;
119     private IdManagerService idManager;
120     private PacketProcessingService packetProcessingService;
121     private NotificationPublishService notificationPublishService;
122     private OdlInterfaceRpcService interfaceManager;
123     private Map<Class<?>, AlivenessProtocolHandler> packetTypeToProtocolHandler;
124     private Map<EtherTypes, AlivenessProtocolHandler> ethTypeToProtocolHandler;
125     private ConcurrentMap<Long, ScheduledFuture<?>> monitoringTasks;
126     private LoadingCache<Long, String> monitorIdKeyCache;
127     private ScheduledExecutorService monitorService;
128     private ExecutorService callbackExecutorService;
129
130     private static final int THREAD_POOL_SIZE = 4;
131     private static final boolean INTERRUPT_TASK = true;
132     private static final int NO_DELAY = 0;
133     private static final Long INITIAL_COUNT = 0L;
134     private static final boolean CREATE_MISSING_PARENT = true;
135     private static final int INVALID_ID = 0;
136     private ConcurrentMap<String, Semaphore> lockMap = new ConcurrentHashMap<>();
137
138     private class FutureCallbackImpl implements FutureCallback<Void> {
139         private String message;
140         public FutureCallbackImpl(String message) {
141             this.message = message;
142         }
143
144         @Override
145         public void onFailure(Throwable error) {
146             LOG.warn("Error in Datastore operation - {}", message, error);
147         }
148
149         @Override
150         public void onSuccess(Void result) {
151             LOG.debug("Success in Datastore operation - {}", message);
152         }
153     }
154
155     private class AlivenessMonitorTask implements Runnable {
156         private MonitoringInfo monitoringInfo;
157
158         public AlivenessMonitorTask(MonitoringInfo monitoringInfo) {
159             this.monitoringInfo = monitoringInfo;
160         }
161
162         @Override
163         public void run() {
164             if(LOG.isTraceEnabled()) {
165                 LOG.trace("send monitor packet - {}", monitoringInfo);
166             }
167             sendMonitorPacket(monitoringInfo);
168         }
169     }
170
171     public AlivenessMonitor(DataBroker dataBroker) {
172         broker = dataBroker;
173         ethTypeToProtocolHandler = new EnumMap<>(EtherTypes.class);
174         packetTypeToProtocolHandler = new HashMap<>();
175         monitorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
176                             getMonitoringThreadFactory("Aliveness Monitoring Task"));
177         callbackExecutorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
178                             getMonitoringThreadFactory("Aliveness Callback Handler"));
179         monitoringTasks = new ConcurrentHashMap<>();
180         initilizeCache();
181     }
182
183     private ThreadFactory getMonitoringThreadFactory(String threadNameFormat) {
184         ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
185         builder.setNameFormat(threadNameFormat);
186         builder.setUncaughtExceptionHandler( new UncaughtExceptionHandler() {
187             @Override
188             public void uncaughtException(Thread t, Throwable e) {
189                 LOG.error("Received Uncaught Exception event in Thread: {}", t.getName(), e);
190             }
191         });
192         return builder.build();
193     }
194
195     private void initilizeCache() {
196         monitorIdKeyCache = CacheBuilder.newBuilder()
197                 .build(new CacheLoader<Long, String>() {
198                     @Override
199                     public String load(Long monitorId) throws Exception {
200                         String monitorKey = null;
201                         Optional<MonitoridKeyEntry> optKey = read(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId));
202                         if(optKey.isPresent()) {
203                             monitorKey = optKey.get().getMonitorKey();
204                         }
205                         return monitorKey;
206                     }
207                 });
208     }
209
210     @Override
211     public void close() throws Exception {
212         monitorIdKeyCache.cleanUp();
213         monitorService.shutdown();
214         callbackExecutorService.shutdown();
215     }
216
217     @Override
218     public DataBroker getDataBroker() {
219         return broker;
220     }
221
222     @Override
223     public OdlInterfaceRpcService getInterfaceManager() {
224         return interfaceManager;
225     }
226
227     public void setPacketProcessingService(PacketProcessingService pktProcessingService) {
228         this.packetProcessingService = pktProcessingService;
229     }
230
231     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
232         this.notificationPublishService = notificationPublishService;
233     }
234
235     public void setInterfaceManager(OdlInterfaceRpcService interfaceManager) {
236         this.interfaceManager = interfaceManager;
237     }
238
239     public void setIdManager(IdManagerService idManager) {
240         this.idManager = idManager;
241         createIdPool();
242     }
243
244     public void registerHandler(EtherTypes etherType, AlivenessProtocolHandler protocolHandler) {
245         ethTypeToProtocolHandler.put(etherType, protocolHandler);
246         packetTypeToProtocolHandler.put(protocolHandler.getPacketClass(), protocolHandler);
247     }
248
249     private void createIdPool() {
250         CreateIdPoolInput createPool = new CreateIdPoolInputBuilder()
251                                             .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
252                                             .setLow(AlivenessMonitorConstants.MONITOR_IDPOOL_START)
253                                             .setHigh(AlivenessMonitorConstants.MONITOR_IDPOOL_SIZE)
254                                             .build();
255         Future<RpcResult<Void>> result = idManager.createIdPool(createPool);
256         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(result), new FutureCallback<RpcResult<Void>>() {
257
258             @Override
259             public void onFailure(Throwable error) {
260                 LOG.error("Failed to create idPool for Aliveness Monitor Service",error);
261             }
262
263             @Override
264             public void onSuccess(RpcResult<Void> result) {
265                 if(result.isSuccessful()) {
266                     LOG.debug("Created IdPool for Aliveness Monitor Service");
267                 } else {
268                     LOG.error("RPC to create Idpool failed {}", result.getErrors());
269                 }
270             }
271         });
272     }
273
274     private int getUniqueId(final String idKey) {
275         AllocateIdInput getIdInput = new AllocateIdInputBuilder()
276                   .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
277                   .setIdKey(idKey).build();
278
279         Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(getIdInput);
280
281         try {
282             RpcResult<AllocateIdOutput> rpcResult = result.get();
283             if(rpcResult.isSuccessful()) {
284                 return rpcResult.getResult().getIdValue().intValue();
285             } else {
286                 LOG.warn("RPC Call to Get Unique Id returned with Errors {}", rpcResult.getErrors());
287             }
288         } catch (InterruptedException | ExecutionException e) {
289             LOG.warn("Exception when getting Unique Id for key {}", idKey, e);
290         }
291         return INVALID_ID;
292     }
293
294     private void releaseId(String idKey) {
295         ReleaseIdInput idInput = new ReleaseIdInputBuilder()
296                                        .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
297                                        .setIdKey(idKey).build();
298         try {
299             Future<RpcResult<Void>> result = idManager.releaseId(idInput);
300             RpcResult<Void> rpcResult = result.get();
301             if(!rpcResult.isSuccessful()) {
302                 LOG.warn("RPC Call to release Id {} with Key {} returned with Errors {}",
303                                                             idKey, rpcResult.getErrors());
304             }
305         } catch (InterruptedException | ExecutionException e) {
306             LOG.warn("Exception when releasing Id for key {}", idKey, e);
307         }
308     }
309
310     @Override
311     public void onPacketReceived(PacketReceived packetReceived) {
312         Class<? extends PacketInReason> pktInReason = packetReceived.getPacketInReason();
313         if(LOG.isTraceEnabled()) {
314             LOG.trace("Packet Received {}", packetReceived );
315         }
316
317         if (pktInReason == SendToController.class) {
318             Packet packetInFormatted;
319             byte[] data = packetReceived.getPayload();
320             Ethernet res = new Ethernet();
321             try {
322                 packetInFormatted = res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
323             } catch (Exception e) {
324                 LOG.warn("Failed to decode packet: {}", e.getMessage());
325                 return;
326             }
327
328             if(packetInFormatted == null) {
329                 LOG.warn("Failed to deserialize Received Packet from table {}", packetReceived.getTableId().getValue());
330                 return;
331             }
332
333             Object objPayload = packetInFormatted.getPayload();
334
335             if(objPayload == null) {
336                 LOG.trace("Unsupported packet type. Ignoring the packet...");
337                 return;
338             }
339
340             if (LOG.isTraceEnabled()) {
341                 LOG.trace("onPacketReceived packet: {}, packet class: {}", packetReceived,
342                         objPayload.getClass());
343             }
344
345             AlivenessProtocolHandler livenessProtocolHandler = packetTypeToProtocolHandler.get(objPayload.getClass());
346             if (livenessProtocolHandler == null) {
347                     return;
348             }
349
350             String monitorKey = livenessProtocolHandler.handlePacketIn(packetInFormatted.getPayload(), packetReceived);
351
352             if(monitorKey != null) {
353                 processReceivedMonitorKey(monitorKey);
354             } else {
355                 LOG.debug("No monitorkey associated with received packet");
356             }
357         }
358     }
359
360     private void processReceivedMonitorKey(final String monitorKey) {
361         Preconditions.checkNotNull(monitorKey, "Monitor Key required to process the state");
362
363         LOG.debug("Processing monitorKey: {} for received packet", monitorKey);
364
365         final Semaphore lock = lockMap.get(monitorKey);
366         LOG.debug("Acquiring lock for monitor key : {} to process monitor packet", monitorKey);
367         acquireLock(lock);
368
369         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
370
371         ListenableFuture<Optional<MonitoringState>> stateResult = tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
372
373         //READ Callback
374         Futures.addCallback(stateResult, new FutureCallback<Optional<MonitoringState>>() {
375
376             @Override
377             public void onSuccess(Optional<MonitoringState> optState) {
378
379                 if(optState.isPresent()) {
380                     final MonitoringState currentState = optState.get();
381
382                     if(LOG.isTraceEnabled()) {
383                         LOG.trace("OnPacketReceived : Monitoring state from ODS : {} ", currentState);
384                     }
385
386                     Long responsePendingCount = currentState.getResponsePendingCount();
387
388                     //Need to relook at the pending count logic to support N out of M scenarios
389 //                    if(currentState.getState() != LivenessState.Up) {
390 //                        //Reset responsePendingCount when state changes from DOWN to UP
391 //                        responsePendingCount = INITIAL_COUNT;
392 //                    }
393 //
394 //                    if(responsePendingCount > INITIAL_COUNT) {
395 //                        responsePendingCount = currentState.getResponsePendingCount() - 1;
396 //                    }
397                     responsePendingCount = INITIAL_COUNT;
398
399                     final boolean stateChanged =  (currentState.getState() == LivenessState.Down ||
400                                                            currentState.getState() == LivenessState.Unknown);
401
402                     final MonitoringState state = new MonitoringStateBuilder().setMonitorKey(monitorKey).setState(LivenessState.Up)
403                                                            .setResponsePendingCount(responsePendingCount).build();
404                     tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), state);
405                     ListenableFuture<Void> writeResult = tx.submit();
406
407                     //WRITE Callback
408                     Futures.addCallback(writeResult, new FutureCallback<Void>() {
409                         @Override
410                         public void onSuccess(Void noarg) {
411                             releaseLock(lock);
412                             if(stateChanged) {
413                                 //send notifications
414                                 LOG.info("Sending notification for monitor Id : {} with Current State: {}",
415                                         currentState.getMonitorId(), LivenessState.Up);
416                                 publishNotification(currentState.getMonitorId(), LivenessState.Up);
417                             } else {
418                                 if(LOG.isTraceEnabled()) {
419                                     LOG.trace("Successful in writing monitoring state {} to ODS", state);
420                                 }
421                             }
422                         }
423
424                         @Override
425                         public void onFailure(Throwable error) {
426                             releaseLock(lock);
427                             LOG.warn("Error in writing monitoring state : {} to Datastore", monitorKey, error);
428                             if(LOG.isTraceEnabled()) {
429                                 LOG.trace("Error in writing monitoring state: {} to Datastore", state);
430                             }
431                         }
432                     });
433                 } else {
434                     LOG.warn("Monitoring State not available for key: {} to process the Packet received", monitorKey);
435                     //Complete the transaction
436                     tx.submit();
437                     releaseLock(lock);
438                 }
439             }
440
441             @Override
442             public void onFailure(Throwable error) {
443                 LOG.error("Error when reading Monitoring State for key: {} to process the Packet received", monitorKey, error);
444                 //FIXME: Not sure if the transaction status is valid to cancel
445                 tx.cancel();
446                 releaseLock(lock);
447             }
448         });
449     }
450
451     @Override
452     public PacketProcessingService getPacketProcessingService() {
453         return packetProcessingService;
454     }
455
456     private String getIpAddress(EndpointType endpoint) {
457         String ipAddress = "";
458         if( endpoint instanceof IpAddress) {
459             ipAddress = ((IpAddress) endpoint).getIpAddress().getIpv4Address().getValue();
460         } else if (endpoint instanceof Interface) {
461             ipAddress = ((Interface)endpoint).getInterfaceIp().getIpv4Address().getValue();
462         }
463         return ipAddress;
464     }
465
466     private String getUniqueKey(String interfaceName, String ethType, EndpointType source, EndpointType destination) {
467         StringBuilder builder =  new StringBuilder().append(interfaceName).append(AlivenessMonitorConstants.SEPERATOR)
468                                                     .append(ethType);
469         if(source != null) {
470             builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(source));
471         }
472
473         if(destination != null) {
474             builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(destination));
475         }
476         return builder.toString();
477     }
478
479     @Override
480     public Future<RpcResult<MonitorStartOutput>> monitorStart(MonitorStartInput input) {
481         RpcResultBuilder<MonitorStartOutput> rpcResultBuilder;
482         final Config in = input.getConfig();
483         Long profileId = in.getProfileId();
484         LOG.debug("Monitor Start invoked with Config: {}, Profile Id: {}", in, profileId);
485
486         try {
487             if(in.getMode() != MonitoringMode.OneOne) {
488                 throw new UnsupportedConfigException(
489                         "Unsupported Monitoring mode. Currently one-one mode is supported");
490             }
491
492             Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
493             final MonitorProfile profile;
494             if(!optProfile.isPresent()) {
495                 String errMsg =  String.format("No monitoring profile associated with Id: %d", profileId);
496                 LOG.error("Monitor start failed. {}", errMsg);
497                 throw new RuntimeException(errMsg);
498             } else {
499                 profile = optProfile.get();
500             }
501
502             EtherTypes ethType = profile.getProtocolType();
503
504             String interfaceName = null;
505             EndpointType srcEndpointType = in.getSource().getEndpointType();
506
507             if( srcEndpointType instanceof Interface) {
508                 Interface endPoint = (Interface) srcEndpointType;
509                 interfaceName = endPoint.getInterfaceName();
510             } else {
511                 throw new UnsupportedConfigException(
512                         "Unsupported source Endpoint type. Only Interface Endpoint currently supported for monitoring");
513             }
514
515             if(Strings.isNullOrEmpty(interfaceName)) {
516                 throw new RuntimeException("Interface Name not defined in the source Endpoint");
517             }
518
519             //Initially the support is for one monitoring per interface. 
520             //Revisit the retrieving monitor id logic when the multiple monitoring for same interface is needed.
521             EndpointType destEndpointType = null;
522             if(in.getDestination() != null) {
523                 destEndpointType = in.getDestination().getEndpointType();
524             }
525             String idKey = getUniqueKey(interfaceName, ethType.toString(), srcEndpointType, destEndpointType);
526             final long monitorId = getUniqueId(idKey);
527             Optional<MonitoringInfo> optKey = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
528
529             if(optKey.isPresent()) {
530                 String message = String.format("Monitoring for the interface %s with this configuration is already registered.", interfaceName);
531                 LOG.warn(message);
532                 MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
533                 rpcResultBuilder = RpcResultBuilder.success(output).withWarning(ErrorType.APPLICATION, "config-exists", message);
534                 return Futures.immediateFuture(rpcResultBuilder.build());
535             } else {
536                 //Construct the monitor key
537                 final MonitoringInfo monitoringInfo = new MonitoringInfoBuilder()
538                                                               .setId(monitorId)
539                                                               .setMode(in.getMode())
540                                                               .setProfileId(profileId)
541                                                               .setDestination(in.getDestination())
542                                                               .setSource(in.getSource()).build();
543                 //Construct the initial monitor state
544                 AlivenessProtocolHandler handler = ethTypeToProtocolHandler.get(ethType);
545                 final String monitoringKey = handler.getUniqueMonitoringKey(monitoringInfo);
546
547                 MonitoringState monitoringState = new MonitoringStateBuilder()
548                                                            .setMonitorKey(monitoringKey)
549                                                            .setMonitorId(monitorId)
550                                                            .setState(LivenessState.Unknown)
551                                                            .setStatus(MonitorStatus.Started)
552                                                            .setRequestCount(INITIAL_COUNT)
553                                                            .setResponsePendingCount(INITIAL_COUNT).build();
554
555                 MonitoridKeyEntry mapEntry = new MonitoridKeyEntryBuilder().setMonitorId(monitorId)
556                                                                      .setMonitorKey(monitoringKey).build();
557
558                 WriteTransaction tx = broker.newWriteOnlyTransaction();
559
560                 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId), monitoringInfo, CREATE_MISSING_PARENT);
561                 LOG.debug("adding oper monitoring info {}", monitoringInfo);
562
563                 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitoringKey), monitoringState, CREATE_MISSING_PARENT);
564                 LOG.debug("adding oper monitoring state {}", monitoringState);
565
566                 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId), mapEntry, CREATE_MISSING_PARENT);
567                 LOG.debug("adding oper map entry {}", mapEntry);
568
569                 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
570                     @Override
571                     public void onFailure(Throwable error) {
572                         String errorMsg = String.format("Adding Monitoring info: %s in Datastore failed", monitoringInfo);
573                         LOG.warn(errorMsg, error);
574                         throw new RuntimeException(errorMsg, error);
575                     }
576
577                     @Override
578                     public void onSuccess(Void noarg) {
579                         //Schedule task
580                         LOG.debug("Scheduling monitor task for config: {}", in);
581                         scheduleMonitoringTask(monitoringInfo, profile.getMonitorInterval());
582                         lockMap.put(monitoringKey, new Semaphore(1, true));
583                     }
584                 });
585             }
586
587             associateMonitorIdWithInterface(monitorId, interfaceName);
588
589             MonitorStartOutput output = new MonitorStartOutputBuilder()
590                                             .setMonitorId(monitorId).build();
591
592             rpcResultBuilder = RpcResultBuilder.success(output);
593         } catch(Exception e) {
594             LOG.error("Start Monitoring Failed. {}", e.getMessage(), e);
595             rpcResultBuilder = RpcResultBuilder.<MonitorStartOutput>failed().withError(ErrorType.APPLICATION, e.getMessage(), e);
596         }
597         return Futures.immediateFuture(rpcResultBuilder.build());
598     }
599
600     private void associateMonitorIdWithInterface(final Long monitorId, final String interfaceName) {
601         LOG.debug("associate monitor Id {} with interface {}", monitorId, interfaceName);
602         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
603         ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture =
604                                         tx.read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
605         ListenableFuture<Void> updateFuture =
606                 Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
607
608                     @Override
609                     public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
610                         if(optEntry.isPresent()) {
611                             InterfaceMonitorEntry entry = optEntry.get();
612                             List<Long> monitorIds = entry.getMonitorIds();
613                             monitorIds.add(monitorId);
614                             InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder()
615                                         .setKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
616                             tx.merge(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry);
617                         } else {
618                             //Create new monitor entry
619                             LOG.debug("Adding new interface-monitor association for interface {} with id {}", interfaceName, monitorId);
620                             List<Long> monitorIds = new ArrayList<>();
621                             monitorIds.add(monitorId);
622                             InterfaceMonitorEntry newEntry =
623                                     new InterfaceMonitorEntryBuilder().setInterfaceName(interfaceName).setMonitorIds(monitorIds).build();
624                             tx.put(LogicalDatastoreType.OPERATIONAL,
625                                                   getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
626                         }
627                         return tx.submit();
628                     }
629                 });
630
631         Futures.addCallback(updateFuture, new FutureCallbackImpl(
632                      String.format("Association of monitorId %d with Interface %s", monitorId, interfaceName)));
633     }
634
635     private void scheduleMonitoringTask(MonitoringInfo monitoringInfo, long monitorInterval) {
636         AlivenessMonitorTask monitorTask = new AlivenessMonitorTask(monitoringInfo);
637         ScheduledFuture<?> scheduledFutureResult = monitorService.scheduleAtFixedRate(
638                                   monitorTask, NO_DELAY, monitorInterval, TimeUnit.MILLISECONDS);
639         monitoringTasks.put(monitoringInfo.getId(), scheduledFutureResult);
640     }
641
642     @Override
643     public Future<RpcResult<Void>> monitorPause(MonitorPauseInput input) {
644         LOG.debug("Monitor Pause operation invoked for monitor id: {}", input.getMonitorId());
645         SettableFuture<RpcResult<Void>> result = SettableFuture.create();
646         final Long monitorId = input.getMonitorId();
647
648         //Set the monitoring status to Paused
649         updateMonitorStatusTo(monitorId, MonitorStatus.Paused, new Predicate<MonitorStatus>() {
650             @Override
651             public boolean apply(MonitorStatus currentStatus) {
652                 return currentStatus == MonitorStatus.Started;
653             }
654         });
655
656         if(stopMonitoringTask(monitorId)) {
657             result.set(RpcResultBuilder.<Void>success().build());
658         } else {
659             String errorMsg = String.format("No Monitoring Task availble to pause for the given monitor id : %d", monitorId);
660             LOG.error("Monitor Pause operation failed- {}",errorMsg);
661             result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
662         }
663
664         return result;
665     }
666
667     @Override
668     public Future<RpcResult<Void>> monitorUnpause(MonitorUnpauseInput input) {
669         LOG.debug("Monitor Unpause operation invoked for monitor id: {}", input.getMonitorId());
670         final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
671
672         final Long monitorId = input.getMonitorId();
673         final ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
674         ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
675                 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
676
677         Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
678
679             @Override
680             public void onFailure(Throwable error) {
681                 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
682                 LOG.error("Monitor unpause Failed. {}", msg, error);
683                 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
684             }
685
686             @Override
687             public void onSuccess(Optional<MonitoringInfo> optInfo) {
688                 if(optInfo.isPresent()) {
689                     final MonitoringInfo info = optInfo.get();
690                     ListenableFuture<Optional<MonitorProfile>> readProfile =
691                             tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
692                     Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
693
694                         @Override
695                         public void onFailure(Throwable error) {
696                             String msg = String.format("Unable to read Monitoring profile associated with id %d", info.getProfileId());
697                             LOG.warn("Monitor unpause Failed. {}", msg, error);
698                             result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
699                         }
700
701                         @Override
702                         public void onSuccess(Optional<MonitorProfile> optProfile) {
703                             tx.close();
704                             if(optProfile.isPresent()) {
705                                 updateMonitorStatusTo(monitorId, MonitorStatus.Started, new Predicate<MonitorStatus>() {
706                                     @Override
707                                     public boolean apply(MonitorStatus currentStatus) {
708                                         return (currentStatus == MonitorStatus.Paused ||
709                                                     currentStatus == MonitorStatus.Stopped);
710                                     }
711                                 });
712                                 MonitorProfile profile = optProfile.get();
713                                 LOG.debug("Monitor Resume - Scheduling monitoring task with Id: {}", monitorId);
714                                 scheduleMonitoringTask(info, profile.getMonitorInterval());
715                                 result.set(RpcResultBuilder.<Void>success().build());
716                             } else {
717                                 String msg = String.format("Monitoring profile associated with id %d is not present", info.getProfileId());
718                                 LOG.warn("Monitor unpause Failed. {}", msg);
719                                 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
720                             }
721                         }
722                     });
723                 } else {
724                     tx.close();
725                     String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
726                     LOG.warn("Monitor unpause Failed. {}", msg);
727                     result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
728                 }
729             }
730         }, callbackExecutorService);
731
732         return result;
733     }
734
735     private boolean stopMonitoringTask(Long monitorId) {
736         return stopMonitoringTask(monitorId, INTERRUPT_TASK);
737     }
738
739     private boolean stopMonitoringTask(Long monitorId, boolean interruptTask) {
740         ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
741         if(scheduledFutureResult != null) {
742             scheduledFutureResult.cancel(interruptTask);
743             return true;
744         }
745         return false;
746     }
747
748     private Optional<MonitorProfile> getMonitorProfile(Long profileId) {
749         return read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
750     }
751
752     private void acquireLock(Semaphore lock) {
753         if(lock == null) {
754             return;
755         }
756
757         boolean acquiredLock = false;
758         try {
759             acquiredLock = lock.tryAcquire(50, TimeUnit.MILLISECONDS);
760         } catch (InterruptedException e) {
761             LOG.warn("Thread interrupted when waiting to acquire the lock");
762         }
763
764         if(!acquiredLock) {
765             LOG.warn("Previous transaction did not complete in time. Releasing the lock to proceed");
766             lock.release();
767             try {
768                 lock.acquire();
769                 LOG.trace("Lock acquired successfully");
770             } catch (InterruptedException e) {
771                 LOG.warn("Acquire failed");
772             }
773         } else {
774             LOG.trace("Lock acquired successfully");
775         }
776     }
777
778     private void releaseLock(Semaphore lock) {
779         if(lock != null) {
780             lock.release();
781         }
782     }
783
784     private void sendMonitorPacket(final MonitoringInfo monitoringInfo) {
785         //TODO: Handle interrupts
786         final Long monitorId = monitoringInfo.getId();
787         final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
788         if(monitorKey == null) {
789             LOG.warn("No monitor Key associated with id {} to send the monitor packet", monitorId);
790             return;
791         } else {
792             LOG.debug("Sending monitoring packet for key: {}", monitorKey);
793         }
794
795         final MonitorProfile profile;
796         Optional<MonitorProfile> optProfile = getMonitorProfile(monitoringInfo.getProfileId());
797         if(optProfile.isPresent()) {
798             profile = optProfile.get();
799         } else {
800             LOG.warn("No monitor profile associated with id {}. "
801                     + "Could not send Monitor packet for monitor-id {}", monitoringInfo.getProfileId(), monitorId);
802             return;
803         }
804
805         final Semaphore lock = lockMap.get(monitorKey);
806         LOG.debug("Acquiring lock for monitor key : {} to send monitor packet", monitorKey);
807         acquireLock(lock);
808
809         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
810         ListenableFuture<Optional<MonitoringState>> readResult =
811                                   tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
812         ListenableFuture<Void> writeResult = Futures.transform(readResult, new AsyncFunction<Optional<MonitoringState>, Void>() {
813
814             @Override
815             public ListenableFuture<Void> apply(Optional<MonitoringState> optState)
816                     throws Exception {
817                 if(optState.isPresent()) {
818                     MonitoringState state = optState.get();
819
820                     //Increase the request count
821                     Long requestCount = state.getRequestCount() + 1;
822
823                     //Check with the monitor window
824                     LivenessState currentLivenessState = state.getState();
825
826                     //Increase the pending response count
827                     long responsePendingCount = state.getResponsePendingCount();
828                     if(responsePendingCount < profile.getMonitorWindow()) {
829                         responsePendingCount = responsePendingCount + 1;
830                     }
831
832                     //Check with the failure thresold
833                     if(responsePendingCount >= profile.getFailureThreshold()) {
834                         //Change the state to down and notify
835                         if(currentLivenessState != LivenessState.Down) {
836                             LOG.debug("Response pending Count: {}, Failure threshold: {} for monitorId {}",
837                                     responsePendingCount, profile.getFailureThreshold(), state.getMonitorId());
838                             LOG.info("Sending notification for monitor Id : {} with State: {}",  
839                                     state.getMonitorId(), LivenessState.Down);
840                             publishNotification(monitorId, LivenessState.Down);
841                             currentLivenessState = LivenessState.Down;
842                             //Reset requestCount when state changes from UP to DOWN
843                             requestCount = INITIAL_COUNT;
844                         }
845                     }
846
847                     //Update the ODS with state
848                     MonitoringState updatedState = new MonitoringStateBuilder(/*state*/).setMonitorKey(state.getMonitorKey())
849                                                                                     .setRequestCount(requestCount)
850                                                                                     .setResponsePendingCount(responsePendingCount)
851                                                                                     .setState(currentLivenessState).build();
852                     tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(state.getMonitorKey()), updatedState);
853                     return tx.submit();
854                 } else {
855                     //Close the transaction
856                     tx.submit();
857                     String errorMsg = String.format("Monitoring State associated with id %d is not present to send packet out.", monitorId);
858                     return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
859                 }
860             }
861
862         });
863
864         Futures.addCallback(writeResult, new FutureCallback<Void>() {
865             @Override
866             public void onSuccess(Void noarg) {
867                 //invoke packetout on protocol handler
868                 AlivenessProtocolHandler handler = ethTypeToProtocolHandler.get(profile.getProtocolType());
869                 if(handler != null) {
870                     LOG.debug("Sending monitoring packet {}", monitoringInfo);
871                     handler.sendPacketOut(monitoringInfo);
872                 }
873                 releaseLock(lock);
874             }
875
876             @Override
877             public void onFailure(Throwable error) {
878                 LOG.warn("Updating monitoring state for key: {} failed. Monitoring packet is not sent", monitorKey, error);
879                 releaseLock(lock);
880             }
881
882         });
883
884     }
885
886     private void publishNotification(final Long monitorId, final LivenessState state) {
887         LOG.debug("Sending notification for id {}  - state {}", monitorId, state);
888         EventData data = new EventDataBuilder().setMonitorId(monitorId)
889                                                .setMonitorState(state).build();
890         MonitorEvent event = new MonitorEventBuilder().setEventData(data).build();;
891         final ListenableFuture<? extends Object> eventFuture = notificationPublishService.offerNotification(event);
892         Futures.addCallback(eventFuture, new FutureCallback<Object>() {
893             @Override
894             public void onFailure(Throwable error) {
895                 LOG.warn("Error in notifying listeners for id {} - state {}", monitorId, state, error);
896             }
897
898             @Override
899             public void onSuccess(Object arg) {
900                 LOG.trace("Successful in notifying listeners for id {} - state {}", monitorId, state);
901             }
902         });
903     }
904
905     @Override
906     public Future<RpcResult<MonitorProfileCreateOutput>> monitorProfileCreate(final MonitorProfileCreateInput input) {
907         LOG.debug("Monitor Profile Create operation - {}", input.getProfile());
908         final SettableFuture<RpcResult<MonitorProfileCreateOutput>> result = SettableFuture.create();
909         Profile profile = input.getProfile();
910         final Long failureThreshold = profile.getFailureThreshold();
911         final Long monitorInterval = profile.getMonitorInterval();
912         final Long monitorWindow = profile.getMonitorWindow();
913         final EtherTypes ethType = profile.getProtocolType();
914         String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, ethType);
915         final Long profileId = Long.valueOf(getUniqueId(idKey));
916
917         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
918         ListenableFuture<Optional<MonitorProfile>> readFuture =
919                                    tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
920         ListenableFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture =
921                 Futures.transform(readFuture, new AsyncFunction<Optional<MonitorProfile>, RpcResult<MonitorProfileCreateOutput>>() {
922
923                     @Override
924                     public ListenableFuture<RpcResult<MonitorProfileCreateOutput>> apply(
925                             Optional<MonitorProfile> optProfile) throws Exception {
926                         if(optProfile.isPresent()) {
927                             tx.cancel();
928                             MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
929                                                                             .setProfileId(profileId).build();
930                             String msg = String.format("Monitor profile %s already present for the given input", input);
931                             LOG.warn(msg);
932                             result.set(RpcResultBuilder.success(output)
933                                          .withWarning(ErrorType.PROTOCOL, "profile-exists", msg).build());
934                         } else {
935                             final MonitorProfile monitorProfile = new MonitorProfileBuilder().setId(profileId)
936                                                                                        .setFailureThreshold(failureThreshold)
937                                                                                        .setMonitorInterval(monitorInterval)
938                                                                                        .setMonitorWindow(monitorWindow)
939                                                                                        .setProtocolType(ethType).build();
940                             tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId), monitorProfile, CREATE_MISSING_PARENT);
941                             Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
942                                 @Override
943                                 public void onFailure(Throwable error) {
944                                     String msg =
945                                             String.format("Error when storing monitorprofile %s in datastore", monitorProfile);
946                                     LOG.error(msg, error);
947                                     result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
948                                                 .withError(ErrorType.APPLICATION, msg, error).build());
949                                 }
950                                 @Override
951                                 public void onSuccess(Void noarg) {
952                                     MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
953                                                                               .setProfileId(profileId).build();
954                                     result.set(RpcResultBuilder.success(output).build());
955                                 }
956                             });
957                         }
958                         return result;
959                     }
960                 }, callbackExecutorService);
961         Futures.addCallback(resultFuture, new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
962             @Override
963             public void onFailure(Throwable error) {
964                 //This would happen when any error happens during reading for monitoring profile
965                 String msg = String.format("Error in creating monitorprofile - %s", input);
966                 result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
967                             .withError(ErrorType.APPLICATION, msg, error).build());
968                 LOG.error(msg, error);
969             }
970
971             @Override
972             public void onSuccess(RpcResult<MonitorProfileCreateOutput> result) {
973                 LOG.debug("Successfully created monitor Profile {} ", input);
974             }
975         }, callbackExecutorService);
976         return result;
977     }
978
979
980     @Override
981     public Future<RpcResult<MonitorProfileGetOutput>> monitorProfileGet(MonitorProfileGetInput input){
982         LOG.debug("Monitor Profile Get operation for input profile- {}", input.getProfile());
983         RpcResultBuilder<MonitorProfileGetOutput> rpcResultBuilder;
984         try{
985             final Long profileId = getExistingProfileId(input);
986
987             MonitorProfileGetOutputBuilder output = new MonitorProfileGetOutputBuilder().setProfileId(profileId);
988             rpcResultBuilder = RpcResultBuilder.success();
989             rpcResultBuilder.withResult(output.build());
990         }catch(Exception e){
991             LOG.error("Retrieval of monitor profile ID for input {} failed due to {}" , input, e);
992             rpcResultBuilder = RpcResultBuilder.failed();
993         }
994         return Futures.immediateFuture(rpcResultBuilder.build());
995     }
996
997     private Long getExistingProfileId(MonitorProfileGetInput input){
998         org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profile.get.input.Profile profile = input.getProfile();
999         final Long failureThreshold = profile.getFailureThreshold();
1000         final Long monitorInterval = profile.getMonitorInterval();
1001         final Long monitorWindow = profile.getMonitorWindow();
1002         final EtherTypes ethType = profile.getProtocolType();
1003         LOG.debug("getExistingProfileId for profile : {}", input.getProfile());
1004         String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, ethType);
1005         LOG.debug("Obtained existing profile ID for profile : {}", input.getProfile());
1006         return (Long.valueOf(getUniqueId(idKey)));
1007     }
1008
1009     private String getUniqueProfileKey(Long failureThreshold,Long monitorInterval,Long monitorWindow,EtherTypes ethType) {
1010         return new StringBuilder().append(failureThreshold).append(AlivenessMonitorConstants.SEPERATOR)
1011                                   .append(monitorInterval).append(AlivenessMonitorConstants.SEPERATOR)
1012                                   .append(monitorWindow).append(AlivenessMonitorConstants.SEPERATOR)
1013                                   .append(ethType).append(AlivenessMonitorConstants.SEPERATOR).toString();
1014     }
1015
1016     @Override
1017     public Future<RpcResult<Void>> monitorProfileDelete(final MonitorProfileDeleteInput input) {
1018         LOG.debug("Monitor Profile delete for Id: {}", input.getProfileId());
1019         final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
1020         final Long profileId = input.getProfileId();
1021         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
1022         ListenableFuture<Optional<MonitorProfile>> readFuture =
1023                                    tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
1024         ListenableFuture<RpcResult<Void>> writeFuture =
1025                    Futures.transform(readFuture, new AsyncFunction<Optional<MonitorProfile>, RpcResult<Void>>() {
1026
1027                         @Override
1028                         public ListenableFuture<RpcResult<Void>> apply(final Optional<MonitorProfile> optProfile) throws Exception {
1029                             if(optProfile.isPresent()) {
1030                                 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
1031                                 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
1032                                     @Override
1033                                     public void onFailure(Throwable error) {
1034                                         String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
1035                                         LOG.error(msg, error);
1036                                         result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
1037                                     }
1038
1039                                     @Override
1040                                     public void onSuccess(Void noarg) {
1041                                         MonitorProfile profile = optProfile.get();
1042                                         String id = getUniqueProfileKey(profile.getFailureThreshold(), profile.getMonitorInterval(), 
1043                                                                                  profile.getMonitorWindow(), profile.getProtocolType());
1044                                         releaseId(id);
1045                                         result.set(RpcResultBuilder.<Void>success().build());
1046                                     }
1047                                 });
1048                             } else {
1049                                 String msg = String.format("Monitor profile with Id: %d does not exist", profileId);
1050                                 LOG.info(msg);
1051                                 result.set(RpcResultBuilder.<Void>success().withWarning(ErrorType.PROTOCOL, "invalid-value", msg).build());
1052                             }
1053                             return result;
1054                         }
1055                     }, callbackExecutorService);
1056
1057         Futures.addCallback(writeFuture, new FutureCallback<RpcResult<Void>>() {
1058
1059             @Override
1060             public void onFailure(Throwable error) {
1061                 String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
1062                 LOG.error(msg, error);
1063                 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
1064             }
1065
1066             @Override
1067             public void onSuccess(RpcResult<Void> noarg) {
1068                 LOG.debug("Successfully removed Monitor Profile {}", profileId);
1069             }
1070         }, callbackExecutorService);
1071         return result;
1072     }
1073
1074     @Override
1075     public Future<RpcResult<Void>> monitorStop(MonitorStopInput input) {
1076         LOG.debug("Monitor Stop operation for monitor id - {}", input.getMonitorId());
1077         SettableFuture<RpcResult<Void>> result = SettableFuture.create();
1078
1079         final Long monitorId = input.getMonitorId();
1080         Optional<MonitoringInfo> optInfo = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1081         if(optInfo.isPresent()) {
1082             //Stop the monitoring task
1083             stopMonitoringTask(monitorId);
1084
1085             //Cleanup the Data store
1086             WriteTransaction tx = broker.newWriteOnlyTransaction();
1087             String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1088             if(monitorKey != null) {
1089                 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
1090                 monitorIdKeyCache.invalidate(monitorId);
1091             }
1092
1093             tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1094             Futures.addCallback(tx.submit(),
1095                     new FutureCallbackImpl(String.format("Delete monitor state with Id %d", monitorId)));
1096
1097             MonitoringInfo info = optInfo.get();
1098             String interfaceName = getInterfaceName(info.getSource().getEndpointType());
1099             if(interfaceName != null) {
1100                 removeMonitorIdFromInterfaceAssociation(monitorId, interfaceName);
1101             }
1102             releaseIdForMonitoringInfo(info);
1103
1104             lockMap.remove(monitorKey);
1105
1106             result.set(RpcResultBuilder.<Void>success().build());
1107         } else {
1108             String errorMsg = String.format("Do not have monitoring information associated with key %d", monitorId);
1109             LOG.error("Delete monitoring operation Failed - {}", errorMsg);
1110             result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
1111         }
1112
1113         return result;
1114     }
1115
1116     private void removeMonitorIdFromInterfaceAssociation(final Long monitorId, final String interfaceName) {
1117         LOG.debug("Remove monitorId {} from Interface association {}", monitorId, interfaceName);
1118         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
1119         ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
1120         ListenableFuture<Void> updateFuture = Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
1121
1122             @Override
1123             public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
1124                 if(optEntry.isPresent()) {
1125                     InterfaceMonitorEntry entry = optEntry.get();
1126                     List<Long> monitorIds = entry.getMonitorIds();
1127                     monitorIds.remove(monitorId);
1128                     InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder(entry)
1129                                        .setKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
1130                     tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
1131                     return tx.submit();
1132                 } else {
1133                     LOG.warn("No Interface map entry found {} to remove monitorId {}", interfaceName, monitorId);
1134                     tx.cancel();
1135                     return Futures.immediateFuture(null);
1136                 }
1137             }
1138         });
1139
1140         Futures.addCallback(updateFuture, new FutureCallbackImpl(
1141                      String.format("Dis-association of monitorId %d with Interface %s", monitorId, interfaceName)));
1142     }
1143
1144
1145     private void releaseIdForMonitoringInfo(MonitoringInfo info) {
1146         Long monitorId = info.getId();
1147         EndpointType source = info.getSource().getEndpointType();
1148         String interfaceName = getInterfaceName(source);
1149         if(!Strings.isNullOrEmpty(interfaceName)) {
1150             Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1151             if(optProfile.isPresent()) {
1152                 EtherTypes ethType = optProfile.get().getProtocolType();
1153                 EndpointType destination = (info.getDestination() != null) ? info.getDestination().getEndpointType() : null; 
1154                 String idKey = getUniqueKey(interfaceName, ethType.toString(), source, destination);
1155                 releaseId(idKey);
1156             } else {
1157                 LOG.warn("Could not release monitorId {}. No profile associated with it", monitorId);
1158             }
1159         }
1160     }
1161
1162     private String getInterfaceName(EndpointType endpoint) {
1163         String interfaceName = null;
1164         if(endpoint instanceof Interface) {
1165             interfaceName = ((Interface)endpoint).getInterfaceName();
1166         }
1167         return interfaceName;
1168     }
1169
1170     private void stopMonitoring(long monitorId) {
1171         updateMonitorStatusTo(monitorId, MonitorStatus.Stopped, new Predicate<MonitorStatus>() {
1172             @Override
1173             public boolean apply(MonitorStatus currentStatus) {
1174                 return currentStatus != MonitorStatus.Stopped;
1175             }
1176         });
1177         if(!stopMonitoringTask(monitorId)) {
1178             LOG.warn("No monitoring task running to perform cancel operation for monitorId {}", monitorId);
1179         }
1180     }
1181
1182     private void updateMonitorStatusTo(final Long monitorId, final MonitorStatus newStatus, final Predicate<MonitorStatus> isValidStatus) {
1183         final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1184         if(monitorKey == null) {
1185             LOG.warn("No monitor Key associated with id {} to change the monitor status to {}", monitorId, newStatus);
1186             return;
1187         }
1188         final ReadWriteTransaction tx = broker.newReadWriteTransaction();
1189
1190         ListenableFuture<Optional<MonitoringState>> readResult =
1191                             tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
1192
1193         ListenableFuture<Void> writeResult = Futures.transform(readResult, new AsyncFunction<Optional<MonitoringState>, Void>() {
1194             @Override
1195             public ListenableFuture<Void> apply(Optional<MonitoringState> optState) throws Exception {
1196                 if(optState.isPresent()) {
1197                     MonitoringState state = optState.get();
1198                     if(isValidStatus.apply(state.getStatus())) {
1199                         MonitoringState updatedState = new MonitoringStateBuilder().setMonitorKey(monitorKey)
1200                                                                               .setStatus(newStatus).build();
1201                         tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), updatedState);
1202                     } else {
1203                         LOG.warn("Invalid Monitoring status {}, cannot be updated to {} for monitorId {}"
1204                                                                     , state.getStatus(), newStatus, monitorId);
1205                     }
1206                 } else {
1207                     LOG.warn("No associated monitoring state data available to update the status to {} for {}", newStatus, monitorId);
1208                 }
1209                 return tx.submit();
1210             }
1211         });
1212
1213         Futures.addCallback(writeResult,
1214                         new FutureCallbackImpl(String.format("Monitor status update for %d to %s", monitorId, newStatus.toString())));
1215     }
1216
1217     private void resumeMonitoring(final long monitorId) {
1218         final ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
1219         ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
1220                 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1221
1222         Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
1223
1224             @Override
1225             public void onFailure(Throwable error) {
1226                 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
1227                 LOG.error("Monitor resume Failed. {}", msg, error);
1228             }
1229
1230             @Override
1231             public void onSuccess(Optional<MonitoringInfo> optInfo) {
1232                 if(optInfo.isPresent()) {
1233                     final MonitoringInfo info = optInfo.get();
1234                     ListenableFuture<Optional<MonitorProfile>> readProfile =
1235                             tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1236                     Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
1237
1238                         @Override
1239                         public void onFailure(Throwable error) {
1240                             String msg = String.format("Unable to read Monitoring profile associated with id %d", info.getProfileId());
1241                             LOG.warn("Monitor resume Failed. {}", msg, error);
1242                         }
1243
1244                         @Override
1245                         public void onSuccess(Optional<MonitorProfile> optProfile) {
1246                             tx.close();
1247                             if(optProfile.isPresent()) {
1248                                 updateMonitorStatusTo(monitorId, MonitorStatus.Started, new Predicate<MonitorStatus>() {
1249                                     @Override
1250                                     public boolean apply(MonitorStatus currentStatus) {
1251                                         return currentStatus != MonitorStatus.Started;
1252                                     }
1253                                 });
1254                                 MonitorProfile profile = optProfile.get();
1255                                 LOG.debug("Monitor Resume - Scheduling monitoring task for Id: {}", monitorId);
1256                                 scheduleMonitoringTask(info, profile.getMonitorInterval());
1257                             } else {
1258                                 String msg = String.format("Monitoring profile associated with id %d is not present", info.getProfileId());
1259                                 LOG.warn("Monitor resume Failed. {}", msg);
1260                             }
1261                         }
1262                     });
1263                 } else {
1264                     tx.close();
1265                     String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
1266                     LOG.warn("Monitor resume Failed. {}", msg);
1267                 }
1268             }
1269         });
1270     }
1271
1272     //DATA STORE OPERATIONS
1273     private <T extends DataObject> Optional<T> read(LogicalDatastoreType datastoreType, InstanceIdentifier<T> path) {
1274         ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
1275
1276         Optional<T> result = Optional.absent();
1277         try {
1278             result = tx.read(datastoreType, path).get();
1279         } catch (InterruptedException | ExecutionException e) {
1280             LOG.warn("Error reading data from path {} in datastore {}", path, datastoreType, e);
1281         } finally {
1282             tx.close();
1283         }
1284
1285         return result;
1286     }
1287
1288     @Override
1289     public void onInterfaceStateUp(String interfaceName) {
1290         List<Long> monitorIds = getMonitorIds(interfaceName);
1291         if(monitorIds.isEmpty()) {
1292             LOG.warn("Could not get monitorId for interface: {}", interfaceName);
1293             return;
1294         }
1295         for(Long monitorId : monitorIds) {
1296             LOG.debug("Resume monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1297             resumeMonitoring(monitorId);
1298         }
1299     }
1300
1301     @Override
1302     public void onInterfaceStateDown(String interfaceName) {
1303         List<Long> monitorIds = getMonitorIds(interfaceName);
1304         if(monitorIds.isEmpty()) {
1305             LOG.warn("Could not get monitorIds for interface: {}", interfaceName);
1306             return;
1307         }
1308         for(Long monitorId : monitorIds) {
1309             LOG.debug("Suspend monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1310             stopMonitoring(monitorId);
1311         }
1312     }
1313
1314     private List<Long> getMonitorIds(String interfaceName) {
1315         Optional<InterfaceMonitorEntry> optEntry = read(LogicalDatastoreType.OPERATIONAL,
1316                                                            getInterfaceMonitorMapId(interfaceName));
1317         if(optEntry.isPresent()) {
1318             InterfaceMonitorEntry entry = optEntry.get();
1319             return entry.getMonitorIds();
1320         }
1321         return Collections.emptyList();
1322     }
1323
1324 }