Handle nullable lists
[genius.git] / alivenessmonitor / alivenessmonitor-impl / src / main / java / org / opendaylight / genius / alivenessmonitor / internal / AlivenessMonitor.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.alivenessmonitor.internal;
9
10 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getInterfaceMonitorMapId;
11 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorMapId;
12 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorProfileId;
13 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorStateId;
14 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitoringInfoId;
15 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.nullToEmpty;
16 import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
17
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import com.google.common.base.Predicate;
21 import com.google.common.base.Strings;
22 import com.google.common.cache.CacheBuilder;
23 import com.google.common.cache.CacheLoader;
24 import com.google.common.cache.LoadingCache;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.SettableFuture;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.ScheduledExecutorService;
39 import java.util.concurrent.ScheduledFuture;
40 import java.util.concurrent.Semaphore;
41 import java.util.concurrent.TimeUnit;
42 import javax.annotation.Nonnull;
43 import javax.annotation.PreDestroy;
44 import javax.inject.Inject;
45 import javax.inject.Singleton;
46 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
47 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
48 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
49 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
50 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
51 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandler;
52 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
53 import org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil;
54 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
55 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
56 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
57 import org.opendaylight.genius.mdsalutil.packet.Ethernet;
58 import org.opendaylight.genius.mdsalutil.packet.utils.PacketUtil;
59 import org.opendaylight.infrautils.utils.concurrent.Executors;
60 import org.opendaylight.openflowplugin.libraries.liblldp.NetUtils;
61 import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
62 import org.opendaylight.openflowplugin.libraries.liblldp.PacketException;
63 import org.opendaylight.serviceutils.tools.mdsal.listener.AbstractClusteredSyncDataTreeChangeListener;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.AlivenessMonitorService;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.LivenessState;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEvent;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEventBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseInput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseOutput;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateOutput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateOutputBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileDeleteInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileDeleteOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetInput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetOutput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetOutputBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProtocolType;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartInput;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartOutput;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartOutputBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStatus;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStopInput;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStopOutput;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorUnpauseInput;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorUnpauseOutput;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitoringMode;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitoringStates;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntry;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntryBuilder;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntryKey;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.EndpointType;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.endpoint.type.Interface;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.configs.MonitoringInfo;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.configs.MonitoringInfoBuilder;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.event.EventData;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.event.EventDataBuilder;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profile.create.input.Profile;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profiles.MonitorProfile;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profiles.MonitorProfileBuilder;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.start.input.Config;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitorid.key.map.MonitoridKeyEntry;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitorid.key.map.MonitoridKeyEntryBuilder;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringState;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringStateBuilder;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInputBuilder;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInputBuilder;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutput;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInputBuilder;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutput;
116 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInReason;
117 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
118 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
119 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
120 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
121 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
122 import org.opendaylight.yangtools.yang.common.RpcResult;
123 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
124 import org.slf4j.Logger;
125 import org.slf4j.LoggerFactory;
126
127 @Singleton
128 public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListener<MonitoringState>
129         implements AlivenessMonitorService, PacketProcessingListener, InterfaceStateListener {
130
131     private static final Logger LOG = LoggerFactory.getLogger(AlivenessMonitor.class);
132
133     private static final int THREAD_POOL_SIZE = 4;
134     private static final boolean INTERRUPT_TASK = true;
135     private static final int NO_DELAY = 0;
136     private static final Long INITIAL_COUNT = 0L;
137     private static final boolean CREATE_MISSING_PARENT = true;
138     private static final int INVALID_ID = 0;
139
140     private static class FutureCallbackImpl implements FutureCallback<Object> {
141         private final String message;
142
143         FutureCallbackImpl(String message) {
144             this.message = message;
145         }
146
147         @Override
148         public void onFailure(Throwable error) {
149             LOG.warn("Error in Datastore operation - {}", message, error);
150         }
151
152         @Override
153         public void onSuccess(Object result) {
154             LOG.debug("Success in Datastore operation - {}", message);
155         }
156     }
157
158     private class AlivenessMonitorTask implements Runnable {
159         private final MonitoringInfo monitoringInfo;
160
161         AlivenessMonitorTask(MonitoringInfo monitoringInfo) {
162             this.monitoringInfo = monitoringInfo;
163         }
164
165         @Override
166         public void run() {
167             LOG.trace("send monitor packet - {}", monitoringInfo);
168             sendMonitorPacket(monitoringInfo);
169         }
170     }
171
172     private final DataBroker dataBroker;
173     private final ManagedNewTransactionRunner txRunner;
174     private final IdManagerService idManager;
175     private final NotificationPublishService notificationPublishService;
176     private final AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry;
177     private final ConcurrentMap<Long, ScheduledFuture<?>> monitoringTasks = new ConcurrentHashMap<>();
178     private final ScheduledExecutorService monitorService;
179     private final ExecutorService callbackExecutorService;
180     private final LoadingCache<Long, String> monitorIdKeyCache;
181     private final ConcurrentMap<String, Semaphore> lockMap = new ConcurrentHashMap<>();
182
183     @Inject
184     public AlivenessMonitor(final DataBroker dataBroker, final IdManagerService idManager,
185             final NotificationPublishService notificationPublishService,
186             AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry) {
187         super(dataBroker, LogicalDatastoreType.OPERATIONAL,
188                 InstanceIdentifier.create(MonitoringStates.class).child(MonitoringState.class));
189         this.dataBroker = dataBroker;
190         this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
191         this.idManager = idManager;
192         this.notificationPublishService = notificationPublishService;
193         this.alivenessProtocolHandlerRegistry = alivenessProtocolHandlerRegistry;
194
195         monitorService = Executors.newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Aliveness Monitoring Task", LOG);
196         callbackExecutorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE, "Aliveness Callback Handler", LOG);
197
198         createIdPool();
199         monitorIdKeyCache = CacheBuilder.newBuilder().build(new CacheLoader<Long, String>() {
200             @Override
201             public String load(@Nonnull Long monitorId) {
202                 return SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
203                         dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId))
204                         .toJavaUtil().map(MonitoridKeyEntry::getMonitorKey).orElse(null);
205             }
206         });
207
208         LOG.info("{} started", getClass().getSimpleName());
209     }
210
211     @Override
212     @PreDestroy
213     public void close() {
214         monitorIdKeyCache.cleanUp();
215         monitorService.shutdown();
216         callbackExecutorService.shutdown();
217         LOG.info("{} close", getClass().getSimpleName());
218     }
219
220     Semaphore getLock(String key) {
221         return lockMap.get(key);
222     }
223
224     private void createIdPool() {
225         CreateIdPoolInput createPool = new CreateIdPoolInputBuilder()
226                 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
227                 .setLow(AlivenessMonitorConstants.MONITOR_IDPOOL_START)
228                 .setHigh(AlivenessMonitorConstants.MONITOR_IDPOOL_SIZE).build();
229         Futures.addCallback(idManager.createIdPool(createPool), new FutureCallback<RpcResult<CreateIdPoolOutput>>() {
230             @Override
231             public void onFailure(Throwable error) {
232                 LOG.error("Failed to create idPool for Aliveness Monitor Service", error);
233             }
234
235             @Override
236             public void onSuccess(@Nonnull RpcResult<CreateIdPoolOutput> result) {
237                 if (result.isSuccessful()) {
238                     LOG.debug("Created IdPool for Aliveness Monitor Service");
239                 } else {
240                     LOG.error("RPC to create Idpool failed {}", result.getErrors());
241                 }
242             }
243         }, callbackExecutorService);
244     }
245
246     private int getUniqueId(final String idKey) {
247         AllocateIdInput getIdInput = new AllocateIdInputBuilder()
248                 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME).setIdKey(idKey).build();
249
250         Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(getIdInput);
251
252         try {
253             RpcResult<AllocateIdOutput> rpcResult = result.get();
254             if (rpcResult.isSuccessful()) {
255                 return rpcResult.getResult().getIdValue().intValue();
256             } else {
257                 LOG.warn("RPC Call to Get Unique Id returned with Errors {}", rpcResult.getErrors());
258             }
259         } catch (InterruptedException | ExecutionException e) {
260             LOG.warn("Exception when getting Unique Id for key {}", idKey, e);
261         }
262         return INVALID_ID;
263     }
264
265     private void releaseId(String idKey) {
266         ReleaseIdInput idInput = new ReleaseIdInputBuilder().setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
267                 .setIdKey(idKey).build();
268         try {
269             ListenableFuture<RpcResult<ReleaseIdOutput>> result = idManager.releaseId(idInput);
270             RpcResult<ReleaseIdOutput> rpcResult = result.get();
271             if (!rpcResult.isSuccessful()) {
272                 LOG.warn("RPC Call to release Id {} returned with Errors {}", idKey, rpcResult.getErrors());
273             }
274         } catch (InterruptedException | ExecutionException e) {
275             LOG.warn("Exception when releasing Id for key {}", idKey, e);
276         }
277     }
278
279     @Override
280     public void onPacketReceived(PacketReceived packetReceived) {
281         Class<? extends PacketInReason> pktInReason = packetReceived.getPacketInReason();
282         if (LOG.isTraceEnabled()) {
283             LOG.trace("Packet Received {}", packetReceived);
284         }
285         if (pktInReason != SendToController.class) {
286             return;
287         }
288         byte[] data = packetReceived.getPayload();
289         Packet protocolPacket = null;
290         AlivenessProtocolHandler<Packet> livenessProtocolHandler = null;
291
292         if (!PacketUtil.isIpv6NaPacket(data)) {
293             Packet packetInFormatted;
294             Ethernet res = new Ethernet();
295             try {
296                 packetInFormatted = res.deserialize(data, 0, data.length * NetUtils.NUM_BITS_IN_A_BYTE);
297             } catch (PacketException e) {
298                 LOG.warn("Failed to decode packet: ", e);
299                 return;
300             }
301
302             if (packetInFormatted == null) {
303                 LOG.warn("Failed to deserialize Received Packet from table {}", packetReceived.getTableId().getValue());
304                 return;
305             }
306
307             protocolPacket = packetInFormatted.getPayload();
308             if (protocolPacket == null) {
309                 LOG.trace("Unsupported packet type. Ignoring the packet...");
310                 return;
311             }
312             if (LOG.isTraceEnabled()) {
313                 LOG.trace("onPacketReceived packet: {}, packet class: {}", packetReceived, protocolPacket.getClass());
314             }
315             livenessProtocolHandler = getAlivenessProtocolHandler(protocolPacket.getClass());
316
317         } else if (PacketUtil.isIpv6NaPacket(data)) {
318             livenessProtocolHandler = getAlivenessProtocolHandler(MonitorProtocolType.Ipv6Nd);
319         }
320         if (livenessProtocolHandler == null) {
321             return;
322         }
323
324         String monitorKey = livenessProtocolHandler.handlePacketIn(protocolPacket, packetReceived);
325         if (monitorKey != null) {
326             processReceivedMonitorKey(monitorKey);
327         } else {
328             LOG.debug("No monitorkey associated with received packet");
329         }
330     }
331
332     @SuppressWarnings("unchecked")
333     private AlivenessProtocolHandler<Packet> getAlivenessProtocolHandler(Class<? extends Packet> protocolHandlerClass) {
334         return (AlivenessProtocolHandler<Packet>) alivenessProtocolHandlerRegistry.getOpt(protocolHandlerClass);
335     }
336
337     @SuppressWarnings("unchecked")
338     private AlivenessProtocolHandler<Packet> getAlivenessProtocolHandler(MonitorProtocolType protocolType) {
339         return (AlivenessProtocolHandler<Packet>) alivenessProtocolHandlerRegistry.get(protocolType);
340     }
341
342     private void processReceivedMonitorKey(final String monitorKey) {
343         class Result {
344             final MonitoringState currentState;
345             final MonitoringState state;
346
347             Result(MonitoringState currentState, MonitoringState state) {
348                 this.currentState = currentState;
349                 this.state = state;
350             }
351         }
352
353         Preconditions.checkNotNull(monitorKey, "Monitor Key required to process the state");
354
355         LOG.debug("Processing monitorKey: {} for received packet", monitorKey);
356
357         final Semaphore lock = lockMap.get(monitorKey);
358         LOG.debug("Acquiring lock for monitor key : {} to process monitor packet", monitorKey);
359         acquireLock(lock);
360
361         txRunner.applyWithNewReadWriteTransactionAndSubmit(OPERATIONAL, tx -> {
362             Optional<MonitoringState> optState = tx.read(getMonitorStateId(monitorKey)).get();
363             if (optState.isPresent()) {
364                 final MonitoringState currentState = optState.get();
365
366                 if (LOG.isTraceEnabled()) {
367                     LOG.trace("OnPacketReceived : Monitoring state from ODS : {} ", currentState);
368                 }
369
370                 // Long responsePendingCount = currentState.getResponsePendingCount();
371                 //
372                 // Need to relook at the pending count logic to support N
373                 // out of M scenarios
374                 // if (currentState.getState() != LivenessState.Up) {
375                 // //Reset responsePendingCount when state changes from DOWN
376                 // to UP
377                 // responsePendingCount = INITIAL_COUNT;
378                 // }
379                 //
380                 // if (responsePendingCount > INITIAL_COUNT) {
381                 // responsePendingCount =
382                 // currentState.getResponsePendingCount() - 1;
383                 // }
384                 Long responsePendingCount = INITIAL_COUNT;
385
386                 final MonitoringState state = new MonitoringStateBuilder().setMonitorKey(monitorKey)
387                         .setState(LivenessState.Up).setResponsePendingCount(responsePendingCount).build();
388                 tx.merge(getMonitorStateId(monitorKey), state);
389
390                 return Optional.of(new Result(currentState, state));
391             } else {
392                 if (LOG.isTraceEnabled()) {
393                     LOG.trace("Monitoring State not available for key: {} to process the Packet received",
394                             monitorKey);
395                 }
396                 return Optional.<Result>absent();
397             }
398         }).addCallback(new FutureCallback<Optional<Result>>() {
399             @Override
400             public void onSuccess(Optional<Result> optResult) {
401                 releaseLock(lock);
402                 optResult.toJavaUtil().ifPresent(result -> {
403                     final boolean stateChanged = result.currentState.getState() == LivenessState.Down
404                             || result.currentState.getState() == LivenessState.Unknown;
405                     if (stateChanged) {
406                         // send notifications
407                         if (LOG.isTraceEnabled()) {
408                             LOG.trace("Sending notification for monitor Id : {} with Current State: {}",
409                                     result.currentState.getMonitorId(), LivenessState.Up);
410                         }
411                         publishNotification(result.currentState.getMonitorId(), LivenessState.Up);
412                     } else {
413                         if (LOG.isTraceEnabled()) {
414                             LOG.trace("Successful in writing monitoring state {} to ODS", result.state);
415                         }
416                     }
417                 });
418             }
419
420             @Override
421             public void onFailure(Throwable error) {
422                 releaseLock(lock);
423                 LOG.warn("Error in reading or writing monitoring state : {} to Datastore", monitorKey, error);
424             }
425         }, callbackExecutorService);
426     }
427
428     private String getUniqueKey(String interfaceName, String protocolType, EndpointType source,
429             EndpointType destination) {
430         StringBuilder builder = new StringBuilder().append(interfaceName).append(AlivenessMonitorConstants.SEPERATOR)
431                 .append(protocolType);
432         if (source != null) {
433             builder.append(AlivenessMonitorConstants.SEPERATOR).append(AlivenessMonitorUtil.getIpAddress(source));
434         }
435
436         if (destination != null) {
437             builder.append(AlivenessMonitorConstants.SEPERATOR).append(AlivenessMonitorUtil.getIpAddress(destination));
438         }
439         return builder.toString();
440     }
441
442     @Override
443     public ListenableFuture<RpcResult<MonitorStartOutput>> monitorStart(MonitorStartInput input) {
444         RpcResultBuilder<MonitorStartOutput> rpcResultBuilder;
445         final Config in = input.getConfig();
446         Long profileId = in.getProfileId();
447         LOG.debug("Monitor Start invoked with Config: {}, Profile Id: {}", in, profileId);
448
449         try {
450             if (in.getMode() != MonitoringMode.OneOne) {
451                 throw new UnsupportedConfigException(
452                         "Unsupported Monitoring mode. Currently one-one mode is supported");
453             }
454
455             Optional<MonitorProfile> optProfile =
456                     SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
457                             dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
458             final MonitorProfile profile;
459             if (!optProfile.isPresent()) {
460                 String errMsg = String.format("No monitoring profile associated with Id: %d", profileId);
461                 LOG.error("Monitor start failed. {}", errMsg);
462                 throw new RuntimeException(errMsg);
463             } else {
464                 profile = optProfile.get();
465             }
466
467             final MonitorProtocolType protocolType = profile.getProtocolType();
468
469             String interfaceName = null;
470             EndpointType srcEndpointType = in.getSource().getEndpointType();
471
472             if (srcEndpointType instanceof Interface) {
473                 Interface endPoint = (Interface) srcEndpointType;
474                 interfaceName = endPoint.getInterfaceName();
475             } else {
476                 throw new UnsupportedConfigException(
477                         "Unsupported source Endpoint type. Only Interface Endpoint currently supported for monitoring");
478             }
479
480             if (Strings.isNullOrEmpty(interfaceName)) {
481                 throw new RuntimeException("Interface Name not defined in the source Endpoint");
482             }
483
484             // Initially the support is for one monitoring per interface.
485             // Revisit the retrieving monitor id logic when the multiple
486             // monitoring for same interface is needed.
487             EndpointType destEndpointType = null;
488             if (in.getDestination() != null) {
489                 destEndpointType = in.getDestination().getEndpointType();
490             }
491             String idKey = getUniqueKey(interfaceName, protocolType.toString(), srcEndpointType, destEndpointType);
492             final long monitorId = getUniqueId(idKey);
493             Optional<MonitoringInfo> optKey =
494                     SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
495                             dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
496             final AlivenessProtocolHandler<?> handler;
497             if (optKey.isPresent()) {
498                 String message = String.format(
499                         "Monitoring for the interface %s with this configuration " + "is already registered.",
500                         interfaceName);
501                 LOG.warn("Monitoring for the interface {} with this configuration is already registered.",
502                         interfaceName);
503                 MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
504                 rpcResultBuilder = RpcResultBuilder.success(output).withWarning(ErrorType.APPLICATION,
505                         "config-exists", message);
506                 return Futures.immediateFuture(rpcResultBuilder.build());
507             } else {
508                 // Construct the monitor key
509                 final MonitoringInfo monitoringInfo = new MonitoringInfoBuilder().setId(monitorId).setMode(in.getMode())
510                         .setProfileId(profileId).setDestination(in.getDestination()).setSource(in.getSource()).build();
511                 // Construct the initial monitor state
512                 handler = alivenessProtocolHandlerRegistry.get(protocolType);
513                 final String monitoringKey = handler.getUniqueMonitoringKey(monitoringInfo);
514
515                 MonitoringStateBuilder monitoringStateBuilder =
516                         new MonitoringStateBuilder().setMonitorKey(monitoringKey).setMonitorId(monitorId)
517                                 .setState(LivenessState.Unknown).setStatus(MonitorStatus.Started);
518                 if (protocolType != MonitorProtocolType.Bfd) {
519                     monitoringStateBuilder.setRequestCount(INITIAL_COUNT).setResponsePendingCount(INITIAL_COUNT);
520                 }
521                 MonitoringState monitoringState = monitoringStateBuilder.build();
522
523                 txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, operTx -> {
524                     operTx.put(getMonitoringInfoId(monitorId), monitoringInfo, CREATE_MISSING_PARENT);
525                     LOG.debug("adding oper monitoring info {}", monitoringInfo);
526
527                     operTx.put(getMonitorStateId(monitoringKey), monitoringState, CREATE_MISSING_PARENT);
528                     LOG.debug("adding oper monitoring state {}", monitoringState);
529
530                     MonitoridKeyEntry mapEntry = new MonitoridKeyEntryBuilder().setMonitorId(monitorId)
531                             .setMonitorKey(monitoringKey).build();
532                     operTx.put(getMonitorMapId(monitorId), mapEntry, CREATE_MISSING_PARENT);
533                     LOG.debug("adding oper map entry {}", mapEntry);
534                 }).addCallback(new FutureCallback<Void>() {
535                     @Override
536                     public void onFailure(Throwable error) {
537                         String errorMsg = String.format("Adding Monitoring info: %s in Datastore failed",
538                                 monitoringInfo);
539                         LOG.warn("Adding Monitoring info: {} in Datastore failed", monitoringInfo, error);
540                         throw new RuntimeException(errorMsg, error);
541                     }
542
543                     @Override
544                     public void onSuccess(Void ignored) {
545                         lockMap.put(monitoringKey, new Semaphore(1, true));
546                         if (protocolType == MonitorProtocolType.Bfd) {
547                             handler.startMonitoringTask(monitoringInfo);
548                             return;
549                         }
550                         // Schedule task
551                         LOG.debug("Scheduling monitor task for config: {}", in);
552                         scheduleMonitoringTask(monitoringInfo, profile.getMonitorInterval());
553                     }
554                 }, callbackExecutorService);
555             }
556
557             associateMonitorIdWithInterface(monitorId, interfaceName);
558
559             MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
560
561             rpcResultBuilder = RpcResultBuilder.success(output);
562         } catch (UnsupportedConfigException e) {
563             LOG.error("Start Monitoring Failed. ", e);
564             rpcResultBuilder = RpcResultBuilder.<MonitorStartOutput>failed().withError(ErrorType.APPLICATION,
565                     e.getMessage(), e);
566         }
567         return Futures.immediateFuture(rpcResultBuilder.build());
568     }
569
570     private void associateMonitorIdWithInterface(final Long monitorId, final String interfaceName) {
571         LOG.debug("associate monitor Id {} with interface {}", monitorId, interfaceName);
572         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
573         ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
574                 getInterfaceMonitorMapId(interfaceName));
575         ListenableFuture<Void> updateFuture = Futures.transformAsync(readFuture, optEntry -> {
576             if (optEntry.isPresent()) {
577                 InterfaceMonitorEntry entry = optEntry.get();
578                 List<Long> monitorIds1 = new ArrayList<>(nullToEmpty(entry.getMonitorIds()));
579                 monitorIds1.add(monitorId);
580                 InterfaceMonitorEntry newEntry1 = new InterfaceMonitorEntryBuilder()
581                          .withKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds1).build();
582                 tx.merge(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry1);
583             } else {
584                     // Create new monitor entry
585                 LOG.debug("Adding new interface-monitor association for interface {} with id {}", interfaceName,
586                             monitorId);
587                 List<Long> monitorIds2 = new ArrayList<>();
588                 monitorIds2.add(monitorId);
589                 InterfaceMonitorEntry newEntry2 = new InterfaceMonitorEntryBuilder()
590                             .setInterfaceName(interfaceName).setMonitorIds(monitorIds2).build();
591                 tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry2,
592                             CREATE_MISSING_PARENT);
593             }
594             return tx.submit();
595         }, callbackExecutorService);
596
597         Futures.addCallback(updateFuture, new FutureCallbackImpl(
598                 String.format("Association of monitorId %d with Interface %s", monitorId, interfaceName)),
599                 MoreExecutors.directExecutor());
600     }
601
602     private void scheduleMonitoringTask(MonitoringInfo monitoringInfo, long monitorInterval) {
603         AlivenessMonitorTask monitorTask = new AlivenessMonitorTask(monitoringInfo);
604         ScheduledFuture<?> scheduledFutureResult = monitorService.scheduleAtFixedRate(monitorTask, NO_DELAY,
605                 monitorInterval, TimeUnit.MILLISECONDS);
606         monitoringTasks.put(monitoringInfo.getId(), scheduledFutureResult);
607     }
608
609     @Override
610     public ListenableFuture<RpcResult<MonitorPauseOutput>> monitorPause(MonitorPauseInput input) {
611         LOG.debug("Monitor Pause operation invoked for monitor id: {}", input.getMonitorId());
612         SettableFuture<RpcResult<MonitorPauseOutput>> result = SettableFuture.create();
613         final Long monitorId = input.getMonitorId();
614
615         // Set the monitoring status to Paused
616         updateMonitorStatusTo(monitorId, MonitorStatus.Paused, currentStatus -> currentStatus == MonitorStatus.Started);
617
618         if (stopMonitoringTask(monitorId)) {
619             result.set(RpcResultBuilder.<MonitorPauseOutput>success().build());
620         } else {
621             String errorMsg = String.format("No Monitoring Task availble to pause for the given monitor id : %d",
622                     monitorId);
623             LOG.error("Monitor Pause operation failed- {}", errorMsg);
624             result.set(RpcResultBuilder.<MonitorPauseOutput>failed()
625                     .withError(ErrorType.APPLICATION, errorMsg).build());
626         }
627
628         return result;
629     }
630
631     @Override
632     public ListenableFuture<RpcResult<MonitorUnpauseOutput>> monitorUnpause(MonitorUnpauseInput input) {
633         LOG.debug("Monitor Unpause operation invoked for monitor id: {}", input.getMonitorId());
634         final SettableFuture<RpcResult<MonitorUnpauseOutput>> result = SettableFuture.create();
635
636         final Long monitorId = input.getMonitorId();
637         final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
638         ListenableFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
639                 getMonitoringInfoId(monitorId));
640
641         Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
642
643             @Override
644             public void onFailure(Throwable error) {
645                 tx.close();
646                 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
647                 LOG.error("Monitor unpause Failed. {}", msg, error);
648                 result.set(RpcResultBuilder.<MonitorUnpauseOutput>failed()
649                         .withError(ErrorType.APPLICATION, msg, error).build());
650             }
651
652             @Override
653             public void onSuccess(@Nonnull Optional<MonitoringInfo> optInfo) {
654                 if (optInfo.isPresent()) {
655                     final MonitoringInfo info = optInfo.get();
656                     ListenableFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
657                             getMonitorProfileId(info.getProfileId()));
658                     Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>() {
659
660                         @Override
661                         public void onFailure(Throwable error) {
662                             tx.close();
663                             String msg = String.format("Unable to read Monitoring profile associated with id %d",
664                                     info.getProfileId());
665                             LOG.warn("Monitor unpause Failed. {}", msg, error);
666                             result.set(RpcResultBuilder.<MonitorUnpauseOutput>failed()
667                                     .withError(ErrorType.APPLICATION, msg, error).build());
668                         }
669
670                         @Override
671                         public void onSuccess(@Nonnull Optional<MonitorProfile> optProfile) {
672                             tx.close();
673                             if (optProfile.isPresent()) {
674                                 updateMonitorStatusTo(monitorId, MonitorStatus.Started,
675                                     currentStatus -> (currentStatus == MonitorStatus.Paused
676                                                 || currentStatus == MonitorStatus.Stopped));
677                                 MonitorProfile profile = optProfile.get();
678                                 LOG.debug("Monitor Resume - Scheduling monitoring task with Id: {}", monitorId);
679                                 MonitorProtocolType protocolType = profile.getProtocolType();
680                                 if (protocolType == MonitorProtocolType.Bfd) {
681                                     LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
682                                     ((HwVtepTunnelsStateHandler) alivenessProtocolHandlerRegistry.get(protocolType))
683                                             .resetMonitoringTask(true);
684                                 } else {
685                                     scheduleMonitoringTask(info, profile.getMonitorInterval());
686                                 }
687                                 result.set(RpcResultBuilder.<MonitorUnpauseOutput>success().build());
688                             } else {
689                                 String msg = String.format("Monitoring profile associated with id %d is not present",
690                                         info.getProfileId());
691                                 LOG.warn("Monitor unpause Failed. {}", msg);
692                                 result.set(
693                                         RpcResultBuilder.<MonitorUnpauseOutput>failed()
694                                                 .withError(ErrorType.APPLICATION, msg).build());
695                             }
696                         }
697                     }, callbackExecutorService);
698                 } else {
699                     tx.close();
700                     String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
701                     LOG.warn("Monitor unpause Failed. {}", msg);
702                     result.set(RpcResultBuilder.<MonitorUnpauseOutput>failed()
703                             .withError(ErrorType.APPLICATION, msg).build());
704                 }
705             }
706         }, callbackExecutorService);
707
708         return result;
709     }
710
711     private boolean stopMonitoringTask(Long monitorId) {
712         return stopMonitoringTask(monitorId, INTERRUPT_TASK);
713     }
714
715     private boolean stopMonitoringTask(Long monitorId, boolean interruptTask) {
716         Optional<MonitoringInfo> optInfo =
717                 SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
718                         LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
719         if (!optInfo.isPresent()) {
720             LOG.warn("There is no monitoring info present for monitor id {}", monitorId);
721             return false;
722         }
723         MonitoringInfo monitoringInfo = optInfo.get();
724         Optional<MonitorProfile> optProfile =
725                 SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
726                         LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(monitoringInfo.getProfileId()));
727         MonitorProtocolType protocolType = optProfile.get().getProtocolType();
728         if (protocolType == MonitorProtocolType.Bfd) {
729             LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
730             ((HwVtepTunnelsStateHandler) alivenessProtocolHandlerRegistry.get(protocolType))
731                     .resetMonitoringTask(false);
732             return true;
733         }
734         ScheduledFuture<?> scheduledFutureResult = monitoringTasks.remove(monitorId);
735         if (scheduledFutureResult != null) {
736             scheduledFutureResult.cancel(interruptTask);
737             return true;
738         }
739         return false;
740     }
741
742     Optional<MonitorProfile> getMonitorProfile(Long profileId) {
743         return SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
744                 dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
745     }
746
747     void acquireLock(Semaphore lock) {
748         if (lock == null) {
749             return;
750         }
751
752         boolean acquiredLock = false;
753         try {
754             acquiredLock = lock.tryAcquire(50, TimeUnit.MILLISECONDS);
755         } catch (InterruptedException e) {
756             LOG.warn("Thread interrupted when waiting to acquire the lock");
757         }
758
759         if (!acquiredLock) {
760             LOG.warn("Previous transaction did not complete in time. Releasing the lock to proceed");
761             lock.release();
762             try {
763                 lock.acquire();
764                 LOG.trace("Lock acquired successfully");
765             } catch (InterruptedException e) {
766                 LOG.warn("Acquire failed");
767             }
768         } else {
769             LOG.trace("Lock acquired successfully");
770         }
771     }
772
773     void releaseLock(Semaphore lock) {
774         if (lock != null) {
775             lock.release();
776         }
777     }
778
779     private void sendMonitorPacket(final MonitoringInfo monitoringInfo) {
780         // TODO: Handle interrupts
781         final Long monitorId = monitoringInfo.getId();
782         final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
783         if (monitorKey == null) {
784             LOG.warn("No monitor Key associated with id {} to send the monitor packet", monitorId);
785             return;
786         } else {
787             LOG.debug("Sending monitoring packet for key: {}", monitorKey);
788         }
789
790         final MonitorProfile profile;
791         Optional<MonitorProfile> optProfile = getMonitorProfile(monitoringInfo.getProfileId());
792         if (optProfile.isPresent()) {
793             profile = optProfile.get();
794         } else {
795             LOG.warn("No monitor profile associated with id {}. " + "Could not send Monitor packet for monitor-id {}",
796                     monitoringInfo.getProfileId(), monitorId);
797             return;
798         }
799
800         final Semaphore lock = lockMap.get(monitorKey);
801         LOG.debug("Acquiring lock for monitor key : {} to send monitor packet", monitorKey);
802         acquireLock(lock);
803
804         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
805         ListenableFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
806                 getMonitorStateId(monitorKey));
807         ListenableFuture<Void> writeResult = Futures.transformAsync(readResult, optState -> {
808             if (optState.isPresent()) {
809                 MonitoringState state = optState.get();
810
811                 // Increase the request count
812                 Long requestCount = state.getRequestCount() + 1;
813
814                 // Check with the monitor window
815                 LivenessState currentLivenessState = state.getState();
816
817                 // Increase the pending response count
818                 long responsePendingCount = state.getResponsePendingCount();
819                 if (responsePendingCount < profile.getMonitorWindow()) {
820                     responsePendingCount = responsePendingCount + 1;
821                 }
822
823                 // Check with the failure threshold
824                 if (responsePendingCount >= profile.getFailureThreshold()) {
825                     // Change the state to down and notify
826                     if (currentLivenessState != LivenessState.Down) {
827                         LOG.debug("Response pending Count: {}, Failure threshold: {} for monitorId {}",
828                                 responsePendingCount, profile.getFailureThreshold(), state.getMonitorId());
829                         LOG.info("Sending notification for monitor Id : {} with State: {}",
830                                 state.getMonitorId(), LivenessState.Down);
831                         publishNotification(monitorId, LivenessState.Down);
832                         currentLivenessState = LivenessState.Down;
833                         // Reset requestCount when state changes
834                         // from UP to DOWN
835                         requestCount = INITIAL_COUNT;
836                     }
837                 }
838
839                 // Update the ODS with state
840                 MonitoringState updatedState = new MonitoringStateBuilder(/* state */)
841                             .setMonitorKey(state.getMonitorKey()).setRequestCount(requestCount)
842                             .setResponsePendingCount(responsePendingCount).setState(currentLivenessState).build();
843                 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(state.getMonitorKey()),
844                             updatedState);
845                 return tx.submit();
846             } else {
847                 // Close the transaction
848                 tx.submit();
849                 String errorMsg = String.format(
850                         "Monitoring State associated with id %d is not present to send packet out.", monitorId);
851                 return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
852             }
853         }, callbackExecutorService);
854
855         Futures.addCallback(writeResult, new FutureCallback<Void>() {
856             @Override
857             public void onSuccess(Void noarg) {
858                 // invoke packetout on protocol handler
859                 AlivenessProtocolHandler<?> handler =
860                         alivenessProtocolHandlerRegistry.getOpt(profile.getProtocolType());
861                 if (handler != null) {
862                     LOG.debug("Sending monitoring packet {}", monitoringInfo);
863                     handler.startMonitoringTask(monitoringInfo);
864                 }
865                 releaseLock(lock);
866             }
867
868             @Override
869             public void onFailure(Throwable error) {
870                 LOG.warn("Updating monitoring state for key: {} failed. Monitoring packet is not sent", monitorKey,
871                         error);
872                 releaseLock(lock);
873             }
874         }, callbackExecutorService);
875     }
876
877     void publishNotification(final Long monitorId, final LivenessState state) {
878         LOG.debug("Sending notification for id {}  - state {}", monitorId, state);
879         EventData data = new EventDataBuilder().setMonitorId(monitorId).setMonitorState(state).build();
880         MonitorEvent event = new MonitorEventBuilder().setEventData(data).build();
881         final ListenableFuture<?> eventFuture = notificationPublishService.offerNotification(event);
882         Futures.addCallback(eventFuture, new FutureCallback<Object>() {
883             @Override
884             public void onFailure(Throwable error) {
885                 LOG.warn("Error in notifying listeners for id {} - state {}", monitorId, state, error);
886             }
887
888             @Override
889             public void onSuccess(Object arg) {
890                 LOG.trace("Successful in notifying listeners for id {} - state {}", monitorId, state);
891             }
892         }, callbackExecutorService);
893     }
894
895     @Override
896     public ListenableFuture<RpcResult<MonitorProfileCreateOutput>> monitorProfileCreate(
897             final MonitorProfileCreateInput input) {
898         LOG.debug("Monitor Profile Create operation - {}", input.getProfile());
899         final SettableFuture<RpcResult<MonitorProfileCreateOutput>> returnFuture = SettableFuture.create();
900         Profile profile = input.getProfile();
901         final Long failureThreshold = profile.getFailureThreshold();
902         final Long monitorInterval = profile.getMonitorInterval();
903         final Long monitorWindow = profile.getMonitorWindow();
904         final MonitorProtocolType protocolType = profile.getProtocolType();
905         String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, protocolType);
906         final Long profileId = (long) getUniqueId(idKey);
907
908         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
909         ListenableFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
910                 getMonitorProfileId(profileId));
911         ListenableFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture = Futures.transformAsync(readFuture,
912             optProfile -> {
913                 if (optProfile.isPresent()) {
914                     tx.cancel();
915                     MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
916                                 .setProfileId(profileId).build();
917                     String msg = String.format("Monitor profile %s already present for the given input", input);
918                     LOG.warn(msg);
919                     returnFuture.set(RpcResultBuilder.success(output)
920                                 .withWarning(ErrorType.PROTOCOL, "profile-exists", msg).build());
921                 } else {
922                     final MonitorProfile monitorProfile = new MonitorProfileBuilder().setId(profileId)
923                                 .setFailureThreshold(failureThreshold).setMonitorInterval(monitorInterval)
924                                 .setMonitorWindow(monitorWindow).setProtocolType(protocolType).build();
925                     tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId), monitorProfile,
926                           CREATE_MISSING_PARENT);
927                     Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
928                             @Override
929                             public void onFailure(Throwable error) {
930                                 String msg = String.format("Error when storing monitorprofile %s in datastore",
931                                         monitorProfile);
932                                 LOG.error("Error when storing monitorprofile {} in datastore", monitorProfile, error);
933                                 returnFuture.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
934                                         .withError(ErrorType.APPLICATION, msg, error).build());
935                             }
936
937                             @Override
938                             public void onSuccess(Void noarg) {
939                                 MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
940                                         .setProfileId(profileId).build();
941                                 returnFuture.set(RpcResultBuilder.success(output).build());
942                             }
943                         }, callbackExecutorService);
944                 }
945                 return returnFuture;
946             }, callbackExecutorService);
947         Futures.addCallback(resultFuture, new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
948             @Override
949             public void onFailure(Throwable error) {
950                 // This would happen when any error happens during reading for
951                 // monitoring profile
952                 String msg = String.format("Error in creating monitorprofile - %s", input);
953                 returnFuture.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
954                         .withError(ErrorType.APPLICATION, msg, error).build());
955                 LOG.error("Error in creating monitorprofile - {} ", input, error);
956             }
957
958             @Override
959             public void onSuccess(RpcResult<MonitorProfileCreateOutput> result) {
960                 LOG.debug("Successfully created monitor Profile {} ", input);
961             }
962         }, callbackExecutorService);
963         return returnFuture;
964     }
965
966     @Override
967     public ListenableFuture<RpcResult<MonitorProfileGetOutput>> monitorProfileGet(MonitorProfileGetInput input) {
968         LOG.debug("Monitor Profile Get operation for input profile- {}", input.getProfile());
969         RpcResultBuilder<MonitorProfileGetOutput> rpcResultBuilder;
970         final Long profileId = getExistingProfileId(input);
971
972         MonitorProfileGetOutputBuilder output = new MonitorProfileGetOutputBuilder().setProfileId(profileId);
973         rpcResultBuilder = RpcResultBuilder.success();
974         rpcResultBuilder.withResult(output.build());
975         return Futures.immediateFuture(rpcResultBuilder.build());
976     }
977
978     private Long getExistingProfileId(MonitorProfileGetInput input) {
979         org.opendaylight.yang.gen.v1.urn.opendaylight.genius
980             .alivenessmonitor.rev160411.monitor.profile.get.input.Profile profile = input
981                 .getProfile();
982         final Long failureThreshold = profile.getFailureThreshold();
983         final Long monitorInterval = profile.getMonitorInterval();
984         final Long monitorWindow = profile.getMonitorWindow();
985         final MonitorProtocolType protocolType = profile.getProtocolType();
986         LOG.debug("getExistingProfileId for profile : {}", input.getProfile());
987         String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, protocolType);
988         LOG.debug("Obtained existing profile ID for profile : {}", input.getProfile());
989         return (long) getUniqueId(idKey);
990     }
991
992     private String getUniqueProfileKey(Long failureThreshold, Long monitorInterval, Long monitorWindow,
993             MonitorProtocolType protocolType) {
994         return String.valueOf(failureThreshold) + AlivenessMonitorConstants.SEPERATOR + monitorInterval
995                 + AlivenessMonitorConstants.SEPERATOR + monitorWindow + AlivenessMonitorConstants.SEPERATOR
996                 + protocolType + AlivenessMonitorConstants.SEPERATOR;
997     }
998
999     @Override
1000     public ListenableFuture<RpcResult<MonitorProfileDeleteOutput>> monitorProfileDelete(
1001             final MonitorProfileDeleteInput input) {
1002         LOG.debug("Monitor Profile delete for Id: {}", input.getProfileId());
1003         final SettableFuture<RpcResult<MonitorProfileDeleteOutput>> result = SettableFuture.create();
1004         final Long profileId = input.getProfileId();
1005         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1006         ListenableFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
1007                 getMonitorProfileId(profileId));
1008         ListenableFuture<RpcResult<MonitorProfileDeleteOutput>> writeFuture =
1009                 Futures.transformAsync(readFuture, optProfile -> {
1010                     if (optProfile.isPresent()) {
1011                         tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
1012                         Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
1013                             @Override
1014                             public void onFailure(Throwable error) {
1015                                 String msg = String.format("Error when removing monitor profile %d from datastore",
1016                                         profileId);
1017                                 LOG.error("Error when removing monitor profile {} from datastore", profileId, error);
1018                                 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>failed()
1019                                         .withError(ErrorType.APPLICATION, msg, error)
1020                                         .build());
1021                             }
1022
1023                             @Override
1024                             public void onSuccess(Void noarg) {
1025                                 MonitorProfile profile = optProfile.get();
1026                                 String id = getUniqueProfileKey(profile.getFailureThreshold(),
1027                                         profile.getMonitorInterval(), profile.getMonitorWindow(),
1028                                         profile.getProtocolType());
1029                                 releaseId(id);
1030                                 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>success().build());
1031                             }
1032                         }, callbackExecutorService);
1033                     } else {
1034                         String msg = String.format("Monitor profile with Id: %d does not exist", profileId);
1035                         LOG.info(msg);
1036                         result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>success()
1037                                 .withWarning(ErrorType.PROTOCOL, "invalid-value", msg).build());
1038                     }
1039                     return result;
1040                 }, callbackExecutorService);
1041
1042         Futures.addCallback(writeFuture, new FutureCallback<RpcResult<MonitorProfileDeleteOutput>>() {
1043
1044             @Override
1045             public void onFailure(Throwable error) {
1046                 String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
1047                 LOG.error("Error when removing monitor profile {} from datastore", profileId, error);
1048                 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>failed()
1049                         .withError(ErrorType.APPLICATION, msg, error).build());
1050             }
1051
1052             @Override
1053             public void onSuccess(RpcResult<MonitorProfileDeleteOutput> noarg) {
1054                 LOG.debug("Successfully removed Monitor Profile {}", profileId);
1055             }
1056         }, callbackExecutorService);
1057         return result;
1058     }
1059
1060     @Override
1061     public ListenableFuture<RpcResult<MonitorStopOutput>> monitorStop(MonitorStopInput input) {
1062         LOG.debug("Monitor Stop operation for monitor id - {}", input.getMonitorId());
1063         SettableFuture<RpcResult<MonitorStopOutput>> result = SettableFuture.create();
1064
1065         final Long monitorId = input.getMonitorId();
1066         Optional<MonitoringInfo> optInfo =
1067                 SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
1068                         LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1069         if (optInfo.isPresent()) {
1070             // Stop the monitoring task
1071             stopMonitoringTask(monitorId);
1072
1073             String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1074
1075             // Cleanup the Data store
1076             txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, tx -> {
1077                 if (monitorKey != null) {
1078                     tx.delete(getMonitorStateId(monitorKey));
1079                     monitorIdKeyCache.invalidate(monitorId);
1080                 }
1081
1082                 tx.delete(getMonitoringInfoId(monitorId));
1083
1084                 //Remove monitorid-key-map
1085                 tx.delete(getMonitorMapId(monitorId));
1086             }).addCallback(new FutureCallbackImpl(String.format("Delete monitor state with Id %d", monitorId)),
1087                     MoreExecutors.directExecutor());
1088
1089             MonitoringInfo info = optInfo.get();
1090             String interfaceName = getInterfaceName(info.getSource().getEndpointType());
1091             if (interfaceName != null) {
1092                 removeMonitorIdFromInterfaceAssociation(monitorId, interfaceName);
1093             }
1094             releaseIdForMonitoringInfo(info);
1095
1096             if (monitorKey != null) {
1097                 lockMap.remove(monitorKey);
1098             }
1099
1100             result.set(RpcResultBuilder.<MonitorStopOutput>success().build());
1101         } else {
1102             String errorMsg = String.format("Do not have monitoring information associated with key %d", monitorId);
1103             LOG.error("Delete monitoring operation Failed - {}", errorMsg);
1104             result.set(RpcResultBuilder.<MonitorStopOutput>failed().withError(ErrorType.APPLICATION, errorMsg).build());
1105         }
1106
1107         return result;
1108     }
1109
1110     private void removeMonitorIdFromInterfaceAssociation(final Long monitorId, final String interfaceName) {
1111         LOG.debug("Remove monitorId {} from Interface association {}", monitorId, interfaceName);
1112         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1113         ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
1114                 getInterfaceMonitorMapId(interfaceName));
1115         ListenableFuture<Void> updateFuture = Futures.transformAsync(readFuture, optEntry -> {
1116             if (optEntry.isPresent()) {
1117                 InterfaceMonitorEntry entry = optEntry.get();
1118                 List<Long> monitorIds = new ArrayList<>(nullToEmpty(entry.getMonitorIds()));
1119                 monitorIds.remove(monitorId);
1120                 if (monitorIds.isEmpty()) {
1121                     tx.delete(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
1122                 } else {
1123                     InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder(entry)
1124                             .withKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
1125                     tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry,
1126                             CREATE_MISSING_PARENT);
1127                 }
1128                 return tx.submit();
1129             } else {
1130                 LOG.warn("No Interface map entry found {} to remove monitorId {}", interfaceName, monitorId);
1131                 tx.cancel();
1132                 return Futures.immediateFuture(null);
1133             }
1134         }, MoreExecutors.directExecutor());
1135
1136         Futures.addCallback(updateFuture, new FutureCallbackImpl(
1137                 String.format("Dis-association of monitorId %d with Interface %s", monitorId, interfaceName)),
1138                 MoreExecutors.directExecutor());
1139     }
1140
1141     private void releaseIdForMonitoringInfo(MonitoringInfo info) {
1142         Long monitorId = info.getId();
1143         EndpointType source = info.getSource().getEndpointType();
1144         String interfaceName = getInterfaceName(source);
1145         if (!Strings.isNullOrEmpty(interfaceName)) {
1146             Optional<MonitorProfile> optProfile =
1147                     SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
1148                             LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1149             if (optProfile.isPresent()) {
1150                 MonitorProtocolType protocolType = optProfile.get().getProtocolType();
1151                 EndpointType destination = info.getDestination() != null ? info.getDestination().getEndpointType()
1152                         : null;
1153                 String idKey = getUniqueKey(interfaceName, protocolType.toString(), source, destination);
1154                 releaseId(idKey);
1155             } else {
1156                 LOG.warn("Could not release monitorId {}. No profile associated with it", monitorId);
1157             }
1158         }
1159     }
1160
1161     private String getInterfaceName(EndpointType endpoint) {
1162         String interfaceName = null;
1163         if (endpoint instanceof Interface) {
1164             interfaceName = ((Interface) endpoint).getInterfaceName();
1165         }
1166         return interfaceName;
1167     }
1168
1169     private void stopMonitoring(long monitorId) {
1170         updateMonitorStatusTo(monitorId, MonitorStatus.Stopped,
1171             currentStatus -> currentStatus != MonitorStatus.Stopped);
1172         if (!stopMonitoringTask(monitorId)) {
1173             LOG.warn("No monitoring task running to perform cancel operation for monitorId {}", monitorId);
1174         }
1175     }
1176
1177     private void updateMonitorStatusTo(final Long monitorId, final MonitorStatus newStatus,
1178             final Predicate<MonitorStatus> isValidStatus) {
1179         final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1180         if (monitorKey == null) {
1181             LOG.warn("No monitor Key associated with id {} to change the monitor status to {}", monitorId, newStatus);
1182             return;
1183         }
1184         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1185
1186         ListenableFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
1187                 getMonitorStateId(monitorKey));
1188
1189         ListenableFuture<Void> writeResult = Futures.transformAsync(readResult, optState -> {
1190             if (optState.isPresent()) {
1191                 MonitoringState state = optState.get();
1192                 if (isValidStatus.apply(state.getStatus())) {
1193                     MonitoringState updatedState = new MonitoringStateBuilder().setMonitorKey(monitorKey)
1194                                 .setStatus(newStatus).build();
1195                     tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), updatedState);
1196                 } else {
1197                     LOG.warn("Invalid Monitoring status {}, cannot be updated to {} for monitorId {}",
1198                                 state.getStatus(), newStatus, monitorId);
1199                 }
1200             } else {
1201                 LOG.warn("No associated monitoring state data available to update the status to {} for {}",
1202                            newStatus, monitorId);
1203             }
1204             return tx.submit();
1205         }, MoreExecutors.directExecutor());
1206
1207         Futures.addCallback(writeResult, new FutureCallbackImpl(
1208                 String.format("Monitor status update for %d to %s", monitorId, newStatus.toString())),
1209                 MoreExecutors.directExecutor());
1210     }
1211
1212     private void resumeMonitoring(final long monitorId) {
1213         final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
1214         ListenableFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
1215                 getMonitoringInfoId(monitorId));
1216
1217         Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
1218
1219             @Override
1220             public void onFailure(Throwable error) {
1221                 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
1222                 LOG.error("Monitor resume Failed. {}", msg, error);
1223                 tx.close();
1224             }
1225
1226             @Override
1227             public void onSuccess(@Nonnull Optional<MonitoringInfo> optInfo) {
1228                 if (optInfo.isPresent()) {
1229                     final MonitoringInfo info = optInfo.get();
1230                     ListenableFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
1231                             getMonitorProfileId(info.getProfileId()));
1232                     Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>() {
1233
1234                         @Override
1235                         public void onFailure(Throwable error) {
1236                             String msg = String.format("Unable to read Monitoring profile associated with id %d",
1237                                     info.getProfileId());
1238                             LOG.warn("Monitor resume Failed. {}", msg, error);
1239                             tx.close();
1240                         }
1241
1242                         @Override
1243                         public void onSuccess(@Nonnull Optional<MonitorProfile> optProfile) {
1244                             tx.close();
1245                             if (optProfile.isPresent()) {
1246                                 updateMonitorStatusTo(monitorId, MonitorStatus.Started,
1247                                     currentStatus -> currentStatus != MonitorStatus.Started);
1248                                 MonitorProfile profile = optProfile.get();
1249                                 LOG.debug("Monitor Resume - Scheduling monitoring task for Id: {}", monitorId);
1250                                 scheduleMonitoringTask(info, profile.getMonitorInterval());
1251                             } else {
1252                                 String msg = String.format("Monitoring profile associated with id %d is not present",
1253                                         info.getProfileId());
1254                                 LOG.warn("Monitor resume Failed. {}", msg);
1255                             }
1256                         }
1257                     }, MoreExecutors.directExecutor());
1258                 } else {
1259                     tx.close();
1260                     String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
1261                     LOG.warn("Monitor resume Failed. {}", msg);
1262                 }
1263             }
1264         }, MoreExecutors.directExecutor());
1265     }
1266
1267     @Override
1268     public void onInterfaceStateUp(String interfaceName) {
1269         List<Long> monitorIds = getMonitorIds(interfaceName);
1270         if (monitorIds.isEmpty()) {
1271             LOG.warn("Could not get monitorId for interface: {}", interfaceName);
1272             return;
1273         }
1274         for (Long monitorId : monitorIds) {
1275             LOG.debug("Resume monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1276             resumeMonitoring(monitorId);
1277         }
1278     }
1279
1280     @Override
1281     public void onInterfaceStateDown(String interfaceName) {
1282         List<Long> monitorIds = getMonitorIds(interfaceName);
1283         if (monitorIds.isEmpty()) {
1284             LOG.warn("Could not get monitorIds for interface: {}", interfaceName);
1285             return;
1286         }
1287         for (Long monitorId : monitorIds) {
1288             LOG.debug("Suspend monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1289             stopMonitoring(monitorId);
1290         }
1291     }
1292
1293     private List<Long> getMonitorIds(String interfaceName) {
1294         return SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
1295                 dataBroker, LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName))
1296                 .toJavaUtil().map(InterfaceMonitorEntry::getMonitorIds).orElse(Collections.emptyList());
1297     }
1298
1299     //handle monitor stop
1300     @Override
1301     public void remove(@Nonnull InstanceIdentifier<MonitoringState> instanceIdentifier,
1302                        @Nonnull MonitoringState removedDataObject) {
1303         final Long monitorId = removedDataObject.getMonitorId();
1304         LOG.debug("Monitor State remove listener invoked for monitor id: {}", monitorId);
1305
1306         if (removedDataObject.getStatus() != MonitorStatus.Paused) {
1307             ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
1308             if (scheduledFutureResult != null) {
1309                 LOG.debug("Stopping the task for Monitor id: {}", monitorId);
1310                 stopMonitoringTask(monitorId);
1311             }
1312         }
1313     }
1314
1315     //handle monitor pause
1316     @Override
1317     public void update(@Nonnull InstanceIdentifier<MonitoringState> instanceIdentifier,
1318                        @Nonnull MonitoringState originalDataObject,
1319                        @Nonnull MonitoringState updatedDataObject) {
1320         final Long monitorId = updatedDataObject.getMonitorId();
1321         LOG.debug("Monitor State update listener invoked for monitor id: {}", monitorId);
1322
1323         if (updatedDataObject.getStatus() == MonitorStatus.Paused
1324                 && originalDataObject.getStatus() != MonitorStatus.Paused) {
1325             ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
1326             if (scheduledFutureResult != null) {
1327                 LOG.debug("Stopping the task for Monitor id: {}", monitorId);
1328                 stopMonitoringTask(monitorId);
1329             }
1330         }
1331     }
1332 }