2 * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.vpnservice.alivenessmonitor.internal;
10 import java.lang.Thread.UncaughtExceptionHandler;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.EnumMap;
14 import java.util.HashMap;
15 import java.util.List;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.Semaphore;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
29 import org.opendaylight.controller.liblldp.NetUtils;
30 import org.opendaylight.controller.liblldp.Packet;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
34 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
35 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
36 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
37 import org.opendaylight.vpnservice.mdsalutil.packet.Ethernet;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.AlivenessMonitorService;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.EtherTypes;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.LivenessState;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorEvent;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorEventBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorPauseInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateOutputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileDeleteInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartOutputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStatus;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStopInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorUnpauseInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitoringMode;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntry;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntryBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntryKey;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.EndpointType;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.Interface;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.IpAddress;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfo;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfoBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.event.EventData;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.event.EventDataBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profile.create.input.Profile;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfile;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfileBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.start.input.Config;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntry;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntryBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringState;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringStateBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdInputBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdOutput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.CreateIdPoolInput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.CreateIdPoolInputBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.IdManagerService;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.ReleaseIdInput;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.ReleaseIdInputBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.OdlInterfaceRpcService;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInReason;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
87 import org.opendaylight.yangtools.yang.binding.DataObject;
88 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
89 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
90 import org.opendaylight.yangtools.yang.common.RpcResult;
91 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
92 import org.slf4j.Logger;
93 import org.slf4j.LoggerFactory;
95 import com.google.common.base.Optional;
96 import com.google.common.base.Preconditions;
97 import com.google.common.base.Predicate;
98 import com.google.common.base.Strings;
99 import com.google.common.cache.CacheBuilder;
100 import com.google.common.cache.CacheLoader;
101 import com.google.common.cache.LoadingCache;
102 import com.google.common.util.concurrent.AsyncFunction;
103 import com.google.common.util.concurrent.FutureCallback;
104 import com.google.common.util.concurrent.Futures;
105 import com.google.common.util.concurrent.JdkFutureAdapters;
106 import com.google.common.util.concurrent.ListenableFuture;
107 import com.google.common.util.concurrent.SettableFuture;
108 import com.google.common.util.concurrent.ThreadFactoryBuilder;
110 import static org.opendaylight.vpnservice.alivenessmonitor.internal.AlivenessMonitorUtil.*;
112 public class AlivenessMonitor implements AlivenessMonitorService, PacketProcessingListener,
113 ServiceProvider, InterfaceStateListener, AutoCloseable {
114 private static final Logger LOG = LoggerFactory.getLogger(AlivenessMonitor.class);
115 private final DataBroker broker;
116 private IdManagerService idManager;
117 private PacketProcessingService packetProcessingService;
118 private NotificationPublishService notificationPublishService;
119 private OdlInterfaceRpcService interfaceManager;
120 private Map<Class<?>, AlivenessProtocolHandler> packetTypeToProtocolHandler;
121 private Map<EtherTypes, AlivenessProtocolHandler> ethTypeToProtocolHandler;
122 private ConcurrentMap<Long, ScheduledFuture<?>> monitoringTasks;
123 private LoadingCache<Long, String> monitorIdKeyCache;
124 private ScheduledExecutorService monitorService;
125 private ExecutorService callbackExecutorService;
127 private static final int THREAD_POOL_SIZE = 4;
128 private static final boolean INTERRUPT_TASK = true;
129 private static final int NO_DELAY = 0;
130 private static final Long INITIAL_COUNT = 0L;
131 private static final boolean CREATE_MISSING_PARENT = true;
132 private static final int INVALID_ID = 0;
133 private ConcurrentMap<String, Semaphore> lockMap = new ConcurrentHashMap<>();
135 private class FutureCallbackImpl implements FutureCallback<Void> {
136 private String message;
137 public FutureCallbackImpl(String message) {
138 this.message = message;
142 public void onFailure(Throwable error) {
143 LOG.warn("Error in Datastore operation - {}", message, error);
147 public void onSuccess(Void result) {
148 LOG.debug("Success in Datastore operation - {}", message);
152 private class AlivenessMonitorTask implements Runnable {
153 private MonitoringInfo monitoringInfo;
155 public AlivenessMonitorTask(MonitoringInfo monitoringInfo) {
156 this.monitoringInfo = monitoringInfo;
161 if(LOG.isTraceEnabled()) {
162 LOG.trace("send monitor packet - {}", monitoringInfo);
164 sendMonitorPacket(monitoringInfo);
168 public AlivenessMonitor(DataBroker dataBroker) {
170 ethTypeToProtocolHandler = new EnumMap<>(EtherTypes.class);
171 packetTypeToProtocolHandler = new HashMap<>();
172 monitorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
173 getMonitoringThreadFactory("Aliveness Monitoring Task"));
174 callbackExecutorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
175 getMonitoringThreadFactory("Aliveness Callback Handler"));
176 monitoringTasks = new ConcurrentHashMap<>();
180 private ThreadFactory getMonitoringThreadFactory(String threadNameFormat) {
181 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
182 builder.setNameFormat(threadNameFormat);
183 builder.setUncaughtExceptionHandler( new UncaughtExceptionHandler() {
185 public void uncaughtException(Thread t, Throwable e) {
186 LOG.error("Received Uncaught Exception event in Thread: {}", t.getName(), e);
189 return builder.build();
192 private void initilizeCache() {
193 monitorIdKeyCache = CacheBuilder.newBuilder()
194 .build(new CacheLoader<Long, String>() {
196 public String load(Long monitorId) throws Exception {
197 String monitorKey = null;
198 Optional<MonitoridKeyEntry> optKey = read(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId));
199 if(optKey.isPresent()) {
200 monitorKey = optKey.get().getMonitorKey();
208 public void close() throws Exception {
209 monitorIdKeyCache.cleanUp();
210 monitorService.shutdown();
211 callbackExecutorService.shutdown();
215 public DataBroker getDataBroker() {
220 public OdlInterfaceRpcService getInterfaceManager() {
221 return interfaceManager;
224 public void setPacketProcessingService(PacketProcessingService pktProcessingService) {
225 this.packetProcessingService = pktProcessingService;
228 public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
229 this.notificationPublishService = notificationPublishService;
232 public void setInterfaceManager(OdlInterfaceRpcService interfaceManager) {
233 this.interfaceManager = interfaceManager;
236 public void setIdManager(IdManagerService idManager) {
237 this.idManager = idManager;
241 public void registerHandler(EtherTypes etherType, AlivenessProtocolHandler protocolHandler) {
242 ethTypeToProtocolHandler.put(etherType, protocolHandler);
243 packetTypeToProtocolHandler.put(protocolHandler.getPacketClass(), protocolHandler);
246 private void createIdPool() {
247 CreateIdPoolInput createPool = new CreateIdPoolInputBuilder()
248 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
249 .setLow(AlivenessMonitorConstants.MONITOR_IDPOOL_START)
250 .setHigh(AlivenessMonitorConstants.MONITOR_IDPOOL_SIZE)
252 Future<RpcResult<Void>> result = idManager.createIdPool(createPool);
253 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(result), new FutureCallback<RpcResult<Void>>() {
256 public void onFailure(Throwable error) {
257 LOG.error("Failed to create idPool for Aliveness Monitor Service",error);
261 public void onSuccess(RpcResult<Void> result) {
262 if(result.isSuccessful()) {
263 LOG.debug("Created IdPool for Aliveness Monitor Service");
265 LOG.error("RPC to create Idpool failed {}", result.getErrors());
271 private int getUniqueId(final String idKey) {
272 AllocateIdInput getIdInput = new AllocateIdInputBuilder()
273 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
274 .setIdKey(idKey).build();
276 Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(getIdInput);
279 RpcResult<AllocateIdOutput> rpcResult = result.get();
280 if(rpcResult.isSuccessful()) {
281 return rpcResult.getResult().getIdValue().intValue();
283 LOG.warn("RPC Call to Get Unique Id returned with Errors {}", rpcResult.getErrors());
285 } catch (InterruptedException | ExecutionException e) {
286 LOG.warn("Exception when getting Unique Id for key {}", idKey, e);
291 private void releaseId(String idKey) {
292 ReleaseIdInput idInput = new ReleaseIdInputBuilder()
293 .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
294 .setIdKey(idKey).build();
296 Future<RpcResult<Void>> result = idManager.releaseId(idInput);
297 RpcResult<Void> rpcResult = result.get();
298 if(!rpcResult.isSuccessful()) {
299 LOG.warn("RPC Call to release Id {} with Key {} returned with Errors {}",
300 idKey, rpcResult.getErrors());
302 } catch (InterruptedException | ExecutionException e) {
303 LOG.warn("Exception when releasing Id for key {}", idKey, e);
308 public void onPacketReceived(PacketReceived packetReceived) {
309 Class<? extends PacketInReason> pktInReason = packetReceived.getPacketInReason();
310 if(LOG.isTraceEnabled()) {
311 LOG.trace("Packet Received {}", packetReceived );
314 if (pktInReason == SendToController.class) {
315 Packet packetInFormatted;
316 byte[] data = packetReceived.getPayload();
317 Ethernet res = new Ethernet();
319 packetInFormatted = res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
320 } catch (Exception e) {
321 LOG.warn("Failed to decode packet: {}", e.getMessage());
325 if(packetInFormatted == null) {
326 LOG.warn("Failed to deserialize Received Packet from table {}", packetReceived.getTableId().getValue());
330 Object objPayload = packetInFormatted.getPayload();
332 if(objPayload == null) {
333 LOG.trace("Unsupported packet type. Ignoring the packet...");
337 if (LOG.isTraceEnabled()) {
338 LOG.trace("onPacketReceived packet: {}, packet class: {}", packetReceived,
339 objPayload.getClass());
342 AlivenessProtocolHandler livenessProtocolHandler = packetTypeToProtocolHandler.get(objPayload.getClass());
343 if (livenessProtocolHandler == null) {
347 String monitorKey = livenessProtocolHandler.handlePacketIn(packetInFormatted.getPayload(), packetReceived);
349 if(monitorKey != null) {
350 processReceivedMonitorKey(monitorKey);
352 LOG.debug("No monitorkey associated with received packet");
357 private void processReceivedMonitorKey(final String monitorKey) {
358 Preconditions.checkNotNull(monitorKey, "Monitor Key required to process the state");
360 LOG.debug("Processing monitorKey: {} for received packet", monitorKey);
362 final Semaphore lock = lockMap.get(monitorKey);
363 LOG.debug("Acquiring lock for monitor key : {} to process monitor packet", monitorKey);
366 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
368 ListenableFuture<Optional<MonitoringState>> stateResult = tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
371 Futures.addCallback(stateResult, new FutureCallback<Optional<MonitoringState>>() {
374 public void onSuccess(Optional<MonitoringState> optState) {
376 if(optState.isPresent()) {
377 final MonitoringState currentState = optState.get();
379 if(LOG.isTraceEnabled()) {
380 LOG.trace("OnPacketReceived : Monitoring state from ODS : {} ", currentState);
383 Long responsePendingCount = currentState.getResponsePendingCount();
385 //Need to relook at the pending count logic to support N out of M scenarios
386 // if(currentState.getState() != LivenessState.Up) {
387 // //Reset responsePendingCount when state changes from DOWN to UP
388 // responsePendingCount = INITIAL_COUNT;
391 // if(responsePendingCount > INITIAL_COUNT) {
392 // responsePendingCount = currentState.getResponsePendingCount() - 1;
394 responsePendingCount = INITIAL_COUNT;
396 final boolean stateChanged = (currentState.getState() == LivenessState.Down ||
397 currentState.getState() == LivenessState.Unknown);
399 final MonitoringState state = new MonitoringStateBuilder().setMonitorKey(monitorKey).setState(LivenessState.Up)
400 .setResponsePendingCount(responsePendingCount).build();
401 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), state);
402 ListenableFuture<Void> writeResult = tx.submit();
405 Futures.addCallback(writeResult, new FutureCallback<Void>() {
407 public void onSuccess(Void noarg) {
411 LOG.info("Sending notification for monitor Id : {} with Current State: {}",
412 currentState.getMonitorId(), LivenessState.Up);
413 publishNotification(currentState.getMonitorId(), LivenessState.Up);
415 if(LOG.isTraceEnabled()) {
416 LOG.trace("Successful in writing monitoring state {} to ODS", state);
422 public void onFailure(Throwable error) {
424 LOG.warn("Error in writing monitoring state : {} to Datastore", monitorKey, error);
425 if(LOG.isTraceEnabled()) {
426 LOG.trace("Error in writing monitoring state: {} to Datastore", state);
431 LOG.warn("Monitoring State not available for key: {} to process the Packet received", monitorKey);
432 //Complete the transaction
439 public void onFailure(Throwable error) {
440 LOG.error("Error when reading Monitoring State for key: {} to process the Packet received", monitorKey, error);
441 //FIXME: Not sure if the transaction status is valid to cancel
449 public PacketProcessingService getPacketProcessingService() {
450 return packetProcessingService;
453 private String getIpAddress(EndpointType endpoint) {
454 String ipAddress = "";
455 if( endpoint instanceof IpAddress) {
456 ipAddress = ((IpAddress) endpoint).getIpAddress().getIpv4Address().getValue();
457 } else if (endpoint instanceof Interface) {
458 ipAddress = ((Interface)endpoint).getInterfaceIp().getIpv4Address().getValue();
463 private String getUniqueKey(String interfaceName, String ethType, EndpointType source, EndpointType destination) {
464 StringBuilder builder = new StringBuilder().append(interfaceName).append(AlivenessMonitorConstants.SEPERATOR)
467 builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(source));
470 if(destination != null) {
471 builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(destination));
473 return builder.toString();
477 public Future<RpcResult<MonitorStartOutput>> monitorStart(MonitorStartInput input) {
478 RpcResultBuilder<MonitorStartOutput> rpcResultBuilder;
479 final Config in = input.getConfig();
480 Long profileId = in.getProfileId();
481 LOG.debug("Monitor Start invoked with Config: {}, Profile Id: {}", in, profileId);
484 if(in.getMode() != MonitoringMode.OneOne) {
485 throw new UnsupportedConfigException(
486 "Unsupported Monitoring mode. Currently one-one mode is supported");
489 Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
490 final MonitorProfile profile;
491 if(!optProfile.isPresent()) {
492 String errMsg = String.format("No monitoring profile associated with Id: %d", profileId);
493 LOG.error("Monitor start failed. {}", errMsg);
494 throw new RuntimeException(errMsg);
496 profile = optProfile.get();
499 EtherTypes ethType = profile.getProtocolType();
501 String interfaceName = null;
502 EndpointType srcEndpointType = in.getSource().getEndpointType();
504 if( srcEndpointType instanceof Interface) {
505 Interface endPoint = (Interface) srcEndpointType;
506 interfaceName = endPoint.getInterfaceName();
508 throw new UnsupportedConfigException(
509 "Unsupported source Endpoint type. Only Interface Endpoint currently supported for monitoring");
512 if(Strings.isNullOrEmpty(interfaceName)) {
513 throw new RuntimeException("Interface Name not defined in the source Endpoint");
516 //Initially the support is for one monitoring per interface.
517 //Revisit the retrieving monitor id logic when the multiple monitoring for same interface is needed.
518 EndpointType destEndpointType = null;
519 if(in.getDestination() != null) {
520 destEndpointType = in.getDestination().getEndpointType();
522 String idKey = getUniqueKey(interfaceName, ethType.toString(), srcEndpointType, destEndpointType);
523 final long monitorId = getUniqueId(idKey);
524 Optional<MonitoringInfo> optKey = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
526 if(optKey.isPresent()) {
527 String message = String.format("Monitoring for the interface %s with this configuration is already registered.", interfaceName);
529 MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
530 rpcResultBuilder = RpcResultBuilder.success(output).withWarning(ErrorType.APPLICATION, "config-exists", message);
531 return Futures.immediateFuture(rpcResultBuilder.build());
533 //Construct the monitor key
534 final MonitoringInfo monitoringInfo = new MonitoringInfoBuilder()
536 .setMode(in.getMode())
537 .setProfileId(profileId)
538 .setDestination(in.getDestination())
539 .setSource(in.getSource()).build();
540 //Construct the initial monitor state
541 AlivenessProtocolHandler handler = ethTypeToProtocolHandler.get(ethType);
542 final String monitoringKey = handler.getUniqueMonitoringKey(monitoringInfo);
544 MonitoringState monitoringState = new MonitoringStateBuilder()
545 .setMonitorKey(monitoringKey)
546 .setMonitorId(monitorId)
547 .setState(LivenessState.Unknown)
548 .setStatus(MonitorStatus.Started)
549 .setRequestCount(INITIAL_COUNT)
550 .setResponsePendingCount(INITIAL_COUNT).build();
552 MonitoridKeyEntry mapEntry = new MonitoridKeyEntryBuilder().setMonitorId(monitorId)
553 .setMonitorKey(monitoringKey).build();
555 WriteTransaction tx = broker.newWriteOnlyTransaction();
557 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId), monitoringInfo, CREATE_MISSING_PARENT);
559 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitoringKey), monitoringState, CREATE_MISSING_PARENT);
561 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId), mapEntry, CREATE_MISSING_PARENT);
563 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
565 public void onFailure(Throwable error) {
566 String errorMsg = String.format("Adding Monitoring info: %s in Datastore failed", monitoringInfo);
567 LOG.warn(errorMsg, error);
568 throw new RuntimeException(errorMsg, error);
572 public void onSuccess(Void noarg) {
574 LOG.debug("Scheduling monitor task for config: {}", in);
575 scheduleMonitoringTask(monitoringInfo, profile.getMonitorInterval());
576 lockMap.put(monitoringKey, new Semaphore(1, true));
581 associateMonitorIdWithInterface(monitorId, interfaceName);
583 MonitorStartOutput output = new MonitorStartOutputBuilder()
584 .setMonitorId(monitorId).build();
586 rpcResultBuilder = RpcResultBuilder.success(output);
587 } catch(Exception e) {
588 LOG.error("Start Monitoring Failed. {}", e.getMessage(), e);
589 rpcResultBuilder = RpcResultBuilder.<MonitorStartOutput>failed().withError(ErrorType.APPLICATION, e.getMessage(), e);
591 return Futures.immediateFuture(rpcResultBuilder.build());
594 private void associateMonitorIdWithInterface(final Long monitorId, final String interfaceName) {
595 LOG.debug("associate monitor Id {} with interface {}", monitorId, interfaceName);
596 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
597 ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture =
598 tx.read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
599 ListenableFuture<Void> updateFuture =
600 Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
603 public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
604 if(optEntry.isPresent()) {
605 InterfaceMonitorEntry entry = optEntry.get();
606 List<Long> monitorIds = entry.getMonitorIds();
607 monitorIds.add(monitorId);
608 InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder()
609 .setKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
610 tx.merge(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry);
612 //Create new monitor entry
613 LOG.debug("Adding new interface-monitor association for interface {} with id {}", interfaceName, monitorId);
614 List<Long> monitorIds = new ArrayList<>();
615 monitorIds.add(monitorId);
616 InterfaceMonitorEntry newEntry =
617 new InterfaceMonitorEntryBuilder().setInterfaceName(interfaceName).setMonitorIds(monitorIds).build();
618 tx.put(LogicalDatastoreType.OPERATIONAL,
619 getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
625 Futures.addCallback(updateFuture, new FutureCallbackImpl(
626 String.format("Association of monitorId %d with Interface %s", monitorId, interfaceName)));
629 private void scheduleMonitoringTask(MonitoringInfo monitoringInfo, long monitorInterval) {
630 AlivenessMonitorTask monitorTask = new AlivenessMonitorTask(monitoringInfo);
631 ScheduledFuture<?> scheduledFutureResult = monitorService.scheduleAtFixedRate(
632 monitorTask, NO_DELAY, monitorInterval, TimeUnit.MILLISECONDS);
633 monitoringTasks.put(monitoringInfo.getId(), scheduledFutureResult);
637 public Future<RpcResult<Void>> monitorPause(MonitorPauseInput input) {
638 LOG.debug("Monitor Pause operation invoked for monitor id: {}", input.getMonitorId());
639 SettableFuture<RpcResult<Void>> result = SettableFuture.create();
640 final Long monitorId = input.getMonitorId();
642 //Set the monitoring status to Paused
643 updateMonitorStatusTo(monitorId, MonitorStatus.Paused, new Predicate<MonitorStatus>() {
645 public boolean apply(MonitorStatus currentStatus) {
646 return currentStatus == MonitorStatus.Started;
650 if(stopMonitoringTask(monitorId)) {
651 result.set(RpcResultBuilder.<Void>success().build());
653 String errorMsg = String.format("No Monitoring Task availble to pause for the given monitor id : %d", monitorId);
654 LOG.error("Monitor Pause operation failed- {}",errorMsg);
655 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
662 public Future<RpcResult<Void>> monitorUnpause(MonitorUnpauseInput input) {
663 LOG.debug("Monitor Unpause operation invoked for monitor id: {}", input.getMonitorId());
664 final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
666 final Long monitorId = input.getMonitorId();
667 final ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
668 ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
669 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
671 Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
674 public void onFailure(Throwable error) {
675 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
676 LOG.error("Monitor unpause Failed. {}", msg, error);
677 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
681 public void onSuccess(Optional<MonitoringInfo> optInfo) {
682 if(optInfo.isPresent()) {
683 final MonitoringInfo info = optInfo.get();
684 ListenableFuture<Optional<MonitorProfile>> readProfile =
685 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
686 Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
689 public void onFailure(Throwable error) {
690 String msg = String.format("Unable to read Monitoring profile associated with id %d", info.getProfileId());
691 LOG.warn("Monitor unpause Failed. {}", msg, error);
692 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
696 public void onSuccess(Optional<MonitorProfile> optProfile) {
698 if(optProfile.isPresent()) {
699 updateMonitorStatusTo(monitorId, MonitorStatus.Started, new Predicate<MonitorStatus>() {
701 public boolean apply(MonitorStatus currentStatus) {
702 return (currentStatus == MonitorStatus.Paused ||
703 currentStatus == MonitorStatus.Stopped);
706 MonitorProfile profile = optProfile.get();
707 LOG.debug("Monitor Resume - Scheduling monitoring task with Id: {}", monitorId);
708 scheduleMonitoringTask(info, profile.getMonitorInterval());
709 result.set(RpcResultBuilder.<Void>success().build());
711 String msg = String.format("Monitoring profile associated with id %d is not present", info.getProfileId());
712 LOG.warn("Monitor unpause Failed. {}", msg);
713 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
719 String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
720 LOG.warn("Monitor unpause Failed. {}", msg);
721 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
724 }, callbackExecutorService);
729 private boolean stopMonitoringTask(Long monitorId) {
730 return stopMonitoringTask(monitorId, INTERRUPT_TASK);
733 private boolean stopMonitoringTask(Long monitorId, boolean interruptTask) {
734 ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
735 if(scheduledFutureResult != null) {
736 scheduledFutureResult.cancel(interruptTask);
742 private Optional<MonitorProfile> getMonitorProfile(Long profileId) {
743 return read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
746 private void acquireLock(Semaphore lock) {
751 boolean acquiredLock = false;
753 acquiredLock = lock.tryAcquire(50, TimeUnit.MILLISECONDS);
754 } catch (InterruptedException e) {
755 LOG.warn("Thread interrupted when waiting to acquire the lock");
759 LOG.warn("Previous transaction did not complete in time. Releasing the lock to proceed");
763 LOG.trace("Lock acquired successfully");
764 } catch (InterruptedException e) {
765 LOG.warn("Acquire failed");
768 LOG.trace("Lock acquired successfully");
772 private void releaseLock(Semaphore lock) {
778 private void sendMonitorPacket(final MonitoringInfo monitoringInfo) {
779 //TODO: Handle interrupts
780 final Long monitorId = monitoringInfo.getId();
781 final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
782 if(monitorKey == null) {
783 LOG.warn("No monitor Key associated with id {} to send the monitor packet", monitorId);
786 LOG.debug("Sending monitoring packet for key: {}", monitorKey);
789 final MonitorProfile profile;
790 Optional<MonitorProfile> optProfile = getMonitorProfile(monitoringInfo.getProfileId());
791 if(optProfile.isPresent()) {
792 profile = optProfile.get();
794 LOG.warn("No monitor profile associated with id {}. "
795 + "Could not send Monitor packet for monitor-id {}", monitoringInfo.getProfileId(), monitorId);
799 final Semaphore lock = lockMap.get(monitorKey);
800 LOG.debug("Acquiring lock for monitor key : {} to send monitor packet", monitorKey);
803 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
804 ListenableFuture<Optional<MonitoringState>> readResult =
805 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
806 ListenableFuture<Void> writeResult = Futures.transform(readResult, new AsyncFunction<Optional<MonitoringState>, Void>() {
809 public ListenableFuture<Void> apply(Optional<MonitoringState> optState)
811 if(optState.isPresent()) {
812 MonitoringState state = optState.get();
814 //Increase the request count
815 Long requestCount = state.getRequestCount() + 1;
817 //Check with the monitor window
818 LivenessState currentLivenessState = state.getState();
820 //Increase the pending response count
821 long responsePendingCount = state.getResponsePendingCount();
822 if(responsePendingCount < profile.getMonitorWindow()) {
823 responsePendingCount = responsePendingCount + 1;
826 //Check with the failure thresold
827 if(responsePendingCount >= profile.getFailureThreshold()) {
828 //Change the state to down and notify
829 if(currentLivenessState != LivenessState.Down) {
830 LOG.debug("Response pending Count: {}, Failure threshold: {} for monitorId {}",
831 responsePendingCount, profile.getFailureThreshold(), state.getMonitorId());
832 LOG.info("Sending notification for monitor Id : {} with State: {}",
833 state.getMonitorId(), LivenessState.Down);
834 publishNotification(monitorId, LivenessState.Down);
835 currentLivenessState = LivenessState.Down;
836 //Reset requestCount when state changes from UP to DOWN
837 requestCount = INITIAL_COUNT;
841 //Update the ODS with state
842 MonitoringState updatedState = new MonitoringStateBuilder(/*state*/).setMonitorKey(state.getMonitorKey())
843 .setRequestCount(requestCount)
844 .setResponsePendingCount(responsePendingCount)
845 .setState(currentLivenessState).build();
846 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(state.getMonitorKey()), updatedState);
849 //Close the transaction
851 String errorMsg = String.format("Monitoring State associated with id %d is not present to send packet out.", monitorId);
852 return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
858 Futures.addCallback(writeResult, new FutureCallback<Void>() {
860 public void onSuccess(Void noarg) {
861 //invoke packetout on protocol handler
862 AlivenessProtocolHandler handler = ethTypeToProtocolHandler.get(profile.getProtocolType());
863 if(handler != null) {
864 LOG.debug("Sending monitoring packet {}", monitoringInfo);
865 handler.sendPacketOut(monitoringInfo);
871 public void onFailure(Throwable error) {
872 LOG.warn("Updating monitoring state for key: {} failed. Monitoring packet is not sent", monitorKey, error);
880 private void publishNotification(final Long monitorId, final LivenessState state) {
881 LOG.debug("Sending notification for id {} - state {}", monitorId, state);
882 EventData data = new EventDataBuilder().setMonitorId(monitorId)
883 .setMonitorState(state).build();
884 MonitorEvent event = new MonitorEventBuilder().setEventData(data).build();;
885 final ListenableFuture<? extends Object> eventFuture = notificationPublishService.offerNotification(event);
886 Futures.addCallback(eventFuture, new FutureCallback<Object>() {
888 public void onFailure(Throwable error) {
889 LOG.warn("Error in notifying listeners for id {} - state {}", monitorId, state, error);
893 public void onSuccess(Object arg) {
894 LOG.trace("Successful in notifying listeners for id {} - state {}", monitorId, state);
900 public Future<RpcResult<MonitorProfileCreateOutput>> monitorProfileCreate(final MonitorProfileCreateInput input) {
901 LOG.debug("Monitor Profile Create operation - {}", input.getProfile());
902 final SettableFuture<RpcResult<MonitorProfileCreateOutput>> result = SettableFuture.create();
903 Profile profile = input.getProfile();
904 final Long failureThreshold = profile.getFailureThreshold();
905 final Long monitorInterval = profile.getMonitorInterval();
906 final Long monitorWindow = profile.getMonitorWindow();
907 final EtherTypes ethType = profile.getProtocolType();
908 String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, ethType);
909 final Long profileId = Long.valueOf(getUniqueId(idKey));
911 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
912 ListenableFuture<Optional<MonitorProfile>> readFuture =
913 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
914 ListenableFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture =
915 Futures.transform(readFuture, new AsyncFunction<Optional<MonitorProfile>, RpcResult<MonitorProfileCreateOutput>>() {
918 public ListenableFuture<RpcResult<MonitorProfileCreateOutput>> apply(
919 Optional<MonitorProfile> optProfile) throws Exception {
920 if(optProfile.isPresent()) {
922 MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
923 .setProfileId(profileId).build();
924 String msg = String.format("Monitor profile %s already present for the given input", input);
926 result.set(RpcResultBuilder.success(output)
927 .withWarning(ErrorType.PROTOCOL, "profile-exists", msg).build());
929 final MonitorProfile monitorProfile = new MonitorProfileBuilder().setId(profileId)
930 .setFailureThreshold(failureThreshold)
931 .setMonitorInterval(monitorInterval)
932 .setMonitorWindow(monitorWindow)
933 .setProtocolType(ethType).build();
934 tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId), monitorProfile, CREATE_MISSING_PARENT);
935 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
937 public void onFailure(Throwable error) {
939 String.format("Error when storing monitorprofile %s in datastore", monitorProfile);
940 LOG.error(msg, error);
941 result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
942 .withError(ErrorType.APPLICATION, msg, error).build());
945 public void onSuccess(Void noarg) {
946 MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
947 .setProfileId(profileId).build();
948 result.set(RpcResultBuilder.success(output).build());
954 }, callbackExecutorService);
955 Futures.addCallback(resultFuture, new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
957 public void onFailure(Throwable error) {
958 //This would happen when any error happens during reading for monitoring profile
959 String msg = String.format("Error in creating monitorprofile - %s", input);
960 result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
961 .withError(ErrorType.APPLICATION, msg, error).build());
962 LOG.error(msg, error);
966 public void onSuccess(RpcResult<MonitorProfileCreateOutput> result) {
967 LOG.debug("Successfully created monitor Profile {} ", input);
969 }, callbackExecutorService);
973 private String getUniqueProfileKey(Long failureThreshold,Long monitorInterval,Long monitorWindow,EtherTypes ethType) {
974 return new StringBuilder().append(failureThreshold).append(AlivenessMonitorConstants.SEPERATOR)
975 .append(monitorInterval).append(AlivenessMonitorConstants.SEPERATOR)
976 .append(monitorWindow).append(AlivenessMonitorConstants.SEPERATOR)
977 .append(ethType).append(AlivenessMonitorConstants.SEPERATOR).toString();
981 public Future<RpcResult<Void>> monitorProfileDelete(final MonitorProfileDeleteInput input) {
982 LOG.debug("Monitor Profile delete for Id: {}", input.getProfileId());
983 final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
984 final Long profileId = input.getProfileId();
985 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
986 ListenableFuture<Optional<MonitorProfile>> readFuture =
987 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
988 ListenableFuture<RpcResult<Void>> writeFuture =
989 Futures.transform(readFuture, new AsyncFunction<Optional<MonitorProfile>, RpcResult<Void>>() {
992 public ListenableFuture<RpcResult<Void>> apply(final Optional<MonitorProfile> optProfile) throws Exception {
993 if(optProfile.isPresent()) {
994 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
995 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
997 public void onFailure(Throwable error) {
998 String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
999 LOG.error(msg, error);
1000 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
1004 public void onSuccess(Void noarg) {
1005 MonitorProfile profile = optProfile.get();
1006 String id = getUniqueProfileKey(profile.getFailureThreshold(), profile.getMonitorInterval(),
1007 profile.getMonitorWindow(), profile.getProtocolType());
1009 result.set(RpcResultBuilder.<Void>success().build());
1013 String msg = String.format("Monitor profile with Id: %d does not exist", profileId);
1015 result.set(RpcResultBuilder.<Void>success().withWarning(ErrorType.PROTOCOL, "invalid-value", msg).build());
1019 }, callbackExecutorService);
1021 Futures.addCallback(writeFuture, new FutureCallback<RpcResult<Void>>() {
1024 public void onFailure(Throwable error) {
1025 String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
1026 LOG.error(msg, error);
1027 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
1031 public void onSuccess(RpcResult<Void> noarg) {
1032 LOG.debug("Successfully removed Monitor Profile {}", profileId);
1034 }, callbackExecutorService);
1039 public Future<RpcResult<Void>> monitorStop(MonitorStopInput input) {
1040 LOG.debug("Monitor Stop operation for monitor id - {}", input.getMonitorId());
1041 SettableFuture<RpcResult<Void>> result = SettableFuture.create();
1043 final Long monitorId = input.getMonitorId();
1044 Optional<MonitoringInfo> optInfo = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1045 if(optInfo.isPresent()) {
1046 //Stop the monitoring task
1047 stopMonitoringTask(monitorId);
1049 //Cleanup the Data store
1050 WriteTransaction tx = broker.newWriteOnlyTransaction();
1051 String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1052 if(monitorKey != null) {
1053 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
1054 monitorIdKeyCache.invalidate(monitorId);
1057 tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1058 Futures.addCallback(tx.submit(),
1059 new FutureCallbackImpl(String.format("Delete monitor state with Id %d", monitorId)));
1061 MonitoringInfo info = optInfo.get();
1062 String interfaceName = getInterfaceName(info.getSource().getEndpointType());
1063 if(interfaceName != null) {
1064 removeMonitorIdFromInterfaceAssociation(monitorId, interfaceName);
1066 releaseIdForMonitoringInfo(info);
1068 lockMap.remove(monitorKey);
1070 result.set(RpcResultBuilder.<Void>success().build());
1072 String errorMsg = String.format("Do not have monitoring information associated with key %d", monitorId);
1073 LOG.error("Delete monitoring operation Failed - {}", errorMsg);
1074 result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
1080 private void removeMonitorIdFromInterfaceAssociation(final Long monitorId, final String interfaceName) {
1081 LOG.debug("Remove monitorId {} from Interface association {}", monitorId, interfaceName);
1082 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
1083 ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
1084 ListenableFuture<Void> updateFuture = Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
1087 public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
1088 if(optEntry.isPresent()) {
1089 InterfaceMonitorEntry entry = optEntry.get();
1090 List<Long> monitorIds = entry.getMonitorIds();
1091 monitorIds.remove(monitorId);
1092 InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder(entry)
1093 .setKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
1094 tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
1097 LOG.warn("No Interface map entry found {} to remove monitorId {}", interfaceName, monitorId);
1099 return Futures.immediateFuture(null);
1104 Futures.addCallback(updateFuture, new FutureCallbackImpl(
1105 String.format("Dis-association of monitorId %d with Interface %s", monitorId, interfaceName)));
1109 private void releaseIdForMonitoringInfo(MonitoringInfo info) {
1110 Long monitorId = info.getId();
1111 EndpointType source = info.getSource().getEndpointType();
1112 String interfaceName = getInterfaceName(source);
1113 if(!Strings.isNullOrEmpty(interfaceName)) {
1114 Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1115 if(optProfile.isPresent()) {
1116 EtherTypes ethType = optProfile.get().getProtocolType();
1117 EndpointType destination = (info.getDestination() != null) ? info.getDestination().getEndpointType() : null;
1118 String idKey = getUniqueKey(interfaceName, ethType.toString(), source, destination);
1121 LOG.warn("Could not release monitorId {}. No profile associated with it", monitorId);
1126 private String getInterfaceName(EndpointType endpoint) {
1127 String interfaceName = null;
1128 if(endpoint instanceof Interface) {
1129 interfaceName = ((Interface)endpoint).getInterfaceName();
1131 return interfaceName;
1134 private void stopMonitoring(long monitorId) {
1135 updateMonitorStatusTo(monitorId, MonitorStatus.Stopped, new Predicate<MonitorStatus>() {
1137 public boolean apply(MonitorStatus currentStatus) {
1138 return currentStatus != MonitorStatus.Stopped;
1141 if(!stopMonitoringTask(monitorId)) {
1142 LOG.warn("No monitoring task running to perform cancel operation for monitorId {}", monitorId);
1146 private void updateMonitorStatusTo(final Long monitorId, final MonitorStatus newStatus, final Predicate<MonitorStatus> isValidStatus) {
1147 final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
1148 if(monitorKey == null) {
1149 LOG.warn("No monitor Key associated with id {} to change the monitor status to {}", monitorId, newStatus);
1152 final ReadWriteTransaction tx = broker.newReadWriteTransaction();
1154 ListenableFuture<Optional<MonitoringState>> readResult =
1155 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
1157 ListenableFuture<Void> writeResult = Futures.transform(readResult, new AsyncFunction<Optional<MonitoringState>, Void>() {
1159 public ListenableFuture<Void> apply(Optional<MonitoringState> optState) throws Exception {
1160 if(optState.isPresent()) {
1161 MonitoringState state = optState.get();
1162 if(isValidStatus.apply(state.getStatus())) {
1163 MonitoringState updatedState = new MonitoringStateBuilder().setMonitorKey(monitorKey)
1164 .setStatus(newStatus).build();
1165 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), updatedState);
1167 LOG.warn("Invalid Monitoring status {}, cannot be updated to {} for monitorId {}"
1168 , state.getStatus(), newStatus, monitorId);
1171 LOG.warn("No associated monitoring state data available to update the status to {} for {}", newStatus, monitorId);
1177 Futures.addCallback(writeResult,
1178 new FutureCallbackImpl(String.format("Monitor status update for %d to %s", monitorId, newStatus.toString())));
1181 private void resumeMonitoring(final long monitorId) {
1182 final ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
1183 ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
1184 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
1186 Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
1189 public void onFailure(Throwable error) {
1190 String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
1191 LOG.error("Monitor resume Failed. {}", msg, error);
1195 public void onSuccess(Optional<MonitoringInfo> optInfo) {
1196 if(optInfo.isPresent()) {
1197 final MonitoringInfo info = optInfo.get();
1198 ListenableFuture<Optional<MonitorProfile>> readProfile =
1199 tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
1200 Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
1203 public void onFailure(Throwable error) {
1204 String msg = String.format("Unable to read Monitoring profile associated with id %d", info.getProfileId());
1205 LOG.warn("Monitor resume Failed. {}", msg, error);
1209 public void onSuccess(Optional<MonitorProfile> optProfile) {
1211 if(optProfile.isPresent()) {
1212 updateMonitorStatusTo(monitorId, MonitorStatus.Started, new Predicate<MonitorStatus>() {
1214 public boolean apply(MonitorStatus currentStatus) {
1215 return currentStatus != MonitorStatus.Started;
1218 MonitorProfile profile = optProfile.get();
1219 LOG.debug("Monitor Resume - Scheduling monitoring task for Id: {}", monitorId);
1220 scheduleMonitoringTask(info, profile.getMonitorInterval());
1222 String msg = String.format("Monitoring profile associated with id %d is not present", info.getProfileId());
1223 LOG.warn("Monitor resume Failed. {}", msg);
1229 String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
1230 LOG.warn("Monitor resume Failed. {}", msg);
1236 //DATA STORE OPERATIONS
1237 private <T extends DataObject> Optional<T> read(LogicalDatastoreType datastoreType, InstanceIdentifier<T> path) {
1238 ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
1240 Optional<T> result = Optional.absent();
1242 result = tx.read(datastoreType, path).get();
1243 } catch (InterruptedException | ExecutionException e) {
1244 LOG.warn("Error reading data from path {} in datastore {}", path, datastoreType, e);
1253 public void onInterfaceStateUp(String interfaceName) {
1254 List<Long> monitorIds = getMonitorIds(interfaceName);
1255 if(monitorIds.isEmpty()) {
1256 LOG.warn("Could not get monitorId for interface: {}", interfaceName);
1259 for(Long monitorId : monitorIds) {
1260 LOG.debug("Resume monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1261 resumeMonitoring(monitorId);
1266 public void onInterfaceStateDown(String interfaceName) {
1267 List<Long> monitorIds = getMonitorIds(interfaceName);
1268 if(monitorIds.isEmpty()) {
1269 LOG.warn("Could not get monitorIds for interface: {}", interfaceName);
1272 for(Long monitorId : monitorIds) {
1273 LOG.debug("Suspend monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
1274 stopMonitoring(monitorId);
1278 private List<Long> getMonitorIds(String interfaceName) {
1279 Optional<InterfaceMonitorEntry> optEntry = read(LogicalDatastoreType.OPERATIONAL,
1280 getInterfaceMonitorMapId(interfaceName));
1281 if(optEntry.isPresent()) {
1282 InterfaceMonitorEntry entry = optEntry.get();
1283 return entry.getMonitorIds();
1285 return Collections.emptyList();