import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.PreDestroy;
import org.opendaylight.controller.liblldp.PacketException;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
-import org.opendaylight.controller.md.sal.binding.api.NotificationService;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
private static final boolean CREATE_MISSING_PARENT = true;
private static final int INVALID_ID = 0;
- private final DataBroker dataBroker;
- private final ManagedNewTransactionRunner txRunner;
- private final IdManagerService idManager;
- private final NotificationPublishService notificationPublishService;
- private final NotificationService notificationService;
- private final AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry;
- private final ConcurrentMap<Long, ScheduledFuture<?>> monitoringTasks;
- private final ScheduledExecutorService monitorService;
- private final ExecutorService callbackExecutorService;
- private final LoadingCache<Long, String> monitorIdKeyCache;
- private final ListenerRegistration<AlivenessMonitor> listenerRegistration;
- // TODO clean up: visibility package local instead of private because accessed in HwVtepTunnelsStateHandler
- final ConcurrentMap<String, Semaphore> lockMap = new ConcurrentHashMap<>();
-
private static class FutureCallbackImpl implements FutureCallback<Void> {
private final String message;
}
}
+ private final DataBroker dataBroker;
+ private final ManagedNewTransactionRunner txRunner;
+ private final IdManagerService idManager;
+ private final NotificationPublishService notificationPublishService;
+ private final AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry;
+ private final ConcurrentMap<Long, ScheduledFuture<?>> monitoringTasks = new ConcurrentHashMap<>();
+ private final ScheduledExecutorService monitorService;
+ private final ExecutorService callbackExecutorService;
+ private final LoadingCache<Long, String> monitorIdKeyCache;
+ private final ConcurrentMap<String, Semaphore> lockMap = new ConcurrentHashMap<>();
+
@Inject
public AlivenessMonitor(final DataBroker dataBroker, final IdManagerService idManager,
final NotificationPublishService notificationPublishService,
- final NotificationService notificationService,
AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry) {
this.dataBroker = dataBroker;
this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
this.idManager = idManager;
this.notificationPublishService = notificationPublishService;
- this.notificationService = notificationService;
this.alivenessProtocolHandlerRegistry = alivenessProtocolHandlerRegistry;
monitorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
ThreadFactoryProvider.builder().namePrefix("Aliveness Monitoring Task").logger(LOG).build().get());
callbackExecutorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
ThreadFactoryProvider.builder().namePrefix("Aliveness Callback Handler").logger(LOG).build().get());
- monitoringTasks = new ConcurrentHashMap<>();
createIdPool();
monitorIdKeyCache = CacheBuilder.newBuilder().build(new CacheLoader<Long, String>() {
}
});
- listenerRegistration = notificationService.registerNotificationListener(this);
LOG.info("{} started", getClass().getSimpleName());
}
monitorIdKeyCache.cleanUp();
monitorService.shutdown();
callbackExecutorService.shutdown();
- if (listenerRegistration != null) {
- listenerRegistration.close();
- }
LOG.info("{} close", getClass().getSimpleName());
}
- private ThreadFactory getMonitoringThreadFactory(String threadNameFormat) {
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- builder.setNameFormat(threadNameFormat);
- builder.setUncaughtExceptionHandler(
- (thread, ex) -> LOG.error("Received Uncaught Exception event in Thread: {}", thread.getName(), ex));
- return builder.build();
+ Semaphore getLock(String key) {
+ return lockMap.get(key);
}
private void createIdPool() {
.setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
.setLow(AlivenessMonitorConstants.MONITOR_IDPOOL_START)
.setHigh(AlivenessMonitorConstants.MONITOR_IDPOOL_SIZE).build();
- Future<RpcResult<Void>> result = idManager.createIdPool(createPool);
- Futures.addCallback(JdkFutureAdapters.listenInPoolThread(result), new FutureCallback<RpcResult<Void>>() {
+ Future<RpcResult<Void>> resultFuture = idManager.createIdPool(createPool);
+ Futures.addCallback(JdkFutureAdapters.listenInPoolThread(resultFuture), new FutureCallback<RpcResult<Void>>() {
@Override
public void onFailure(Throwable error) {
}
@Override
- public void onSuccess(RpcResult<Void> result) {
+ public void onSuccess(@Nonnull RpcResult<Void> result) {
if (result.isSuccessful()) {
LOG.debug("Created IdPool for Aliveness Monitor Service");
} else {
LOG.error("RPC to create Idpool failed {}", result.getErrors());
}
}
- });
+ }, callbackExecutorService);
}
private int getUniqueId(final String idKey) {
return;
}
- Object objPayload = packetInFormatted.getPayload();
+ Packet objPayload = packetInFormatted.getPayload();
if (objPayload == null) {
LOG.trace("Unsupported packet type. Ignoring the packet...");
LOG.trace("onPacketReceived packet: {}, packet class: {}", packetReceived, objPayload.getClass());
}
- AlivenessProtocolHandler livenessProtocolHandler = alivenessProtocolHandlerRegistry
- .getOpt(objPayload.getClass());
+ AlivenessProtocolHandler<Packet> livenessProtocolHandler = alivenessProtocolHandlerRegistry
+ .getOpt(Packet.class);
if (livenessProtocolHandler == null) {
return;
}
Futures.addCallback(stateResult, new FutureCallback<Optional<MonitoringState>>() {
@Override
- public void onSuccess(Optional<MonitoringState> optState) {
+ public void onSuccess(@Nonnull Optional<MonitoringState> optState) {
if (optState.isPresent()) {
final MonitoringState currentState = optState.get();
LOG.trace("OnPacketReceived : Monitoring state from ODS : {} ", currentState);
}
- Long responsePendingCount = currentState.getResponsePendingCount();
-
+ // Long responsePendingCount = currentState.getResponsePendingCount();
+ //
// Need to relook at the pending count logic to support N
// out of M scenarios
// if (currentState.getState() != LivenessState.Up) {
// responsePendingCount =
// currentState.getResponsePendingCount() - 1;
// }
- responsePendingCount = INITIAL_COUNT;
+ Long responsePendingCount = INITIAL_COUNT;
final boolean stateChanged = currentState.getState() == LivenessState.Down
|| currentState.getState() == LivenessState.Unknown;
LOG.trace("Error in writing monitoring state: {} to Datastore", state);
}
}
- });
+ }, callbackExecutorService);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Monitoring State not available for key: {} to process the Packet received",
tx.cancel();
releaseLock(lock);
}
- });
+ }, callbackExecutorService);
}
private String getIpAddress(EndpointType endpoint) {
String idKey = getUniqueKey(interfaceName, ethType.toString(), srcEndpointType, destEndpointType);
final long monitorId = getUniqueId(idKey);
Optional<MonitoringInfo> optKey = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
- final AlivenessProtocolHandler handler;
+ final AlivenessProtocolHandler<?> handler;
if (optKey.isPresent()) {
String message = String.format(
"Monitoring for the interface %s with this configuration " + "is already registered.",
LOG.debug("Scheduling monitor task for config: {}", in);
scheduleMonitoringTask(monitoringInfo, profile.getMonitorInterval());
}
- }, MoreExecutors.directExecutor());
+ }, callbackExecutorService);
}
associateMonitorIdWithInterface(monitorId, interfaceName);
CREATE_MISSING_PARENT);
}
return tx.submit();
- });
+ }, callbackExecutorService);
Futures.addCallback(updateFuture, new FutureCallbackImpl(
- String.format("Association of monitorId %d with Interface %s", monitorId, interfaceName)));
+ String.format("Association of monitorId %d with Interface %s", monitorId, interfaceName)),
+ MoreExecutors.directExecutor());
}
private void scheduleMonitoringTask(MonitoringInfo monitoringInfo, long monitorInterval) {
}
@Override
- @SuppressWarnings("resource") // tx.close() in Future's callback(s)
public Future<RpcResult<Void>> monitorUnpause(MonitorUnpauseInput input) {
LOG.debug("Monitor Unpause operation invoked for monitor id: {}", input.getMonitorId());
final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
}
@Override
- public void onSuccess(Optional<MonitoringInfo> optInfo) {
+ public void onSuccess(@Nonnull Optional<MonitoringInfo> optInfo) {
if (optInfo.isPresent()) {
final MonitoringInfo info = optInfo.get();
ListenableFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
}
@Override
- public void onSuccess(Optional<MonitorProfile> optProfile) {
+ public void onSuccess(@Nonnull Optional<MonitorProfile> optProfile) {
tx.close();
if (optProfile.isPresent()) {
updateMonitorStatusTo(monitorId, MonitorStatus.Started,
RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
}
}
- });
+ }, callbackExecutorService);
} else {
tx.close();
String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
"Monitoring State associated with id %d is not present to send packet out.", monitorId);
return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
}
- });
+ }, callbackExecutorService);
Futures.addCallback(writeResult, new FutureCallback<Void>() {
@Override
public void onSuccess(Void noarg) {
// invoke packetout on protocol handler
- AlivenessProtocolHandler handler = alivenessProtocolHandlerRegistry.getOpt(profile.getProtocolType());
+ AlivenessProtocolHandler<?> handler =
+ alivenessProtocolHandlerRegistry.getOpt(profile.getProtocolType());
if (handler != null) {
LOG.debug("Sending monitoring packet {}", monitoringInfo);
handler.startMonitoringTask(monitoringInfo);
error);
releaseLock(lock);
}
- });
+ }, callbackExecutorService);
}
void publishNotification(final Long monitorId, final LivenessState state) {
public void onSuccess(Object arg) {
LOG.trace("Successful in notifying listeners for id {} - state {}", monitorId, state);
}
- });
+ }, callbackExecutorService);
}
@Override
public Future<RpcResult<MonitorProfileCreateOutput>> monitorProfileCreate(final MonitorProfileCreateInput input) {
LOG.debug("Monitor Profile Create operation - {}", input.getProfile());
- final SettableFuture<RpcResult<MonitorProfileCreateOutput>> result = SettableFuture.create();
+ final SettableFuture<RpcResult<MonitorProfileCreateOutput>> returnFuture = SettableFuture.create();
Profile profile = input.getProfile();
final Long failureThreshold = profile.getFailureThreshold();
final Long monitorInterval = profile.getMonitorInterval();
.setProfileId(profileId).build();
String msg = String.format("Monitor profile %s already present for the given input", input);
LOG.warn(msg);
- result.set(RpcResultBuilder.success(output)
+ returnFuture.set(RpcResultBuilder.success(output)
.withWarning(ErrorType.PROTOCOL, "profile-exists", msg).build());
} else {
final MonitorProfile monitorProfile = new MonitorProfileBuilder().setId(profileId)
String msg = String.format("Error when storing monitorprofile %s in datastore",
monitorProfile);
LOG.error(msg, error);
- result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
+ returnFuture.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
.withError(ErrorType.APPLICATION, msg, error).build());
}
public void onSuccess(Void noarg) {
MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
.setProfileId(profileId).build();
- result.set(RpcResultBuilder.success(output).build());
+ returnFuture.set(RpcResultBuilder.success(output).build());
}
- });
+ }, callbackExecutorService);
}
- return result;
+ return returnFuture;
}, callbackExecutorService);
Futures.addCallback(resultFuture, new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
@Override
// This would happen when any error happens during reading for
// monitoring profile
String msg = String.format("Error in creating monitorprofile - %s", input);
- result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
+ returnFuture.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
.withError(ErrorType.APPLICATION, msg, error).build());
LOG.error(msg, error);
}
LOG.debug("Successfully created monitor Profile {} ", input);
}
}, callbackExecutorService);
- return result;
+ return returnFuture;
}
@Override
releaseId(id);
result.set(RpcResultBuilder.<Void>success().build());
}
- });
+ }, callbackExecutorService);
} else {
String msg = String.format("Monitor profile with Id: %d does not exist", profileId);
LOG.info(msg);
}
tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
- }), new FutureCallbackImpl(String.format("Delete monitor state with Id %d", monitorId)));
+ }), new FutureCallbackImpl(String.format("Delete monitor state with Id %d", monitorId)),
+ MoreExecutors.directExecutor());
MonitoringInfo info = optInfo.get();
String interfaceName = getInterfaceName(info.getSource().getEndpointType());
tx.cancel();
return Futures.immediateFuture(null);
}
- });
+ }, MoreExecutors.directExecutor());
Futures.addCallback(updateFuture, new FutureCallbackImpl(
- String.format("Dis-association of monitorId %d with Interface %s", monitorId, interfaceName)));
+ String.format("Dis-association of monitorId %d with Interface %s", monitorId, interfaceName)),
+ MoreExecutors.directExecutor());
}
private void releaseIdForMonitoringInfo(MonitoringInfo info) {
newStatus, monitorId);
}
return tx.submit();
- });
+ }, MoreExecutors.directExecutor());
Futures.addCallback(writeResult, new FutureCallbackImpl(
- String.format("Monitor status update for %d to %s", monitorId, newStatus.toString())));
+ String.format("Monitor status update for %d to %s", monitorId, newStatus.toString())),
+ MoreExecutors.directExecutor());
}
- @SuppressWarnings("resource") // tx.close() in Future's callback(s)
private void resumeMonitoring(final long monitorId) {
final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
ListenableFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
}
@Override
- public void onSuccess(Optional<MonitoringInfo> optInfo) {
+ public void onSuccess(@Nonnull Optional<MonitoringInfo> optInfo) {
if (optInfo.isPresent()) {
final MonitoringInfo info = optInfo.get();
ListenableFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
}
@Override
- public void onSuccess(Optional<MonitorProfile> optProfile) {
+ public void onSuccess(@Nonnull Optional<MonitorProfile> optProfile) {
tx.close();
if (optProfile.isPresent()) {
updateMonitorStatusTo(monitorId, MonitorStatus.Started,
LOG.warn("Monitor resume Failed. {}", msg);
}
}
- });
+ }, MoreExecutors.directExecutor());
} else {
tx.close();
String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
LOG.warn("Monitor resume Failed. {}", msg);
}
}
- });
+ }, MoreExecutors.directExecutor());
}
// DATA STORE OPERATIONS
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Semaphore;
+import javax.annotation.Nonnull;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.opendaylight.controller.liblldp.Packet;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.monitoring.states.MonitoringStateBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepPhysicalLocatorRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.Tunnels;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelsBuilder;
@Singleton
public class HwVtepTunnelsStateHandler extends HwvtepAbstractDataTreeChangeListener<Tunnels, HwVtepTunnelsStateHandler>
- implements AlivenessProtocolHandler, AutoCloseable {
+ implements AlivenessProtocolHandler<Packet> {
private static final Logger LOG = LoggerFactory.getLogger(HwVtepTunnelsStateHandler.class);
private final DataBroker dataBroker;
final String monitorKey = getBfdMonitorKey(interfaceName);
LOG.debug("Processing monitorKey: {} for received Tunnels update DCN", monitorKey);
- final Semaphore lock = alivenessMonitor.lockMap.get(monitorKey);
+ final Semaphore lock = alivenessMonitor.getLock(monitorKey);
LOG.debug("Acquiring lock for monitor key : {} to process monitor DCN", monitorKey);
alivenessMonitor.acquireLock(lock);
Futures.addCallback(stateResult, new FutureCallback<Optional<MonitoringState>>() {
@Override
- public void onSuccess(Optional<MonitoringState> optState) {
+ public void onSuccess(@Nonnull Optional<MonitoringState> optState) {
if (optState.isPresent()) {
final MonitoringState currentState = optState.get();
if (currentState.getState() == newTunnelOpState) {
LOG.trace("Error in writing monitoring state: {} to Datastore", state);
}
}
- });
+ }, MoreExecutors.directExecutor());
} else {
LOG.warn("Monitoring State not available for key: {} to process the Packet received", monitorKey);
// Complete the transaction
tx.cancel();
alivenessMonitor.releaseLock(lock);
}
- });
+ }, MoreExecutors.directExecutor());
}
private LivenessState getTunnelOpState(List<BfdStatus> tunnelBfdStatus) {
}
@Override
- public Class<?> getPacketClass() {
- return null;
+ public Class<Packet> getPacketClass() {
+ return Packet.class;
}
@Override
return null;
}
+ // tunnelKey, nodeId, topologyId are initialized to null and immediately passed to getTunnelIdentifier which
+ // FindBugs as a "Load of known null value" violation. Not sure sure what the intent...
+ @SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE")
void resetMonitoringTask(boolean isEnable) {
// TODO: get the corresponding hwvtep tunnel from the sourceInterface
// once InterfaceMgr
// implments renderer for hwvtep vxlan tunnels
+
+ // tunnelKey, nodeId, topologyId are initialized to null and immediately passed to getTunnelIdentifier which
+ // FindBugs flags as a "Load of known null value" violation. Not sure sure what the intent...
TunnelsKey tunnelKey = null;
String nodeId = null;
String topologyId = null;
List<BfdParams> tunnelBfdParams = tunnel.getBfdParams();
if (tunnelBfdParams == null || tunnelBfdParams.isEmpty()) {
LOG.debug("there is no bfd params available for the tunnel {}", tunnel);
+ return;
}
+
Iterator<BfdParams> tunnelBfdParamsInterator = tunnelBfdParams.iterator();
while (tunnelBfdParamsInterator.hasNext()) {
BfdParams bfdParam = tunnelBfdParamsInterator.next();
String tunnelLocalMacAddress = "<TODO>";
String tunnelLocalIpAddress = "<TODO>";
String tunnelRemoteMacAddress = "<TODO>";
- String tunnelRemoteIpAddress = "<TODO>";
List<BfdParams> bfdParams = new ArrayList<>();
fillBfdParams(bfdParams, profile);
List<BfdLocalConfigs> bfdLocalConfigs = new ArrayList<>();
fillBfdLocalConfigs(bfdLocalConfigs, tunnelLocalMacAddress, tunnelLocalIpAddress);
List<BfdRemoteConfigs> bfdRemoteConfigs = new ArrayList<>();
fillBfdRemoteConfigs(bfdRemoteConfigs, tunnelRemoteMacAddress);
- TunnelsKey tunnelKey = null;
- Tunnels tunnelWithBfd = new TunnelsBuilder().setKey(tunnelKey).setBfdParams(bfdParams)
+ // tunnelKey is initialized to null and passed to setKey which FindBugs flags as a
+ // "Load of known null value" violation. Not sure sure what the intent is...
+ //TunnelsKey tunnelKey = null;
+ Tunnels tunnelWithBfd = new TunnelsBuilder().setKey(/*tunnelKey*/ null).setBfdParams(bfdParams)
.setBfdLocalConfigs(bfdLocalConfigs).setBfdRemoteConfigs(bfdRemoteConfigs).build();
// TODO: get the following parameters from the interface and use it to
// update hwvtep datastore
// into hwvtep datastore. if tunnels are not created during that time,
// then start monitoring has to
// be done as part of tunnel add DCN handling.
- HwvtepPhysicalLocatorRef remoteRef = null;
- HwvtepPhysicalLocatorRef localRef = null;
+// HwvtepPhysicalLocatorRef remoteRef = null;
+// HwvtepPhysicalLocatorRef localRef = null;
String topologyId = "";
String nodeId = "";
MDSALUtil.syncUpdate(dataBroker, LogicalDatastoreType.CONFIGURATION,
- getTunnelIdentifier(topologyId, nodeId, new TunnelsKey(localRef, remoteRef)), tunnelWithBfd);
+ getTunnelIdentifier(topologyId, nodeId, new TunnelsKey(/*localRef*/ null, /*remoteRef*/ null)),
+ tunnelWithBfd);
}
private void fillBfdRemoteConfigs(List<BfdRemoteConfigs> bfdRemoteConfigs, String tunnelRemoteMacAddress) {