2 * Copyright (c) 2015 - 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.vpnservice.alivenessmonitor.internal;
10 import java.lang.Thread.UncaughtExceptionHandler;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.EnumMap;
14 import java.util.HashMap;
15 import java.util.List;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.Semaphore;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
29 import org.opendaylight.controller.liblldp.NetUtils;
30 import org.opendaylight.controller.liblldp.Packet;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
34 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
35 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
36 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
37 import org.opendaylight.vpnservice.mdsalutil.packet.Ethernet;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.AlivenessMonitorService;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.EtherTypes;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.LivenessState;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorEvent;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorEventBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorPauseInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateOutputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileDeleteInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartOutputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStatus;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStopInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorUnpauseInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitoringMode;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileGetInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileGetOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileGetOutputBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntry;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntryBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntryKey;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.EndpointType;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.Interface;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.IpAddress;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfo;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfoBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.event.EventData;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.event.EventDataBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profile.create.input.Profile;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfile;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfileBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.start.input.Config;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntry;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntryBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringState;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringStateBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdInput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdInputBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdOutput;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.CreateIdPoolInput;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.CreateIdPoolInputBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.IdManagerService;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.ReleaseIdInput;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.ReleaseIdInputBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.OdlInterfaceRpcService;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInReason;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
90 import org.opendaylight.yangtools.yang.binding.DataObject;
91 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
92 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
93 import org.opendaylight.yangtools.yang.common.RpcResult;
94 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
95 import org.slf4j.Logger;
96 import org.slf4j.LoggerFactory;
98 import com.google.common.base.Optional;
99 import com.google.common.base.Preconditions;
100 import com.google.common.base.Predicate;
101 import com.google.common.base.Strings;
102 import com.google.common.cache.CacheBuilder;
103 import com.google.common.cache.CacheLoader;
104 import com.google.common.cache.LoadingCache;
105 import com.google.common.util.concurrent.AsyncFunction;
106 import com.google.common.util.concurrent.FutureCallback;
107 import com.google.common.util.concurrent.Futures;
108 import com.google.common.util.concurrent.JdkFutureAdapters;
109 import com.google.common.util.concurrent.ListenableFuture;
110 import com.google.common.util.concurrent.SettableFuture;
111 import com.google.common.util.concurrent.ThreadFactoryBuilder;
113 import static org.opendaylight.vpnservice.alivenessmonitor.internal.AlivenessMonitorUtil.*;
115 public class AlivenessMonitor implements AlivenessMonitorService, PacketProcessingListener,
116 ServiceProvider, InterfaceStateListener, AutoCloseable {
117 private static final Logger LOG = LoggerFactory.getLogger(AlivenessMonitor.class);
118 private final DataBroker broker;
119 private IdManagerService idManager;
120 private PacketProcessingService packetProcessingService;
121 private NotificationPublishService notificationPublishService;
122 private OdlInterfaceRpcService interfaceManager;
123 private Map<Class<?>, AlivenessProtocolHandler> packetTypeToProtocolHandler;
124 private Map<EtherTypes, AlivenessProtocolHandler> ethTypeToProtocolHandler;
125 private ConcurrentMap<Long, ScheduledFuture<?>> monitoringTasks;
126 private LoadingCache<Long, String> monitorIdKeyCache;
127 private ScheduledExecutorService monitorService;
128 private ExecutorService callbackExecutorService;
130 private static final int THREAD_POOL_SIZE = 4;
131 private static final boolean INTERRUPT_TASK = true;
132 private static final int NO_DELAY = 0;
133 private static final Long INITIAL_COUNT = 0L;
134 private static final boolean CREATE_MISSING_PARENT = true;
135 private static final int INVALID_ID = 0;
136 private ConcurrentMap<String, Semaphore> lockMap = new ConcurrentHashMap<>();
138 private class FutureCallbackImpl implements FutureCallback<Void> {
139 private String message;
140 public FutureCallbackImpl(String message) {
141 this.message = message;
145 public void onFailure(Throwable error) {
146 LOG.warn("Error in Datastore operation - {}", message, error);
150 public void onSuccess(Void result) {
151 LOG.debug("Success in Datastore operation - {}", message);
155 private class AlivenessMonitorTask implements Runnable {
156 private MonitoringInfo monitoringInfo;
158 public AlivenessMonitorTask(MonitoringInfo monitoringInfo) {
159 this.monitoringInfo = monitoringInfo;
164 if(LOG.isTraceEnabled()) {
165 LOG.trace("send monitor packet - {}", monitoringInfo);
167 sendMonitorPacket(monitoringInfo);
171 public AlivenessMonitor(DataBroker dataBroker) {
173 ethTypeToProtocolHandler = new EnumMap<>(EtherTypes.class);
174 packetTypeToProtocolHandler = new HashMap<>();
175 monitorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
176 getMonitoringThreadFactory("Aliveness Monitoring Task"));
177 callbackExecutorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
178 getMonitoringThreadFactory("Aliveness Callback Handler"));
179 monitoringTasks = new ConcurrentHashMap<>();
183 private ThreadFactory getMonitoringThreadFactory(String threadNameFormat) {
184 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
185 builder.setNameFormat(threadNameFormat);
186 builder.setUncaughtExceptionHandler( new UncaughtExceptionHandler() {
188 public void uncaughtException(Thread t, Throwable e) {
189 LOG.error("Received Uncaught Exception event in Thread: {}", t.getName(), e);
192 return builder.build();
195 private void initilizeCache() {
196 monitorIdKeyCache = CacheBuilder.newBuilder()
197 .build(new CacheLoader<Long, String>() {
199 public String load(Long monitorId) throws Exception {
200 String monitorKey = null;
201 Optional<MonitoridKeyEntry> optKey = read(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId));
202 if(optKey.isPresent()) {
203 monitorKey = optKey.get().getMonitorKey();
211 public void close() throws Exception {
212 monitorIdKeyCache.cleanUp();
213 monitorService.shutdown();
214 callbackExecutorService.shutdown();
218 public DataBroker getDataBroker() {
223 public OdlInterfaceRpcService getInterfaceManager() {
224 return interfaceManager;
227 public void setPacketProcessingService(PacketProcessingService pktProcessingService) {
228 this.packetProcessingService = pktProcessingService;
231 public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
232 this.notificationPublishService = notificationPublishService;
235 public void setInterfaceManager(OdlInterfaceRpcService interfaceManager) {
236 this.interfaceManager = interfaceManager;
239 public void setIdManager(IdManagerService idManager) {
240 this.idManager = idManager;
244 public void registerHandler(EtherTypes etherType, AlivenessProtocolHandler protocolHandler) {
245 ethTypeToProtocolHandler.put(etherType, protocolHandler);
246 packetTypeToProtocolHandler.put(protocolHandler.getPacketClass(), protocolHandler);
249 private void createIdPool() {
250 CreateIdPoolInput createPool = new CreateIdPoolInputBuilder()
251 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
252 .setLow(AlivenessMonitorConstants.MONITOR_IDPOOL_START)
253 .setHigh(AlivenessMonitorConstants.MONITOR_IDPOOL_SIZE)
255 Future<RpcResult<Void>> result = idManager.createIdPool(createPool);
256 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(result), new FutureCallback<RpcResult<Void>>() {
259 public void onFailure(Throwable error) {
260 LOG.error("Failed to create idPool for Aliveness Monitor Service",error);
264 public void onSuccess(RpcResult<Void> result) {
265 if(result.isSuccessful()) {
266 LOG.debug("Created IdPool for Aliveness Monitor Service");
268 LOG.error("RPC to create Idpool failed {}", result.getErrors());
274 private int getUniqueId(final String idKey) {
275 AllocateIdInput getIdInput = new AllocateIdInputBuilder()
276 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
277 .setIdKey(idKey).build();
279 Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(getIdInput);
282 RpcResult<AllocateIdOutput> rpcResult = result.get();
283 if(rpcResult.isSuccessful()) {
284 return rpcResult.getResult().getIdValue().intValue();
286 LOG.warn("RPC Call to Get Unique Id returned with Errors {}", rpcResult.getErrors());
288 } catch (InterruptedException | ExecutionException e) {
289 LOG.warn("Exception when getting Unique Id for key {}", idKey, e);
294 private void releaseId(String idKey) {
295 ReleaseIdInput idInput = new ReleaseIdInputBuilder()
296 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
297 .setIdKey(idKey).build();
299 Future<RpcResult<Void>> result = idManager.releaseId(idInput);
300 RpcResult<Void> rpcResult = result.get();
301 if(!rpcResult.isSuccessful()) {
302 LOG.warn("RPC Call to release Id {} with Key {} returned with Errors {}",
303 idKey, rpcResult.getErrors());
305 } catch (InterruptedException | ExecutionException e) {
306 LOG.warn("Exception when releasing Id for key {}", idKey, e);
311 public void onPacketReceived(PacketReceived packetReceived) {
312 Class<? extends PacketInReason> pktInReason = packetReceived.getPacketInReason();
313 if(LOG.isTraceEnabled()) {
314 LOG.trace("Packet Received {}", packetReceived );
317 if (pktInReason == SendToController.class) {
318 Packet packetInFormatted;
319 byte[] data = packetReceived.getPayload();
320 Ethernet res = new Ethernet();
322 packetInFormatted = res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
323 } catch (Exception e) {
324 LOG.warn("Failed to decode packet: {}", e.getMessage());
328 if(packetInFormatted == null) {
329 LOG.warn("Failed to deserialize Received Packet from table {}", packetReceived.getTableId().getValue());
333 Object objPayload = packetInFormatted.getPayload();
335 if(objPayload == null) {
336 LOG.trace("Unsupported packet type. Ignoring the packet...");
340 if (LOG.isTraceEnabled()) {
341 LOG.trace("onPacketReceived packet: {}, packet class: {}", packetReceived,
342 objPayload.getClass());
345 AlivenessProtocolHandler livenessProtocolHandler = packetTypeToProtocolHandler.get(objPayload.getClass());
346 if (livenessProtocolHandler == null) {
350 String monitorKey = livenessProtocolHandler.handlePacketIn(packetInFormatted.getPayload(), packetReceived);
352 if(monitorKey != null) {
353 processReceivedMonitorKey(monitorKey);
355 LOG.debug("No monitorkey associated with received packet");
360 private void processReceivedMonitorKey(final String monitorKey) {
361 Preconditions.checkNotNull(monitorKey, "Monitor Key required to process the state");
363 LOG.debug("Processing monitorKey: {} for received packet", monitorKey);
365 final Semaphore lock = lockMap.get(monitorKey);
366 LOG.debug("Acquiring lock for monitor key : {} to process monitor packet", monitorKey);
369 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
371 ListenableFuture<Optional<MonitoringState>> stateResult = tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
374 Futures.addCallback(stateResult, new FutureCallback<Optional<MonitoringState>>() {
377 public void onSuccess(Optional<MonitoringState> optState) {
379 if(optState.isPresent()) {
380 final MonitoringState currentState = optState.get();
382 if(LOG.isTraceEnabled()) {
383 LOG.trace("OnPacketReceived : Monitoring state from ODS : {} ", currentState);
386 Long responsePendingCount = currentState.getResponsePendingCount();
388 //Need to relook at the pending count logic to support N out of M scenarios
389 // if(currentState.getState() != LivenessState.Up) {
390 // //Reset responsePendingCount when state changes from DOWN to UP
391 // responsePendingCount = INITIAL_COUNT;
394 // if(responsePendingCount > INITIAL_COUNT) {
395 // responsePendingCount = currentState.getResponsePendingCount() - 1;
397 responsePendingCount = INITIAL_COUNT;
399 final boolean stateChanged = (currentState.getState() == LivenessState.Down ||
400 currentState.getState() == LivenessState.Unknown);
402 final MonitoringState state = new MonitoringStateBuilder().setMonitorKey(monitorKey).setState(LivenessState.Up)
403 .setResponsePendingCount(responsePendingCount).build();
404 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), state);
405 ListenableFuture<Void> writeResult = tx.submit();
408 Futures.addCallback(writeResult, new FutureCallback<Void>() {
410 public void onSuccess(Void noarg) {
414 LOG.info("Sending notification for monitor Id : {} with Current State: {}",
415 currentState.getMonitorId(), LivenessState.Up);
416 publishNotification(currentState.getMonitorId(), LivenessState.Up);
418 if(LOG.isTraceEnabled()) {
419 LOG.trace("Successful in writing monitoring state {} to ODS", state);
425 public void onFailure(Throwable error) {
427 LOG.warn("Error in writing monitoring state : {} to Datastore", monitorKey, error);
428 if(LOG.isTraceEnabled()) {
429 LOG.trace("Error in writing monitoring state: {} to Datastore", state);
434 LOG.warn("Monitoring State not available for key: {} to process the Packet received", monitorKey);
435 //Complete the transaction
442 public void onFailure(Throwable error) {
443 LOG.error("Error when reading Monitoring State for key: {} to process the Packet received", monitorKey, error);
444 //FIXME: Not sure if the transaction status is valid to cancel
452 public PacketProcessingService getPacketProcessingService() {
453 return packetProcessingService;
456 private String getIpAddress(EndpointType endpoint) {
457 String ipAddress = "";
458 if( endpoint instanceof IpAddress) {
459 ipAddress = ((IpAddress) endpoint).getIpAddress().getIpv4Address().getValue();
460 } else if (endpoint instanceof Interface) {
461 ipAddress = ((Interface)endpoint).getInterfaceIp().getIpv4Address().getValue();
466 private String getUniqueKey(String interfaceName, String ethType, EndpointType source, EndpointType destination) {
467 StringBuilder builder = new StringBuilder().append(interfaceName).append(AlivenessMonitorConstants.SEPERATOR)
470 builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(source));
473 if(destination != null) {
474 builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(destination));
476 return builder.toString();
480 public Future<RpcResult<MonitorStartOutput>> monitorStart(MonitorStartInput input) {
481 RpcResultBuilder<MonitorStartOutput> rpcResultBuilder;
482 final Config in = input.getConfig();
483 Long profileId = in.getProfileId();
484 LOG.debug("Monitor Start invoked with Config: {}, Profile Id: {}", in, profileId);
487 if(in.getMode() != MonitoringMode.OneOne) {
488 throw new UnsupportedConfigException(
489 "Unsupported Monitoring mode. Currently one-one mode is supported");
492 Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
493 final MonitorProfile profile;
494 if(!optProfile.isPresent()) {
495 String errMsg = String.format("No monitoring profile associated with Id: %d", profileId);
496 LOG.error("Monitor start failed. {}", errMsg);
497 throw new RuntimeException(errMsg);
499 profile = optProfile.get();
502 EtherTypes ethType = profile.getProtocolType();
504 String interfaceName = null;
505 EndpointType srcEndpointType = in.getSource().getEndpointType();
507 if( srcEndpointType instanceof Interface) {
508 Interface endPoint = (Interface) srcEndpointType;
509 interfaceName = endPoint.getInterfaceName();
511 throw new UnsupportedConfigException(
512 "Unsupported source Endpoint type. Only Interface Endpoint currently supported for monitoring");
515 if(Strings.isNullOrEmpty(interfaceName)) {
516 throw new RuntimeException("Interface Name not defined in the source Endpoint");
519 //Initially the support is for one monitoring per interface.
520 //Revisit the retrieving monitor id logic when the multiple monitoring for same interface is needed.
521 EndpointType destEndpointType = null;
522 if(in.getDestination() != null) {
523 destEndpointType = in.getDestination().getEndpointType();
525 String idKey = getUniqueKey(interfaceName, ethType.toString(), srcEndpointType, destEndpointType);
526 final long monitorId = getUniqueId(idKey);
527 Optional<MonitoringInfo> optKey = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
529 if(optKey.isPresent()) {
530 String message = String.format("Monitoring for the interface %s with this configuration is already registered.", interfaceName);
532 MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
533 rpcResultBuilder = RpcResultBuilder.success(output).withWarning(ErrorType.APPLICATION, "config-exists", message);
534 return Futures.immediateFuture(rpcResultBuilder.build());
536 //Construct the monitor key
537 final MonitoringInfo monitoringInfo = new MonitoringInfoBuilder()
539 .setMode(in.getMode())
540 .setProfileId(profileId)
541 .setDestination(in.getDestination())
542 .setSource(in.getSource()).build();
543 //Construct the initial monitor state
544 AlivenessProtocolHandler handler = ethTypeToProtocolHandler.get(ethType);
545 final String monitoringKey = handler.getUniqueMonitoringKey(monitoringInfo);
547 MonitoringState monitoringState = new MonitoringStateBuilder()
548 .setMonitorKey(monitoringKey)
549 .setMonitorId(monitorId)
550 .setState(LivenessState.Unknown)
551 .setStatus(MonitorStatus.Started)
552 .setRequestCount(INITIAL_COUNT)
553 .setResponsePendingCount(INITIAL_COUNT).build();
555 MonitoridKeyEntry mapEntry = new MonitoridKeyEntryBuilder().setMonitorId(monitorId)
556 .setMonitorKey(monitoringKey).build();
558 WriteTransaction tx = broker.newWriteOnlyTransaction();
560 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId), monitoringInfo, CREATE_MISSING_PARENT);
561 LOG.debug("adding oper monitoring info {}", monitoringInfo);
563 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitoringKey), monitoringState, CREATE_MISSING_PARENT);
564 LOG.debug("adding oper monitoring state {}", monitoringState);
566 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId), mapEntry, CREATE_MISSING_PARENT);
567 LOG.debug("adding oper map entry {}", mapEntry);
569 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
571 public void onFailure(Throwable error) {
572 String errorMsg = String.format("Adding Monitoring info: %s in Datastore failed", monitoringInfo);
573 LOG.warn(errorMsg, error);
574 throw new RuntimeException(errorMsg, error);
578 public void onSuccess(Void noarg) {
580 LOG.debug("Scheduling monitor task for config: {}", in);
581 scheduleMonitoringTask(monitoringInfo, profile.getMonitorInterval());
582 lockMap.put(monitoringKey, new Semaphore(1, true));
587 associateMonitorIdWithInterface(monitorId, interfaceName);
589 MonitorStartOutput output = new MonitorStartOutputBuilder()
590 .setMonitorId(monitorId).build();
592 rpcResultBuilder = RpcResultBuilder.success(output);
593 } catch(Exception e) {
594 LOG.error("Start Monitoring Failed. {}", e.getMessage(), e);
595 rpcResultBuilder = RpcResultBuilder.<MonitorStartOutput>failed().withError(ErrorType.APPLICATION, e.getMessage(), e);
597 return Futures.immediateFuture(rpcResultBuilder.build());
600 private void associateMonitorIdWithInterface(final Long monitorId, final String interfaceName) {
601 LOG.debug("associate monitor Id {} with interface {}", monitorId, interfaceName);
602 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
603 ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture =
604 tx.read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
605 ListenableFuture<Void> updateFuture =
606 Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
609 public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
610 if(optEntry.isPresent()) {
611 InterfaceMonitorEntry entry = optEntry.get();
612 List<Long> monitorIds = entry.getMonitorIds();
613 monitorIds.add(monitorId);
614 InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder()
615 .setKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
616 tx.merge(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry);
618 //Create new monitor entry
619 LOG.debug("Adding new interface-monitor association for interface {} with id {}", interfaceName, monitorId);
620 List<Long> monitorIds = new ArrayList<>();
621 monitorIds.add(monitorId);
622 InterfaceMonitorEntry newEntry =
623 new InterfaceMonitorEntryBuilder().setInterfaceName(interfaceName).setMonitorIds(monitorIds).build();
624 tx.put(LogicalDatastoreType.OPERATIONAL,
625 getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
631 Futures.addCallback(updateFuture, new FutureCallbackImpl(
632 String.format("Association of monitorId %d with Interface %s", monitorId, interfaceName)));
635 private void scheduleMonitoringTask(MonitoringInfo monitoringInfo, long monitorInterval) {
636 AlivenessMonitorTask monitorTask = new AlivenessMonitorTask(monitoringInfo);
637 ScheduledFuture<?> scheduledFutureResult = monitorService.scheduleAtFixedRate(
638 monitorTask, NO_DELAY, monitorInterval, TimeUnit.MILLISECONDS);
639 monitoringTasks.put(monitoringInfo.getId(), scheduledFutureResult);
643 public Future<RpcResult<Void>> monitorPause(MonitorPauseInput input) {
644 LOG.debug("Monitor Pause operation invoked for monitor id: {}", input.getMonitorId());
645 SettableFuture<RpcResult<Void>> result = SettableFuture.create();
646 final Long monitorId = input.getMonitorId();
648 //Set the monitoring status to Paused
649 updateMonitorStatusTo(monitorId, MonitorStatus.Paused, new Predicate<MonitorStatus>() {
651 public boolean apply(MonitorStatus currentStatus) {
652 return currentStatus == MonitorStatus.Started;
656 if(stopMonitoringTask(monitorId)) {
657 result.set(RpcResultBuilder.<Void>success().build());
659 String errorMsg = String.format("No Monitoring Task availble to pause for the given monitor id : %d", monitorId);
660 LOG.error("Monitor Pause operation failed- {}",errorMsg);
661 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
668 public Future<RpcResult<Void>> monitorUnpause(MonitorUnpauseInput input) {
669 LOG.debug("Monitor Unpause operation invoked for monitor id: {}", input.getMonitorId());
670 final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
672 final Long monitorId = input.getMonitorId();
673 final ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
674 ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
675 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
677 Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
680 public void onFailure(Throwable error) {
681 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
682 LOG.error("Monitor unpause Failed. {}", msg, error);
683 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
687 public void onSuccess(Optional<MonitoringInfo> optInfo) {
688 if(optInfo.isPresent()) {
689 final MonitoringInfo info = optInfo.get();
690 ListenableFuture<Optional<MonitorProfile>> readProfile =
691 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
692 Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
695 public void onFailure(Throwable error) {
696 String msg = String.format("Unable to read Monitoring profile associated with id %d", info.getProfileId());
697 LOG.warn("Monitor unpause Failed. {}", msg, error);
698 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
702 public void onSuccess(Optional<MonitorProfile> optProfile) {
704 if(optProfile.isPresent()) {
705 updateMonitorStatusTo(monitorId, MonitorStatus.Started, new Predicate<MonitorStatus>() {
707 public boolean apply(MonitorStatus currentStatus) {
708 return (currentStatus == MonitorStatus.Paused ||
709 currentStatus == MonitorStatus.Stopped);
712 MonitorProfile profile = optProfile.get();
713 LOG.debug("Monitor Resume - Scheduling monitoring task with Id: {}", monitorId);
714 scheduleMonitoringTask(info, profile.getMonitorInterval());
715 result.set(RpcResultBuilder.<Void>success().build());
717 String msg = String.format("Monitoring profile associated with id %d is not present", info.getProfileId());
718 LOG.warn("Monitor unpause Failed. {}", msg);
719 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
725 String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
726 LOG.warn("Monitor unpause Failed. {}", msg);
727 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
730 }, callbackExecutorService);
735 private boolean stopMonitoringTask(Long monitorId) {
736 return stopMonitoringTask(monitorId, INTERRUPT_TASK);
739 private boolean stopMonitoringTask(Long monitorId, boolean interruptTask) {
740 ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
741 if(scheduledFutureResult != null) {
742 scheduledFutureResult.cancel(interruptTask);
748 private Optional<MonitorProfile> getMonitorProfile(Long profileId) {
749 return read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
752 private void acquireLock(Semaphore lock) {
757 boolean acquiredLock = false;
759 acquiredLock = lock.tryAcquire(50, TimeUnit.MILLISECONDS);
760 } catch (InterruptedException e) {
761 LOG.warn("Thread interrupted when waiting to acquire the lock");
765 LOG.warn("Previous transaction did not complete in time. Releasing the lock to proceed");
769 LOG.trace("Lock acquired successfully");
770 } catch (InterruptedException e) {
771 LOG.warn("Acquire failed");
774 LOG.trace("Lock acquired successfully");
778 private void releaseLock(Semaphore lock) {
784 private void sendMonitorPacket(final MonitoringInfo monitoringInfo) {
785 //TODO: Handle interrupts
786 final Long monitorId = monitoringInfo.getId();
787 final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
788 if(monitorKey == null) {
789 LOG.warn("No monitor Key associated with id {} to send the monitor packet", monitorId);
792 LOG.debug("Sending monitoring packet for key: {}", monitorKey);
795 final MonitorProfile profile;
796 Optional<MonitorProfile> optProfile = getMonitorProfile(monitoringInfo.getProfileId());
797 if(optProfile.isPresent()) {
798 profile = optProfile.get();
800 LOG.warn("No monitor profile associated with id {}. "
801 + "Could not send Monitor packet for monitor-id {}", monitoringInfo.getProfileId(), monitorId);
805 final Semaphore lock = lockMap.get(monitorKey);
806 LOG.debug("Acquiring lock for monitor key : {} to send monitor packet", monitorKey);
809 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
810 ListenableFuture<Optional<MonitoringState>> readResult =
811 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
812 ListenableFuture<Void> writeResult = Futures.transform(readResult, new AsyncFunction<Optional<MonitoringState>, Void>() {
815 public ListenableFuture<Void> apply(Optional<MonitoringState> optState)
817 if(optState.isPresent()) {
818 MonitoringState state = optState.get();
820 //Increase the request count
821 Long requestCount = state.getRequestCount() + 1;
823 //Check with the monitor window
824 LivenessState currentLivenessState = state.getState();
826 //Increase the pending response count
827 long responsePendingCount = state.getResponsePendingCount();
828 if(responsePendingCount < profile.getMonitorWindow()) {
829 responsePendingCount = responsePendingCount + 1;
832 //Check with the failure thresold
833 if(responsePendingCount >= profile.getFailureThreshold()) {
834 //Change the state to down and notify
835 if(currentLivenessState != LivenessState.Down) {
836 LOG.debug("Response pending Count: {}, Failure threshold: {} for monitorId {}",
837 responsePendingCount, profile.getFailureThreshold(), state.getMonitorId());
838 LOG.info("Sending notification for monitor Id : {} with State: {}",
839 state.getMonitorId(), LivenessState.Down);
840 publishNotification(monitorId, LivenessState.Down);
841 currentLivenessState = LivenessState.Down;
842 //Reset requestCount when state changes from UP to DOWN
843 requestCount = INITIAL_COUNT;
847 //Update the ODS with state
848 MonitoringState updatedState = new MonitoringStateBuilder(/*state*/).setMonitorKey(state.getMonitorKey())
849 .setRequestCount(requestCount)
850 .setResponsePendingCount(responsePendingCount)
851 .setState(currentLivenessState).build();
852 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(state.getMonitorKey()), updatedState);
855 //Close the transaction
857 String errorMsg = String.format("Monitoring State associated with id %d is not present to send packet out.", monitorId);
858 return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
864 Futures.addCallback(writeResult, new FutureCallback<Void>() {
866 public void onSuccess(Void noarg) {
867 //invoke packetout on protocol handler
868 AlivenessProtocolHandler handler = ethTypeToProtocolHandler.get(profile.getProtocolType());
869 if(handler != null) {
870 LOG.debug("Sending monitoring packet {}", monitoringInfo);
871 handler.sendPacketOut(monitoringInfo);
877 public void onFailure(Throwable error) {
878 LOG.warn("Updating monitoring state for key: {} failed. Monitoring packet is not sent", monitorKey, error);
886 private void publishNotification(final Long monitorId, final LivenessState state) {
887 LOG.debug("Sending notification for id {} - state {}", monitorId, state);
888 EventData data = new EventDataBuilder().setMonitorId(monitorId)
889 .setMonitorState(state).build();
890 MonitorEvent event = new MonitorEventBuilder().setEventData(data).build();;
891 final ListenableFuture<? extends Object> eventFuture = notificationPublishService.offerNotification(event);
892 Futures.addCallback(eventFuture, new FutureCallback<Object>() {
894 public void onFailure(Throwable error) {
895 LOG.warn("Error in notifying listeners for id {} - state {}", monitorId, state, error);
899 public void onSuccess(Object arg) {
900 LOG.trace("Successful in notifying listeners for id {} - state {}", monitorId, state);
906 public Future<RpcResult<MonitorProfileCreateOutput>> monitorProfileCreate(final MonitorProfileCreateInput input) {
907 LOG.debug("Monitor Profile Create operation - {}", input.getProfile());
908 final SettableFuture<RpcResult<MonitorProfileCreateOutput>> result = SettableFuture.create();
909 Profile profile = input.getProfile();
910 final Long failureThreshold = profile.getFailureThreshold();
911 final Long monitorInterval = profile.getMonitorInterval();
912 final Long monitorWindow = profile.getMonitorWindow();
913 final EtherTypes ethType = profile.getProtocolType();
914 String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, ethType);
915 final Long profileId = Long.valueOf(getUniqueId(idKey));
917 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
918 ListenableFuture<Optional<MonitorProfile>> readFuture =
919 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
920 ListenableFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture =
921 Futures.transform(readFuture, new AsyncFunction<Optional<MonitorProfile>, RpcResult<MonitorProfileCreateOutput>>() {
924 public ListenableFuture<RpcResult<MonitorProfileCreateOutput>> apply(
925 Optional<MonitorProfile> optProfile) throws Exception {
926 if(optProfile.isPresent()) {
928 MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
929 .setProfileId(profileId).build();
930 String msg = String.format("Monitor profile %s already present for the given input", input);
932 result.set(RpcResultBuilder.success(output)
933 .withWarning(ErrorType.PROTOCOL, "profile-exists", msg).build());
935 final MonitorProfile monitorProfile = new MonitorProfileBuilder().setId(profileId)
936 .setFailureThreshold(failureThreshold)
937 .setMonitorInterval(monitorInterval)
938 .setMonitorWindow(monitorWindow)
939 .setProtocolType(ethType).build();
940 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId), monitorProfile, CREATE_MISSING_PARENT);
941 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
943 public void onFailure(Throwable error) {
945 String.format("Error when storing monitorprofile %s in datastore", monitorProfile);
946 LOG.error(msg, error);
947 result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
948 .withError(ErrorType.APPLICATION, msg, error).build());
951 public void onSuccess(Void noarg) {
952 MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
953 .setProfileId(profileId).build();
954 result.set(RpcResultBuilder.success(output).build());
960 }, callbackExecutorService);
961 Futures.addCallback(resultFuture, new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
963 public void onFailure(Throwable error) {
964 //This would happen when any error happens during reading for monitoring profile
965 String msg = String.format("Error in creating monitorprofile - %s", input);
966 result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
967 .withError(ErrorType.APPLICATION, msg, error).build());
968 LOG.error(msg, error);
972 public void onSuccess(RpcResult<MonitorProfileCreateOutput> result) {
973 LOG.debug("Successfully created monitor Profile {} ", input);
975 }, callbackExecutorService);
981 public Future<RpcResult<MonitorProfileGetOutput>> monitorProfileGet(MonitorProfileGetInput input){
982 LOG.debug("Monitor Profile Get operation for input profile- {}", input.getProfile());
983 RpcResultBuilder<MonitorProfileGetOutput> rpcResultBuilder;
985 final Long profileId = getExistingProfileId(input);
987 MonitorProfileGetOutputBuilder output = new MonitorProfileGetOutputBuilder().setProfileId(profileId);
988 rpcResultBuilder = RpcResultBuilder.success();
989 rpcResultBuilder.withResult(output.build());
991 LOG.error("Retrieval of monitor profile ID for input {} failed due to {}" , input, e);
992 rpcResultBuilder = RpcResultBuilder.failed();
994 return Futures.immediateFuture(rpcResultBuilder.build());
997 private Long getExistingProfileId(MonitorProfileGetInput input){
998 org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profile.get.input.Profile profile = input.getProfile();
999 final Long failureThreshold = profile.getFailureThreshold();
1000 final Long monitorInterval = profile.getMonitorInterval();
1001 final Long monitorWindow = profile.getMonitorWindow();
1002 final EtherTypes ethType = profile.getProtocolType();
1003 LOG.debug("getExistingProfileId for profile : {}", input.getProfile());
1004 String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, ethType);
1005 LOG.debug("Obtained existing profile ID for profile : {}", input.getProfile());
1006 return (Long.valueOf(getUniqueId(idKey)));
1009 private String getUniqueProfileKey(Long failureThreshold,Long monitorInterval,Long monitorWindow,EtherTypes ethType) {
1010 return new StringBuilder().append(failureThreshold).append(AlivenessMonitorConstants.SEPERATOR)
1011 .append(monitorInterval).append(AlivenessMonitorConstants.SEPERATOR)
1012 .append(monitorWindow).append(AlivenessMonitorConstants.SEPERATOR)
1013 .append(ethType).append(AlivenessMonitorConstants.SEPERATOR).toString();
1017 public Future<RpcResult<Void>> monitorProfileDelete(final MonitorProfileDeleteInput input) {
1018 LOG.debug("Monitor Profile delete for Id: {}", input.getProfileId());
1019 final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
1020 final Long profileId = input.getProfileId();
1021 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
1022 ListenableFuture<Optional<MonitorProfile>> readFuture =
1023 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
1024 ListenableFuture<RpcResult<Void>> writeFuture =
1025 Futures.transform(readFuture, new AsyncFunction<Optional<MonitorProfile>, RpcResult<Void>>() {
1028 public ListenableFuture<RpcResult<Void>> apply(final Optional<MonitorProfile> optProfile) throws Exception {
1029 if(optProfile.isPresent()) {
1030 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
1031 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
1033 public void onFailure(Throwable error) {
1034 String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
1035 LOG.error(msg, error);
1036 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
1040 public void onSuccess(Void noarg) {
1041 MonitorProfile profile = optProfile.get();
1042 String id = getUniqueProfileKey(profile.getFailureThreshold(), profile.getMonitorInterval(),
1043 profile.getMonitorWindow(), profile.getProtocolType());
1045 result.set(RpcResultBuilder.<Void>success().build());
1049 String msg = String.format("Monitor profile with Id: %d does not exist", profileId);
1051 result.set(RpcResultBuilder.<Void>success().withWarning(ErrorType.PROTOCOL, "invalid-value", msg).build());
1055 }, callbackExecutorService);
1057 Futures.addCallback(writeFuture, new FutureCallback<RpcResult<Void>>() {
1060 public void onFailure(Throwable error) {
1061 String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
1062 LOG.error(msg, error);
1063 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
1067 public void onSuccess(RpcResult<Void> noarg) {
1068 LOG.debug("Successfully removed Monitor Profile {}", profileId);
1070 }, callbackExecutorService);
1075 public Future<RpcResult<Void>> monitorStop(MonitorStopInput input) {
1076 LOG.debug("Monitor Stop operation for monitor id - {}", input.getMonitorId());
1077 SettableFuture<RpcResult<Void>> result = SettableFuture.create();
1079 final Long monitorId = input.getMonitorId();
1080 Optional<MonitoringInfo> optInfo = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1081 if(optInfo.isPresent()) {
1082 //Stop the monitoring task
1083 stopMonitoringTask(monitorId);
1085 //Cleanup the Data store
1086 WriteTransaction tx = broker.newWriteOnlyTransaction();
1087 String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1088 if(monitorKey != null) {
1089 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
1090 monitorIdKeyCache.invalidate(monitorId);
1093 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1094 Futures.addCallback(tx.submit(),
1095 new FutureCallbackImpl(String.format("Delete monitor state with Id %d", monitorId)));
1097 MonitoringInfo info = optInfo.get();
1098 String interfaceName = getInterfaceName(info.getSource().getEndpointType());
1099 if(interfaceName != null) {
1100 removeMonitorIdFromInterfaceAssociation(monitorId, interfaceName);
1102 releaseIdForMonitoringInfo(info);
1104 lockMap.remove(monitorKey);
1106 result.set(RpcResultBuilder.<Void>success().build());
1108 String errorMsg = String.format("Do not have monitoring information associated with key %d", monitorId);
1109 LOG.error("Delete monitoring operation Failed - {}", errorMsg);
1110 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
1116 private void removeMonitorIdFromInterfaceAssociation(final Long monitorId, final String interfaceName) {
1117 LOG.debug("Remove monitorId {} from Interface association {}", monitorId, interfaceName);
1118 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
1119 ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
1120 ListenableFuture<Void> updateFuture = Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
1123 public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
1124 if(optEntry.isPresent()) {
1125 InterfaceMonitorEntry entry = optEntry.get();
1126 List<Long> monitorIds = entry.getMonitorIds();
1127 monitorIds.remove(monitorId);
1128 InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder(entry)
1129 .setKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
1130 tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
1133 LOG.warn("No Interface map entry found {} to remove monitorId {}", interfaceName, monitorId);
1135 return Futures.immediateFuture(null);
1140 Futures.addCallback(updateFuture, new FutureCallbackImpl(
1141 String.format("Dis-association of monitorId %d with Interface %s", monitorId, interfaceName)));
1145 private void releaseIdForMonitoringInfo(MonitoringInfo info) {
1146 Long monitorId = info.getId();
1147 EndpointType source = info.getSource().getEndpointType();
1148 String interfaceName = getInterfaceName(source);
1149 if(!Strings.isNullOrEmpty(interfaceName)) {
1150 Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1151 if(optProfile.isPresent()) {
1152 EtherTypes ethType = optProfile.get().getProtocolType();
1153 EndpointType destination = (info.getDestination() != null) ? info.getDestination().getEndpointType() : null;
1154 String idKey = getUniqueKey(interfaceName, ethType.toString(), source, destination);
1157 LOG.warn("Could not release monitorId {}. No profile associated with it", monitorId);
1162 private String getInterfaceName(EndpointType endpoint) {
1163 String interfaceName = null;
1164 if(endpoint instanceof Interface) {
1165 interfaceName = ((Interface)endpoint).getInterfaceName();
1167 return interfaceName;
1170 private void stopMonitoring(long monitorId) {
1171 updateMonitorStatusTo(monitorId, MonitorStatus.Stopped, new Predicate<MonitorStatus>() {
1173 public boolean apply(MonitorStatus currentStatus) {
1174 return currentStatus != MonitorStatus.Stopped;
1177 if(!stopMonitoringTask(monitorId)) {
1178 LOG.warn("No monitoring task running to perform cancel operation for monitorId {}", monitorId);
1182 private void updateMonitorStatusTo(final Long monitorId, final MonitorStatus newStatus, final Predicate<MonitorStatus> isValidStatus) {
1183 final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1184 if(monitorKey == null) {
1185 LOG.warn("No monitor Key associated with id {} to change the monitor status to {}", monitorId, newStatus);
1188 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
1190 ListenableFuture<Optional<MonitoringState>> readResult =
1191 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
1193 ListenableFuture<Void> writeResult = Futures.transform(readResult, new AsyncFunction<Optional<MonitoringState>, Void>() {
1195 public ListenableFuture<Void> apply(Optional<MonitoringState> optState) throws Exception {
1196 if(optState.isPresent()) {
1197 MonitoringState state = optState.get();
1198 if(isValidStatus.apply(state.getStatus())) {
1199 MonitoringState updatedState = new MonitoringStateBuilder().setMonitorKey(monitorKey)
1200 .setStatus(newStatus).build();
1201 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), updatedState);
1203 LOG.warn("Invalid Monitoring status {}, cannot be updated to {} for monitorId {}"
1204 , state.getStatus(), newStatus, monitorId);
1207 LOG.warn("No associated monitoring state data available to update the status to {} for {}", newStatus, monitorId);
1213 Futures.addCallback(writeResult,
1214 new FutureCallbackImpl(String.format("Monitor status update for %d to %s", monitorId, newStatus.toString())));
1217 private void resumeMonitoring(final long monitorId) {
1218 final ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
1219 ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
1220 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1222 Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
1225 public void onFailure(Throwable error) {
1226 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
1227 LOG.error("Monitor resume Failed. {}", msg, error);
1231 public void onSuccess(Optional<MonitoringInfo> optInfo) {
1232 if(optInfo.isPresent()) {
1233 final MonitoringInfo info = optInfo.get();
1234 ListenableFuture<Optional<MonitorProfile>> readProfile =
1235 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1236 Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
1239 public void onFailure(Throwable error) {
1240 String msg = String.format("Unable to read Monitoring profile associated with id %d", info.getProfileId());
1241 LOG.warn("Monitor resume Failed. {}", msg, error);
1245 public void onSuccess(Optional<MonitorProfile> optProfile) {
1247 if(optProfile.isPresent()) {
1248 updateMonitorStatusTo(monitorId, MonitorStatus.Started, new Predicate<MonitorStatus>() {
1250 public boolean apply(MonitorStatus currentStatus) {
1251 return currentStatus != MonitorStatus.Started;
1254 MonitorProfile profile = optProfile.get();
1255 LOG.debug("Monitor Resume - Scheduling monitoring task for Id: {}", monitorId);
1256 scheduleMonitoringTask(info, profile.getMonitorInterval());
1258 String msg = String.format("Monitoring profile associated with id %d is not present", info.getProfileId());
1259 LOG.warn("Monitor resume Failed. {}", msg);
1265 String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
1266 LOG.warn("Monitor resume Failed. {}", msg);
1272 //DATA STORE OPERATIONS
1273 private <T extends DataObject> Optional<T> read(LogicalDatastoreType datastoreType, InstanceIdentifier<T> path) {
1274 ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
1276 Optional<T> result = Optional.absent();
1278 result = tx.read(datastoreType, path).get();
1279 } catch (InterruptedException | ExecutionException e) {
1280 LOG.warn("Error reading data from path {} in datastore {}", path, datastoreType, e);
1289 public void onInterfaceStateUp(String interfaceName) {
1290 List<Long> monitorIds = getMonitorIds(interfaceName);
1291 if(monitorIds.isEmpty()) {
1292 LOG.warn("Could not get monitorId for interface: {}", interfaceName);
1295 for(Long monitorId : monitorIds) {
1296 LOG.debug("Resume monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1297 resumeMonitoring(monitorId);
1302 public void onInterfaceStateDown(String interfaceName) {
1303 List<Long> monitorIds = getMonitorIds(interfaceName);
1304 if(monitorIds.isEmpty()) {
1305 LOG.warn("Could not get monitorIds for interface: {}", interfaceName);
1308 for(Long monitorId : monitorIds) {
1309 LOG.debug("Suspend monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1310 stopMonitoring(monitorId);
1314 private List<Long> getMonitorIds(String interfaceName) {
1315 Optional<InterfaceMonitorEntry> optEntry = read(LogicalDatastoreType.OPERATIONAL,
1316 getInterfaceMonitorMapId(interfaceName));
1317 if(optEntry.isPresent()) {
1318 InterfaceMonitorEntry entry = optEntry.get();
1319 return entry.getMonitorIds();
1321 return Collections.emptyList();