2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.alivenessmonitor.internal;
10 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getInterfaceMonitorMapId;
11 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorMapId;
12 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorProfileId;
13 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorStateId;
14 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitoringInfoId;
15 import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
17 import com.google.common.base.Preconditions;
18 import com.google.common.base.Predicate;
19 import com.google.common.base.Strings;
20 import com.google.common.cache.CacheBuilder;
21 import com.google.common.cache.CacheLoader;
22 import com.google.common.cache.LoadingCache;
23 import com.google.common.util.concurrent.FluentFuture;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.SettableFuture;
29 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.Optional;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.ScheduledExecutorService;
40 import java.util.concurrent.ScheduledFuture;
41 import java.util.concurrent.Semaphore;
42 import java.util.concurrent.TimeUnit;
43 import javax.annotation.PreDestroy;
44 import javax.inject.Inject;
45 import javax.inject.Singleton;
46 import org.apache.aries.blueprint.annotation.service.Reference;
47 import org.eclipse.jdt.annotation.NonNull;
48 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandler;
49 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
50 import org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil;
51 import org.opendaylight.genius.mdsalutil.packet.Ethernet;
52 import org.opendaylight.genius.mdsalutil.packet.utils.PacketUtil;
53 import org.opendaylight.infrautils.utils.concurrent.Executors;
54 import org.opendaylight.mdsal.binding.api.DataBroker;
55 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
56 import org.opendaylight.mdsal.binding.api.ReadTransaction;
57 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
58 import org.opendaylight.mdsal.binding.util.Datastore.Operational;
59 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
60 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
61 import org.opendaylight.mdsal.common.api.CommitInfo;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
64 import org.opendaylight.openflowplugin.libraries.liblldp.PacketException;
65 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredSyncDataTreeChangeListener;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.AlivenessMonitorService;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.LivenessState;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEvent;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEventBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseOutput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateInput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateOutput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileCreateOutputBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileDeleteInput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileDeleteOutput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetInput;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetOutput;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProfileGetOutputBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProtocolType;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartInput;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartOutput;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStartOutputBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStatus;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStopInput;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorStopOutput;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorUnpauseInput;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorUnpauseOutput;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitoringMode;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitoringStates;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntry;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntryBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411._interface.monitor.map.InterfaceMonitorEntryKey;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.EndpointType;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.endpoint.type.Interface;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.configs.MonitoringInfo;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.configs.MonitoringInfoBuilder;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.event.EventData;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.event.EventDataBuilder;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profile.create.input.Profile;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profiles.MonitorProfile;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.profiles.MonitorProfileBuilder;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitor.start.input.Config;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitorid.key.map.MonitoridKeyEntry;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitorid.key.map.MonitoridKeyEntryBuilder;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringState;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringStateBuilder;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInputBuilder;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInputBuilder;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutput;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
116 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInputBuilder;
117 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutput;
118 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInReason;
119 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
120 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
121 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
122 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
123 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
124 import org.opendaylight.yangtools.yang.common.RpcResult;
125 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
126 import org.opendaylight.yangtools.yang.common.Uint32;
127 import org.slf4j.Logger;
128 import org.slf4j.LoggerFactory;
131 public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListener<MonitoringState>
132 implements AlivenessMonitorService, PacketProcessingListener, InterfaceStateListener {
134 private static final Logger LOG = LoggerFactory.getLogger(AlivenessMonitor.class);
136 private static final int THREAD_POOL_SIZE = 4;
137 private static final boolean INTERRUPT_TASK = true;
138 private static final int NO_DELAY = 0;
139 private static final Long INITIAL_COUNT = 0L;
140 private static final boolean CREATE_MISSING_PARENT = true;
141 private static final int INVALID_ID = 0;
143 private static class FutureCallbackImpl implements FutureCallback<Object> {
144 private final String message;
146 FutureCallbackImpl(String message) {
147 this.message = message;
151 public void onFailure(Throwable error) {
152 LOG.warn("Error in Datastore operation - {}", message, error);
156 public void onSuccess(Object result) {
157 LOG.debug("Success in Datastore operation - {}", message);
161 private class AlivenessMonitorTask implements Runnable {
162 private final MonitoringInfo monitoringInfo;
164 AlivenessMonitorTask(MonitoringInfo monitoringInfo) {
165 this.monitoringInfo = monitoringInfo;
170 LOG.trace("send monitor packet - {}", monitoringInfo);
171 sendMonitorPacket(monitoringInfo);
175 private final DataBroker dataBroker;
176 private final ManagedNewTransactionRunner txRunner;
177 private final IdManagerService idManager;
178 private final NotificationPublishService notificationPublishService;
179 private final AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry;
180 private final ConcurrentMap<Uint32, ScheduledFuture<?>> monitoringTasks = new ConcurrentHashMap<>();
181 private final ScheduledExecutorService monitorService;
182 private final ExecutorService callbackExecutorService;
183 private final LoadingCache<Uint32, String> monitorIdKeyCache;
184 private final ConcurrentMap<String, Semaphore> lockMap = new ConcurrentHashMap<>();
187 public AlivenessMonitor(@Reference final DataBroker dataBroker, final IdManagerService idManager,
188 @Reference final NotificationPublishService notificationPublishService,
189 AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry) {
190 super(dataBroker, LogicalDatastoreType.OPERATIONAL,
191 InstanceIdentifier.create(MonitoringStates.class).child(MonitoringState.class));
192 this.dataBroker = dataBroker;
193 this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
194 this.idManager = idManager;
195 this.notificationPublishService = notificationPublishService;
196 this.alivenessProtocolHandlerRegistry = alivenessProtocolHandlerRegistry;
198 monitorService = Executors.newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Aliveness Monitoring Task", LOG);
199 callbackExecutorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE, "Aliveness Callback Handler", LOG);
202 monitorIdKeyCache = CacheBuilder.newBuilder().build(new CacheLoader<Uint32, String>() {
204 public String load(Uint32 monitorId) {
206 return txRunner.<Operational, ExecutionException, Optional<MonitoridKeyEntry>>
207 applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
208 tx -> tx.read(getMonitorMapId(monitorId)).get())
209 .map(MonitoridKeyEntry::getMonitorKey)
211 } catch (InterruptedException | ExecutionException e) {
212 LOG.error("Error reading monitor {}", monitorId, e);
218 LOG.info("{} started", getClass().getSimpleName());
223 public void close() {
224 monitorIdKeyCache.cleanUp();
225 monitorService.shutdown();
226 callbackExecutorService.shutdown();
227 LOG.info("{} close", getClass().getSimpleName());
230 Semaphore getLock(String key) {
231 return lockMap.get(key);
234 private void createIdPool() {
235 CreateIdPoolInput createPool = new CreateIdPoolInputBuilder()
236 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
237 .setLow(AlivenessMonitorConstants.MONITOR_IDPOOL_START)
238 .setHigh(AlivenessMonitorConstants.MONITOR_IDPOOL_SIZE).build();
239 Futures.addCallback(idManager.createIdPool(createPool), new FutureCallback<RpcResult<CreateIdPoolOutput>>() {
241 public void onFailure(Throwable error) {
242 LOG.error("Failed to create idPool for Aliveness Monitor Service", error);
246 public void onSuccess(@NonNull RpcResult<CreateIdPoolOutput> result) {
247 if (result.isSuccessful()) {
248 LOG.debug("Created IdPool for Aliveness Monitor Service");
250 LOG.error("RPC to create Idpool failed {}", result.getErrors());
253 }, callbackExecutorService);
256 private int getUniqueId(final String idKey) {
257 AllocateIdInput getIdInput = new AllocateIdInputBuilder()
258 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME).setIdKey(idKey).build();
260 Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(getIdInput);
263 RpcResult<AllocateIdOutput> rpcResult = result.get();
264 if (rpcResult.isSuccessful()) {
265 return rpcResult.getResult().getIdValue().intValue();
267 LOG.warn("RPC Call to Get Unique Id returned with Errors {}", rpcResult.getErrors());
269 } catch (InterruptedException | ExecutionException e) {
270 LOG.warn("Exception when getting Unique Id for key {}", idKey, e);
275 private void releaseId(String idKey) {
276 ReleaseIdInput idInput = new ReleaseIdInputBuilder().setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
277 .setIdKey(idKey).build();
279 ListenableFuture<RpcResult<ReleaseIdOutput>> result = idManager.releaseId(idInput);
280 RpcResult<ReleaseIdOutput> rpcResult = result.get();
281 if (!rpcResult.isSuccessful()) {
282 LOG.warn("RPC Call to release Id {} returned with Errors {}", idKey, rpcResult.getErrors());
284 } catch (InterruptedException | ExecutionException e) {
285 LOG.warn("Exception when releasing Id for key {}", idKey, e);
290 public void onPacketReceived(PacketReceived packetReceived) {
291 Class<? extends PacketInReason> pktInReason = packetReceived.getPacketInReason();
292 if (LOG.isTraceEnabled()) {
293 LOG.trace("Packet Received {}", packetReceived);
295 if (pktInReason != SendToController.class) {
298 byte[] data = packetReceived.getPayload();
299 Packet protocolPacket = null;
300 AlivenessProtocolHandler<Packet> livenessProtocolHandler = null;
302 if (!PacketUtil.isIpv6NaPacket(data)) {
303 Packet packetInFormatted;
304 Ethernet res = new Ethernet();
306 packetInFormatted = res.deserialize(data, 0, data.length * Byte.SIZE);
307 } catch (PacketException e) {
308 LOG.warn("Failed to decode packet: ", e);
312 if (packetInFormatted == null) {
313 LOG.warn("Failed to deserialize Received Packet from table {}", packetReceived.getTableId().getValue());
317 protocolPacket = packetInFormatted.getPayload();
318 if (protocolPacket == null) {
319 LOG.trace("Unsupported packet type. Ignoring the packet...");
322 if (LOG.isTraceEnabled()) {
323 LOG.trace("onPacketReceived packet: {}, packet class: {}", packetReceived, protocolPacket.getClass());
325 livenessProtocolHandler = getAlivenessProtocolHandler(protocolPacket.getClass());
327 } else if (PacketUtil.isIpv6NaPacket(data)) {
328 livenessProtocolHandler = getAlivenessProtocolHandler(MonitorProtocolType.Ipv6Nd);
330 if (livenessProtocolHandler == null) {
334 String monitorKey = livenessProtocolHandler.handlePacketIn(protocolPacket, packetReceived);
335 if (monitorKey != null) {
336 processReceivedMonitorKey(monitorKey);
338 LOG.debug("No monitorkey associated with received packet");
342 @SuppressWarnings("unchecked")
343 private AlivenessProtocolHandler<Packet> getAlivenessProtocolHandler(Class<? extends Packet> protocolHandlerClass) {
344 return (AlivenessProtocolHandler<Packet>) alivenessProtocolHandlerRegistry.getOpt(protocolHandlerClass);
347 @SuppressWarnings("unchecked")
348 private AlivenessProtocolHandler<Packet> getAlivenessProtocolHandler(MonitorProtocolType protocolType) {
349 return (AlivenessProtocolHandler<Packet>) alivenessProtocolHandlerRegistry.get(protocolType);
352 private void processReceivedMonitorKey(final String monitorKey) {
354 final MonitoringState currentState;
355 final MonitoringState state;
357 Result(MonitoringState currentState, MonitoringState state) {
358 this.currentState = currentState;
363 Preconditions.checkNotNull(monitorKey, "Monitor Key required to process the state");
365 LOG.debug("Processing monitorKey: {} for received packet", monitorKey);
367 final Semaphore lock = lockMap.get(monitorKey);
368 LOG.debug("Acquiring lock for monitor key : {} to process monitor packet", monitorKey);
371 txRunner.applyWithNewReadWriteTransactionAndSubmit(OPERATIONAL, tx -> {
372 Optional<MonitoringState> optState = tx.read(getMonitorStateId(monitorKey)).get();
373 if (optState.isPresent()) {
374 final MonitoringState currentState = optState.get();
376 if (LOG.isTraceEnabled()) {
377 LOG.trace("OnPacketReceived : Monitoring state from ODS : {} ", currentState);
380 // Long responsePendingCount = currentState.getResponsePendingCount();
382 // Need to relook at the pending count logic to support N
383 // out of M scenarios
384 // if (currentState.getState() != LivenessState.Up) {
385 // //Reset responsePendingCount when state changes from DOWN
387 // responsePendingCount = INITIAL_COUNT;
390 // if (responsePendingCount > INITIAL_COUNT) {
391 // responsePendingCount =
392 // currentState.getResponsePendingCount() - 1;
394 Long responsePendingCount = INITIAL_COUNT;
396 final MonitoringState state = new MonitoringStateBuilder().setMonitorKey(monitorKey)
397 .setState(LivenessState.Up).setResponsePendingCount(responsePendingCount).build();
398 tx.merge(getMonitorStateId(monitorKey), state);
400 return Optional.of(new Result(currentState, state));
402 if (LOG.isTraceEnabled()) {
403 LOG.trace("Monitoring State not available for key: {} to process the Packet received",
406 return Optional.<Result>empty();
408 }).addCallback(new FutureCallback<Optional<Result>>() {
410 public void onSuccess(Optional<Result> optResult) {
412 optResult.ifPresent(result -> {
413 final boolean stateChanged = result.currentState.getState() == LivenessState.Down
414 || result.currentState.getState() == LivenessState.Unknown;
416 // send notifications
417 if (LOG.isTraceEnabled()) {
418 LOG.trace("Sending notification for monitor Id : {} with Current State: {}",
419 result.currentState.getMonitorId(), LivenessState.Up);
421 publishNotification(result.currentState.getMonitorId(), LivenessState.Up);
423 if (LOG.isTraceEnabled()) {
424 LOG.trace("Successful in writing monitoring state {} to ODS", result.state);
431 public void onFailure(Throwable error) {
433 LOG.warn("Error in reading or writing monitoring state : {} to Datastore", monitorKey, error);
435 }, callbackExecutorService);
438 private String getUniqueKey(String interfaceName, String protocolType, EndpointType source,
439 EndpointType destination) {
440 StringBuilder builder = new StringBuilder().append(interfaceName).append(AlivenessMonitorConstants.SEPERATOR)
441 .append(protocolType);
442 if (source != null) {
443 builder.append(AlivenessMonitorConstants.SEPERATOR).append(AlivenessMonitorUtil.getIpAddress(source));
446 if (destination != null) {
447 builder.append(AlivenessMonitorConstants.SEPERATOR).append(AlivenessMonitorUtil.getIpAddress(destination));
449 return builder.toString();
453 public ListenableFuture<RpcResult<MonitorStartOutput>> monitorStart(MonitorStartInput input) {
454 RpcResultBuilder<MonitorStartOutput> rpcResultBuilder;
455 final Config in = input.getConfig();
456 Uint32 profileId = in.getProfileId();
457 LOG.debug("Monitor Start invoked with Config: {}, Profile Id: {}", in, profileId);
460 if (in.getMode() != MonitoringMode.OneOne) {
461 throw new UnsupportedConfigException(
462 "Unsupported Monitoring mode. Currently one-one mode is supported");
465 Optional<MonitorProfile> optProfile =
466 txRunner.<Operational, ExecutionException, Optional<MonitorProfile>>
467 applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
468 tx -> tx.read(getMonitorProfileId(profileId)).get());
469 final MonitorProfile profile;
470 if (!optProfile.isPresent()) {
471 String errMsg = "No monitoring profile associated with Id: " + profileId;
472 LOG.error("Monitor start failed. {}", errMsg);
473 throw new RuntimeException(errMsg);
475 profile = optProfile.get();
478 final MonitorProtocolType protocolType = profile.getProtocolType();
480 String interfaceName = null;
481 EndpointType srcEndpointType = in.getSource().getEndpointType();
483 if (srcEndpointType instanceof Interface) {
484 Interface endPoint = (Interface) srcEndpointType;
485 interfaceName = endPoint.getInterfaceName();
487 throw new UnsupportedConfigException(
488 "Unsupported source Endpoint type. Only Interface Endpoint currently supported for monitoring");
491 if (Strings.isNullOrEmpty(interfaceName)) {
492 throw new RuntimeException("Interface Name not defined in the source Endpoint");
495 // Initially the support is for one monitoring per interface.
496 // Revisit the retrieving monitor id logic when the multiple
497 // monitoring for same interface is needed.
498 EndpointType destEndpointType = null;
499 if (in.getDestination() != null) {
500 destEndpointType = in.getDestination().getEndpointType();
502 String idKey = getUniqueKey(interfaceName, protocolType.toString(), srcEndpointType, destEndpointType);
503 final Uint32 monitorId = Uint32.valueOf(getUniqueId(idKey));
504 Optional<MonitoringInfo> optKey =
505 txRunner.<Operational, ExecutionException, Optional<MonitoringInfo>>
506 applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
507 tx -> tx.read(getMonitoringInfoId(monitorId)).get());
508 final AlivenessProtocolHandler<?> handler;
509 if (optKey.isPresent()) {
510 String message = String.format(
511 "Monitoring for the interface %s with this configuration " + "is already registered.",
513 LOG.warn("Monitoring for the interface {} with this configuration is already registered.",
515 MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
516 rpcResultBuilder = RpcResultBuilder.success(output).withWarning(ErrorType.APPLICATION,
517 "config-exists", message);
518 return Futures.immediateFuture(rpcResultBuilder.build());
520 // Construct the monitor key
521 final MonitoringInfo monitoringInfo = new MonitoringInfoBuilder().setId(monitorId).setMode(in.getMode())
522 .setProfileId(profileId).setDestination(in.getDestination()).setSource(in.getSource()).build();
523 // Construct the initial monitor state
524 handler = alivenessProtocolHandlerRegistry.get(protocolType);
525 final String monitoringKey = handler.getUniqueMonitoringKey(monitoringInfo);
527 MonitoringStateBuilder monitoringStateBuilder =
528 new MonitoringStateBuilder().setMonitorKey(monitoringKey).setMonitorId(monitorId)
529 .setState(LivenessState.Unknown).setStatus(MonitorStatus.Started);
530 if (protocolType != MonitorProtocolType.Bfd) {
531 monitoringStateBuilder.setRequestCount(INITIAL_COUNT).setResponsePendingCount(INITIAL_COUNT);
533 MonitoringState monitoringState = monitoringStateBuilder.build();
535 txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, operTx -> {
536 operTx.put(getMonitoringInfoId(monitorId), monitoringInfo, CREATE_MISSING_PARENT);
537 LOG.debug("adding oper monitoring info {}", monitoringInfo);
539 operTx.put(getMonitorStateId(monitoringKey), monitoringState, CREATE_MISSING_PARENT);
540 LOG.debug("adding oper monitoring state {}", monitoringState);
542 MonitoridKeyEntry mapEntry = new MonitoridKeyEntryBuilder().setMonitorId(monitorId)
543 .setMonitorKey(monitoringKey).build();
544 operTx.put(getMonitorMapId(monitorId), mapEntry, CREATE_MISSING_PARENT);
545 LOG.debug("adding oper map entry {}", mapEntry);
546 }).addCallback(new FutureCallback<Object>() {
548 public void onFailure(Throwable error) {
549 String errorMsg = String.format("Adding Monitoring info: %s in Datastore failed",
551 LOG.warn("Adding Monitoring info: {} in Datastore failed", monitoringInfo, error);
552 throw new RuntimeException(errorMsg, error);
556 public void onSuccess(Object ignored) {
557 lockMap.put(monitoringKey, new Semaphore(1, true));
558 if (protocolType == MonitorProtocolType.Bfd) {
559 handler.startMonitoringTask(monitoringInfo);
563 LOG.debug("Scheduling monitor task for config: {}", in);
564 scheduleMonitoringTask(monitoringInfo, profile.getMonitorInterval());
566 }, callbackExecutorService);
569 associateMonitorIdWithInterface(monitorId, interfaceName);
571 MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
573 rpcResultBuilder = RpcResultBuilder.success(output);
574 } catch (UnsupportedConfigException | ExecutionException | InterruptedException e) {
575 LOG.error("Start Monitoring Failed. ", e);
576 rpcResultBuilder = RpcResultBuilder.<MonitorStartOutput>failed().withError(ErrorType.APPLICATION,
579 return Futures.immediateFuture(rpcResultBuilder.build());
582 private void associateMonitorIdWithInterface(final Uint32 monitorId, final String interfaceName) {
583 LOG.debug("associate monitor Id {} with interface {}", monitorId, interfaceName);
584 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
585 FluentFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
586 getInterfaceMonitorMapId(interfaceName));
587 FluentFuture<? extends CommitInfo> updateFuture = readFuture.transformAsync(optEntry -> {
588 if (optEntry.isPresent()) {
589 InterfaceMonitorEntry entry = optEntry.get();
590 List<Uint32> monitorIds1 =
591 entry.getMonitorIds() != null ? new ArrayList<>(entry.getMonitorIds()) : new ArrayList<>();
592 monitorIds1.add(monitorId);
593 InterfaceMonitorEntry newEntry1 = new InterfaceMonitorEntryBuilder()
594 .withKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds1).build();
595 tx.merge(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry1);
597 // Create new monitor entry
598 LOG.debug("Adding new interface-monitor association for interface {} with id {}", interfaceName,
600 List<Uint32> monitorIds2 = new ArrayList<>();
601 monitorIds2.add(monitorId);
602 InterfaceMonitorEntry newEntry2 = new InterfaceMonitorEntryBuilder()
603 .setInterfaceName(interfaceName).setMonitorIds(monitorIds2).build();
604 tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry2,
605 CREATE_MISSING_PARENT);
608 }, callbackExecutorService);
610 Futures.addCallback(updateFuture, new FutureCallbackImpl(
611 "Association of monitorId " + monitorId + " with Interface " + interfaceName),
612 MoreExecutors.directExecutor());
615 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
616 justification = "https://github.com/spotbugs/spotbugs/issues/811")
617 private void scheduleMonitoringTask(MonitoringInfo monitoringInfo, Uint32 monitorInterval) {
618 AlivenessMonitorTask monitorTask = new AlivenessMonitorTask(monitoringInfo);
619 ScheduledFuture<?> scheduledFutureResult = monitorService.scheduleAtFixedRate(monitorTask, NO_DELAY,
620 monitorInterval.toJava(), TimeUnit.MILLISECONDS);
621 monitoringTasks.put(monitoringInfo.getId(), scheduledFutureResult);
625 public ListenableFuture<RpcResult<MonitorPauseOutput>> monitorPause(MonitorPauseInput input) {
626 LOG.debug("Monitor Pause operation invoked for monitor id: {}", input.getMonitorId());
627 SettableFuture<RpcResult<MonitorPauseOutput>> result = SettableFuture.create();
628 final Uint32 monitorId = input.getMonitorId();
630 // Set the monitoring status to Paused
631 updateMonitorStatusTo(monitorId, MonitorStatus.Paused, currentStatus -> currentStatus == MonitorStatus.Started);
633 if (stopMonitoringTask(monitorId)) {
634 result.set(RpcResultBuilder.<MonitorPauseOutput>success().build());
636 String errorMsg = "No Monitoring Task availble to pause for the given monitor id : " + monitorId;
637 LOG.error("Monitor Pause operation failed- {}", errorMsg);
638 result.set(RpcResultBuilder.<MonitorPauseOutput>failed()
639 .withError(ErrorType.APPLICATION, errorMsg).build());
646 public ListenableFuture<RpcResult<MonitorUnpauseOutput>> monitorUnpause(MonitorUnpauseInput input) {
647 LOG.debug("Monitor Unpause operation invoked for monitor id: {}", input.getMonitorId());
648 final SettableFuture<RpcResult<MonitorUnpauseOutput>> result = SettableFuture.create();
650 final Uint32 monitorId = input.getMonitorId();
651 final ReadTransaction tx = dataBroker.newReadOnlyTransaction();
652 FluentFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
653 getMonitoringInfoId(monitorId));
655 readInfoResult.addCallback(new FutureCallback<Optional<MonitoringInfo>>() {
658 public void onFailure(Throwable error) {
660 String msg = "Unable to read monitoring info associated with monitor id " + monitorId;
661 LOG.error("Monitor unpause Failed. {}", msg, error);
662 result.set(RpcResultBuilder.<MonitorUnpauseOutput>failed()
663 .withError(ErrorType.APPLICATION, msg, error).build());
667 public void onSuccess(@NonNull Optional<MonitoringInfo> optInfo) {
668 if (optInfo.isPresent()) {
669 final MonitoringInfo info = optInfo.get();
670 FluentFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
671 getMonitorProfileId(info.getProfileId()));
672 readProfile.addCallback(new FutureCallback<Optional<MonitorProfile>>() {
675 public void onFailure(Throwable error) {
677 String msg = "Unable to read Monitoring profile associated with id " + info.getProfileId();
678 LOG.warn("Monitor unpause Failed. {}", msg, error);
679 result.set(RpcResultBuilder.<MonitorUnpauseOutput>failed()
680 .withError(ErrorType.APPLICATION, msg, error).build());
684 public void onSuccess(@NonNull Optional<MonitorProfile> optProfile) {
686 if (optProfile.isPresent()) {
687 updateMonitorStatusTo(monitorId, MonitorStatus.Started,
688 currentStatus -> (currentStatus == MonitorStatus.Paused
689 || currentStatus == MonitorStatus.Stopped));
690 MonitorProfile profile = optProfile.get();
691 LOG.debug("Monitor Resume - Scheduling monitoring task with Id: {}", monitorId);
692 MonitorProtocolType protocolType = profile.getProtocolType();
693 if (protocolType == MonitorProtocolType.Bfd) {
694 LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
695 ((HwVtepTunnelsStateHandler) alivenessProtocolHandlerRegistry.get(protocolType))
696 .resetMonitoringTask(true);
698 scheduleMonitoringTask(info, profile.getMonitorInterval());
700 result.set(RpcResultBuilder.<MonitorUnpauseOutput>success().build());
702 String msg = String.format("Monitoring profile associated with id %s is not present",
703 info.getProfileId());
704 LOG.warn("Monitor unpause Failed. {}", msg);
706 RpcResultBuilder.<MonitorUnpauseOutput>failed()
707 .withError(ErrorType.APPLICATION, msg).build());
710 }, callbackExecutorService);
713 String msg = String.format("Monitoring info associated with id %s is not present", monitorId);
714 LOG.warn("Monitor unpause Failed. {}", msg);
715 result.set(RpcResultBuilder.<MonitorUnpauseOutput>failed()
716 .withError(ErrorType.APPLICATION, msg).build());
719 }, callbackExecutorService);
724 private boolean stopMonitoringTask(Uint32 monitorId) {
725 return stopMonitoringTask(monitorId, INTERRUPT_TASK);
728 private boolean stopMonitoringTask(Uint32 monitorId, boolean interruptTask) {
729 Optional<MonitoringInfo> optInfo;
731 optInfo = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
732 tx -> tx.read(getMonitoringInfoId(monitorId))).get();
733 } catch (InterruptedException | ExecutionException e) {
734 LOG.error("Error reading monitor {}", monitorId, e);
735 optInfo = Optional.empty();
737 if (!optInfo.isPresent()) {
738 LOG.warn("There is no monitoring info present for monitor id {}", monitorId);
741 MonitoringInfo monitoringInfo = optInfo.get();
742 Optional<MonitorProfile> optProfile;
744 optProfile = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
745 tx -> tx.read(getMonitorProfileId(monitoringInfo.getProfileId()))).get();
746 } catch (InterruptedException | ExecutionException e) {
747 LOG.error("Error reading monitor profile for {}", monitorId, e);
748 optProfile = Optional.empty();
750 MonitorProtocolType protocolType = optProfile.get().getProtocolType();
751 if (protocolType == MonitorProtocolType.Bfd) {
752 LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
753 ((HwVtepTunnelsStateHandler) alivenessProtocolHandlerRegistry.get(protocolType))
754 .resetMonitoringTask(false);
757 ScheduledFuture<?> scheduledFutureResult = monitoringTasks.remove(monitorId);
758 if (scheduledFutureResult != null) {
759 scheduledFutureResult.cancel(interruptTask);
765 Optional<MonitorProfile> getMonitorProfile(Uint32 profileId) {
767 return txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
768 tx -> tx.read(getMonitorProfileId(profileId))).get();
769 } catch (InterruptedException | ExecutionException e) {
770 LOG.error("Error reading monitor profile for {}", profileId, e);
771 return Optional.empty();
775 void acquireLock(Semaphore lock) {
780 boolean acquiredLock = false;
782 acquiredLock = lock.tryAcquire(50, TimeUnit.MILLISECONDS);
783 } catch (InterruptedException e) {
784 LOG.warn("Thread interrupted when waiting to acquire the lock");
788 LOG.warn("Previous transaction did not complete in time. Releasing the lock to proceed");
792 LOG.trace("Lock acquired successfully");
793 } catch (InterruptedException e) {
794 LOG.warn("Acquire failed");
797 LOG.trace("Lock acquired successfully");
801 void releaseLock(Semaphore lock) {
807 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
808 justification = "https://github.com/spotbugs/spotbugs/issues/811")
809 private void sendMonitorPacket(final MonitoringInfo monitoringInfo) {
810 // TODO: Handle interrupts
811 final Uint32 monitorId = monitoringInfo.getId();
812 final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
813 if (monitorKey == null) {
814 LOG.warn("No monitor Key associated with id {} to send the monitor packet", monitorId);
817 LOG.debug("Sending monitoring packet for key: {}", monitorKey);
820 final MonitorProfile profile;
821 Optional<MonitorProfile> optProfile = getMonitorProfile(monitoringInfo.getProfileId());
822 if (optProfile.isPresent()) {
823 profile = optProfile.get();
825 LOG.warn("No monitor profile associated with id {}. " + "Could not send Monitor packet for monitor-id {}",
826 monitoringInfo.getProfileId(), monitorId);
830 final Semaphore lock = lockMap.get(monitorKey);
831 LOG.debug("Acquiring lock for monitor key : {} to send monitor packet", monitorKey);
834 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
835 FluentFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
836 getMonitorStateId(monitorKey));
837 FluentFuture<? extends CommitInfo> writeResult = readResult.transformAsync(optState -> {
838 if (optState.isPresent()) {
839 MonitoringState state = optState.get();
841 // Increase the request count
842 Long requestCount = state.getRequestCount().toJava() + 1;
844 // Check with the monitor window
845 LivenessState currentLivenessState = state.getState();
847 // Increase the pending response count
848 long responsePendingCount = state.getResponsePendingCount().toJava();
849 if (responsePendingCount < profile.getMonitorWindow().toJava()) {
850 responsePendingCount = responsePendingCount + 1;
853 // Check with the failure threshold
854 if (responsePendingCount >= profile.getFailureThreshold().toJava()) {
855 // Change the state to down and notify
856 if (currentLivenessState != LivenessState.Down) {
857 LOG.debug("Response pending Count: {}, Failure threshold: {} for monitorId {}",
858 responsePendingCount, profile.getFailureThreshold(), state.getMonitorId());
859 LOG.info("Sending notification for monitor Id : {} with State: {}",
860 state.getMonitorId(), LivenessState.Down);
861 publishNotification(monitorId, LivenessState.Down);
862 currentLivenessState = LivenessState.Down;
863 // Reset requestCount when state changes
865 requestCount = INITIAL_COUNT;
869 // Update the ODS with state
870 MonitoringState updatedState = new MonitoringStateBuilder(/* state */)
871 .setMonitorKey(state.getMonitorKey()).setRequestCount(requestCount)
872 .setResponsePendingCount(responsePendingCount).setState(currentLivenessState).build();
873 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(state.getMonitorKey()),
877 // Close the transaction
879 String errorMsg = String.format(
880 "Monitoring State associated with id %s is not present to send packet out.", monitorId);
881 return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
883 }, callbackExecutorService);
885 writeResult.addCallback(new FutureCallback<CommitInfo>() {
887 public void onSuccess(CommitInfo noarg) {
888 // invoke packetout on protocol handler
889 AlivenessProtocolHandler<?> handler =
890 alivenessProtocolHandlerRegistry.getOpt(profile.getProtocolType());
891 if (handler != null) {
892 LOG.debug("Sending monitoring packet {}", monitoringInfo);
893 handler.startMonitoringTask(monitoringInfo);
899 public void onFailure(Throwable error) {
900 LOG.warn("Updating monitoring state for key: {} failed. Monitoring packet is not sent", monitorKey,
904 }, callbackExecutorService);
907 void publishNotification(final Uint32 monitorId, final LivenessState state) {
908 LOG.debug("Sending notification for id {} - state {}", monitorId, state);
909 EventData data = new EventDataBuilder().setMonitorId(monitorId).setMonitorState(state).build();
910 MonitorEvent event = new MonitorEventBuilder().setEventData(data).build();
911 final ListenableFuture<?> eventFuture = notificationPublishService.offerNotification(event);
912 Futures.addCallback(eventFuture, new FutureCallback<Object>() {
914 public void onFailure(Throwable error) {
915 LOG.warn("Error in notifying listeners for id {} - state {}", monitorId, state, error);
919 public void onSuccess(Object arg) {
920 LOG.trace("Successful in notifying listeners for id {} - state {}", monitorId, state);
922 }, callbackExecutorService);
926 public ListenableFuture<RpcResult<MonitorProfileCreateOutput>> monitorProfileCreate(
927 final MonitorProfileCreateInput input) {
928 LOG.debug("Monitor Profile Create operation - {}", input.getProfile());
929 final SettableFuture<RpcResult<MonitorProfileCreateOutput>> returnFuture = SettableFuture.create();
930 Profile profile = input.getProfile();
931 final Uint32 failureThreshold = profile.getFailureThreshold();
932 final Uint32 monitorInterval = profile.getMonitorInterval();
933 final Uint32 monitorWindow = profile.getMonitorWindow();
934 final MonitorProtocolType protocolType = profile.getProtocolType();
935 String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, protocolType);
936 final Uint32 profileId = Uint32.valueOf(getUniqueId(idKey));
938 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
939 FluentFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
940 getMonitorProfileId(profileId));
941 FluentFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture = readFuture.transformAsync(
943 if (optProfile.isPresent()) {
945 MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
946 .setProfileId(profileId).build();
947 String msg = String.format("Monitor profile %s already present for the given input", input);
949 returnFuture.set(RpcResultBuilder.success(output)
950 .withWarning(ErrorType.PROTOCOL, "profile-exists", msg).build());
952 final MonitorProfile monitorProfile = new MonitorProfileBuilder().setId(profileId)
953 .setFailureThreshold(failureThreshold).setMonitorInterval(monitorInterval)
954 .setMonitorWindow(monitorWindow).setProtocolType(protocolType).build();
955 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId), monitorProfile,
956 CREATE_MISSING_PARENT);
957 tx.commit().addCallback(new FutureCallback<CommitInfo>() {
959 public void onFailure(Throwable error) {
960 String msg = String.format("Error when storing monitorprofile %s in datastore",
962 LOG.error("Error when storing monitorprofile {} in datastore", monitorProfile, error);
963 returnFuture.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
964 .withError(ErrorType.APPLICATION, msg, error).build());
968 public void onSuccess(CommitInfo noarg) {
969 MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
970 .setProfileId(profileId).build();
971 returnFuture.set(RpcResultBuilder.success(output).build());
973 }, callbackExecutorService);
976 }, callbackExecutorService);
977 resultFuture.addCallback(new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
979 public void onFailure(Throwable error) {
980 // This would happen when any error happens during reading for
981 // monitoring profile
982 String msg = String.format("Error in creating monitorprofile - %s", input);
983 returnFuture.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
984 .withError(ErrorType.APPLICATION, msg, error).build());
985 LOG.error("Error in creating monitorprofile - {} ", input, error);
989 public void onSuccess(RpcResult<MonitorProfileCreateOutput> result) {
990 LOG.debug("Successfully created monitor Profile {} ", input);
992 }, callbackExecutorService);
997 public ListenableFuture<RpcResult<MonitorProfileGetOutput>> monitorProfileGet(MonitorProfileGetInput input) {
998 LOG.debug("Monitor Profile Get operation for input profile- {}", input.getProfile());
999 RpcResultBuilder<MonitorProfileGetOutput> rpcResultBuilder;
1000 final Long profileId = getExistingProfileId(input);
1002 MonitorProfileGetOutputBuilder output = new MonitorProfileGetOutputBuilder().setProfileId(profileId);
1003 rpcResultBuilder = RpcResultBuilder.success();
1004 rpcResultBuilder.withResult(output.build());
1005 return Futures.immediateFuture(rpcResultBuilder.build());
1008 private Long getExistingProfileId(MonitorProfileGetInput input) {
1009 org.opendaylight.yang.gen.v1.urn.opendaylight.genius
1010 .alivenessmonitor.rev160411.monitor.profile.get.input.Profile profile = input
1012 final Uint32 failureThreshold = profile.getFailureThreshold();
1013 final Uint32 monitorInterval = profile.getMonitorInterval();
1014 final Uint32 monitorWindow = profile.getMonitorWindow();
1015 final MonitorProtocolType protocolType = profile.getProtocolType();
1016 LOG.debug("getExistingProfileId for profile : {}", input.getProfile());
1017 String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, protocolType);
1018 LOG.debug("Obtained existing profile ID for profile : {}", input.getProfile());
1019 return (long) getUniqueId(idKey);
1022 private String getUniqueProfileKey(Uint32 failureThreshold, Uint32 monitorInterval, Uint32 monitorWindow,
1023 MonitorProtocolType protocolType) {
1024 return String.valueOf(failureThreshold) + AlivenessMonitorConstants.SEPERATOR + monitorInterval
1025 + AlivenessMonitorConstants.SEPERATOR + monitorWindow + AlivenessMonitorConstants.SEPERATOR
1026 + protocolType + AlivenessMonitorConstants.SEPERATOR;
1030 public ListenableFuture<RpcResult<MonitorProfileDeleteOutput>> monitorProfileDelete(
1031 final MonitorProfileDeleteInput input) {
1032 LOG.debug("Monitor Profile delete for Id: {}", input.getProfileId());
1033 final SettableFuture<RpcResult<MonitorProfileDeleteOutput>> result = SettableFuture.create();
1034 final Uint32 profileId = input.getProfileId();
1035 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1036 FluentFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
1037 getMonitorProfileId(profileId));
1038 FluentFuture<RpcResult<MonitorProfileDeleteOutput>> writeFuture =
1039 readFuture.transformAsync(optProfile -> {
1040 if (optProfile.isPresent()) {
1041 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
1042 tx.commit().addCallback(new FutureCallback<CommitInfo>() {
1044 public void onFailure(Throwable error) {
1045 String msg = String.format("Error when removing monitor profile %s from datastore",
1047 LOG.error("Error when removing monitor profile {} from datastore", profileId, error);
1048 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>failed()
1049 .withError(ErrorType.APPLICATION, msg, error)
1054 public void onSuccess(CommitInfo noarg) {
1055 MonitorProfile profile = optProfile.get();
1056 String id = getUniqueProfileKey(profile.getFailureThreshold(),
1057 profile.getMonitorInterval(), profile.getMonitorWindow(),
1058 profile.getProtocolType());
1060 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>success().build());
1062 }, callbackExecutorService);
1064 String msg = String.format("Monitor profile with Id: %s does not exist", profileId);
1066 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>success()
1067 .withWarning(ErrorType.PROTOCOL, "invalid-value", msg).build());
1070 }, callbackExecutorService);
1072 writeFuture.addCallback(new FutureCallback<RpcResult<MonitorProfileDeleteOutput>>() {
1075 public void onFailure(Throwable error) {
1076 String msg = String.format("Error when removing monitor profile %s from datastore", profileId);
1077 LOG.error("Error when removing monitor profile {} from datastore", profileId, error);
1078 result.set(RpcResultBuilder.<MonitorProfileDeleteOutput>failed()
1079 .withError(ErrorType.APPLICATION, msg, error).build());
1083 public void onSuccess(RpcResult<MonitorProfileDeleteOutput> noarg) {
1084 LOG.debug("Successfully removed Monitor Profile {}", profileId);
1086 }, callbackExecutorService);
1091 public ListenableFuture<RpcResult<MonitorStopOutput>> monitorStop(MonitorStopInput input) {
1092 LOG.debug("Monitor Stop operation for monitor id - {}", input.getMonitorId());
1093 SettableFuture<RpcResult<MonitorStopOutput>> result = SettableFuture.create();
1095 final Uint32 monitorId = input.getMonitorId();
1096 Optional<MonitoringInfo> optInfo;
1098 optInfo = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
1099 tx -> tx.read(getMonitoringInfoId(monitorId))).get();
1100 } catch (InterruptedException | ExecutionException e) {
1101 LOG.error("Error reading monitor {}", monitorId, e);
1102 optInfo = Optional.empty();
1104 if (optInfo.isPresent()) {
1105 // Stop the monitoring task
1106 stopMonitoringTask(monitorId);
1108 String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1110 // Cleanup the Data store
1111 txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, tx -> {
1112 if (monitorKey != null) {
1113 tx.delete(getMonitorStateId(monitorKey));
1114 monitorIdKeyCache.invalidate(monitorId.toJava());
1117 tx.delete(getMonitoringInfoId(monitorId));
1119 //Remove monitorid-key-map
1120 tx.delete(getMonitorMapId(monitorId));
1121 }).addCallback(new FutureCallbackImpl("Delete monitor state with Id " + monitorId),
1122 MoreExecutors.directExecutor());
1124 MonitoringInfo info = optInfo.get();
1125 String interfaceName = getInterfaceName(info.getSource().getEndpointType());
1126 if (interfaceName != null) {
1127 removeMonitorIdFromInterfaceAssociation(monitorId, interfaceName);
1129 releaseIdForMonitoringInfo(info);
1131 if (monitorKey != null) {
1132 lockMap.remove(monitorKey);
1135 result.set(RpcResultBuilder.<MonitorStopOutput>success().build());
1137 String errorMsg = "Do not have monitoring information associated with key " + monitorId;
1138 LOG.error("Delete monitoring operation Failed - {}", errorMsg);
1139 result.set(RpcResultBuilder.<MonitorStopOutput>failed().withError(ErrorType.APPLICATION, errorMsg).build());
1145 private void removeMonitorIdFromInterfaceAssociation(final Uint32 monitorId, final String interfaceName) {
1146 LOG.debug("Remove monitorId {} from Interface association {}", monitorId, interfaceName);
1147 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1148 FluentFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
1149 getInterfaceMonitorMapId(interfaceName));
1150 FluentFuture<? extends CommitInfo> updateFuture = readFuture.transformAsync(optEntry -> {
1151 if (optEntry.isPresent()) {
1152 InterfaceMonitorEntry entry = optEntry.get();
1153 List<Uint32> monitorIds =
1154 entry.getMonitorIds() != null ? new ArrayList<>(entry.getMonitorIds()) : new ArrayList<>();
1155 monitorIds.remove(monitorId);
1156 if (monitorIds.isEmpty()) {
1157 tx.delete(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
1159 InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder(entry)
1160 .withKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
1161 tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry,
1162 CREATE_MISSING_PARENT);
1166 LOG.warn("No Interface map entry found {} to remove monitorId {}", interfaceName, monitorId);
1168 return Futures.immediateFuture(null);
1170 }, MoreExecutors.directExecutor());
1172 updateFuture.addCallback(new FutureCallbackImpl(
1173 String.format("Dis-association of monitorId %s with Interface %s", monitorId, interfaceName)),
1174 MoreExecutors.directExecutor());
1177 private void releaseIdForMonitoringInfo(MonitoringInfo info) {
1178 Uint32 monitorId = info.getId();
1179 EndpointType source = info.getSource().getEndpointType();
1180 String interfaceName = getInterfaceName(source);
1181 if (!Strings.isNullOrEmpty(interfaceName)) {
1182 Optional<MonitorProfile> optProfile;
1184 optProfile = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
1185 tx -> tx.read(getMonitorProfileId(info.getProfileId()))).get();
1186 } catch (InterruptedException | ExecutionException e) {
1187 LOG.error("Error reading monitor profile for {}", info.getProfileId(), e);
1188 optProfile = Optional.empty();
1190 if (optProfile.isPresent()) {
1191 MonitorProtocolType protocolType = optProfile.get().getProtocolType();
1192 EndpointType destination = info.getDestination() != null ? info.getDestination().getEndpointType()
1194 String idKey = getUniqueKey(interfaceName, protocolType.toString(), source, destination);
1197 LOG.warn("Could not release monitorId {}. No profile associated with it", monitorId);
1202 private String getInterfaceName(EndpointType endpoint) {
1203 String interfaceName = null;
1204 if (endpoint instanceof Interface) {
1205 interfaceName = ((Interface) endpoint).getInterfaceName();
1207 return interfaceName;
1210 private void stopMonitoring(Uint32 monitorId) {
1211 updateMonitorStatusTo(monitorId, MonitorStatus.Stopped,
1212 currentStatus -> currentStatus != MonitorStatus.Stopped);
1213 if (!stopMonitoringTask(monitorId)) {
1214 LOG.warn("No monitoring task running to perform cancel operation for monitorId {}", monitorId);
1218 private void updateMonitorStatusTo(final Uint32 monitorId, final MonitorStatus newStatus,
1219 final Predicate<MonitorStatus> isValidStatus) {
1220 final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1221 if (monitorKey == null) {
1222 LOG.warn("No monitor Key associated with id {} to change the monitor status to {}", monitorId, newStatus);
1225 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
1227 FluentFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
1228 getMonitorStateId(monitorKey));
1230 FluentFuture<? extends CommitInfo> writeResult = readResult.transformAsync(optState -> {
1231 if (optState.isPresent()) {
1232 MonitoringState state = optState.get();
1233 if (isValidStatus.apply(state.getStatus())) {
1234 MonitoringState updatedState = new MonitoringStateBuilder().setMonitorKey(monitorKey)
1235 .setStatus(newStatus).build();
1236 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), updatedState);
1238 LOG.warn("Invalid Monitoring status {}, cannot be updated to {} for monitorId {}",
1239 state.getStatus(), newStatus, monitorId);
1242 LOG.warn("No associated monitoring state data available to update the status to {} for {}",
1243 newStatus, monitorId);
1246 }, MoreExecutors.directExecutor());
1248 writeResult.addCallback(new FutureCallbackImpl("Monitor status update for " + monitorId + " to " + newStatus),
1249 MoreExecutors.directExecutor());
1252 private void resumeMonitoring(final Uint32 monitorId) {
1253 final ReadTransaction tx = dataBroker.newReadOnlyTransaction();
1254 FluentFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
1255 getMonitoringInfoId(monitorId));
1257 readInfoResult.addCallback(new FutureCallback<Optional<MonitoringInfo>>() {
1260 public void onFailure(Throwable error) {
1261 String msg = "Unable to read monitoring info associated with monitor id " + monitorId;
1262 LOG.error("Monitor resume Failed. {}", msg, error);
1267 public void onSuccess(@NonNull Optional<MonitoringInfo> optInfo) {
1268 if (optInfo.isPresent()) {
1269 final MonitoringInfo info = optInfo.get();
1270 ListenableFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
1271 getMonitorProfileId(info.getProfileId()));
1272 Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>() {
1275 public void onFailure(Throwable error) {
1276 String msg = "Unable to read Monitoring profile associated with id " + info.getProfileId();
1277 LOG.warn("Monitor resume Failed. {}", msg, error);
1282 public void onSuccess(@NonNull Optional<MonitorProfile> optProfile) {
1284 if (optProfile.isPresent()) {
1285 updateMonitorStatusTo(monitorId, MonitorStatus.Started,
1286 currentStatus -> currentStatus != MonitorStatus.Started);
1287 MonitorProfile profile = optProfile.get();
1288 LOG.debug("Monitor Resume - Scheduling monitoring task for Id: {}", monitorId);
1289 scheduleMonitoringTask(info, profile.getMonitorInterval());
1291 String msg = String.format("Monitoring profile associated with id %s is not present",
1292 info.getProfileId());
1293 LOG.warn("Monitor resume Failed. {}", msg);
1296 }, MoreExecutors.directExecutor());
1299 String msg = String.format("Monitoring info associated with id %s is not present", monitorId);
1300 LOG.warn("Monitor resume Failed. {}", msg);
1303 }, MoreExecutors.directExecutor());
1307 public void onInterfaceStateUp(String interfaceName) {
1308 List<Uint32> monitorIds = getMonitorIds(interfaceName);
1309 if (monitorIds.isEmpty()) {
1310 LOG.warn("Could not get monitorId for interface: {}", interfaceName);
1313 for (Uint32 monitorId : monitorIds) {
1314 LOG.debug("Resume monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1315 resumeMonitoring(monitorId);
1320 public void onInterfaceStateDown(String interfaceName) {
1321 List<Uint32> monitorIds = getMonitorIds(interfaceName);
1322 if (monitorIds.isEmpty()) {
1323 LOG.warn("Could not get monitorIds for interface: {}", interfaceName);
1326 for (Uint32 monitorId : monitorIds) {
1327 LOG.debug("Suspend monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1328 stopMonitoring(monitorId);
1332 private List<Uint32> getMonitorIds(String interfaceName) {
1334 return txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
1335 tx -> tx.read(getInterfaceMonitorMapId(interfaceName))).get().map(
1336 InterfaceMonitorEntry::getMonitorIds).orElse(Collections.emptyList());
1337 } catch (InterruptedException | ExecutionException e) {
1338 LOG.error("Error retrieving monitor ids for {}", interfaceName, e);
1339 return Collections.emptyList();
1343 //handle monitor stop
1345 public void remove(@NonNull InstanceIdentifier<MonitoringState> instanceIdentifier,
1346 @NonNull MonitoringState removedDataObject) {
1347 final Uint32 monitorId = removedDataObject.getMonitorId();
1348 LOG.debug("Monitor State remove listener invoked for monitor id: {}", monitorId);
1350 if (removedDataObject.getStatus() != MonitorStatus.Paused) {
1351 ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
1352 if (scheduledFutureResult != null) {
1353 LOG.debug("Stopping the task for Monitor id: {}", monitorId);
1354 stopMonitoringTask(monitorId);
1359 //handle monitor pause
1361 public void update(@NonNull InstanceIdentifier<MonitoringState> instanceIdentifier,
1362 @NonNull MonitoringState originalDataObject,
1363 @NonNull MonitoringState updatedDataObject) {
1364 final Uint32 monitorId = updatedDataObject.getMonitorId();
1365 LOG.debug("Monitor State update listener invoked for monitor id: {}", monitorId);
1367 if (updatedDataObject.getStatus() == MonitorStatus.Paused
1368 && originalDataObject.getStatus() != MonitorStatus.Paused) {
1369 ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
1370 if (scheduledFutureResult != null) {
1371 LOG.debug("Stopping the task for Monitor id: {}", monitorId);
1372 stopMonitoringTask(monitorId);