e838dde355748117863b725d4801ad10d147b5e0
[genius.git] / alivenessmonitor / alivenessmonitor-impl / src / main / java / org / opendaylight / genius / alivenessmonitor / internal / AlivenessMonitor.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.alivenessmonitor.internal;
9
10 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getInterfaceMonitorMapId;
11 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorMapId;
12 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorProfileId;
13 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorStateId;
14 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitoringInfoId;
15 import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
16
17 import com.google.common.base.Preconditions;
18 import com.google.common.base.Predicate;
19 import com.google.common.base.Strings;
20 import com.google.common.cache.CacheBuilder;
21 import com.google.common.cache.CacheLoader;
22 import com.google.common.cache.LoadingCache;
23 import com.google.common.util.concurrent.FluentFuture;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.SettableFuture;
29 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.Optional;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.ScheduledExecutorService;
40 import java.util.concurrent.ScheduledFuture;
41 import java.util.concurrent.Semaphore;
42 import java.util.concurrent.TimeUnit;
43 import javax.annotation.PreDestroy;
44 import javax.inject.Inject;
45 import javax.inject.Singleton;
46 import org.apache.aries.blueprint.annotation.service.Reference;
47 import org.eclipse.jdt.annotation.NonNull;
48 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandler;
49 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
50 import org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil;
51 import org.opendaylight.genius.mdsalutil.packet.Ethernet;
52 import org.opendaylight.genius.mdsalutil.packet.utils.PacketUtil;
53 import org.opendaylight.infrautils.utils.concurrent.Executors;
54 import org.opendaylight.mdsal.binding.api.DataBroker;
55 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
56 import org.opendaylight.mdsal.binding.api.ReadTransaction;
57 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
58 import org.opendaylight.mdsal.binding.util.Datastore.Operational;
59 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
60 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
61 import org.opendaylight.mdsal.common.api.CommitInfo;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
64 import org.opendaylight.openflowplugin.libraries.liblldp.PacketException;
65 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredSyncDataTreeChangeListener;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.AlivenessMonitorService;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.LivenessState;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEvent;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEventBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseOutput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateInput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateOutput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateOutputBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileDeleteInput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileDeleteOutput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetInput;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetOutput;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetOutputBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProtocolType;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartInput;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartOutput;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartOutputBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStatus;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStopInput;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStopOutput;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorUnpauseInput;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorUnpauseOutput;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitoringMode;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitoringStates;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntry;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntryBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntryKey;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.EndpointType;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.endpoint.type.Interface;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.configs.MonitoringInfo;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.configs.MonitoringInfoBuilder;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.event.EventData;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.event.EventDataBuilder;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profile.create.input.Profile;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profiles.MonitorProfile;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profiles.MonitorProfileBuilder;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.start.input.Config;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitorid.key.map.MonitoridKeyEntry;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitorid.key.map.MonitoridKeyEntryBuilder;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringState;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringStateBuilder;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInputBuilder;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInputBuilder;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutput;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
116 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInputBuilder;
117 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutput;
118 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInReason;
119 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
120 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
121 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
122 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
123 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
124 import org.opendaylight.yangtools.yang.common.RpcResult;
125 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
126 import org.opendaylight.yangtools.yang.common.Uint32;
127 import org.slf4j.Logger;
128 import org.slf4j.LoggerFactory;
129
130 @Singleton
131 public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListener<MonitoringState>
132         implements AlivenessMonitorService, PacketProcessingListener, InterfaceStateListener {
133
134     private static final Logger LOG = LoggerFactory.getLogger(AlivenessMonitor.class);
135
136     private static final int THREAD_POOL_SIZE = 4;
137     private static final boolean INTERRUPT_TASK = true;
138     private static final int NO_DELAY = 0;
139     private static final Long INITIAL_COUNT = 0L;
140     private static final boolean CREATE_MISSING_PARENT = true;
141     private static final int INVALID_ID = 0;
142
143     private static class FutureCallbackImpl implements FutureCallback<Object> {
144         private final String message;
145
146         FutureCallbackImpl(String message) {
147             this.message = message;
148         }
149
150         @Override
151         public void onFailure(Throwable error) {
152             LOG.warn("Error in Datastore operation - {}", message, error);
153         }
154
155         @Override
156         public void onSuccess(Object result) {
157             LOG.debug("Success in Datastore operation - {}", message);
158         }
159     }
160
161     private class AlivenessMonitorTask implements Runnable {
162         private final MonitoringInfo monitoringInfo;
163
164         AlivenessMonitorTask(MonitoringInfo monitoringInfo) {
165             this.monitoringInfo = monitoringInfo;
166         }
167
168         @Override
169         public void run() {
170             LOG.trace("send monitor packet - {}", monitoringInfo);
171             sendMonitorPacket(monitoringInfo);
172         }
173     }
174
175     private final DataBroker dataBroker;
176     private final ManagedNewTransactionRunner txRunner;
177     private final IdManagerService idManager;
178     private final NotificationPublishService notificationPublishService;
179     private final AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry;
180     private final ConcurrentMap<Uint32, ScheduledFuture<?>> monitoringTasks = new ConcurrentHashMap<>();
181     private final ScheduledExecutorService monitorService;
182     private final ExecutorService callbackExecutorService;
183     private final LoadingCache<Uint32, String> monitorIdKeyCache;
184     private final ConcurrentMap<String, Semaphore> lockMap = new ConcurrentHashMap<>();
185
186     @Inject
187     public AlivenessMonitor(@Reference final DataBroker dataBroker, final IdManagerService idManager,
188                             @Reference final NotificationPublishService notificationPublishService,
189                             AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry) {
190         super(dataBroker, LogicalDatastoreType.OPERATIONAL,
191                 InstanceIdentifier.create(MonitoringStates.class).child(MonitoringState.class));
192         this.dataBroker = dataBroker;
193         this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
194         this.idManager = idManager;
195         this.notificationPublishService = notificationPublishService;
196         this.alivenessProtocolHandlerRegistry = alivenessProtocolHandlerRegistry;
197
198         monitorService = Executors.newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Aliveness Monitoring Task", LOG);
199         callbackExecutorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE, "Aliveness Callback Handler", LOG);
200
201         createIdPool();
202         monitorIdKeyCache = CacheBuilder.newBuilder().build(new CacheLoader<Uint32, String>() {
203             @Override
204             public String load(Uint32 monitorId) {
205                 try {
206                     return txRunner.<Operational, ExecutionException, Optional<MonitoridKeyEntry>>
207                         applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
208                             tx -> tx.read(getMonitorMapId(monitorId)).get())
209                                         .map(MonitoridKeyEntry::getMonitorKey)
210                         .orElse(null);
211                 } catch (InterruptedException | ExecutionException e) {
212                     LOG.error("Error reading monitor {}", monitorId, e);
213                     return null;
214                 }
215             }
216         });
217
218         LOG.info("{} started", getClass().getSimpleName());
219     }
220
221     @Override
222     @PreDestroy
223     public void close() {
224         monitorIdKeyCache.cleanUp();
225         monitorService.shutdown();
226         callbackExecutorService.shutdown();
227         LOG.info("{} close", getClass().getSimpleName());
228     }
229
230     Semaphore getLock(String key) {
231         return lockMap.get(key);
232     }
233
234     private void createIdPool() {
235         CreateIdPoolInput createPool = new CreateIdPoolInputBuilder()
236                 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
237                 .setLow(AlivenessMonitorConstants.MONITOR_IDPOOL_START)
238                 .setHigh(AlivenessMonitorConstants.MONITOR_IDPOOL_SIZE).build();
239         Futures.addCallback(idManager.createIdPool(createPool), new FutureCallback<RpcResult<CreateIdPoolOutput>>() {
240             @Override
241             public void onFailure(Throwable error) {
242                 LOG.error("Failed to create idPool for Aliveness Monitor Service", error);
243             }
244
245             @Override
246             public void onSuccess(@NonNull RpcResult<CreateIdPoolOutput> result) {
247                 if (result.isSuccessful()) {
248                     LOG.debug("Created IdPool for Aliveness Monitor Service");
249                 } else {
250                     LOG.error("RPC to create Idpool failed {}", result.getErrors());
251                 }
252             }
253         }, callbackExecutorService);
254     }
255
256     private int getUniqueId(final String idKey) {
257         AllocateIdInput getIdInput = new AllocateIdInputBuilder()
258                 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME).setIdKey(idKey).build();
259
260         Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(getIdInput);
261
262         try {
263             RpcResult<AllocateIdOutput> rpcResult = result.get();
264             if (rpcResult.isSuccessful()) {
265                 return rpcResult.getResult().getIdValue().intValue();
266             } else {
267                 LOG.warn("RPC Call to Get Unique Id returned with Errors {}", rpcResult.getErrors());
268             }
269         } catch (InterruptedException | ExecutionException e) {
270             LOG.warn("Exception when getting Unique Id for key {}", idKey, e);
271         }
272         return INVALID_ID;
273     }
274
275     private void releaseId(String idKey) {
276         ReleaseIdInput idInput = new ReleaseIdInputBuilder().setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
277                 .setIdKey(idKey).build();
278         try {
279             ListenableFuture<RpcResult<ReleaseIdOutput>> result = idManager.releaseId(idInput);
280             RpcResult<ReleaseIdOutput> rpcResult = result.get();
281             if (!rpcResult.isSuccessful()) {
282                 LOG.warn("RPC Call to release Id {} returned with Errors {}", idKey, rpcResult.getErrors());
283             }
284         } catch (InterruptedException | ExecutionException e) {
285             LOG.warn("Exception when releasing Id for key {}", idKey, e);
286         }
287     }
288
289     @Override
290     public void onPacketReceived(PacketReceived packetReceived) {
291         Class<? extends PacketInReason> pktInReason = packetReceived.getPacketInReason();
292         if (LOG.isTraceEnabled()) {
293             LOG.trace("Packet Received {}", packetReceived);
294         }
295         if (pktInReason != SendToController.class) {
296             return;
297         }
298         byte[] data = packetReceived.getPayload();
299         Packet protocolPacket = null;
300         AlivenessProtocolHandler<Packet> livenessProtocolHandler = null;
301
302         if (!PacketUtil.isIpv6NaPacket(data)) {
303             Packet packetInFormatted;
304             Ethernet res = new Ethernet();
305             try {
306                 packetInFormatted = res.deserialize(data, 0, data.length * Byte.SIZE);
307             } catch (PacketException e) {
308                 LOG.warn("Failed to decode packet: ", e);
309                 return;
310             }
311
312             if (packetInFormatted == null) {
313                 LOG.warn("Failed to deserialize Received Packet from table {}", packetReceived.getTableId().getValue());
314                 return;
315             }
316
317             protocolPacket = packetInFormatted.getPayload();
318             if (protocolPacket == null) {
319                 LOG.trace("Unsupported packet type. Ignoring the packet...");
320                 return;
321             }
322             if (LOG.isTraceEnabled()) {
323                 LOG.trace("onPacketReceived packet: {}, packet class: {}", packetReceived, protocolPacket.getClass());
324             }
325             livenessProtocolHandler = getAlivenessProtocolHandler(protocolPacket.getClass());
326
327         } else if (PacketUtil.isIpv6NaPacket(data)) {
328             livenessProtocolHandler = getAlivenessProtocolHandler(MonitorProtocolType.Ipv6Nd);
329         }
330         if (livenessProtocolHandler == null) {
331             return;
332         }
333
334         String monitorKey = livenessProtocolHandler.handlePacketIn(protocolPacket, packetReceived);
335         if (monitorKey != null) {
336             processReceivedMonitorKey(monitorKey);
337         } else {
338             LOG.debug("No monitorkey associated with received packet");
339         }
340     }
341
342     @SuppressWarnings("unchecked")
343     private AlivenessProtocolHandler<Packet> getAlivenessProtocolHandler(Class<? extends Packet> protocolHandlerClass) {
344         return (AlivenessProtocolHandler<Packet>) alivenessProtocolHandlerRegistry.getOpt(protocolHandlerClass);
345     }
346
347     @SuppressWarnings("unchecked")
348     private AlivenessProtocolHandler<Packet> getAlivenessProtocolHandler(MonitorProtocolType protocolType) {
349         return (AlivenessProtocolHandler<Packet>) alivenessProtocolHandlerRegistry.get(protocolType);
350     }
351
352     private void processReceivedMonitorKey(final String monitorKey) {
353         class Result {
354             final MonitoringState currentState;
355             final MonitoringState state;
356
357             Result(MonitoringState currentState, MonitoringState state) {
358                 this.currentState = currentState;
359                 this.state = state;
360             }
361         }
362
363         Preconditions.checkNotNull(monitorKey, "Monitor Key required to process the state");
364
365         LOG.debug("Processing monitorKey: {} for received packet", monitorKey);
366
367         final Semaphore lock = lockMap.get(monitorKey);
368         LOG.debug("Acquiring lock for monitor key : {} to process monitor packet", monitorKey);
369         acquireLock(lock);
370
371         txRunner.applyWithNewReadWriteTransactionAndSubmit(OPERATIONAL, tx -> {
372             Optional<MonitoringState> optState = tx.read(getMonitorStateId(monitorKey)).get();
373             if (optState.isPresent()) {
374                 final MonitoringState currentState = optState.get();
375
376                 if (LOG.isTraceEnabled()) {
377                     LOG.trace("OnPacketReceived : Monitoring state from ODS : {} ", currentState);
378                 }
379
380                 // Long responsePendingCount = currentState.getResponsePendingCount();
381                 //
382                 // Need to relook at the pending count logic to support N
383                 // out of M scenarios
384                 // if (currentState.getState() != LivenessState.Up) {
385                 // //Reset responsePendingCount when state changes from DOWN
386                 // to UP
387                 // responsePendingCount = INITIAL_COUNT;
388                 // }
389                 //
390                 // if (responsePendingCount > INITIAL_COUNT) {
391                 // responsePendingCount =
392                 // currentState.getResponsePendingCount() - 1;
393                 // }
394                 Long responsePendingCount = INITIAL_COUNT;
395
396                 final MonitoringState state = new MonitoringStateBuilder().setMonitorKey(monitorKey)
397                         .setState(LivenessState.Up).setResponsePendingCount(responsePendingCount).build();
398                 tx.merge(getMonitorStateId(monitorKey), state);
399
400                 return Optional.of(new Result(currentState, state));
401             } else {
402                 if (LOG.isTraceEnabled()) {
403                     LOG.trace("Monitoring State not available for key: {} to process the Packet received",
404                             monitorKey);
405                 }
406                 return Optional.<Result>empty();
407             }
408         }).addCallback(new FutureCallback<Optional<Result>>() {
409             @Override
410             public void onSuccess(Optional<Result> optResult) {
411                 releaseLock(lock);
412                 optResult.ifPresent(result -> {
413                     final boolean stateChanged = result.currentState.getState() == LivenessState.Down
414                             || result.currentState.getState() == LivenessState.Unknown;
415                     if (stateChanged) {
416                         // send notifications
417                         if (LOG.isTraceEnabled()) {
418                             LOG.trace("Sending notification for monitor Id : {} with Current State: {}",
419                                     result.currentState.getMonitorId(), LivenessState.Up);
420                         }
421                         publishNotification(result.currentState.getMonitorId(), LivenessState.Up);
422                     } else {
423                         if (LOG.isTraceEnabled()) {
424                             LOG.trace("Successful in writing monitoring state {} to ODS", result.state);
425                         }
426                     }
427                 });
428             }
429
430             @Override
431             public void onFailure(Throwable error) {
432                 releaseLock(lock);
433                 LOG.warn("Error in reading or writing monitoring state : {} to Datastore", monitorKey, error);
434             }
435         }, callbackExecutorService);
436     }
437
438     private String getUniqueKey(String interfaceName, String protocolType, EndpointType source,
439             EndpointType destination) {
440         StringBuilder builder = new StringBuilder().append(interfaceName).append(AlivenessMonitorConstants.SEPERATOR)
441                 .append(protocolType);
442         if (source != null) {
443             builder.append(AlivenessMonitorConstants.SEPERATOR).append(AlivenessMonitorUtil.getIpAddress(source));
444         }
445
446         if (destination != null) {
447             builder.append(AlivenessMonitorConstants.SEPERATOR).append(AlivenessMonitorUtil.getIpAddress(destination));
448         }
449         return builder.toString();
450     }
451
452     @Override
453     public ListenableFuture<RpcResult<MonitorStartOutput>> monitorStart(MonitorStartInput input) {
454         RpcResultBuilder<MonitorStartOutput> rpcResultBuilder;
455         final Config in = input.getConfig();
456         Uint32 profileId = in.getProfileId();
457         LOG.debug("Monitor Start invoked with Config: {}, Profile Id: {}", in, profileId);
458
459         try {
460             if (in.getMode() != MonitoringMode.OneOne) {
461                 throw new UnsupportedConfigException(
462                         "Unsupported Monitoring mode. Currently one-one mode is supported");
463             }
464
465             Optional<MonitorProfile> optProfile =
466                 txRunner.<Operational, ExecutionException, Optional<MonitorProfile>>
467                     applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
468                         tx -> tx.read(getMonitorProfileId(profileId)).get());
469             final MonitorProfile profile;
470             if (!optProfile.isPresent()) {
471                 String errMsg = "No monitoring profile associated with Id: " + profileId;
472                 LOG.error("Monitor start failed. {}", errMsg);
473                 throw new RuntimeException(errMsg);
474             } else {
475                 profile = optProfile.get();
476             }
477
478             final MonitorProtocolType protocolType = profile.getProtocolType();
479
480             String interfaceName = null;
481             EndpointType srcEndpointType = in.getSource().getEndpointType();
482
483             if (srcEndpointType instanceof Interface) {
484                 Interface endPoint = (Interface) srcEndpointType;
485                 interfaceName = endPoint.getInterfaceName();
486             } else {
487                 throw new UnsupportedConfigException(
488                         "Unsupported source Endpoint type. Only Interface Endpoint currently supported for monitoring");
489             }
490
491             if (Strings.isNullOrEmpty(interfaceName)) {
492                 throw new RuntimeException("Interface Name not defined in the source Endpoint");
493             }
494
495             // Initially the support is for one monitoring per interface.
496             // Revisit the retrieving monitor id logic when the multiple
497             // monitoring for same interface is needed.
498             EndpointType destEndpointType = null;
499             if (in.getDestination() != null) {
500                 destEndpointType = in.getDestination().getEndpointType();
501             }
502             String idKey = getUniqueKey(interfaceName, protocolType.toString(), srcEndpointType, destEndpointType);
503             final Uint32 monitorId = Uint32.valueOf(getUniqueId(idKey));
504             Optional<MonitoringInfo> optKey =
505                 txRunner.<Operational, ExecutionException, Optional<MonitoringInfo>>
506                     applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
507                         tx -> tx.read(getMonitoringInfoId(monitorId)).get());
508             final AlivenessProtocolHandler<?> handler;
509             if (optKey.isPresent()) {
510                 String message = String.format(
511                         "Monitoring for the interface %s with this configuration " + "is already registered.",
512                         interfaceName);
513                 LOG.warn("Monitoring for the interface {} with this configuration is already registered.",
514                         interfaceName);
515                 MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
516                 rpcResultBuilder = RpcResultBuilder.success(output).withWarning(ErrorType.APPLICATION,
517                         "config-exists", message);
518                 return Futures.immediateFuture(rpcResultBuilder.build());
519             } else {
520                 // Construct the monitor key
521                 final MonitoringInfo monitoringInfo = new MonitoringInfoBuilder().setId(monitorId).setMode(in.getMode())
522                         .setProfileId(profileId).setDestination(in.getDestination()).setSource(in.getSource()).build();
523                 // Construct the initial monitor state
524                 handler = alivenessProtocolHandlerRegistry.get(protocolType);
525                 final String monitoringKey = handler.getUniqueMonitoringKey(monitoringInfo);
526
527                 MonitoringStateBuilder monitoringStateBuilder =
528                         new MonitoringStateBuilder().setMonitorKey(monitoringKey).setMonitorId(monitorId)
529                                 .setState(LivenessState.Unknown).setStatus(MonitorStatus.Started);
530                 if (protocolType != MonitorProtocolType.Bfd) {
531                     monitoringStateBuilder.setRequestCount(INITIAL_COUNT).setResponsePendingCount(INITIAL_COUNT);
532                 }
533                 MonitoringState monitoringState = monitoringStateBuilder.build();
534
535                 txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, operTx -> {
536                     operTx.put(getMonitoringInfoId(monitorId), monitoringInfo, CREATE_MISSING_PARENT);
537                     LOG.debug("adding oper monitoring info {}", monitoringInfo);
538
539                     operTx.put(getMonitorStateId(monitoringKey), monitoringState, CREATE_MISSING_PARENT);
540                     LOG.debug("adding oper monitoring state {}", monitoringState);
541
542                     MonitoridKeyEntry mapEntry = new MonitoridKeyEntryBuilder().setMonitorId(monitorId)
543                             .setMonitorKey(monitoringKey).build();
544                     operTx.put(getMonitorMapId(monitorId), mapEntry, CREATE_MISSING_PARENT);
545                     LOG.debug("adding oper map entry {}", mapEntry);
546                 }).addCallback(new FutureCallback<Object>() {
547                     @Override
548                     public void onFailure(Throwable error) {
549                         String errorMsg = String.format("Adding Monitoring info: %s in Datastore failed",
550                                 monitoringInfo);
551                         LOG.warn("Adding Monitoring info: {} in Datastore failed", monitoringInfo, error);
552                         throw new RuntimeException(errorMsg, error);
553                     }
554
555                     @Override
556                     public void onSuccess(Object ignored) {
557                         lockMap.put(monitoringKey, new Semaphore(1, true));
558                         if (protocolType == MonitorProtocolType.Bfd) {
559                             handler.startMonitoringTask(monitoringInfo);
560                             return;
561                         }
562                         // Schedule task
563                         LOG.debug("Scheduling monitor task for config: {}", in);
564                         scheduleMonitoringTask(monitoringInfo, profile.getMonitorInterval());
565                     }
566                 }, callbackExecutorService);
567             }
568
569             associateMonitorIdWithInterface(monitorId, interfaceName);
570
571             MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
572
573             rpcResultBuilder = RpcResultBuilder.success(output);
574         } catch (UnsupportedConfigException | ExecutionException | InterruptedException e) {
575             LOG.error("Start Monitoring Failed. ", e);
576             rpcResultBuilder = RpcResultBuilder.<MonitorStartOutput>failed().withError(ErrorType.APPLICATION,
577                     e.getMessage(), e);
578         }
579         return Futures.immediateFuture(rpcResultBuilder.build());
580     }
581
582     private void associateMonitorIdWithInterface(final Uint32 monitorId, final String interfaceName) {
583         LOG.debug("associate monitor Id {} with interface {}", monitorId, interfaceName);
584         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
585         FluentFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
586                 getInterfaceMonitorMapId(interfaceName));
587         FluentFuture<? extends CommitInfo> updateFuture = readFuture.transformAsync(optEntry -> {
588             if (optEntry.isPresent()) {
589                 InterfaceMonitorEntry entry = optEntry.get();
590                 List<Uint32> monitorIds1 =
591                     entry.getMonitorIds() != null ? new ArrayList<>(entry.getMonitorIds()) : new ArrayList<>();
592                 monitorIds1.add(monitorId);
593                 InterfaceMonitorEntry newEntry1 = new InterfaceMonitorEntryBuilder()
594                          .withKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds1).build();
595                 tx.merge(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry1);
596             } else {
597                     // Create new monitor entry
598                 LOG.debug("Adding new interface-monitor association for interface {} with id {}", interfaceName,
599                             monitorId);
600                 List<Uint32> monitorIds2 = new ArrayList<>();
601                 monitorIds2.add(monitorId);
602                 InterfaceMonitorEntry newEntry2 = new InterfaceMonitorEntryBuilder()
603                             .setInterfaceName(interfaceName).setMonitorIds(monitorIds2).build();
604                 tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry2,
605                             CREATE_MISSING_PARENT);
606             }
607             return tx.commit();
608         }, callbackExecutorService);
609
610         Futures.addCallback(updateFuture, new FutureCallbackImpl(
611                 "Association of monitorId " + monitorId + " with Interface " + interfaceName),
612                 MoreExecutors.directExecutor());
613     }
614
615     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
616             justification = "https://github.com/spotbugs/spotbugs/issues/811")
617     private void scheduleMonitoringTask(MonitoringInfo monitoringInfo, Uint32 monitorInterval) {
618         AlivenessMonitorTask monitorTask = new AlivenessMonitorTask(monitoringInfo);
619         ScheduledFuture<?> scheduledFutureResult = monitorService.scheduleAtFixedRate(monitorTask, NO_DELAY,
620                 monitorInterval.toJava(), TimeUnit.MILLISECONDS);
621         monitoringTasks.put(monitoringInfo.getId(), scheduledFutureResult);
622     }
623
624     @Override
625     public ListenableFuture<RpcResult<MonitorPauseOutput>> monitorPause(MonitorPauseInput input) {
626         LOG.debug("Monitor Pause operation invoked for monitor id: {}", input.getMonitorId());
627         SettableFuture<RpcResult<MonitorPauseOutput>> result = SettableFuture.create();
628         final Uint32 monitorId = input.getMonitorId();
629
630         // Set the monitoring status to Paused
631         updateMonitorStatusTo(monitorId, MonitorStatus.Paused, currentStatus -> currentStatus == MonitorStatus.Started);
632
633         if (stopMonitoringTask(monitorId)) {
634             result.set(RpcResultBuilder.<MonitorPauseOutput>success().build());
635         } else {
636             String errorMsg = "No Monitoring Task availble to pause for the given monitor id : " + monitorId;
637             LOG.error("Monitor Pause operation failed- {}", errorMsg);
638             result.set(RpcResultBuilder.<MonitorPauseOutput>failed()
639                     .withError(ErrorType.APPLICATION, errorMsg).build());
640         }
641
642         return result;
643     }
644
645     @Override
646     public ListenableFuture<RpcResult<MonitorUnpauseOutput>> monitorUnpause(MonitorUnpauseInput input) {
647         LOG.debug("Monitor Unpause operation invoked for monitor id: {}", input.getMonitorId());
648         final SettableFuture<RpcResult<MonitorUnpauseOutput>> result = SettableFuture.create();
649
650         final Uint32 monitorId = input.getMonitorId();
651         final ReadTransaction tx = dataBroker.newReadOnlyTransaction();
652         FluentFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
653                 getMonitoringInfoId(monitorId));
654
655         readInfoResult.addCallback(new FutureCallback<Optional<MonitoringInfo>>() {
656
657             @Override
658             public void onFailure(Throwable error) {
659                 tx.close();
660                 String msg = "Unable to read monitoring info associated with monitor id " + monitorId;
661                 LOG.error("Monitor unpause Failed. {}", msg, error);
662                 result.set(RpcResultBuilder.<MonitorUnpauseOutput>failed()
663                         .withError(ErrorType.APPLICATION, msg, error).build());
664             }
665
666             @Override
667             public void onSuccess(@NonNull Optional<MonitoringInfo> optInfo) {
668                 if (optInfo.isPresent()) {
669                     final MonitoringInfo info = optInfo.get();
670                     FluentFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
671                             getMonitorProfileId(info.getProfileId()));
672                     readProfile.addCallback(new FutureCallback<Optional<MonitorProfile>>() {
673
674                         @Override
675                         public void onFailure(Throwable error) {
676                             tx.close();
677                             String msg = "Unable to read Monitoring profile associated with id " + info.getProfileId();
678                             LOG.warn("Monitor unpause Failed. {}", msg, error);
679                             result.set(RpcResultBuilder.<MonitorUnpauseOutput>failed()
680                                     .withError(ErrorType.APPLICATION, msg, error).build());
681                         }
682
683                         @Override
684                         public void onSuccess(@NonNull Optional<MonitorProfile> optProfile) {
685                             tx.close();
686                             if (optProfile.isPresent()) {
687                                 updateMonitorStatusTo(monitorId, MonitorStatus.Started,
688                                     currentStatus -> (currentStatus == MonitorStatus.Paused
689                                                 || currentStatus == MonitorStatus.Stopped));
690                                 MonitorProfile profile = optProfile.get();
691                                 LOG.debug("Monitor Resume - Scheduling monitoring task with Id: {}", monitorId);
692                                 MonitorProtocolType protocolType = profile.getProtocolType();
693                                 if (protocolType == MonitorProtocolType.Bfd) {
694                                     LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
695                                     ((HwVtepTunnelsStateHandler) alivenessProtocolHandlerRegistry.get(protocolType))
696                                             .resetMonitoringTask(true);
697                                 } else {
698                                     scheduleMonitoringTask(info, profile.getMonitorInterval());
699                                 }
700                                 result.set(RpcResultBuilder.<MonitorUnpauseOutput>success().build());
701                             } else {
702                                 String msg = String.format("Monitoring profile associated with id %s is not present",
703                                         info.getProfileId());
704                                 LOG.warn("Monitor unpause Failed. {}", msg);
705                                 result.set(
706                                         RpcResultBuilder.<MonitorUnpauseOutput>failed()
707                                                 .withError(ErrorType.APPLICATION, msg).build());
708                             }
709                         }
710                     }, callbackExecutorService);
711                 } else {
712                     tx.close();
713                     String msg = String.format("Monitoring info associated with id %s is not present", monitorId);
714                     LOG.warn("Monitor unpause Failed. {}", msg);
715                     result.set(RpcResultBuilder.<MonitorUnpauseOutput>failed()
716                             .withError(ErrorType.APPLICATION, msg).build());
717                 }
718             }
719         }, callbackExecutorService);
720
721         return result;
722     }
723
724     private boolean stopMonitoringTask(Uint32 monitorId) {
725         return stopMonitoringTask(monitorId, INTERRUPT_TASK);
726     }
727
728     private boolean stopMonitoringTask(Uint32 monitorId, boolean interruptTask) {
729         Optional<MonitoringInfo> optInfo;
730         try {
731             optInfo = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
732                 tx -> tx.read(getMonitoringInfoId(monitorId))).get();
733         } catch (InterruptedException | ExecutionException e) {
734             LOG.error("Error reading monitor {}", monitorId, e);
735             optInfo = Optional.empty();
736         }
737         if (!optInfo.isPresent()) {
738             LOG.warn("There is no monitoring info present for monitor id {}", monitorId);
739             return false;
740         }
741         MonitoringInfo monitoringInfo = optInfo.get();
742         Optional<MonitorProfile> optProfile;
743         try {
744             optProfile = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
745                 tx -> tx.read(getMonitorProfileId(monitoringInfo.getProfileId()))).get();
746         } catch (InterruptedException | ExecutionException e) {
747             LOG.error("Error reading monitor profile for {}", monitorId, e);
748             optProfile = Optional.empty();
749         }
750         MonitorProtocolType protocolType = optProfile.get().getProtocolType();
751         if (protocolType == MonitorProtocolType.Bfd) {
752             LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
753             ((HwVtepTunnelsStateHandler) alivenessProtocolHandlerRegistry.get(protocolType))
754                     .resetMonitoringTask(false);
755             return true;
756         }
757         ScheduledFuture<?> scheduledFutureResult = monitoringTasks.remove(monitorId);
758         if (scheduledFutureResult != null) {
759             scheduledFutureResult.cancel(interruptTask);
760             return true;
761         }
762         return false;
763     }
764
765     Optional<MonitorProfile> getMonitorProfile(Uint32 profileId) {
766         try {
767             return txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
768                 tx -> tx.read(getMonitorProfileId(profileId))).get();
769         } catch (InterruptedException | ExecutionException e) {
770             LOG.error("Error reading monitor profile for {}", profileId, e);
771             return Optional.empty();
772         }
773     }
774
775     void acquireLock(Semaphore lock) {
776         if (lock == null) {
777             return;
778         }
779
780         boolean acquiredLock = false;
781         try {
782             acquiredLock = lock.tryAcquire(50, TimeUnit.MILLISECONDS);
783         } catch (InterruptedException e) {
784             LOG.warn("Thread interrupted when waiting to acquire the lock");
785         }
786
787         if (!acquiredLock) {
788             LOG.warn("Previous transaction did not complete in time. Releasing the lock to proceed");
789             lock.release();
790             try {
791                 lock.acquire();
792                 LOG.trace("Lock acquired successfully");
793             } catch (InterruptedException e) {
794                 LOG.warn("Acquire failed");
795             }
796         } else {
797             LOG.trace("Lock acquired successfully");
798         }
799     }
800
801     void releaseLock(Semaphore lock) {
802         if (lock != null) {
803             lock.release();
804         }
805     }
806
807     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
808             justification = "https://github.com/spotbugs/spotbugs/issues/811")
809     private void sendMonitorPacket(final MonitoringInfo monitoringInfo) {
810         // TODO: Handle interrupts
811         final Uint32 monitorId = monitoringInfo.getId();
812         final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
813         if (monitorKey == null) {
814             LOG.warn("No monitor Key associated with id {} to send the monitor packet", monitorId);
815             return;
816         } else {
817             LOG.debug("Sending monitoring packet for key: {}", monitorKey);
818         }
819
820         final MonitorProfile profile;
821         Optional<MonitorProfile> optProfile = getMonitorProfile(monitoringInfo.getProfileId());
822         if (optProfile.isPresent()) {
823             profile = optProfile.get();
824         } else {
825             LOG.warn("No monitor profile associated with id {}. " + "Could not send Monitor packet for monitor-id {}",
826                     monitoringInfo.getProfileId(), monitorId);
827             return;
828         }
829
830         final Semaphore lock = lockMap.get(monitorKey);
831         LOG.debug("Acquiring lock for monitor key : {} to send monitor packet", monitorKey);
832         acquireLock(lock);
833
834         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
835         FluentFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
836                 getMonitorStateId(monitorKey));
837         FluentFuture<? extends CommitInfo> writeResult = readResult.transformAsync(optState -> {
838             if (optState.isPresent()) {
839                 MonitoringState state = optState.get();
840
841                 // Increase the request count
842                 Long requestCount = state.getRequestCount().toJava() + 1;
843
844                 // Check with the monitor window
845                 LivenessState currentLivenessState = state.getState();
846
847                 // Increase the pending response count
848                 long responsePendingCount = state.getResponsePendingCount().toJava();
849                 if (responsePendingCount < profile.getMonitorWindow().toJava()) {
850                     responsePendingCount = responsePendingCount + 1;
851                 }
852
853                 // Check with the failure threshold
854                 if (responsePendingCount >= profile.getFailureThreshold().toJava()) {
855                     // Change the state to down and notify
856                     if (currentLivenessState != LivenessState.Down) {
857                         LOG.debug("Response pending Count: {}, Failure threshold: {} for monitorId {}",
858                                 responsePendingCount, profile.getFailureThreshold(), state.getMonitorId());
859                         LOG.info("Sending notification for monitor Id : {} with State: {}",
860                                 state.getMonitorId(), LivenessState.Down);
861                         publishNotification(monitorId, LivenessState.Down);
862                         currentLivenessState = LivenessState.Down;
863                         // Reset requestCount when state changes
864                         // from UP to DOWN
865                         requestCount = INITIAL_COUNT;
866                     }
867                 }
868
869                 // Update the ODS with state
870                 MonitoringState updatedState = new MonitoringStateBuilder(/* state */)
871                             .setMonitorKey(state.getMonitorKey()).setRequestCount(requestCount)
872                             .setResponsePendingCount(responsePendingCount).setState(currentLivenessState).build();
873                 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(state.getMonitorKey()),
874                             updatedState);
875                 return tx.commit();
876             } else {
877                 // Close the transaction
878                 tx.commit();
879                 String errorMsg = String.format(
880                         "Monitoring State associated with id %s is not present to send packet out.", monitorId);
881                 return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
882             }
883         }, callbackExecutorService);
884
885         writeResult.addCallback(new FutureCallback<CommitInfo>() {
886             @Override
887             public void onSuccess(CommitInfo noarg) {
888                 // invoke packetout on protocol handler
889                 AlivenessProtocolHandler<?> handler =
890                         alivenessProtocolHandlerRegistry.getOpt(profile.getProtocolType());
891                 if (handler != null) {
892                     LOG.debug("Sending monitoring packet {}", monitoringInfo);
893                     handler.startMonitoringTask(monitoringInfo);
894                 }
895                 releaseLock(lock);
896             }
897
898             @Override
899             public void onFailure(Throwable error) {
900                 LOG.warn("Updating monitoring state for key: {} failed. Monitoring packet is not sent", monitorKey,
901                         error);
902                 releaseLock(lock);
903             }
904         }, callbackExecutorService);
905     }
906
907     void publishNotification(final Uint32 monitorId, final LivenessState state) {
908         LOG.debug("Sending notification for id {}  - state {}", monitorId, state);
909         EventData data = new EventDataBuilder().setMonitorId(monitorId).setMonitorState(state).build();
910         MonitorEvent event = new MonitorEventBuilder().setEventData(data).build();
911         final ListenableFuture<?> eventFuture = notificationPublishService.offerNotification(event);
912         Futures.addCallback(eventFuture, new FutureCallback<Object>() {
913             @Override
914             public void onFailure(Throwable error) {
915                 LOG.warn("Error in notifying listeners for id {} - state {}", monitorId, state, error);
916             }
917
918             @Override
919             public void onSuccess(Object arg) {
920                 LOG.trace("Successful in notifying listeners for id {} - state {}", monitorId, state);
921             }
922         }, callbackExecutorService);
923     }
924
925     @Override
926     public ListenableFuture<RpcResult<MonitorProfileCreateOutput>> monitorProfileCreate(
927             final MonitorProfileCreateInput input) {
928         LOG.debug("Monitor Profile Create operation - {}", input.getProfile());
929         final SettableFuture<RpcResult<MonitorProfileCreateOutput>> returnFuture = SettableFuture.create();
930         Profile profile = input.getProfile();
931         final Uint32 failureThreshold = profile.getFailureThreshold();
932         final Uint32 monitorInterval = profile.getMonitorInterval();
933         final Uint32 monitorWindow = profile.getMonitorWindow();
934         final MonitorProtocolType protocolType = profile.getProtocolType();
935         String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, protocolType);
936         final Uint32 profileId = Uint32.valueOf(getUniqueId(idKey));
937
938         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
939         FluentFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
940                 getMonitorProfileId(profileId));
941         FluentFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture = readFuture.transformAsync(
942             optProfile -> {
943                 if (optProfile.isPresent()) {
944                     tx.cancel();
945                     MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
946                                 .setProfileId(profileId).build();
947                     String msg = String.format("Monitor profile %s already present for the given input", input);
948                     LOG.warn(msg);
949                     returnFuture.set(RpcResultBuilder.success(output)
950                                 .withWarning(ErrorType.PROTOCOL, "profile-exists", msg).build());
951                 } else {
952                     final MonitorProfile monitorProfile = new MonitorProfileBuilder().setId(profileId)
953                                 .setFailureThreshold(failureThreshold).setMonitorInterval(monitorInterval)
954                                 .setMonitorWindow(monitorWindow).setProtocolType(protocolType).build();
955                     tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId), monitorProfile,
956                           CREATE_MISSING_PARENT);
957                     tx.commit().addCallback(new FutureCallback<CommitInfo>() {
958                             @Override
959                             public void onFailure(Throwable error) {
960                                 String msg = String.format("Error when storing monitorprofile %s in datastore",
961                                         monitorProfile);
962                                 LOG.error("Error when storing monitorprofile {} in datastore", monitorProfile, error);
963                                 returnFuture.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
964                                         .withError(ErrorType.APPLICATION, msg, error).build());
965                             }
966
967                             @Override
968                             public void onSuccess(CommitInfo noarg) {
969                                 MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
970                                         .setProfileId(profileId).build();
971                                 returnFuture.set(RpcResultBuilder.success(output).build());
972                             }
973                         }, callbackExecutorService);
974                 }
975                 return returnFuture;
976             }, callbackExecutorService);
977         resultFuture.addCallback(new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
978             @Override
979             public void onFailure(Throwable error) {
980                 // This would happen when any error happens during reading for
981                 // monitoring profile
982                 String msg = String.format("Error in creating monitorprofile - %s", input);
983                 returnFuture.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
984                         .withError(ErrorType.APPLICATION, msg, error).build());
985                 LOG.error("Error in creating monitorprofile - {} ", input, error);
986             }
987
988             @Override
989             public void onSuccess(RpcResult<MonitorProfileCreateOutput> result) {
990                 LOG.debug("Successfully created monitor Profile {} ", input);
991             }
992         }, callbackExecutorService);
993         return returnFuture;
994     }
995
996     @Override
997     public ListenableFuture<RpcResult<MonitorProfileGetOutput>> monitorProfileGet(MonitorProfileGetInput input) {
998         LOG.debug("Monitor Profile Get operation for input profile- {}", input.getProfile());
999         RpcResultBuilder<MonitorProfileGetOutput> rpcResultBuilder;
1000         final Long profileId = getExistingProfileId(input);
1001
1002         MonitorProfileGetOutputBuilder output = new MonitorProfileGetOutputBuilder().setProfileId(profileId);
1003         rpcResultBuilder = RpcResultBuilder.success();
1004         rpcResultBuilder.withResult(output.build());
1005         return Futures.immediateFuture(rpcResultBuilder.build());
1006     }
1007
1008     private Long getExistingProfileId(MonitorProfileGetInput input) {
1009         org.opendaylight.yang.gen.v1.urn.opendaylight.genius
1010             .alivenessmonitor.rev160411.monitor.profile.get.input.Profile profile = input
1011                 .getProfile();
1012         final Uint32 failureThreshold = profile.getFailureThreshold();
1013         final Uint32 monitorInterval = profile.getMonitorInterval();
1014         final Uint32 monitorWindow = profile.getMonitorWindow();
1015         final MonitorProtocolType protocolType = profile.getProtocolType();
1016         LOG.debug("getExistingProfileId for profile : {}", input.getProfile());
1017         String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, protocolType);
1018         LOG.debug("Obtained existing profile ID for profile : {}", input.getProfile());
1019         return (long) getUniqueId(idKey);
1020     }
1021
1022     private String getUniqueProfileKey(Uint32 failureThreshold, Uint32 monitorInterval, Uint32 monitorWindow,
1023             MonitorProtocolType protocolType) {
1024         return String.valueOf(failureThreshold) + AlivenessMonitorConstants.SEPERATOR + monitorInterval
1025                 + AlivenessMonitorConstants.SEPERATOR + monitorWindow + AlivenessMonitorConstants.SEPERATOR
1026                 + protocolType + AlivenessMonitorConstants.SEPERATOR;
1027     }
1028
1029     @Override
1030     public ListenableFuture<RpcResult<MonitorProfileDeleteOutput>> monitorProfileDelete(
1031             final MonitorProfileDeleteInput input) {
1032         LOG.debug("Monitor Profile delete for Id: {}", input.getProfileId());
1033         final SettableFuture<RpcResult<MonitorProfileDeleteOutput>> result = SettableFuture.create();
1034         final Uint32 profileId = input.getProfileId();
1035         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1036         FluentFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
1037                 getMonitorProfileId(profileId));
1038         FluentFuture<RpcResult<MonitorProfileDeleteOutput>> writeFuture =
1039                 readFuture.transformAsync(optProfile -> {
1040                     if (optProfile.isPresent()) {
1041                         tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
1042                         tx.commit().addCallback(new FutureCallback<CommitInfo>() {
1043                             @Override
1044                             public void onFailure(Throwable error) {
1045                                 String msg = String.format("Error when removing monitor profile %s from datastore",
1046                                         profileId);
1047                                 LOG.error("Error when removing monitor profile {} from datastore", profileId, error);
1048                                 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>failed()
1049                                         .withError(ErrorType.APPLICATION, msg, error)
1050                                         .build());
1051                             }
1052
1053                             @Override
1054                             public void onSuccess(CommitInfo noarg) {
1055                                 MonitorProfile profile = optProfile.get();
1056                                 String id = getUniqueProfileKey(profile.getFailureThreshold(),
1057                                         profile.getMonitorInterval(), profile.getMonitorWindow(),
1058                                         profile.getProtocolType());
1059                                 releaseId(id);
1060                                 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>success().build());
1061                             }
1062                         }, callbackExecutorService);
1063                     } else {
1064                         String msg = String.format("Monitor profile with Id: %s does not exist", profileId);
1065                         LOG.info(msg);
1066                         result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>success()
1067                                 .withWarning(ErrorType.PROTOCOL, "invalid-value", msg).build());
1068                     }
1069                     return result;
1070                 }, callbackExecutorService);
1071
1072         writeFuture.addCallback(new FutureCallback<RpcResult<MonitorProfileDeleteOutput>>() {
1073
1074             @Override
1075             public void onFailure(Throwable error) {
1076                 String msg = String.format("Error when removing monitor profile %s from datastore", profileId);
1077                 LOG.error("Error when removing monitor profile {} from datastore", profileId, error);
1078                 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>failed()
1079                         .withError(ErrorType.APPLICATION, msg, error).build());
1080             }
1081
1082             @Override
1083             public void onSuccess(RpcResult<MonitorProfileDeleteOutput> noarg) {
1084                 LOG.debug("Successfully removed Monitor Profile {}", profileId);
1085             }
1086         }, callbackExecutorService);
1087         return result;
1088     }
1089
1090     @Override
1091     public ListenableFuture<RpcResult<MonitorStopOutput>> monitorStop(MonitorStopInput input) {
1092         LOG.debug("Monitor Stop operation for monitor id - {}", input.getMonitorId());
1093         SettableFuture<RpcResult<MonitorStopOutput>> result = SettableFuture.create();
1094
1095         final Uint32 monitorId = input.getMonitorId();
1096         Optional<MonitoringInfo> optInfo;
1097         try {
1098             optInfo = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
1099                 tx -> tx.read(getMonitoringInfoId(monitorId))).get();
1100         } catch (InterruptedException | ExecutionException e) {
1101             LOG.error("Error reading monitor {}", monitorId, e);
1102             optInfo = Optional.empty();
1103         }
1104         if (optInfo.isPresent()) {
1105             // Stop the monitoring task
1106             stopMonitoringTask(monitorId);
1107
1108             String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1109
1110             // Cleanup the Data store
1111             txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, tx -> {
1112                 if (monitorKey != null) {
1113                     tx.delete(getMonitorStateId(monitorKey));
1114                     monitorIdKeyCache.invalidate(monitorId.toJava());
1115                 }
1116
1117                 tx.delete(getMonitoringInfoId(monitorId));
1118
1119                 //Remove monitorid-key-map
1120                 tx.delete(getMonitorMapId(monitorId));
1121             }).addCallback(new FutureCallbackImpl("Delete monitor state with Id " + monitorId),
1122                     MoreExecutors.directExecutor());
1123
1124             MonitoringInfo info = optInfo.get();
1125             String interfaceName = getInterfaceName(info.getSource().getEndpointType());
1126             if (interfaceName != null) {
1127                 removeMonitorIdFromInterfaceAssociation(monitorId, interfaceName);
1128             }
1129             releaseIdForMonitoringInfo(info);
1130
1131             if (monitorKey != null) {
1132                 lockMap.remove(monitorKey);
1133             }
1134
1135             result.set(RpcResultBuilder.<MonitorStopOutput>success().build());
1136         } else {
1137             String errorMsg = "Do not have monitoring information associated with key " + monitorId;
1138             LOG.error("Delete monitoring operation Failed - {}", errorMsg);
1139             result.set(RpcResultBuilder.<MonitorStopOutput>failed().withError(ErrorType.APPLICATION, errorMsg).build());
1140         }
1141
1142         return result;
1143     }
1144
1145     private void removeMonitorIdFromInterfaceAssociation(final Uint32 monitorId, final String interfaceName) {
1146         LOG.debug("Remove monitorId {} from Interface association {}", monitorId, interfaceName);
1147         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1148         FluentFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
1149                 getInterfaceMonitorMapId(interfaceName));
1150         FluentFuture<? extends CommitInfo> updateFuture = readFuture.transformAsync(optEntry -> {
1151             if (optEntry.isPresent()) {
1152                 InterfaceMonitorEntry entry = optEntry.get();
1153                 List<Uint32> monitorIds =
1154                     entry.getMonitorIds() != null ? new ArrayList<>(entry.getMonitorIds()) : new ArrayList<>();
1155                 monitorIds.remove(monitorId);
1156                 if (monitorIds.isEmpty()) {
1157                     tx.delete(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
1158                 } else {
1159                     InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder(entry)
1160                             .withKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
1161                     tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry,
1162                             CREATE_MISSING_PARENT);
1163                 }
1164                 return tx.commit();
1165             } else {
1166                 LOG.warn("No Interface map entry found {} to remove monitorId {}", interfaceName, monitorId);
1167                 tx.cancel();
1168                 return Futures.immediateFuture(null);
1169             }
1170         }, MoreExecutors.directExecutor());
1171
1172         updateFuture.addCallback(new FutureCallbackImpl(
1173                 String.format("Dis-association of monitorId %s with Interface %s", monitorId, interfaceName)),
1174                 MoreExecutors.directExecutor());
1175     }
1176
1177     private void releaseIdForMonitoringInfo(MonitoringInfo info) {
1178         Uint32 monitorId = info.getId();
1179         EndpointType source = info.getSource().getEndpointType();
1180         String interfaceName = getInterfaceName(source);
1181         if (!Strings.isNullOrEmpty(interfaceName)) {
1182             Optional<MonitorProfile> optProfile;
1183             try {
1184                 optProfile = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
1185                     tx -> tx.read(getMonitorProfileId(info.getProfileId()))).get();
1186             } catch (InterruptedException | ExecutionException e) {
1187                 LOG.error("Error reading monitor profile for {}", info.getProfileId(), e);
1188                 optProfile = Optional.empty();
1189             }
1190             if (optProfile.isPresent()) {
1191                 MonitorProtocolType protocolType = optProfile.get().getProtocolType();
1192                 EndpointType destination = info.getDestination() != null ? info.getDestination().getEndpointType()
1193                         : null;
1194                 String idKey = getUniqueKey(interfaceName, protocolType.toString(), source, destination);
1195                 releaseId(idKey);
1196             } else {
1197                 LOG.warn("Could not release monitorId {}. No profile associated with it", monitorId);
1198             }
1199         }
1200     }
1201
1202     private String getInterfaceName(EndpointType endpoint) {
1203         String interfaceName = null;
1204         if (endpoint instanceof Interface) {
1205             interfaceName = ((Interface) endpoint).getInterfaceName();
1206         }
1207         return interfaceName;
1208     }
1209
1210     private void stopMonitoring(Uint32 monitorId) {
1211         updateMonitorStatusTo(monitorId, MonitorStatus.Stopped,
1212             currentStatus -> currentStatus != MonitorStatus.Stopped);
1213         if (!stopMonitoringTask(monitorId)) {
1214             LOG.warn("No monitoring task running to perform cancel operation for monitorId {}", monitorId);
1215         }
1216     }
1217
1218     private void updateMonitorStatusTo(final Uint32 monitorId, final MonitorStatus newStatus,
1219             final Predicate<MonitorStatus> isValidStatus) {
1220         final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1221         if (monitorKey == null) {
1222             LOG.warn("No monitor Key associated with id {} to change the monitor status to {}", monitorId, newStatus);
1223             return;
1224         }
1225         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1226
1227         FluentFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
1228                 getMonitorStateId(monitorKey));
1229
1230         FluentFuture<? extends CommitInfo> writeResult = readResult.transformAsync(optState -> {
1231             if (optState.isPresent()) {
1232                 MonitoringState state = optState.get();
1233                 if (isValidStatus.apply(state.getStatus())) {
1234                     MonitoringState updatedState = new MonitoringStateBuilder().setMonitorKey(monitorKey)
1235                                 .setStatus(newStatus).build();
1236                     tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), updatedState);
1237                 } else {
1238                     LOG.warn("Invalid Monitoring status {}, cannot be updated to {} for monitorId {}",
1239                                 state.getStatus(), newStatus, monitorId);
1240                 }
1241             } else {
1242                 LOG.warn("No associated monitoring state data available to update the status to {} for {}",
1243                            newStatus, monitorId);
1244             }
1245             return tx.commit();
1246         }, MoreExecutors.directExecutor());
1247
1248         writeResult.addCallback(new FutureCallbackImpl("Monitor status update for " + monitorId + " to " + newStatus),
1249                 MoreExecutors.directExecutor());
1250     }
1251
1252     private void resumeMonitoring(final Uint32 monitorId) {
1253         final ReadTransaction tx = dataBroker.newReadOnlyTransaction();
1254         FluentFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
1255                 getMonitoringInfoId(monitorId));
1256
1257         readInfoResult.addCallback(new FutureCallback<Optional<MonitoringInfo>>() {
1258
1259             @Override
1260             public void onFailure(Throwable error) {
1261                 String msg = "Unable to read monitoring info associated with monitor id " + monitorId;
1262                 LOG.error("Monitor resume Failed. {}", msg, error);
1263                 tx.close();
1264             }
1265
1266             @Override
1267             public void onSuccess(@NonNull Optional<MonitoringInfo> optInfo) {
1268                 if (optInfo.isPresent()) {
1269                     final MonitoringInfo info = optInfo.get();
1270                     ListenableFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
1271                             getMonitorProfileId(info.getProfileId()));
1272                     Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>() {
1273
1274                         @Override
1275                         public void onFailure(Throwable error) {
1276                             String msg = "Unable to read Monitoring profile associated with id " + info.getProfileId();
1277                             LOG.warn("Monitor resume Failed. {}", msg, error);
1278                             tx.close();
1279                         }
1280
1281                         @Override
1282                         public void onSuccess(@NonNull Optional<MonitorProfile> optProfile) {
1283                             tx.close();
1284                             if (optProfile.isPresent()) {
1285                                 updateMonitorStatusTo(monitorId, MonitorStatus.Started,
1286                                     currentStatus -> currentStatus != MonitorStatus.Started);
1287                                 MonitorProfile profile = optProfile.get();
1288                                 LOG.debug("Monitor Resume - Scheduling monitoring task for Id: {}", monitorId);
1289                                 scheduleMonitoringTask(info, profile.getMonitorInterval());
1290                             } else {
1291                                 String msg = String.format("Monitoring profile associated with id %s is not present",
1292                                         info.getProfileId());
1293                                 LOG.warn("Monitor resume Failed. {}", msg);
1294                             }
1295                         }
1296                     }, MoreExecutors.directExecutor());
1297                 } else {
1298                     tx.close();
1299                     String msg = String.format("Monitoring info associated with id %s is not present", monitorId);
1300                     LOG.warn("Monitor resume Failed. {}", msg);
1301                 }
1302             }
1303         }, MoreExecutors.directExecutor());
1304     }
1305
1306     @Override
1307     public void onInterfaceStateUp(String interfaceName) {
1308         List<Uint32> monitorIds = getMonitorIds(interfaceName);
1309         if (monitorIds.isEmpty()) {
1310             LOG.warn("Could not get monitorId for interface: {}", interfaceName);
1311             return;
1312         }
1313         for (Uint32 monitorId : monitorIds) {
1314             LOG.debug("Resume monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1315             resumeMonitoring(monitorId);
1316         }
1317     }
1318
1319     @Override
1320     public void onInterfaceStateDown(String interfaceName) {
1321         List<Uint32> monitorIds = getMonitorIds(interfaceName);
1322         if (monitorIds.isEmpty()) {
1323             LOG.warn("Could not get monitorIds for interface: {}", interfaceName);
1324             return;
1325         }
1326         for (Uint32 monitorId : monitorIds) {
1327             LOG.debug("Suspend monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1328             stopMonitoring(monitorId);
1329         }
1330     }
1331
1332     private List<Uint32> getMonitorIds(String interfaceName) {
1333         try {
1334             return txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
1335                 tx -> tx.read(getInterfaceMonitorMapId(interfaceName))).get().map(
1336                 InterfaceMonitorEntry::getMonitorIds).orElse(Collections.emptyList());
1337         } catch (InterruptedException | ExecutionException e) {
1338             LOG.error("Error retrieving monitor ids for {}", interfaceName, e);
1339             return Collections.emptyList();
1340         }
1341     }
1342
1343     //handle monitor stop
1344     @Override
1345     public void remove(@NonNull InstanceIdentifier<MonitoringState> instanceIdentifier,
1346                        @NonNull MonitoringState removedDataObject) {
1347         final Uint32 monitorId = removedDataObject.getMonitorId();
1348         LOG.debug("Monitor State remove listener invoked for monitor id: {}", monitorId);
1349
1350         if (removedDataObject.getStatus() != MonitorStatus.Paused) {
1351             ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
1352             if (scheduledFutureResult != null) {
1353                 LOG.debug("Stopping the task for Monitor id: {}", monitorId);
1354                 stopMonitoringTask(monitorId);
1355             }
1356         }
1357     }
1358
1359     //handle monitor pause
1360     @Override
1361     public void update(@NonNull InstanceIdentifier<MonitoringState> instanceIdentifier,
1362                        @NonNull MonitoringState originalDataObject,
1363                        @NonNull MonitoringState updatedDataObject) {
1364         final Uint32 monitorId = updatedDataObject.getMonitorId();
1365         LOG.debug("Monitor State update listener invoked for monitor id: {}", monitorId);
1366
1367         if (updatedDataObject.getStatus() == MonitorStatus.Paused
1368                 && originalDataObject.getStatus() != MonitorStatus.Paused) {
1369             ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
1370             if (scheduledFutureResult != null) {
1371                 LOG.debug("Stopping the task for Monitor id: {}", monitorId);
1372                 stopMonitoringTask(monitorId);
1373             }
1374         }
1375     }
1376 }