--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.opendaylight.vpnservice</groupId>
+ <artifactId>config-parent</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ <relativePath>../../commons/config-parent</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.opendaylight.vpnservice</groupId>
+ <artifactId>alivenessmonitor-api</artifactId>
+ <version>${vpnservices.version}</version>
+ <packaging>bundle</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>liblldp</artifactId>
+ <version>${liblldp.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>yang-binding</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal.model</groupId>
+ <artifactId>ietf-inet-types</artifactId>
+ </dependency>
+ </dependencies>
+</project>
--- /dev/null
+module aliveness-monitor {
+ namespace "urn:opendaylight:vpnservice:alivenessmonitor";
+ prefix alivenessmon;
+
+ import ietf-inet-types {
+ prefix inet;
+ }
+
+ revision "2015-06-29" {
+ description "YANG model describes methods for monitoring endpoints.";
+ }
+
+ typedef ether-types {
+ type enumeration {
+ enum arp;
+ enum lldp;
+ }
+ }
+
+ typedef monitoring-mode {
+ type enumeration {
+ enum one-one;
+ }
+ }
+
+ grouping endpoint {
+ choice endpoint-type {
+ case ip-address {
+ leaf ip-address { type inet:ip-address; }
+ }
+ case interface {
+ leaf interface-ip { type inet:ip-address; }
+ leaf interface-name { type string; }
+ }
+ case host-name {
+ leaf host-name { type string; }
+ }
+ }
+ }
+
+ grouping monitor-profile-params {
+ leaf monitor-interval { type uint32; } //Monitoring interval in milli-seconds
+ leaf monitor-window { type uint32; } //Number M of consecutive intervals to consider for monitoring
+ leaf failure-threshold { type uint32; } //Number N of missing messages in window to detect failure ("N out of M")
+ leaf protocol-type { type ether-types; }
+ }
+
+ grouping monitor-params {
+ leaf mode { type monitoring-mode; }
+ container source { uses endpoint; }
+ container destination { uses endpoint; }
+ leaf profile-id { type uint32; }
+ }
+
+ // RPC Methods
+ rpc monitor-profile-create {
+ input {
+ container profile {
+ uses monitor-profile-params;
+ }
+ }
+ output {
+ leaf profile-id { type uint32; }
+ }
+ }
+
+ rpc monitor-start {
+ input {
+ container config {
+ uses monitor-params;
+ }
+ }
+ output {
+ leaf monitor-id { type uint32; }
+ }
+ }
+
+ rpc monitor-pause {
+ input {
+ leaf monitor-id { type uint32; }
+ }
+ }
+
+ rpc monitor-unpause {
+ input {
+ leaf monitor-id { type uint32; }
+ }
+ }
+
+
+ rpc monitor-stop {
+ input {
+ leaf monitor-id { type uint32; }
+ }
+ }
+
+ rpc monitor-profile-delete {
+ input {
+ leaf profile-id { type uint32; }
+ }
+ }
+
+ // YANG Notifications
+ typedef liveness-state {
+ type enumeration {
+ enum up;
+ enum down;
+ enum unknown;
+ }
+ }
+
+ grouping liveness-event-state {
+ leaf monitor-id { type uint32; }
+ leaf monitor-state { type liveness-state; }
+ }
+
+ notification monitor-event {
+ container event-data {
+ uses liveness-event-state;
+ }
+ }
+
+ //Operational Model
+ container monitor-profiles {
+ config false;
+ list monitor-profile {
+ key "id";
+ leaf id { type uint32; }
+ uses monitor-profile-params;
+ }
+ }
+
+ container monitor-configs {
+ config false;
+ list monitoring-info {
+ key "id";
+ leaf id { type uint32; }
+ uses monitor-params;
+ }
+ }
+
+ typedef monitor-status {
+ type enumeration {
+ enum started;
+ enum paused;
+ enum stopped;
+ }
+ }
+
+ container monitoring-states {
+ config false;
+ list monitoring-state {
+ key "monitor-key";
+ leaf monitor-key { type string; } //Key to identify monitor-id from packet-in
+ leaf monitor-id { type uint32; }
+ leaf response-pending-count { type uint32; }
+ leaf request-count { type uint32; }
+ leaf state { type liveness-state; }
+ leaf status { type monitor-status; }
+ }
+ }
+
+ container monitorid-key-map {
+ config false;
+ list monitorid-key-entry {
+ key "monitor-id";
+ leaf monitor-id { type uint32; }
+ leaf monitor-key { type string; }
+ }
+ }
+
+ container interface-monitor-map {
+ config false;
+ list interface-monitor-entry {
+ key "interface-name";
+ leaf interface-name { type string; }
+ leaf-list monitor-ids { type uint32; }
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: --><!--
+Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.opendaylight.vpnservice</groupId>
+ <artifactId>config-parent</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ <relativePath>../../commons/config-parent</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.opendaylight.vpnservice</groupId>
+ <artifactId>alivenessmonitor-impl</artifactId>
+ <version>${vpnservices.version}</version>
+ <packaging>bundle</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.vpnservice</groupId>
+ <artifactId>alivenessmonitor-api</artifactId>
+ <version>${vpnservices.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>liblldp</artifactId>
+ <version>${liblldp.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.vpnservice</groupId>
+ <artifactId>mdsalutil-api</artifactId>
+ <version>${vpnservices.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.openflowplugin.model</groupId>
+ <artifactId>model-flow-service</artifactId>
+ <version>${openflowplugin.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller.model</groupId>
+ <artifactId>model-inventory</artifactId>
+ <version>${controller.mdsal.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal.model</groupId>
+ <artifactId>ietf-interfaces</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.vpnservice</groupId>
+ <artifactId>idmanager-api</artifactId>
+ <version>${vpnservices.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.vpnservice</groupId>
+ <artifactId>interfacemgr-api</artifactId>
+ <version>${vpnservices.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-broker-impl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.vpnservice</groupId>
+ <artifactId>arputil-api</artifactId>
+ <version>${vpnservices.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<snapshot>
+ <required-capabilities>
+ <capability>urn:opendaylight:params:xml:ns:yang:alivenessmonitor:impl?module=alivenessmonitor-impl&revision=2015-07-06</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding?module=opendaylight-md-sal-binding&revision=2013-10-28</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl?module=opendaylight-sal-binding-broker-impl&revision=2013-10-28</capability>
+ <capability>urn:opendaylight:vpnservice:interfacemgr?module=odl-interface&revision=2015-03-31</capability>
+ </required-capabilities>
+ <configuration>
+
+ <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:alivenessmonitor:impl">prefix:alivenessmonitor-impl</type>
+ <name>alivenessmonitor-default</name>
+ <broker>
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-broker-osgi-registry</type>
+ <name>binding-osgi-broker</name>
+ </broker>
+ <rpc-registry>
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-rpc-registry</type>
+ <name>binding-rpc-broker</name>
+ </rpc-registry>
+ <notification-publish-service>
+ <type xmlns:bindingimpl="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">bindingimpl:binding-new-notification-publish-service</type>
+ <name>binding-notification-publish-adapter</name>
+ </notification-publish-service>
+ <notification-service>
+ <type xmlns:bindingimpl="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">bindingimpl:binding-new-notification-service</type>
+ <name>binding-notification-adapter</name>
+ </notification-service>
+ </module>
+ </modules>
+ </data>
+ </configuration>
+</snapshot>
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.alivenessmonitor.internal;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.Interfaces;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.Interface;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.InterfaceKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.GetNodeconnectorIdFromInterfaceInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.GetNodeconnectorIdFromInterfaceInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.GetNodeconnectorIdFromInterfaceOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+
+abstract class AbstractAlivenessProtocolHandler implements AlivenessProtocolHandler {
+
+ protected ServiceProvider serviceProvider;
+ private InventoryReader inventoryReader;
+
+ public AbstractAlivenessProtocolHandler(ServiceProvider serviceProvider) {
+ this.serviceProvider = serviceProvider;
+ inventoryReader = new InventoryReader(serviceProvider.getDataBroker());
+ }
+
+ private InstanceIdentifier<NodeConnector> getNodeConnectorId(String interfaceName) {
+ InstanceIdentifier<Interface> id = InstanceIdentifier.builder(Interfaces.class)
+ .child(Interface.class, new InterfaceKey(interfaceName)).build();
+
+ Optional<Interface> port = read(LogicalDatastoreType.CONFIGURATION, id);
+ if(port.isPresent()) {
+ NodeConnectorId ncId = getNodeConnectorIdFromInterface(interfaceName);
+ NodeId nodeId = getNodeIdFromNodeConnectorId(ncId);
+
+ InstanceIdentifier<NodeConnector> ncIdentifier =
+ InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class, new NodeKey(nodeId))
+ .child(NodeConnector.class, new NodeConnectorKey(ncId)).build();
+ return ncIdentifier;
+ }
+ return null;
+ }
+
+ private NodeConnectorId getNodeConnectorIdFromInterface(String interfaceName) {
+ GetNodeconnectorIdFromInterfaceInput input = new GetNodeconnectorIdFromInterfaceInputBuilder().setIntfName(interfaceName).build();
+ Future<RpcResult<GetNodeconnectorIdFromInterfaceOutput>> output = serviceProvider.getInterfaceManager().getNodeconnectorIdFromInterface(input);
+ RpcResult<GetNodeconnectorIdFromInterfaceOutput> result = null;
+ try {
+ result = output.get();
+ if(result.isSuccessful()) {
+ GetNodeconnectorIdFromInterfaceOutput ncIdOutput = result.getResult();
+ return ncIdOutput.getNodeconnectorId();
+ }
+ } catch(ExecutionException | InterruptedException e) {
+ //TODO: Handle exception
+ }
+
+ return null;
+ }
+
+ private NodeId getNodeIdFromNodeConnectorId(NodeConnectorId ncId) {
+ return new NodeId(ncId.getValue().substring(0,ncId.getValue().lastIndexOf(":")));
+ }
+
+ protected byte[] getMacAddress(String interfaceName) {
+ InstanceIdentifier<NodeConnector> ncId = getNodeConnectorId(interfaceName);
+ if(ncId != null) {
+ String macAddress = inventoryReader.getMacAddress(ncId);
+ if(!Strings.isNullOrEmpty(macAddress)) {
+ return AlivenessMonitorUtil.parseMacAddress(macAddress);
+ }
+ }
+ return null;
+ }
+
+ private InstanceIdentifier<Interface> getInterfaceIdentifier(InterfaceKey interfaceKey) {
+ InstanceIdentifier.InstanceIdentifierBuilder<Interface> interfaceInstanceIdentifierBuilder =
+ InstanceIdentifier.builder(Interfaces.class).child(Interface.class, interfaceKey);
+ return interfaceInstanceIdentifierBuilder.build();
+ }
+
+ protected Interface getInterfaceFromConfigDS(String interfaceName) {
+ InterfaceKey interfaceKey = new InterfaceKey(interfaceName);
+ InstanceIdentifier<Interface> interfaceId = getInterfaceIdentifier(interfaceKey);
+ Optional<Interface> interfaceOptional = read(LogicalDatastoreType.CONFIGURATION, interfaceId);
+ if (!interfaceOptional.isPresent()) {
+ return null;
+ }
+
+ return interfaceOptional.get();
+ }
+
+
+ private <T extends DataObject> Optional<T> read(LogicalDatastoreType datastoreType,
+ InstanceIdentifier<T> path) {
+
+ ReadOnlyTransaction tx = serviceProvider.getDataBroker().newReadOnlyTransaction();
+
+ Optional<T> result = Optional.absent();
+ try {
+ result = tx.read(datastoreType, path).get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ tx.close();
+ }
+
+ return result;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.alivenessmonitor.internal;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.liblldp.NetUtils;
+import org.opendaylight.controller.liblldp.Packet;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.vpnservice.mdsalutil.packet.Ethernet;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.AlivenessMonitorService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.EtherTypes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.LivenessState;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorEvent;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorEventBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorPauseInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileDeleteInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStopInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorUnpauseInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitoringMode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntryBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntryKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.EndpointType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.Interface;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfo;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfoBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.event.EventData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.event.EventDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profile.create.input.Profile;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfile;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfileBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.start.input.Config;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntryBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringState;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringStateBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.CreateIdPoolInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.CreateIdPoolInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.IdManagerService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.ReleaseIdInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.ReleaseIdInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.OdlInterfaceRpcService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInReason;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import static org.opendaylight.vpnservice.alivenessmonitor.internal.AlivenessMonitorUtil.*;
+
+public class AlivenessMonitor implements AlivenessMonitorService, PacketProcessingListener,
+ ServiceProvider, InterfaceStateListener, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(AlivenessMonitor.class);
+ private final DataBroker broker;
+ private IdManagerService idManager;
+ private PacketProcessingService packetProcessingService;
+ private NotificationPublishService notificationPublishService;
+ private OdlInterfaceRpcService interfaceManager;
+ private Map<Class<?>, AlivenessProtocolHandler> packetTypeToProtocolHandler;
+ private Map<EtherTypes, AlivenessProtocolHandler> ethTypeToProtocolHandler;
+ private ConcurrentMap<Long, ScheduledFuture<?>> monitoringTasks;
+ private LoadingCache<Long, String> monitorIdKeyCache;
+ private ScheduledExecutorService monitorService;
+ private ExecutorService callbackExecutorService;
+
+ private static final int THREAD_POOL_SIZE = 4;
+ private static final boolean INTERRUPT_TASK = true;
+ private static final int NO_DELAY = 0;
+ private static final Long INITIAL_COUNT = 0L;
+ private static final boolean CREATE_MISSING_PARENT = true;
+ private static final int INVALID_ID = 0;
+ private ConcurrentMap<String, Semaphore> lockMap = new ConcurrentHashMap<>();
+
+ private class FutureCallbackImpl implements FutureCallback<Void> {
+ private String message;
+ public FutureCallbackImpl(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public void onFailure(Throwable error) {
+ LOG.warn("Error in Datastore operation - {}", message, error);
+ }
+
+ @Override
+ public void onSuccess(Void result) {
+ LOG.debug("Success in Datastore operation - {}", message);
+ }
+ }
+
+ private class AlivenessMonitorTask implements Runnable {
+ private MonitoringInfo monitoringInfo;
+
+ public AlivenessMonitorTask(MonitoringInfo monitoringInfo) {
+ this.monitoringInfo = monitoringInfo;
+ }
+
+ @Override
+ public void run() {
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("send monitor packet - {}", monitoringInfo);
+ }
+ sendMonitorPacket(monitoringInfo);
+ }
+ }
+
+ public AlivenessMonitor(DataBroker dataBroker) {
+ broker = dataBroker;
+ ethTypeToProtocolHandler = new EnumMap<>(EtherTypes.class);
+ packetTypeToProtocolHandler = new HashMap<>();
+ monitorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
+ getMonitoringThreadFactory("Aliveness Monitoring Task"));
+ callbackExecutorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
+ getMonitoringThreadFactory("Aliveness Callback Handler"));
+ monitoringTasks = new ConcurrentHashMap<>();
+ initilizeCache();
+ }
+
+ private ThreadFactory getMonitoringThreadFactory(String threadNameFormat) {
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ builder.setNameFormat(threadNameFormat);
+ builder.setUncaughtExceptionHandler( new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("Received Uncaught Exception event in Thread: {}", t.getName(), e);
+ }
+ });
+ return builder.build();
+ }
+
+ private void initilizeCache() {
+ monitorIdKeyCache = CacheBuilder.newBuilder()
+ .build(new CacheLoader<Long, String>() {
+ @Override
+ public String load(Long monitorId) throws Exception {
+ String monitorKey = null;
+ Optional<MonitoridKeyEntry> optKey = read(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId));
+ if(optKey.isPresent()) {
+ monitorKey = optKey.get().getMonitorKey();
+ }
+ return monitorKey;
+ }
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ monitorIdKeyCache.cleanUp();
+ monitorService.shutdown();
+ callbackExecutorService.shutdown();
+ }
+
+ @Override
+ public DataBroker getDataBroker() {
+ return broker;
+ }
+
+ @Override
+ public OdlInterfaceRpcService getInterfaceManager() {
+ return interfaceManager;
+ }
+
+ public void setPacketProcessingService(PacketProcessingService pktProcessingService) {
+ this.packetProcessingService = pktProcessingService;
+ }
+
+ public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
+ this.notificationPublishService = notificationPublishService;
+ }
+
+ public void setInterfaceManager(OdlInterfaceRpcService interfaceManager) {
+ this.interfaceManager = interfaceManager;
+ }
+
+ public void setIdManager(IdManagerService idManager) {
+ this.idManager = idManager;
+ createIdPool();
+ }
+
+ public void registerHandler(EtherTypes etherType, AlivenessProtocolHandler protocolHandler) {
+ ethTypeToProtocolHandler.put(etherType, protocolHandler);
+ packetTypeToProtocolHandler.put(protocolHandler.getPacketClass(), protocolHandler);
+ }
+
+ private void createIdPool() {
+ CreateIdPoolInput createPool = new CreateIdPoolInputBuilder()
+ .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
+ .setLow(AlivenessMonitorConstants.MONITOR_IDPOOL_START)
+ .setHigh(AlivenessMonitorConstants.MONITOR_IDPOOL_SIZE)
+ .build();
+ Future<RpcResult<Void>> result = idManager.createIdPool(createPool);
+ Futures.addCallback(JdkFutureAdapters.listenInPoolThread(result), new FutureCallback<RpcResult<Void>>() {
+
+ @Override
+ public void onFailure(Throwable error) {
+ LOG.error("Failed to create idPool for Aliveness Monitor Service",error);
+ }
+
+ @Override
+ public void onSuccess(RpcResult<Void> result) {
+ if(result.isSuccessful()) {
+ LOG.debug("Created IdPool for Aliveness Monitor Service");
+ } else {
+ LOG.error("RPC to create Idpool failed {}", result.getErrors());
+ }
+ }
+ });
+ }
+
+ private int getUniqueId(final String idKey) {
+ AllocateIdInput getIdInput = new AllocateIdInputBuilder()
+ .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
+ .setIdKey(idKey).build();
+
+ Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(getIdInput);
+
+ try {
+ RpcResult<AllocateIdOutput> rpcResult = result.get();
+ if(rpcResult.isSuccessful()) {
+ return rpcResult.getResult().getIdValue().intValue();
+ } else {
+ LOG.warn("RPC Call to Get Unique Id returned with Errors {}", rpcResult.getErrors());
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Exception when getting Unique Id for key {}", idKey, e);
+ }
+ return INVALID_ID;
+ }
+
+ private void releaseId(String idKey) {
+ ReleaseIdInput idInput = new ReleaseIdInputBuilder()
+ .setPoolName(AlivenessMonitorConstants.MONITOR_IDPOOL_NAME)
+ .setIdKey(idKey).build();
+ try {
+ Future<RpcResult<Void>> result = idManager.releaseId(idInput);
+ RpcResult<Void> rpcResult = result.get();
+ if(!rpcResult.isSuccessful()) {
+ LOG.warn("RPC Call to release Id {} with Key {} returned with Errors {}",
+ idKey, rpcResult.getErrors());
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Exception when releasing Id for key {}", idKey, e);
+ }
+ }
+
+ @Override
+ public void onPacketReceived(PacketReceived packetReceived) {
+ Class<? extends PacketInReason> pktInReason = packetReceived.getPacketInReason();
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("Packet Received {}", packetReceived );
+ }
+
+ if (pktInReason == SendToController.class) {
+ Packet packetInFormatted;
+ byte[] data = packetReceived.getPayload();
+ Ethernet res = new Ethernet();
+ try {
+ packetInFormatted = res.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
+ } catch (Exception e) {
+ LOG.warn("Failed to decode packet: {}", e.getMessage());
+ return;
+ }
+
+ if(packetInFormatted == null) {
+ LOG.warn("Failed to deserialize Received Packet from table {}", packetReceived.getTableId().getValue());
+ return;
+ }
+
+ Object objPayload = packetInFormatted.getPayload();
+
+ if(objPayload == null) {
+ LOG.trace("Unsupported packet type. Ignoring the packet...");
+ return;
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("onPacketReceived packet: {}, packet class: {}", packetReceived,
+ objPayload.getClass());
+ }
+
+ AlivenessProtocolHandler livenessProtocolHandler = packetTypeToProtocolHandler.get(objPayload.getClass());
+ if (livenessProtocolHandler == null) {
+ return;
+ }
+
+ String monitorKey = livenessProtocolHandler.handlePacketIn(packetInFormatted.getPayload(), packetReceived);
+
+ if(monitorKey != null) {
+ processReceivedMonitorKey(monitorKey);
+ } else {
+ LOG.debug("No monitorkey associated with received packet");
+ }
+ }
+ }
+
+ private void processReceivedMonitorKey(final String monitorKey) {
+ Preconditions.checkNotNull(monitorKey, "Monitor Key required to process the state");
+
+ LOG.debug("Processing monitorKey: {} for received packet", monitorKey);
+
+ final Semaphore lock = lockMap.get(monitorKey);
+ LOG.debug("Acquiring lock for monitor key : {} to process monitor packet", monitorKey);
+ acquireLock(lock);
+
+ final ReadWriteTransaction tx = broker.newReadWriteTransaction();
+
+ ListenableFuture<Optional<MonitoringState>> stateResult = tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
+
+ //READ Callback
+ Futures.addCallback(stateResult, new FutureCallback<Optional<MonitoringState>>() {
+
+ @Override
+ public void onSuccess(Optional<MonitoringState> optState) {
+
+ if(optState.isPresent()) {
+ final MonitoringState currentState = optState.get();
+
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("OnPacketReceived : Monitoring state from ODS : {} ", currentState);
+ }
+
+ Long responsePendingCount = currentState.getResponsePendingCount();
+
+ //Need to relook at the pending count logic to support N out of M scenarios
+// if(currentState.getState() != LivenessState.Up) {
+// //Reset responsePendingCount when state changes from DOWN to UP
+// responsePendingCount = INITIAL_COUNT;
+// }
+//
+// if(responsePendingCount > INITIAL_COUNT) {
+// responsePendingCount = currentState.getResponsePendingCount() - 1;
+// }
+ responsePendingCount = INITIAL_COUNT;
+
+ final boolean stateChanged = (currentState.getState() == LivenessState.Down ||
+ currentState.getState() == LivenessState.Unknown);
+
+ final MonitoringState state = new MonitoringStateBuilder().setMonitorKey(monitorKey).setState(LivenessState.Up)
+ .setResponsePendingCount(responsePendingCount).build();
+ tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), state);
+ ListenableFuture<Void> writeResult = tx.submit();
+
+ //WRITE Callback
+ Futures.addCallback(writeResult, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void noarg) {
+ releaseLock(lock);
+ if(stateChanged) {
+ //send notifications
+ LOG.info("Sending notification for monitor Id : {} with Current State: {}",
+ currentState.getMonitorId(), LivenessState.Up);
+ publishNotification(currentState.getMonitorId(), LivenessState.Up);
+ } else {
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("Successful in writing monitoring state {} to ODS", state);
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable error) {
+ releaseLock(lock);
+ LOG.warn("Error in writing monitoring state : {} to Datastore", monitorKey, error);
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("Error in writing monitoring state: {} to Datastore", state);
+ }
+ }
+ });
+ } else {
+ LOG.warn("Monitoring State not available for key: {} to process the Packet received", monitorKey);
+ //Complete the transaction
+ tx.submit();
+ releaseLock(lock);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable error) {
+ LOG.error("Error when reading Monitoring State for key: {} to process the Packet received", monitorKey, error);
+ //FIXME: Not sure if the transaction status is valid to cancel
+ tx.cancel();
+ releaseLock(lock);
+ }
+ });
+ }
+
+ @Override
+ public PacketProcessingService getPacketProcessingService() {
+ return packetProcessingService;
+ }
+
+ private String getIpAddress(EndpointType endpoint) {
+ String ipAddress = "";
+ if( endpoint instanceof IpAddress) {
+ ipAddress = ((IpAddress) endpoint).getIpAddress().getIpv4Address().getValue();
+ } else if (endpoint instanceof Interface) {
+ ipAddress = ((Interface)endpoint).getInterfaceIp().getIpv4Address().getValue();
+ }
+ return ipAddress;
+ }
+
+ private String getUniqueKey(String interfaceName, String ethType, EndpointType source, EndpointType destination) {
+ StringBuilder builder = new StringBuilder().append(interfaceName).append(AlivenessMonitorConstants.SEPERATOR)
+ .append(ethType);
+ if(source != null) {
+ builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(source));
+ }
+
+ if(destination != null) {
+ builder.append(AlivenessMonitorConstants.SEPERATOR).append(getIpAddress(destination));
+ }
+ return builder.toString();
+ }
+
+ @Override
+ public Future<RpcResult<MonitorStartOutput>> monitorStart(MonitorStartInput input) {
+ RpcResultBuilder<MonitorStartOutput> rpcResultBuilder;
+ final Config in = input.getConfig();
+ Long profileId = in.getProfileId();
+ LOG.debug("Monitor Start invoked with Config: {}, Profile Id: {}", in, profileId);
+
+ try {
+ if(in.getMode() != MonitoringMode.OneOne) {
+ throw new UnsupportedConfigException(
+ "Unsupported Monitoring mode. Currently one-one mode is supported");
+ }
+
+ Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
+ final MonitorProfile profile;
+ if(!optProfile.isPresent()) {
+ String errMsg = String.format("No monitoring profile associated with Id: %d", profileId);
+ LOG.error("Monitor start failed. {}", errMsg);
+ throw new RuntimeException(errMsg);
+ } else {
+ profile = optProfile.get();
+ }
+
+ EtherTypes ethType = profile.getProtocolType();
+
+ String interfaceName = null;
+ EndpointType srcEndpointType = in.getSource().getEndpointType();
+
+ if( srcEndpointType instanceof Interface) {
+ Interface endPoint = (Interface) srcEndpointType;
+ interfaceName = endPoint.getInterfaceName();
+ } else {
+ throw new UnsupportedConfigException(
+ "Unsupported source Endpoint type. Only Interface Endpoint currently supported for monitoring");
+ }
+
+ if(Strings.isNullOrEmpty(interfaceName)) {
+ throw new RuntimeException("Interface Name not defined in the source Endpoint");
+ }
+
+ //Initially the support is for one monitoring per interface.
+ //Revisit the retrieving monitor id logic when the multiple monitoring for same interface is needed.
+ EndpointType destEndpointType = null;
+ if(in.getDestination() != null) {
+ destEndpointType = in.getDestination().getEndpointType();
+ }
+ String idKey = getUniqueKey(interfaceName, ethType.toString(), srcEndpointType, destEndpointType);
+ final long monitorId = getUniqueId(idKey);
+ Optional<MonitoringInfo> optKey = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
+
+ if(optKey.isPresent()) {
+ String message = String.format("Monitoring for the interface %s with this configuration is already registered.", interfaceName);
+ LOG.warn(message);
+ MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
+ rpcResultBuilder = RpcResultBuilder.success(output).withWarning(ErrorType.APPLICATION, "config-exists", message);
+ return Futures.immediateFuture(rpcResultBuilder.build());
+ } else {
+ //Construct the monitor key
+ final MonitoringInfo monitoringInfo = new MonitoringInfoBuilder()
+ .setId(monitorId)
+ .setMode(in.getMode())
+ .setProfileId(profileId)
+ .setDestination(in.getDestination())
+ .setSource(in.getSource()).build();
+ //Construct the initial monitor state
+ AlivenessProtocolHandler handler = ethTypeToProtocolHandler.get(ethType);
+ final String monitoringKey = handler.getUniqueMonitoringKey(monitoringInfo);
+
+ MonitoringState monitoringState = new MonitoringStateBuilder()
+ .setMonitorKey(monitoringKey)
+ .setMonitorId(monitorId)
+ .setState(LivenessState.Unknown)
+ .setStatus(MonitorStatus.Started)
+ .setRequestCount(INITIAL_COUNT)
+ .setResponsePendingCount(INITIAL_COUNT).build();
+
+ MonitoridKeyEntry mapEntry = new MonitoridKeyEntryBuilder().setMonitorId(monitorId)
+ .setMonitorKey(monitoringKey).build();
+
+ WriteTransaction tx = broker.newWriteOnlyTransaction();
+
+ tx.put(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId), monitoringInfo, CREATE_MISSING_PARENT);
+
+ tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitoringKey), monitoringState, CREATE_MISSING_PARENT);
+
+ tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId), mapEntry, CREATE_MISSING_PARENT);
+
+ Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
+ @Override
+ public void onFailure(Throwable error) {
+ String errorMsg = String.format("Adding Monitoring info: %s in Datastore failed", monitoringInfo);
+ LOG.warn(errorMsg, error);
+ throw new RuntimeException(errorMsg, error);
+ }
+
+ @Override
+ public void onSuccess(Void noarg) {
+ //Schedule task
+ LOG.debug("Scheduling monitor task for config: {}", in);
+ scheduleMonitoringTask(monitoringInfo, profile.getMonitorInterval());
+ lockMap.put(monitoringKey, new Semaphore(1, true));
+ }
+ });
+ }
+
+ associateMonitorIdWithInterface(monitorId, interfaceName);
+
+ MonitorStartOutput output = new MonitorStartOutputBuilder()
+ .setMonitorId(monitorId).build();
+
+ rpcResultBuilder = RpcResultBuilder.success(output);
+ } catch(Exception e) {
+ LOG.error("Start Monitoring Failed. {}", e.getMessage(), e);
+ rpcResultBuilder = RpcResultBuilder.<MonitorStartOutput>failed().withError(ErrorType.APPLICATION, e.getMessage(), e);
+ }
+ return Futures.immediateFuture(rpcResultBuilder.build());
+ }
+
+ private void associateMonitorIdWithInterface(final Long monitorId, final String interfaceName) {
+ LOG.debug("associate monitor Id {} with interface {}", monitorId, interfaceName);
+ final ReadWriteTransaction tx = broker.newReadWriteTransaction();
+ ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture =
+ tx.read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
+ ListenableFuture<Void> updateFuture =
+ Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
+
+ @Override
+ public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
+ if(optEntry.isPresent()) {
+ InterfaceMonitorEntry entry = optEntry.get();
+ List<Long> monitorIds = entry.getMonitorIds();
+ monitorIds.add(monitorId);
+ InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder()
+ .setKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
+ tx.merge(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry);
+ } else {
+ //Create new monitor entry
+ LOG.debug("Adding new interface-monitor association for interface {} with id {}", interfaceName, monitorId);
+ List<Long> monitorIds = new ArrayList<>();
+ monitorIds.add(monitorId);
+ InterfaceMonitorEntry newEntry =
+ new InterfaceMonitorEntryBuilder().setInterfaceName(interfaceName).setMonitorIds(monitorIds).build();
+ tx.put(LogicalDatastoreType.OPERATIONAL,
+ getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
+ }
+ return tx.submit();
+ }
+ });
+
+ Futures.addCallback(updateFuture, new FutureCallbackImpl(
+ String.format("Association of monitorId %d with Interface %s", monitorId, interfaceName)));
+ }
+
+ private void scheduleMonitoringTask(MonitoringInfo monitoringInfo, long monitorInterval) {
+ AlivenessMonitorTask monitorTask = new AlivenessMonitorTask(monitoringInfo);
+ ScheduledFuture<?> scheduledFutureResult = monitorService.scheduleAtFixedRate(
+ monitorTask, NO_DELAY, monitorInterval, TimeUnit.MILLISECONDS);
+ monitoringTasks.put(monitoringInfo.getId(), scheduledFutureResult);
+ }
+
+ @Override
+ public Future<RpcResult<Void>> monitorPause(MonitorPauseInput input) {
+ LOG.debug("Monitor Pause operation invoked for monitor id: {}", input.getMonitorId());
+ SettableFuture<RpcResult<Void>> result = SettableFuture.create();
+ final Long monitorId = input.getMonitorId();
+
+ //Set the monitoring status to Paused
+ updateMonitorStatusTo(monitorId, MonitorStatus.Paused, new Predicate<MonitorStatus>() {
+ @Override
+ public boolean apply(MonitorStatus currentStatus) {
+ return currentStatus == MonitorStatus.Started;
+ }
+ });
+
+ if(stopMonitoringTask(monitorId)) {
+ result.set(RpcResultBuilder.<Void>success().build());
+ } else {
+ String errorMsg = String.format("No Monitoring Task availble to pause for the given monitor id : %d", monitorId);
+ LOG.error("Monitor Pause operation failed- {}",errorMsg);
+ result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
+ }
+
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> monitorUnpause(MonitorUnpauseInput input) {
+ LOG.debug("Monitor Unpause operation invoked for monitor id: {}", input.getMonitorId());
+ final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
+
+ final Long monitorId = input.getMonitorId();
+ final ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
+ ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
+ tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
+
+ Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
+
+ @Override
+ public void onFailure(Throwable error) {
+ String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
+ LOG.error("Monitor unpause Failed. {}", msg, error);
+ result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
+ }
+
+ @Override
+ public void onSuccess(Optional<MonitoringInfo> optInfo) {
+ if(optInfo.isPresent()) {
+ final MonitoringInfo info = optInfo.get();
+ ListenableFuture<Optional<MonitorProfile>> readProfile =
+ tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
+ Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
+
+ @Override
+ public void onFailure(Throwable error) {
+ String msg = String.format("Unable to read Monitoring profile associated with id %d", info.getProfileId());
+ LOG.warn("Monitor unpause Failed. {}", msg, error);
+ result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
+ }
+
+ @Override
+ public void onSuccess(Optional<MonitorProfile> optProfile) {
+ tx.close();
+ if(optProfile.isPresent()) {
+ updateMonitorStatusTo(monitorId, MonitorStatus.Started, new Predicate<MonitorStatus>() {
+ @Override
+ public boolean apply(MonitorStatus currentStatus) {
+ return (currentStatus == MonitorStatus.Paused ||
+ currentStatus == MonitorStatus.Stopped);
+ }
+ });
+ MonitorProfile profile = optProfile.get();
+ LOG.debug("Monitor Resume - Scheduling monitoring task with Id: {}", monitorId);
+ scheduleMonitoringTask(info, profile.getMonitorInterval());
+ result.set(RpcResultBuilder.<Void>success().build());
+ } else {
+ String msg = String.format("Monitoring profile associated with id %d is not present", info.getProfileId());
+ LOG.warn("Monitor unpause Failed. {}", msg);
+ result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
+ }
+ }
+ });
+ } else {
+ tx.close();
+ String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
+ LOG.warn("Monitor unpause Failed. {}", msg);
+ result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg).build());
+ }
+ }
+ }, callbackExecutorService);
+
+ return result;
+ }
+
+ private boolean stopMonitoringTask(Long monitorId) {
+ return stopMonitoringTask(monitorId, INTERRUPT_TASK);
+ }
+
+ private boolean stopMonitoringTask(Long monitorId, boolean interruptTask) {
+ ScheduledFuture<?> scheduledFutureResult = monitoringTasks.get(monitorId);
+ if(scheduledFutureResult != null) {
+ scheduledFutureResult.cancel(interruptTask);
+ return true;
+ }
+ return false;
+ }
+
+ private Optional<MonitorProfile> getMonitorProfile(Long profileId) {
+ return read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
+ }
+
+ private void acquireLock(Semaphore lock) {
+ if(lock == null) {
+ return;
+ }
+
+ boolean acquiredLock = false;
+ try {
+ acquiredLock = lock.tryAcquire(50, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("Thread interrupted when waiting to acquire the lock");
+ }
+
+ if(!acquiredLock) {
+ LOG.warn("Previous transaction did not complete in time. Releasing the lock to proceed");
+ lock.release();
+ try {
+ lock.acquire();
+ LOG.trace("Lock acquired successfully");
+ } catch (InterruptedException e) {
+ LOG.warn("Acquire failed");
+ }
+ } else {
+ LOG.trace("Lock acquired successfully");
+ }
+ }
+
+ private void releaseLock(Semaphore lock) {
+ if(lock != null) {
+ lock.release();
+ }
+ }
+
+ private void sendMonitorPacket(final MonitoringInfo monitoringInfo) {
+ //TODO: Handle interrupts
+ final Long monitorId = monitoringInfo.getId();
+ final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
+ if(monitorKey == null) {
+ LOG.warn("No monitor Key associated with id {} to send the monitor packet", monitorId);
+ return;
+ } else {
+ LOG.debug("Sending monitoring packet for key: {}", monitorKey);
+ }
+
+ final MonitorProfile profile;
+ Optional<MonitorProfile> optProfile = getMonitorProfile(monitoringInfo.getProfileId());
+ if(optProfile.isPresent()) {
+ profile = optProfile.get();
+ } else {
+ LOG.warn("No monitor profile associated with id {}. "
+ + "Could not send Monitor packet for monitor-id {}", monitoringInfo.getProfileId(), monitorId);
+ return;
+ }
+
+ final Semaphore lock = lockMap.get(monitorKey);
+ LOG.debug("Acquiring lock for monitor key : {} to send monitor packet", monitorKey);
+ acquireLock(lock);
+
+ final ReadWriteTransaction tx = broker.newReadWriteTransaction();
+ ListenableFuture<Optional<MonitoringState>> readResult =
+ tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
+ ListenableFuture<Void> writeResult = Futures.transform(readResult, new AsyncFunction<Optional<MonitoringState>, Void>() {
+
+ @Override
+ public ListenableFuture<Void> apply(Optional<MonitoringState> optState)
+ throws Exception {
+ if(optState.isPresent()) {
+ MonitoringState state = optState.get();
+
+ //Increase the request count
+ Long requestCount = state.getRequestCount() + 1;
+
+ //Check with the monitor window
+ LivenessState currentLivenessState = state.getState();
+
+ //Increase the pending response count
+ long responsePendingCount = state.getResponsePendingCount();
+ if(responsePendingCount < profile.getMonitorWindow()) {
+ responsePendingCount = responsePendingCount + 1;
+ }
+
+ //Check with the failure thresold
+ if(responsePendingCount >= profile.getFailureThreshold()) {
+ //Change the state to down and notify
+ if(currentLivenessState != LivenessState.Down) {
+ LOG.debug("Response pending Count: {}, Failure threshold: {} for monitorId {}",
+ responsePendingCount, profile.getFailureThreshold(), state.getMonitorId());
+ LOG.info("Sending notification for monitor Id : {} with State: {}",
+ state.getMonitorId(), LivenessState.Down);
+ publishNotification(monitorId, LivenessState.Down);
+ currentLivenessState = LivenessState.Down;
+ //Reset requestCount when state changes from UP to DOWN
+ requestCount = INITIAL_COUNT;
+ }
+ }
+
+ //Update the ODS with state
+ MonitoringState updatedState = new MonitoringStateBuilder(/*state*/).setMonitorKey(state.getMonitorKey())
+ .setRequestCount(requestCount)
+ .setResponsePendingCount(responsePendingCount)
+ .setState(currentLivenessState).build();
+ tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(state.getMonitorKey()), updatedState);
+ return tx.submit();
+ } else {
+ //Close the transaction
+ tx.submit();
+ String errorMsg = String.format("Monitoring State associated with id %d is not present to send packet out.", monitorId);
+ return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
+ }
+ }
+
+ });
+
+ Futures.addCallback(writeResult, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void noarg) {
+ //invoke packetout on protocol handler
+ AlivenessProtocolHandler handler = ethTypeToProtocolHandler.get(profile.getProtocolType());
+ if(handler != null) {
+ LOG.debug("Sending monitoring packet {}", monitoringInfo);
+ handler.sendPacketOut(monitoringInfo);
+ }
+ releaseLock(lock);
+ }
+
+ @Override
+ public void onFailure(Throwable error) {
+ LOG.warn("Updating monitoring state for key: {} failed. Monitoring packet is not sent", monitorKey, error);
+ releaseLock(lock);
+ }
+
+ });
+
+ }
+
+ private void publishNotification(final Long monitorId, final LivenessState state) {
+ LOG.debug("Sending notification for id {} - state {}", monitorId, state);
+ EventData data = new EventDataBuilder().setMonitorId(monitorId)
+ .setMonitorState(state).build();
+ MonitorEvent event = new MonitorEventBuilder().setEventData(data).build();;
+ final ListenableFuture<? extends Object> eventFuture = notificationPublishService.offerNotification(event);
+ Futures.addCallback(eventFuture, new FutureCallback<Object>() {
+ @Override
+ public void onFailure(Throwable error) {
+ LOG.warn("Error in notifying listeners for id {} - state {}", monitorId, state, error);
+ }
+
+ @Override
+ public void onSuccess(Object arg) {
+ LOG.trace("Successful in notifying listeners for id {} - state {}", monitorId, state);
+ }
+ });
+ }
+
+ @Override
+ public Future<RpcResult<MonitorProfileCreateOutput>> monitorProfileCreate(final MonitorProfileCreateInput input) {
+ LOG.debug("Monitor Profile Create operation - {}", input.getProfile());
+ final SettableFuture<RpcResult<MonitorProfileCreateOutput>> result = SettableFuture.create();
+ Profile profile = input.getProfile();
+ final Long failureThreshold = profile.getFailureThreshold();
+ final Long monitorInterval = profile.getMonitorInterval();
+ final Long monitorWindow = profile.getMonitorWindow();
+ final EtherTypes ethType = profile.getProtocolType();
+ String idKey = getUniqueProfileKey(failureThreshold, monitorInterval, monitorWindow, ethType);
+ final Long profileId = Long.valueOf(getUniqueId(idKey));
+
+ final ReadWriteTransaction tx = broker.newReadWriteTransaction();
+ ListenableFuture<Optional<MonitorProfile>> readFuture =
+ tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
+ ListenableFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture =
+ Futures.transform(readFuture, new AsyncFunction<Optional<MonitorProfile>, RpcResult<MonitorProfileCreateOutput>>() {
+
+ @Override
+ public ListenableFuture<RpcResult<MonitorProfileCreateOutput>> apply(
+ Optional<MonitorProfile> optProfile) throws Exception {
+ if(optProfile.isPresent()) {
+ tx.cancel();
+ MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
+ .setProfileId(profileId).build();
+ String msg = String.format("Monitor profile %s already present for the given input", input);
+ LOG.warn(msg);
+ result.set(RpcResultBuilder.success(output)
+ .withWarning(ErrorType.PROTOCOL, "profile-exists", msg).build());
+ } else {
+ final MonitorProfile monitorProfile = new MonitorProfileBuilder().setId(profileId)
+ .setFailureThreshold(failureThreshold)
+ .setMonitorInterval(monitorInterval)
+ .setMonitorWindow(monitorWindow)
+ .setProtocolType(ethType).build();
+ tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId), monitorProfile, CREATE_MISSING_PARENT);
+ Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
+ @Override
+ public void onFailure(Throwable error) {
+ String msg =
+ String.format("Error when storing monitorprofile %s in datastore", monitorProfile);
+ LOG.error(msg, error);
+ result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
+ .withError(ErrorType.APPLICATION, msg, error).build());
+ }
+ @Override
+ public void onSuccess(Void noarg) {
+ MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
+ .setProfileId(profileId).build();
+ result.set(RpcResultBuilder.success(output).build());
+ }
+ });
+ }
+ return result;
+ }
+ }, callbackExecutorService);
+ Futures.addCallback(resultFuture, new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
+ @Override
+ public void onFailure(Throwable error) {
+ //This would happen when any error happens during reading for monitoring profile
+ String msg = String.format("Error in creating monitorprofile - %s", input);
+ result.set(RpcResultBuilder.<MonitorProfileCreateOutput>failed()
+ .withError(ErrorType.APPLICATION, msg, error).build());
+ LOG.error(msg, error);
+ }
+
+ @Override
+ public void onSuccess(RpcResult<MonitorProfileCreateOutput> result) {
+ LOG.debug("Successfully created monitor Profile {} ", input);
+ }
+ }, callbackExecutorService);
+ return result;
+ }
+
+ private String getUniqueProfileKey(Long failureThreshold,Long monitorInterval,Long monitorWindow,EtherTypes ethType) {
+ return new StringBuilder().append(failureThreshold).append(AlivenessMonitorConstants.SEPERATOR)
+ .append(monitorInterval).append(AlivenessMonitorConstants.SEPERATOR)
+ .append(monitorWindow).append(AlivenessMonitorConstants.SEPERATOR)
+ .append(ethType).append(AlivenessMonitorConstants.SEPERATOR).toString();
+ }
+
+ @Override
+ public Future<RpcResult<Void>> monitorProfileDelete(final MonitorProfileDeleteInput input) {
+ LOG.debug("Monitor Profile delete for Id: {}", input.getProfileId());
+ final SettableFuture<RpcResult<Void>> result = SettableFuture.create();
+ final Long profileId = input.getProfileId();
+ final ReadWriteTransaction tx = broker.newReadWriteTransaction();
+ ListenableFuture<Optional<MonitorProfile>> readFuture =
+ tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
+ ListenableFuture<RpcResult<Void>> writeFuture =
+ Futures.transform(readFuture, new AsyncFunction<Optional<MonitorProfile>, RpcResult<Void>>() {
+
+ @Override
+ public ListenableFuture<RpcResult<Void>> apply(final Optional<MonitorProfile> optProfile) throws Exception {
+ if(optProfile.isPresent()) {
+ tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
+ Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
+ @Override
+ public void onFailure(Throwable error) {
+ String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
+ LOG.error(msg, error);
+ result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
+ }
+
+ @Override
+ public void onSuccess(Void noarg) {
+ MonitorProfile profile = optProfile.get();
+ String id = getUniqueProfileKey(profile.getFailureThreshold(), profile.getMonitorInterval(),
+ profile.getMonitorWindow(), profile.getProtocolType());
+ releaseId(id);
+ result.set(RpcResultBuilder.<Void>success().build());
+ }
+ });
+ } else {
+ String msg = String.format("Monitor profile with Id: %d does not exist", profileId);
+ LOG.info(msg);
+ result.set(RpcResultBuilder.<Void>success().withWarning(ErrorType.PROTOCOL, "invalid-value", msg).build());
+ }
+ return result;
+ }
+ }, callbackExecutorService);
+
+ Futures.addCallback(writeFuture, new FutureCallback<RpcResult<Void>>() {
+
+ @Override
+ public void onFailure(Throwable error) {
+ String msg = String.format("Error when removing monitor profile %d from datastore", profileId);
+ LOG.error(msg, error);
+ result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, msg, error).build());
+ }
+
+ @Override
+ public void onSuccess(RpcResult<Void> noarg) {
+ LOG.debug("Successfully removed Monitor Profile {}", profileId);
+ }
+ }, callbackExecutorService);
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> monitorStop(MonitorStopInput input) {
+ LOG.debug("Monitor Stop operation for monitor id - {}", input.getMonitorId());
+ SettableFuture<RpcResult<Void>> result = SettableFuture.create();
+
+ final Long monitorId = input.getMonitorId();
+ Optional<MonitoringInfo> optInfo = read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
+ if(optInfo.isPresent()) {
+ //Stop the monitoring task
+ stopMonitoringTask(monitorId);
+
+ //Cleanup the Data store
+ WriteTransaction tx = broker.newWriteOnlyTransaction();
+ String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
+ if(monitorKey != null) {
+ tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
+ monitorIdKeyCache.invalidate(monitorId);
+ }
+
+ tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
+ Futures.addCallback(tx.submit(),
+ new FutureCallbackImpl(String.format("Delete monitor state with Id %d", monitorId)));
+
+ MonitoringInfo info = optInfo.get();
+ String interfaceName = getInterfaceName(info.getSource().getEndpointType());
+ if(interfaceName != null) {
+ removeMonitorIdFromInterfaceAssociation(monitorId, interfaceName);
+ }
+ releaseIdForMonitoringInfo(info);
+
+ lockMap.remove(monitorKey);
+
+ result.set(RpcResultBuilder.<Void>success().build());
+ } else {
+ String errorMsg = String.format("Do not have monitoring information associated with key %d", monitorId);
+ LOG.error("Delete monitoring operation Failed - {}", errorMsg);
+ result.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, errorMsg).build());
+ }
+
+ return result;
+ }
+
+ private void removeMonitorIdFromInterfaceAssociation(final Long monitorId, final String interfaceName) {
+ LOG.debug("Remove monitorId {} from Interface association {}", monitorId, interfaceName);
+ final ReadWriteTransaction tx = broker.newReadWriteTransaction();
+ ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName));
+ ListenableFuture<Void> updateFuture = Futures.transform(readFuture, new AsyncFunction<Optional<InterfaceMonitorEntry>, Void>() {
+
+ @Override
+ public ListenableFuture<Void> apply(Optional<InterfaceMonitorEntry> optEntry) throws Exception {
+ if(optEntry.isPresent()) {
+ InterfaceMonitorEntry entry = optEntry.get();
+ List<Long> monitorIds = entry.getMonitorIds();
+ monitorIds.remove(monitorId);
+ InterfaceMonitorEntry newEntry = new InterfaceMonitorEntryBuilder(entry)
+ .setKey(new InterfaceMonitorEntryKey(interfaceName)).setMonitorIds(monitorIds).build();
+ tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry, CREATE_MISSING_PARENT);
+ return tx.submit();
+ } else {
+ LOG.warn("No Interface map entry found {} to remove monitorId {}", interfaceName, monitorId);
+ tx.cancel();
+ return Futures.immediateFuture(null);
+ }
+ }
+ });
+
+ Futures.addCallback(updateFuture, new FutureCallbackImpl(
+ String.format("Dis-association of monitorId %d with Interface %s", monitorId, interfaceName)));
+ }
+
+
+ private void releaseIdForMonitoringInfo(MonitoringInfo info) {
+ Long monitorId = info.getId();
+ EndpointType source = info.getSource().getEndpointType();
+ String interfaceName = getInterfaceName(source);
+ if(!Strings.isNullOrEmpty(interfaceName)) {
+ Optional<MonitorProfile> optProfile = read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
+ if(optProfile.isPresent()) {
+ EtherTypes ethType = optProfile.get().getProtocolType();
+ EndpointType destination = (info.getDestination() != null) ? info.getDestination().getEndpointType() : null;
+ String idKey = getUniqueKey(interfaceName, ethType.toString(), source, destination);
+ releaseId(idKey);
+ } else {
+ LOG.warn("Could not release monitorId {}. No profile associated with it", monitorId);
+ }
+ }
+ }
+
+ private String getInterfaceName(EndpointType endpoint) {
+ String interfaceName = null;
+ if(endpoint instanceof Interface) {
+ interfaceName = ((Interface)endpoint).getInterfaceName();
+ }
+ return interfaceName;
+ }
+
+ private void stopMonitoring(long monitorId) {
+ updateMonitorStatusTo(monitorId, MonitorStatus.Stopped, new Predicate<MonitorStatus>() {
+ @Override
+ public boolean apply(MonitorStatus currentStatus) {
+ return currentStatus != MonitorStatus.Stopped;
+ }
+ });
+ if(!stopMonitoringTask(monitorId)) {
+ LOG.warn("No monitoring task running to perform cancel operation for monitorId {}", monitorId);
+ }
+ }
+
+ private void updateMonitorStatusTo(final Long monitorId, final MonitorStatus newStatus, final Predicate<MonitorStatus> isValidStatus) {
+ final String monitorKey = monitorIdKeyCache.getUnchecked(monitorId);
+ if(monitorKey == null) {
+ LOG.warn("No monitor Key associated with id {} to change the monitor status to {}", monitorId, newStatus);
+ return;
+ }
+ final ReadWriteTransaction tx = broker.newReadWriteTransaction();
+
+ ListenableFuture<Optional<MonitoringState>> readResult =
+ tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey));
+
+ ListenableFuture<Void> writeResult = Futures.transform(readResult, new AsyncFunction<Optional<MonitoringState>, Void>() {
+ @Override
+ public ListenableFuture<Void> apply(Optional<MonitoringState> optState) throws Exception {
+ if(optState.isPresent()) {
+ MonitoringState state = optState.get();
+ if(isValidStatus.apply(state.getStatus())) {
+ MonitoringState updatedState = new MonitoringStateBuilder().setMonitorKey(monitorKey)
+ .setStatus(newStatus).build();
+ tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), updatedState);
+ } else {
+ LOG.warn("Invalid Monitoring status {}, cannot be updated to {} for monitorId {}"
+ , state.getStatus(), newStatus, monitorId);
+ }
+ } else {
+ LOG.warn("No associated monitoring state data available to update the status to {} for {}", newStatus, monitorId);
+ }
+ return tx.submit();
+ }
+ });
+
+ Futures.addCallback(writeResult,
+ new FutureCallbackImpl(String.format("Monitor status update for %d to %s", monitorId, newStatus.toString())));
+ }
+
+ private void resumeMonitoring(final long monitorId) {
+ final ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
+ ListenableFuture<Optional<MonitoringInfo>> readInfoResult =
+ tx.read(LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
+
+ Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
+
+ @Override
+ public void onFailure(Throwable error) {
+ String msg = String.format("Unable to read monitoring info associated with monitor id %d", monitorId);
+ LOG.error("Monitor resume Failed. {}", msg, error);
+ }
+
+ @Override
+ public void onSuccess(Optional<MonitoringInfo> optInfo) {
+ if(optInfo.isPresent()) {
+ final MonitoringInfo info = optInfo.get();
+ ListenableFuture<Optional<MonitorProfile>> readProfile =
+ tx.read(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
+ Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>(){
+
+ @Override
+ public void onFailure(Throwable error) {
+ String msg = String.format("Unable to read Monitoring profile associated with id %d", info.getProfileId());
+ LOG.warn("Monitor resume Failed. {}", msg, error);
+ }
+
+ @Override
+ public void onSuccess(Optional<MonitorProfile> optProfile) {
+ tx.close();
+ if(optProfile.isPresent()) {
+ updateMonitorStatusTo(monitorId, MonitorStatus.Started, new Predicate<MonitorStatus>() {
+ @Override
+ public boolean apply(MonitorStatus currentStatus) {
+ return currentStatus != MonitorStatus.Started;
+ }
+ });
+ MonitorProfile profile = optProfile.get();
+ LOG.debug("Monitor Resume - Scheduling monitoring task for Id: {}", monitorId);
+ scheduleMonitoringTask(info, profile.getMonitorInterval());
+ } else {
+ String msg = String.format("Monitoring profile associated with id %d is not present", info.getProfileId());
+ LOG.warn("Monitor resume Failed. {}", msg);
+ }
+ }
+ });
+ } else {
+ tx.close();
+ String msg = String.format("Monitoring info associated with id %d is not present", monitorId);
+ LOG.warn("Monitor resume Failed. {}", msg);
+ }
+ }
+ });
+ }
+
+ //DATA STORE OPERATIONS
+ private <T extends DataObject> Optional<T> read(LogicalDatastoreType datastoreType, InstanceIdentifier<T> path) {
+ ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
+
+ Optional<T> result = Optional.absent();
+ try {
+ result = tx.read(datastoreType, path).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Error reading data from path {} in datastore {}", path, datastoreType, e);
+ } finally {
+ tx.close();
+ }
+
+ return result;
+ }
+
+ @Override
+ public void onInterfaceStateUp(String interfaceName) {
+ List<Long> monitorIds = getMonitorIds(interfaceName);
+ if(monitorIds.isEmpty()) {
+ LOG.warn("Could not get monitorId for interface: {}", interfaceName);
+ return;
+ }
+ for(Long monitorId : monitorIds) {
+ LOG.debug("Resume monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
+ resumeMonitoring(monitorId);
+ }
+ }
+
+ @Override
+ public void onInterfaceStateDown(String interfaceName) {
+ List<Long> monitorIds = getMonitorIds(interfaceName);
+ if(monitorIds.isEmpty()) {
+ LOG.warn("Could not get monitorIds for interface: {}", interfaceName);
+ return;
+ }
+ for(Long monitorId : monitorIds) {
+ LOG.debug("Suspend monitoring on interface: {} with monitorId: {}", interfaceName, monitorId);
+ stopMonitoring(monitorId);
+ }
+ }
+
+ private List<Long> getMonitorIds(String interfaceName) {
+ Optional<InterfaceMonitorEntry> optEntry = read(LogicalDatastoreType.OPERATIONAL,
+ getInterfaceMonitorMapId(interfaceName));
+ if(optEntry.isPresent()) {
+ InterfaceMonitorEntry entry = optEntry.get();
+ return entry.getMonitorIds();
+ }
+ return Collections.emptyList();
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.alivenessmonitor.internal;
+
+public class AlivenessMonitorConstants {
+ static final String MONITOR_IDPOOL_NAME = "aliveness-monitor";
+ static final long MONITOR_IDPOOL_START = 1L;
+ static final long MONITOR_IDPOOL_SIZE = 65535;
+ static final short L3_INTERFACE_TABLE = 80;
+ static final String SEPERATOR = ".";
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.alivenessmonitor.internal;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
+import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.md.sal.binding.api.NotificationService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.AlivenessMonitorService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.EtherTypes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.arputil.rev151126.OdlArputilService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.IdManagerService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.OdlInterfaceRpcService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlivenessMonitorProvider implements BindingAwareProvider,
+ AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(AlivenessMonitorProvider.class);
+ private AlivenessMonitor alivenessMonitor;
+ private RpcProviderRegistry rpcProviderRegistry;
+ private OdlInterfaceRpcService interfaceManager;
+ private RpcRegistration<AlivenessMonitorService> rpcRegistration;
+ private ListenerRegistration<AlivenessMonitor> listenerRegistration;
+ private NotificationService notificationService;
+ private NotificationPublishService notificationPublishService;
+
+ public AlivenessMonitorProvider(RpcProviderRegistry rpcProviderRegistry) {
+ this.rpcProviderRegistry = rpcProviderRegistry;
+ }
+
+ public RpcProviderRegistry getRpcProviderRegistry() {
+ return rpcProviderRegistry;
+ }
+
+ public void setNotificationService(NotificationService notificationService) {
+ this.notificationService = notificationService;
+ }
+
+ public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
+ this.notificationPublishService = notificationPublishService;
+ }
+
+ @Override
+ public void close() throws Exception {
+ rpcRegistration.close();
+ listenerRegistration.close();
+ alivenessMonitor.close();
+ }
+
+ @Override
+ public void onSessionInitiated(ProviderContext session) {
+ LOG.info("AlivenessMonitorProvider Session Initiated");
+ try {
+ final DataBroker dataBroker = session.getSALService(DataBroker.class);
+ PacketProcessingService pktProcessingService = session.getRpcService(PacketProcessingService.class);
+ IdManagerService idManager = rpcProviderRegistry.getRpcService(IdManagerService.class);
+ OdlInterfaceRpcService interfaceService = rpcProviderRegistry.getRpcService(OdlInterfaceRpcService.class);
+ alivenessMonitor = new AlivenessMonitor(dataBroker);
+ alivenessMonitor.setPacketProcessingService(pktProcessingService);
+ alivenessMonitor.setNotificationPublishService(notificationPublishService);
+ alivenessMonitor.setIdManager(idManager);
+ alivenessMonitor.setInterfaceManager(interfaceService);
+ rpcRegistration = getRpcProviderRegistry().addRpcImplementation(AlivenessMonitorService.class, alivenessMonitor);
+ listenerRegistration = notificationService.registerNotificationListener(alivenessMonitor);
+
+ //ARP Handler
+ AlivenessProtocolHandler arpHandler = new AlivenessProtocolHandlerARP(alivenessMonitor);
+ OdlArputilService arpService = rpcProviderRegistry.getRpcService(OdlArputilService.class);
+ ((AlivenessProtocolHandlerARP) arpHandler).setArpManagerService(arpService);
+ alivenessMonitor.registerHandler(EtherTypes.Arp, arpHandler);
+
+ //LLDP Handler
+ AlivenessProtocolHandler lldpHandler = new AlivenessProtocolHandlerLLDP(alivenessMonitor);
+ alivenessMonitor.registerHandler(EtherTypes.Lldp, lldpHandler);
+
+ //TODO: Enable Interface Event Listener
+ //DelegatingInterfaceEventListener listener = new DelegatingInterfaceEventListener(alivenessMonitor);
+ //interfaceListenerRegistration = notificationService.registerNotificationListener(listener);
+ } catch (Exception e) {
+ LOG.error("Error initializing AlivenessMonitor service", e);
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.alivenessmonitor.internal;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.InterfaceMonitorMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorConfigs;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfiles;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitoridKeyMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitoringStates;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntryKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfo;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfoKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfile;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfileKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntryKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringState;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringStateKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import com.google.common.primitives.UnsignedBytes;
+
+public class AlivenessMonitorUtil {
+
+ static InstanceIdentifier<MonitoringState> getMonitorStateId(String keyId) {
+ return InstanceIdentifier.builder(MonitoringStates.class)
+ .child(MonitoringState.class, new MonitoringStateKey(keyId)).build();
+ }
+
+ static InstanceIdentifier<MonitoringInfo> getMonitoringInfoId(Long monitorId) {
+ return InstanceIdentifier.builder(MonitorConfigs.class)
+ .child(MonitoringInfo.class, new MonitoringInfoKey(monitorId)).build();
+ }
+
+ static InstanceIdentifier<MonitorProfile> getMonitorProfileId(Long profileId) {
+ return InstanceIdentifier.builder(MonitorProfiles.class)
+ .child(MonitorProfile.class, new MonitorProfileKey(profileId)).build();
+ }
+
+ static InstanceIdentifier<MonitoridKeyEntry> getMonitorMapId(Long keyId) {
+ return InstanceIdentifier.builder(MonitoridKeyMap.class)
+ .child(MonitoridKeyEntry.class, new MonitoridKeyEntryKey(keyId)).build();
+ }
+
+ static InstanceIdentifier<InterfaceMonitorEntry> getInterfaceMonitorMapId(String interfaceName) {
+ return InstanceIdentifier.builder(InterfaceMonitorMap.class)
+ .child(InterfaceMonitorEntry.class, new InterfaceMonitorEntryKey(interfaceName)).build();
+ }
+
+ public static String toStringIpAddress(byte[] ipAddress)
+ {
+ String ip = "";
+ if (ipAddress == null) {
+ return ip;
+ }
+
+ try {
+ ip = InetAddress.getByAddress(ipAddress).getHostAddress();
+ } catch(UnknownHostException e) { }
+
+ return ip;
+ }
+
+ public static String toStringMacAddress(byte[] macAddress)
+ {
+ if (macAddress == null) {
+ return "";
+ }
+
+ StringBuilder sb = new StringBuilder(18);
+
+ for (int i = 0; i < macAddress.length; i++) {
+ sb.append(UnsignedBytes.toString(macAddress[i], 16).toUpperCase());
+ sb.append(":");
+ }
+
+ sb.setLength(17);
+ return sb.toString();
+ }
+
+ public static byte[] parseIpAddress(String ipAddress) {
+ byte cur;
+
+ String[] addressPart = ipAddress.split(".");
+ int size = addressPart.length;
+
+ byte[] part = new byte[size];
+ for (int i = 0; i < size; i++) {
+ cur = UnsignedBytes.parseUnsignedByte(addressPart[i], 16);
+ part[i] = cur;
+ }
+
+ return part;
+ }
+
+ public static byte[] parseMacAddress(String macAddress) {
+ byte cur;
+
+ String[] addressPart = macAddress.split(":");
+ int size = addressPart.length;
+
+ byte[] part = new byte[size];
+ for (int i = 0; i < size; i++) {
+ cur = UnsignedBytes.parseUnsignedByte(addressPart[i], 16);
+ part[i] = cur;
+ }
+
+ return part;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.alivenessmonitor.internal;
+
+import org.opendaylight.controller.liblldp.Packet;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfo;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
+
+/**
+ * Protocol specific Handler interface defined by the Aliveness monitor service
+ * Handler will be registered with Alivnessmonitor service along with the protocol type
+ * it supports.
+ *
+ */
+public interface AlivenessProtocolHandler {
+
+ Class<?> getPacketClass();
+
+ String handlePacketIn(Packet protocolPacket, PacketReceived packetReceived);
+
+ void sendPacketOut(MonitoringInfo monitorInfo);
+
+ String getUniqueMonitoringKey(MonitoringInfo monitorInfo);
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.alivenessmonitor.internal;
+
+import static org.opendaylight.vpnservice.alivenessmonitor.internal.AlivenessMonitorUtil.toStringIpAddress;
+import static org.opendaylight.vpnservice.alivenessmonitor.internal.AlivenessMonitorConstants.*;
+
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.opendaylight.controller.liblldp.Packet;
+import org.opendaylight.vpnservice.mdsalutil.MetaDataUtil;
+import org.opendaylight.vpnservice.mdsalutil.packet.ARP;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddressBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.EtherTypes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.EndpointType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.Interface;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfo;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.arputil.rev151126.OdlArputilService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.arputil.rev151126.SendArpRequestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.arputil.rev151126.SendArpRequestInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.arputil.rev151126.interfaces.InterfaceAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.arputil.rev151126.interfaces.InterfaceAddressBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+
+public class AlivenessProtocolHandlerARP extends AbstractAlivenessProtocolHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(AlivenessProtocolHandlerARP.class);
+ private OdlArputilService arpService;
+
+ public AlivenessProtocolHandlerARP(ServiceProvider serviceProvider) {
+ super(serviceProvider);
+ }
+
+ void setArpManagerService(OdlArputilService arpService) {
+ this.arpService = arpService;
+ }
+
+ @Override
+ public Class<?> getPacketClass() {
+ return ARP.class;
+ }
+
+ @Override
+ public String handlePacketIn(Packet protocolPacket, PacketReceived packetReceived) {
+ ARP packet = (ARP) protocolPacket;
+ short tableId = packetReceived.getTableId().getValue();
+ int arpType = packet.getOpCode();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("packet: {}, tableId {}, arpType {}", packetReceived, tableId, arpType);
+ }
+
+ if (tableId == AlivenessMonitorConstants.L3_INTERFACE_TABLE) {
+ if (arpType == ARP.REPLY) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("packet: {}, monitorKey {}", packetReceived);
+ }
+
+ BigInteger metadata = packetReceived.getMatch().getMetadata().getMetadata();
+ int portTag = MetaDataUtil.getLportFromMetadata(metadata).intValue();
+ String interfaceName = null;
+ NodeConnectorId connId = packetReceived.getMatch().getInPort();
+// try {
+// interfaceName = serviceProvider.getInterfaceManager().getInterfaceNameForInterfaceTag(portTag);
+// } catch(InterfaceNotFoundException e) {
+// LOG.warn("Error retrieving interface Name for tag {}", portTag, e);
+// }
+ if(!Strings.isNullOrEmpty(interfaceName)) {
+ String sourceIp = toStringIpAddress(packet.getSenderProtocolAddress());
+ String targetIp = toStringIpAddress(packet.getTargetProtocolAddress());
+ return getMonitoringKey(interfaceName, targetIp, sourceIp);
+ } else {
+ LOG.debug("No interface associated with tag {} to interpret the received ARP Reply", portTag);
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void sendPacketOut(MonitoringInfo monitorInfo) {
+ if(arpService == null) {
+ LOG.debug("ARP Service not available to send the packet");
+ return;
+ }
+ EndpointType source = monitorInfo.getSource().getEndpointType();
+ final String sourceInterface = Preconditions.checkNotNull(getInterfaceName(source),
+ "Source interface is required to send ARP Packet for monitoring");
+
+ final String srcIp = Preconditions.checkNotNull(getIpAddress(source),
+ "Source Ip address is required to send ARP Packet for monitoring");
+
+ EndpointType target = monitorInfo.getDestination().getEndpointType();
+ final String targetIp = Preconditions.checkNotNull(getIpAddress(target),
+ "Target Ip address is required to send ARP Packet for monitoring");
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("sendArpRequest interface {}, senderIPAddress {}, targetAddress {}", sourceInterface, srcIp, targetIp);
+ }
+
+ List<InterfaceAddress> addresses = Collections.singletonList(
+ new InterfaceAddressBuilder().setInterface(sourceInterface)
+ .setIpAddress(IpAddressBuilder.getDefaultInstance(srcIp)).build());
+ SendArpRequestInput input = new SendArpRequestInputBuilder().setInterfaceAddress(addresses)
+ .setIpaddress(IpAddressBuilder.getDefaultInstance(targetIp)).build();
+ Future<RpcResult<Void>> future = arpService.sendArpRequest(input);
+
+ final String msgFormat = String.format("Send ARP Request on interface %s to destination %s", sourceInterface, targetIp);
+ Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future), new FutureCallback<RpcResult<Void>>() {
+ @Override
+ public void onFailure(Throwable error) {
+ LOG.error("Error - {}", msgFormat, error);
+ }
+
+ @Override
+ public void onSuccess(RpcResult<Void> result) {
+ if(!result.isSuccessful()) {
+ LOG.warn("Rpc call to {} failed {}", msgFormat, getErrorText(result.getErrors()));
+ } else {
+ LOG.debug("Successful RPC Result - {}", msgFormat);
+ }
+ }
+ });
+ }
+
+ private String getErrorText(Collection<RpcError> errors) {
+ StringBuilder errorText = new StringBuilder();
+ for(RpcError error : errors) {
+ errorText.append(",").append(error.getErrorType()).append("-")
+ .append(error.getMessage());
+ }
+ return errorText.toString();
+ }
+
+ @Override
+ public String getUniqueMonitoringKey(MonitoringInfo monitorInfo) {
+ String interfaceName = getInterfaceName(monitorInfo.getSource().getEndpointType());
+ String sourceIp = getIpAddress(monitorInfo.getSource().getEndpointType());
+ String targetIp = getIpAddress(monitorInfo.getDestination().getEndpointType());
+ return getMonitoringKey(interfaceName, sourceIp, targetIp);
+ }
+
+ private String getMonitoringKey(String interfaceName, String sourceIp, String targetIp) {
+ return new StringBuilder().append(interfaceName).append(SEPERATOR).append(sourceIp)
+ .append(SEPERATOR).append(targetIp).append(SEPERATOR).append(EtherTypes.Arp).toString();
+ }
+
+ private String getIpAddress(EndpointType source) {
+ String ipAddress = null;
+ if( source instanceof IpAddress) {
+ ipAddress = ((IpAddress) source).getIpAddress().getIpv4Address().getValue();
+ } else if (source instanceof Interface) {
+ ipAddress = ((Interface)source).getInterfaceIp().getIpv4Address().getValue();
+ }
+ return ipAddress;
+ }
+
+ private String getInterfaceName(EndpointType endpoint) {
+ String interfaceName = null;
+ if(endpoint instanceof Interface) {
+ interfaceName = ((Interface)endpoint).getInterfaceName();
+ }
+ return interfaceName;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.alivenessmonitor.internal;
+
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.StringUtils;
+import org.opendaylight.controller.liblldp.EtherTypes;
+import org.opendaylight.controller.liblldp.LLDP;
+import org.opendaylight.controller.liblldp.LLDPTLV;
+import org.opendaylight.controller.liblldp.LLDPTLV.TLVType;
+import org.opendaylight.controller.liblldp.Packet;
+import org.opendaylight.controller.liblldp.PacketException;
+import org.opendaylight.vpnservice.mdsalutil.ActionInfo;
+import org.opendaylight.vpnservice.mdsalutil.ActionType;
+import org.opendaylight.vpnservice.mdsalutil.MDSALUtil;
+import org.opendaylight.vpnservice.mdsalutil.MetaDataUtil;
+import org.opendaylight.vpnservice.mdsalutil.packet.Ethernet;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.EndpointType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.Interface;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfo;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.iana._if.type.rev140508.Tunnel;
+//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.Interface;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.InterfaceType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.GetDpidFromInterfaceInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.GetDpidFromInterfaceInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.GetDpidFromInterfaceOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.GetPortFromInterfaceInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.GetPortFromInterfaceInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.GetPortFromInterfaceOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.OdlInterfaceRpcService;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+
+public class AlivenessProtocolHandlerLLDP extends AbstractAlivenessProtocolHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(AlivenessProtocolHandlerLLDP.class);
+ private AtomicInteger packetId = new AtomicInteger(0);
+
+ public AlivenessProtocolHandlerLLDP(ServiceProvider serviceProvider) {
+ super(serviceProvider);
+ }
+
+ @Override
+ public Class<?> getPacketClass() {
+ return LLDP.class;
+ }
+
+ @Override
+ public String handlePacketIn(Packet protocolPacket, PacketReceived packetReceived) {
+ String sSourceDpnId = null;
+ String sPortNumber = null;
+ int nServiceId = -1;
+ int packetId = 0;
+
+ String sTmp = null;
+
+ byte lldpTlvTypeCur;
+
+ LLDP lldpPacket = (LLDP)protocolPacket;
+
+ LLDPTLV lldpTlvCur = lldpPacket.getSystemNameId();
+ if (lldpTlvCur != null) {
+ sSourceDpnId = new String(lldpTlvCur.getValue(), Charset.defaultCharset());
+ }
+
+ lldpTlvCur = lldpPacket.getPortId();
+ if (lldpTlvCur != null) {
+ sPortNumber = new String(lldpTlvCur.getValue(), Charset.defaultCharset());
+ }
+
+ for (LLDPTLV lldpTlv : lldpPacket.getOptionalTLVList()) {
+ lldpTlvTypeCur = lldpTlv.getType();
+
+ if (lldpTlvTypeCur == LLDPTLV.TLVType.SystemName.getValue()) {
+ sSourceDpnId = new String(lldpTlvCur.getValue(), Charset.defaultCharset());
+ }
+ }
+
+ for (LLDPTLV lldpTlv : lldpPacket.getCustomTlvList()) {
+ lldpTlvTypeCur = lldpTlv.getType();
+
+ if (lldpTlvTypeCur == LLDPTLV.TLVType.Custom.getValue()) {
+ sTmp = new String(lldpTlv.getValue());
+ nServiceId = 0;
+ }
+ }
+
+ String interfaceName = null;
+
+ //TODO: Check if the below fields are required
+ if (!Strings.isNullOrEmpty(sTmp) && sTmp.contains("#")) {
+ String[] asTmp = sTmp.split("#");
+ interfaceName = asTmp[0];
+ packetId = Integer.parseInt(asTmp[1]);
+ LOG.debug("Custom LLDP Value on received packet: " + sTmp);
+ }
+
+ BigInteger metadata = packetReceived.getMatch().getMetadata().getMetadata();
+ int portTag = MetaDataUtil.getLportFromMetadata(metadata).intValue();
+ if(portTag == 0) {
+ LOG.trace("Ignoring Packet-in for interface tag 0");
+ return null;
+ }
+
+ String destInterfaceName = null;
+ //TODO: Use latest interface RPC
+// try {
+// destInterfaceName = serviceProvider.getInterfaceManager().getInterfaceNameForInterfaceTag(portTag);
+// } catch(InterfaceNotFoundException e) {
+// LOG.warn("Error retrieving interface Name for tag {}", portTag, e);
+// }
+
+ if(!Strings.isNullOrEmpty(interfaceName)) {
+ String monitorKey = new StringBuilder().append(interfaceName).append(EtherTypes.LLDP).toString();
+ return monitorKey;
+ } else {
+ LOG.debug("No interface associated with tag {} to handle received LLDP Packet", portTag);
+ }
+ return null;
+ }
+
+ @Override
+ public void sendPacketOut(MonitoringInfo monitorInfo) {
+ String sourceInterface;
+
+ EndpointType source = monitorInfo.getSource().getEndpointType();
+ if( source instanceof Interface) {
+ Interface intf = (Interface)source;
+ sourceInterface = intf.getInterfaceName();
+ } else {
+ LOG.warn("Invalid source endpoint. Could not retrieve source interface to send LLDP Packet");
+ return;
+ }
+
+ //Get Mac Address for the source interface
+ byte[] sourceMac = getMacAddress(sourceInterface);
+ if(sourceMac == null) {
+ LOG.error("Could not read mac address for the source interface {} from the Inventory. "
+ + "LLDP packet cannot be send.", sourceInterface);
+ return;
+ }
+
+ OdlInterfaceRpcService interfaceService = serviceProvider.getInterfaceManager();
+
+ long nodeId = -1, portNum = -1;
+ try {
+ GetDpidFromInterfaceInput dpIdInput = new GetDpidFromInterfaceInputBuilder().setIntfName(sourceInterface).build();
+ Future<RpcResult<GetDpidFromInterfaceOutput>> dpIdOutput = interfaceService.getDpidFromInterface(dpIdInput);
+ RpcResult<GetDpidFromInterfaceOutput> dpIdResult = dpIdOutput.get();
+ if(dpIdResult.isSuccessful()) {
+ GetDpidFromInterfaceOutput output = dpIdResult.getResult();
+ nodeId = output.getDpid().longValue();
+ } else {
+ LOG.error("Could not retrieve DPN Id for interface {}", sourceInterface);
+ return;
+ }
+
+
+ GetPortFromInterfaceInput input = new GetPortFromInterfaceInputBuilder().setIntfName(sourceInterface).build();
+ Future<RpcResult<GetPortFromInterfaceOutput>> portOutput = interfaceService.getPortFromInterface(input);
+ RpcResult<GetPortFromInterfaceOutput> result = portOutput.get();
+ if(result.isSuccessful()) {
+ GetPortFromInterfaceOutput output = result.getResult();
+ portNum = output.getPortno();
+ } else {
+ LOG.error("Could not retrieve port number for interface {}", sourceInterface);
+ return;
+ }
+ }catch(InterruptedException | ExecutionException e) {
+ LOG.error("Failed to retrieve RPC Result ", e);
+ return;
+ }
+
+ Ethernet ethenetLLDPPacket = makeLLDPPacket(Long.toString(nodeId), portNum, 0, sourceMac, sourceInterface);
+
+ try {
+ List<ActionInfo> actions = getInterfaceActions(sourceInterface);
+ if(actions.isEmpty()) {
+ LOG.error("No interface actions to send packet out over interface {}", sourceInterface);
+ return;
+ }
+ TransmitPacketInput transmitPacketInput = MDSALUtil.getPacketOut(actions,
+ ethenetLLDPPacket.serialize(), nodeId, MDSALUtil.getNodeConnRef(BigInteger.valueOf(nodeId), "0xfffffffd"));
+ serviceProvider.getPacketProcessingService().transmitPacket(transmitPacketInput);
+ } catch (Exception e) {
+ LOG.error("Error while sending LLDP Packet", e);
+ }
+ }
+
+ private List<ActionInfo> getInterfaceActions(String interfaceName) throws InterruptedException, ExecutionException {
+
+ OdlInterfaceRpcService interfaceService = serviceProvider.getInterfaceManager();
+
+ long portNum = -1;
+ GetPortFromInterfaceInput input = new GetPortFromInterfaceInputBuilder().setIntfName(interfaceName).build();
+ Future<RpcResult<GetPortFromInterfaceOutput>> portOutput = interfaceService.getPortFromInterface(input);
+ RpcResult<GetPortFromInterfaceOutput> result = portOutput.get();
+ if(result.isSuccessful()) {
+ GetPortFromInterfaceOutput output = result.getResult();
+ portNum = output.getPortno();
+ } else {
+ LOG.error("Could not retrieve port number for interface {} to construct actions", interfaceName);
+ return Collections.emptyList();
+ }
+
+ Class<? extends InterfaceType> intfType = null;
+ org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.Interface interfaceInfo =
+ getInterfaceFromConfigDS(interfaceName);
+ if(interfaceInfo != null) {
+ intfType = interfaceInfo.getType();
+ } else {
+ LOG.error("Could not retrieve port type for interface {} to construct actions", interfaceName);
+ return Collections.emptyList();
+ }
+
+ List<ActionInfo> actionInfos = new ArrayList<ActionInfo>();
+
+ if(Tunnel.class.equals(intfType)) {
+ actionInfos.add(new ActionInfo(ActionType.set_field_tunnel_id, new BigInteger[] {
+ MetaDataUtil.getTunnelIdWithValidVniBitAndVniSet(0x08000000),
+ MetaDataUtil.METADA_MASK_VALID_TUNNEL_ID_BIT_AND_TUNNEL_ID}));
+ }
+ actionInfos.add(new ActionInfo(ActionType.output, new String[] { Long.toString(portNum) }));
+ return actionInfos;
+ }
+
+ private static LLDPTLV buildLLDTLV(LLDPTLV.TLVType tlvType,byte[] abyTLV) {
+ return new LLDPTLV().setType(tlvType.getValue()).setLength((short) abyTLV.length).setValue(abyTLV);
+ }
+
+ private int getPacketId() {
+ int id = packetId.incrementAndGet();
+ if(id > 16000) {
+ LOG.debug("Resetting the LLDP Packet Id counter");
+ packetId.set(0);
+ }
+
+ return id;
+ }
+
+ public Ethernet makeLLDPPacket(String nodeId,
+ long portNum, int serviceId, byte[] srcMac, String sourceInterface) {
+
+ // Create LLDP TTL TLV
+ LLDPTLV lldpTlvTTL = buildLLDTLV(LLDPTLV.TLVType.TTL, new byte[] { (byte) 0, (byte) 120 });
+
+ LLDPTLV lldpTlvChassisId = buildLLDTLV(LLDPTLV.TLVType.ChassisID, LLDPTLV.createChassisIDTLVValue(colonize(StringUtils
+ .leftPad(Long.toHexString(MDSALUtil.getDpnIdFromNodeName(nodeId).longValue()), 16,
+ "0"))));
+ LLDPTLV lldpTlvSystemName = buildLLDTLV(TLVType.SystemName, LLDPTLV.createSystemNameTLVValue(nodeId));
+
+ LLDPTLV lldpTlvPortId = buildLLDTLV(TLVType.PortID, LLDPTLV.createPortIDTLVValue(
+ Long.toHexString(portNum)));
+
+ String customValue = sourceInterface + "#" + getPacketId();
+
+ LOG.debug("Sending LLDP packet, custome value " + customValue);
+
+ LLDPTLV lldpTlvCustom = buildLLDTLV(TLVType.Custom, customValue.getBytes());
+
+ List<LLDPTLV> lstLLDPTLVCustom = new ArrayList<>();
+ lstLLDPTLVCustom.add(lldpTlvCustom);
+
+ LLDP lldpDiscoveryPacket = new LLDP();
+ lldpDiscoveryPacket.setChassisId(lldpTlvChassisId)
+ .setPortId(lldpTlvPortId)
+ .setTtl(lldpTlvTTL)
+ .setSystemNameId(lldpTlvSystemName)
+ .setOptionalTLVList(lstLLDPTLVCustom);
+
+ byte[] destMac = LLDP.LLDPMulticastMac;
+
+ Ethernet ethernetPacket = new Ethernet();
+ ethernetPacket.setSourceMACAddress(srcMac)
+ .setDestinationMACAddress(destMac)
+ .setEtherType(EtherTypes.LLDP.shortValue())
+ .setPayload(lldpDiscoveryPacket);
+
+ return ethernetPacket;
+ }
+
+ private String colonize(String orig) {
+ return orig.replaceAll("(?<=..)(..)", ":$1");
+ }
+
+ @Override
+ public String getUniqueMonitoringKey(MonitoringInfo monitorInfo) {
+ String interfaceName = getInterfaceName(monitorInfo.getSource().getEndpointType());
+ return new StringBuilder().append(interfaceName).append(EtherTypes.LLDP).toString();
+ }
+
+ private String getInterfaceName(EndpointType endpoint) {
+ String interfaceName = null;
+ if(endpoint instanceof Interface) {
+ interfaceName = ((Interface)endpoint).getInterfaceName();
+ }
+ return interfaceName;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.alivenessmonitor.internal;
+
+interface InterfaceStateListener {
+
+ void onInterfaceStateUp(String interfaceName);
+
+ void onInterfaceStateDown(String interfaceName);
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.alivenessmonitor.internal;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.primitives.UnsignedBytes;
+
+class InventoryReader {
+ private Logger LOG = LoggerFactory.getLogger(InventoryReader.class);
+ private DataBroker dataService;
+
+ public InventoryReader(DataBroker dataService) {
+ this.dataService = dataService;
+ }
+
+ public String getMacAddress(InstanceIdentifier<NodeConnector> nodeConnectorId) {
+ //TODO: Use mdsal apis to read
+ Optional<NodeConnector> optNc = read(LogicalDatastoreType.OPERATIONAL, nodeConnectorId);
+ if(optNc.isPresent()) {
+ NodeConnector nc = optNc.get();
+ FlowCapableNodeConnector fcnc = nc.getAugmentation(FlowCapableNodeConnector.class);
+ MacAddress macAddress = fcnc.getHardwareAddress();
+ return macAddress.getValue();
+ }
+ return null;
+ }
+
+ private <T extends DataObject> Optional<T> read(LogicalDatastoreType datastoreType,
+ InstanceIdentifier<T> path) {
+
+ ReadOnlyTransaction tx = dataService.newReadOnlyTransaction();
+
+ Optional<T> result = Optional.absent();
+ try {
+ result = tx.read(datastoreType, path).get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ tx.close();
+
+ return result;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.alivenessmonitor.internal;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.interfacemgr.rpcs.rev151003.OdlInterfaceRpcService;
+
+/**
+ * Provides access methods to retrieve the reference to the dependent service
+ *
+ */
+interface ServiceProvider {
+
+ DataBroker getDataBroker();
+
+ PacketProcessingService getPacketProcessingService();
+
+ OdlInterfaceRpcService getInterfaceManager();
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.alivenessmonitor.internal;
+
+/**
+ * Exception indicating the config provided is not supported currently
+ *
+ *
+ */
+public class UnsupportedConfigException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public UnsupportedConfigException(String message){
+ super(message);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.alivenessmonitor.impl.rev150706;
+
+import org.opendaylight.vpnservice.alivenessmonitor.internal.AlivenessMonitorProvider;
+
+public class AlivenessMonitorModule extends org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.alivenessmonitor.impl.rev150706.AbstractAlivenessMonitorModule {
+ public AlivenessMonitorModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ super(identifier, dependencyResolver);
+ }
+
+ public AlivenessMonitorModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.alivenessmonitor.impl.rev150706.AlivenessMonitorModule oldModule, java.lang.AutoCloseable oldInstance) {
+ super(identifier, dependencyResolver, oldModule, oldInstance);
+ }
+
+ @Override
+ public void customValidation() {
+ // add custom validation form module attributes here.
+ }
+
+ @Override
+ public java.lang.AutoCloseable createInstance() {
+ AlivenessMonitorProvider provider = new AlivenessMonitorProvider(getRpcRegistryDependency());
+ provider.setNotificationPublishService(getNotificationPublishServiceDependency());
+ provider.setNotificationService(getNotificationServiceDependency());
+ getBrokerDependency().registerProvider(provider);
+ return provider;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+/*
+* Generated file
+*
+* Generated from: yang module name: alivenessmonitor-impl yang module local name: alivenessmonitor-impl
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Mon Jul 06 11:45:33 IST 2015
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.alivenessmonitor.impl.rev150706;
+public class AlivenessMonitorModuleFactory extends org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.alivenessmonitor.impl.rev150706.AbstractAlivenessMonitorModuleFactory {
+
+}
--- /dev/null
+module alivenessmonitor-impl {
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:alivenessmonitor:impl";
+ prefix "alivenessmonitor-impl";
+
+ import config { prefix config; revision-date 2013-04-05; }
+ import opendaylight-md-sal-binding { prefix md-sal-binding; revision-date 2013-10-28;}
+ import opendaylight-sal-binding-broker-impl { prefix md-sal-binding-impl; revision-date 2013-10-28;}
+ import odl-interface {prefix odlif; revision-date 2015-03-31;}
+ //import aliveness-monitor { prefix aliveness-mon; revision-date 2015-06-29; }
+
+ description
+ "Service definition for aliveness monitor module";
+
+ revision "2015-07-06" {
+ description
+ "Initial revision";
+ }
+
+ identity alivenessmonitor-impl {
+ base config:module-type;
+ config:java-name-prefix AlivenessMonitor;
+ }
+
+ augment "/config:modules/config:module/config:configuration" {
+ case alivenessmonitor-impl {
+ when "/config:modules/config:module/config:type = 'alivenessmonitor-impl'";
+ container broker {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity md-sal-binding:binding-broker-osgi-registry;
+ }
+ }
+ }
+ container rpc-registry {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity md-sal-binding:binding-rpc-registry;
+ }
+ }
+ }
+ container notification-publish-service {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity md-sal-binding-impl:binding-new-notification-publish-service;
+ }
+ }
+ }
+ container notification-service {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity md-sal-binding-impl:binding-new-notification-service;
+ }
+ }
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.alivenessmonitor.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+import static org.mockito.Matchers.argThat;
+
+import java.util.Arrays;
+
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.vpnservice.alivenessmonitor.internal.AlivenessMonitor;
+import org.opendaylight.vpnservice.alivenessmonitor.internal.AlivenessProtocolHandler;
+import org.opendaylight.vpnservice.alivenessmonitor.internal.AlivenessProtocolHandlerARP;
+import org.opendaylight.vpnservice.alivenessmonitor.internal.AlivenessProtocolHandlerLLDP;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddressBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.EtherTypes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorPauseInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorPauseInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileCreateOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileDeleteInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorProfileDeleteInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStartOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStopInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorStopInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorUnpauseInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitorUnpauseInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.MonitoringMode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629._interface.monitor.map.InterfaceMonitorEntryBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.Interface;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.endpoint.endpoint.type.InterfaceBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfo;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.configs.MonitoringInfoBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.params.DestinationBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.params.SourceBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profile.create.input.ProfileBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfile;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.profiles.MonitorProfileBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitor.start.input.ConfigBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitorid.key.map.MonitoridKeyEntryBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringState;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.alivenessmonitor.rev150629.monitoring.states.MonitoringStateBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.AllocateIdOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.CreateIdPoolInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.IdManagerService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.idmanager.rev150403.ReleaseIdInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+
+public class AlivenessMonitorTest {
+
+ @Mock private DataBroker dataBroker;
+ @Mock private IdManagerService idManager;
+ @Mock private PacketProcessingService packetProcessingService;
+ @Mock private NotificationPublishService notificationPublishService;
+ private AlivenessMonitor alivenessMonitor;
+ private AlivenessProtocolHandler arpHandler;
+ private AlivenessProtocolHandler lldpHandler;
+ private long mockId;
+ @Mock private ReadOnlyTransaction readTx;
+ @Mock private WriteTransaction writeTx;
+ @Mock private ReadWriteTransaction readWriteTx;
+ @Captor ArgumentCaptor<MonitoringState> stateCaptor;
+
+ private <T extends DataObject> Matcher<InstanceIdentifier<T>> isType(final Class<T> klass) {
+ return new TypeSafeMatcher<InstanceIdentifier<T>>() {
+ @Override
+ public void describeTo(Description desc) {
+ desc.appendText("Instance Identifier should have Target Type " + klass);
+ }
+
+ @Override
+ protected boolean matchesSafely(InstanceIdentifier<T> id) {
+ return id.getTargetType().equals(klass);
+ }
+ };
+ }
+
+ private Matcher<RpcError> hasErrorType(final ErrorType errorType) {
+ return new TypeSafeMatcher<RpcError>() {
+ @Override
+ public void describeTo(Description desc) {
+ desc.appendText("Error type do not match " + errorType);
+ }
+
+ @Override
+ protected boolean matchesSafely(RpcError error) {
+ return error.getErrorType().equals(errorType);
+ }
+ };
+ }
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ alivenessMonitor = new AlivenessMonitor(dataBroker);
+ when(idManager.createIdPool(any(CreateIdPoolInput.class)))
+ .thenReturn(Futures.immediateFuture(RpcResultBuilder.<Void>success().build()));
+ alivenessMonitor.setIdManager(idManager);
+ alivenessMonitor.setNotificationPublishService(notificationPublishService);
+ alivenessMonitor.setPacketProcessingService(packetProcessingService);
+
+ arpHandler = new AlivenessProtocolHandlerARP(alivenessMonitor);
+ alivenessMonitor.registerHandler(EtherTypes.Arp, arpHandler);
+
+ lldpHandler = new AlivenessProtocolHandlerLLDP(alivenessMonitor);
+ alivenessMonitor.registerHandler(EtherTypes.Lldp, lldpHandler);
+ mockId = 1L;
+ when(idManager.allocateId(any(AllocateIdInput.class)))
+ .thenReturn(Futures.immediateFuture(RpcResultBuilder.success(new AllocateIdOutputBuilder().setIdValue(mockId++).build()).build()));
+ when(idManager.releaseId(any(ReleaseIdInput.class))).thenReturn(Futures.immediateFuture(RpcResultBuilder.<Void>success().build()));
+ doReturn(readTx).when(dataBroker).newReadOnlyTransaction();
+ doReturn(writeTx).when(dataBroker).newWriteOnlyTransaction();
+ doReturn(readWriteTx).when(dataBroker).newReadWriteTransaction();
+ doNothing().when(writeTx).put(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(DataObject.class));
+ doReturn(Futures.immediateCheckedFuture(null)).when(writeTx).submit();
+ doReturn(Futures.immediateCheckedFuture(null)).when(readWriteTx).submit();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ alivenessMonitor.close();
+ }
+
+ @Test
+ public void testMonitorProfileCreate() throws Throwable {
+ MonitorProfileCreateInput input = new MonitorProfileCreateInputBuilder().setProfile(new ProfileBuilder().setFailureThreshold(10L)
+ .setMonitorInterval(10000L).setMonitorWindow(10L).setProtocolType(EtherTypes.Arp).build()).build();
+ doReturn(Futures.immediateCheckedFuture(Optional.absent())).when(readWriteTx).read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitorProfile.class)));
+ doReturn(Futures.immediateCheckedFuture(null)).when(readWriteTx).submit();
+ RpcResult<MonitorProfileCreateOutput> output = alivenessMonitor.monitorProfileCreate(input).get();
+ assertTrue("Monitor Profile Create result", output.isSuccessful());
+ assertNotNull("Monitor Profile Output", output.getResult().getProfileId());
+ }
+
+ @Test
+ public void testMonitorProfileCreateAlreadyExist() throws Throwable {
+ MonitorProfileCreateInput input = new MonitorProfileCreateInputBuilder().setProfile(new ProfileBuilder().setFailureThreshold(10L)
+ .setMonitorInterval(10000L).setMonitorWindow(10L).setProtocolType(EtherTypes.Arp).build()).build();
+ @SuppressWarnings("unchecked")
+ Optional<MonitorProfile> optionalProfile = (Optional<MonitorProfile>)mock(Optional.class);
+ CheckedFuture<Optional<MonitorProfile>, ReadFailedException> proFuture = Futures.immediateCheckedFuture(optionalProfile);
+ doReturn(true).when(optionalProfile).isPresent();
+ doReturn(proFuture).when(readWriteTx).read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitorProfile.class)));
+ RpcResult<MonitorProfileCreateOutput> output = alivenessMonitor.monitorProfileCreate(input).get();
+ assertTrue("Monitor Profile Create result", output.isSuccessful());
+ assertThat(output.getErrors(), CoreMatchers.hasItem(hasErrorType(ErrorType.PROTOCOL)));
+ }
+
+ @Test
+ public void testMonitorStart() throws Throwable {
+ Long profileId = createProfile();
+ MonitorStartInput input = new MonitorStartInputBuilder().setConfig(new ConfigBuilder()
+ .setDestination(new DestinationBuilder().setEndpointType(getInterface("10.0.0.1")).build())
+ .setSource(new SourceBuilder().setEndpointType(getInterface("testInterface", "10.1.1.1")).build())
+ .setMode(MonitoringMode.OneOne)
+ .setProfileId(profileId).build()).build();
+ @SuppressWarnings("unchecked")
+ Optional<MonitorProfile> optionalProfile = (Optional<MonitorProfile>)mock(Optional.class);
+ CheckedFuture<Optional<MonitorProfile>, ReadFailedException> proFuture = Futures.immediateCheckedFuture(optionalProfile);
+ when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitorProfile.class)))).thenReturn(proFuture);
+ doReturn(true).when(optionalProfile).isPresent();
+ doReturn(getTestMonitorProfile()).when(optionalProfile).get();
+ CheckedFuture<Optional<MonitoringInfo>, ReadFailedException> outFuture = Futures.immediateCheckedFuture(Optional.<MonitoringInfo>absent());
+ when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitoringInfo.class)))).thenReturn(outFuture);
+ RpcResult<MonitorStartOutput> output = alivenessMonitor.monitorStart(input).get();
+ verify(idManager, times(2)).allocateId(any(AllocateIdInput.class));
+ assertTrue("Monitor start output result", output.isSuccessful());
+ assertNotNull("Monitor start output", output.getResult().getMonitorId());
+ }
+
+ @Test
+ public void testMonitorPause() throws Throwable {
+ MonitorPauseInput input = new MonitorPauseInputBuilder().setMonitorId(2L).build();
+ Optional<MonitoringState> optState = Optional.of(new MonitoringStateBuilder().setStatus(MonitorStatus.Started).build());
+ when(readWriteTx.read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitoringState.class)))).
+ thenReturn(Futures.<Optional<MonitoringState>, ReadFailedException>immediateCheckedFuture(optState));
+ Optional<MonitoridKeyEntry> optMap = Optional.of(new MonitoridKeyEntryBuilder().setMonitorId(2L).setMonitorKey("Test monitor Key").build());
+ when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitoridKeyEntry.class)))).
+ thenReturn(Futures.<Optional<MonitoridKeyEntry>, ReadFailedException>immediateCheckedFuture(optMap));
+ alivenessMonitor.monitorPause(input).get();
+ verify(readWriteTx).merge(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitoringState.class)), stateCaptor.capture());
+ assertEquals(MonitorStatus.Paused, stateCaptor.getValue().getStatus());
+ }
+
+ @Test
+ public void testMonitorUnpause() throws Throwable {
+ MonitorUnpauseInput input = new MonitorUnpauseInputBuilder().setMonitorId(2L).build();
+ Optional<MonitoringState> optState = Optional.of(new MonitoringStateBuilder().setStatus(MonitorStatus.Paused).build());
+ when(readWriteTx.read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitoringState.class)))).
+ thenReturn(Futures.<Optional<MonitoringState>, ReadFailedException>immediateCheckedFuture(optState));
+ Optional<MonitoringInfo> optInfo = Optional.of(new MonitoringInfoBuilder().setId(2L).setProfileId(1L).build());
+ when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitoringInfo.class)))).
+ thenReturn(Futures.<Optional<MonitoringInfo>, ReadFailedException>immediateCheckedFuture(optInfo));
+ Optional<MonitorProfile> optProfile = Optional.of(getTestMonitorProfile());
+ when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitorProfile.class)))).
+ thenReturn(Futures.<Optional<MonitorProfile>, ReadFailedException>immediateCheckedFuture(optProfile));
+ Optional<MonitoridKeyEntry> optMap = Optional.of(new MonitoridKeyEntryBuilder().setMonitorId(2L).setMonitorKey("Test monitor Key").build());
+ when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitoridKeyEntry.class)))).
+ thenReturn(Futures.<Optional<MonitoridKeyEntry>, ReadFailedException>immediateCheckedFuture(optMap));
+ RpcResult<Void> result = alivenessMonitor.monitorUnpause(input).get();
+ verify(readWriteTx).merge(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitoringState.class)), stateCaptor.capture());
+ assertEquals(MonitorStatus.Started, stateCaptor.getValue().getStatus());
+ assertTrue("Monitor unpause rpc result", result.isSuccessful());
+ }
+
+ @Test
+ public void testMonitorStop() throws Throwable {
+ MonitorStopInput input = new MonitorStopInputBuilder().setMonitorId(2L).build();
+ Optional<MonitoringInfo> optInfo = Optional.of(
+ new MonitoringInfoBuilder().setSource(new SourceBuilder().setEndpointType(getInterface("testInterface", "10.1.1.1")).build()).build());
+ CheckedFuture<Optional<MonitoringInfo>, ReadFailedException> outFuture = Futures.immediateCheckedFuture(optInfo);
+ when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitoringInfo.class)))).thenReturn(outFuture);
+ Optional<MonitoridKeyEntry> optMap = Optional.of(new MonitoridKeyEntryBuilder().setMonitorId(2L).setMonitorKey("Test monitor Key").build());
+ when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitoridKeyEntry.class)))).
+ thenReturn(Futures.<Optional<MonitoridKeyEntry>, ReadFailedException>immediateCheckedFuture(optMap));
+ Optional<MonitorProfile> optProfile = Optional.of(getTestMonitorProfile());
+ when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitorProfile.class)))).
+ thenReturn(Futures.<Optional<MonitorProfile>, ReadFailedException>immediateCheckedFuture(optProfile));
+ Optional<InterfaceMonitorEntry> optEntry = Optional.of(getInterfaceMonitorEntry());
+ when(readWriteTx.read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(InterfaceMonitorEntry.class)))).
+ thenReturn(Futures.<Optional<InterfaceMonitorEntry>, ReadFailedException>immediateCheckedFuture(optEntry));
+ RpcResult<Void> result = alivenessMonitor.monitorStop(input).get();
+ verify(idManager).releaseId(any(ReleaseIdInput.class));
+ verify(writeTx, times(2)).delete(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class));
+ assertTrue("Monitor stop rpc result", result.isSuccessful());
+ }
+
+ @Test
+ public void testMonitorProfileDelete() throws Throwable {
+ MonitorProfileDeleteInput input = new MonitorProfileDeleteInputBuilder().setProfileId(1L).build();
+ Optional<MonitorProfile> optProfile = Optional.of(getTestMonitorProfile());
+ when(readWriteTx.read(eq(LogicalDatastoreType.OPERATIONAL), argThat(isType(MonitorProfile.class)))).
+ thenReturn(Futures.<Optional<MonitorProfile>, ReadFailedException>immediateCheckedFuture(optProfile));
+ RpcResult<Void> result = alivenessMonitor.monitorProfileDelete(input).get();
+ verify(idManager).releaseId(any(ReleaseIdInput.class));
+ verify(readWriteTx).delete(eq(LogicalDatastoreType.OPERATIONAL), Matchers.<InstanceIdentifier<MonitorProfile>>any());
+ assertTrue("Monitor profile delete result", result.isSuccessful());
+ }
+
+ @SuppressWarnings("unchecked")
+ private long createProfile() throws Throwable{
+ MonitorProfileCreateInput input = new MonitorProfileCreateInputBuilder().setProfile(new ProfileBuilder().setFailureThreshold(10L)
+ .setMonitorInterval(10000L).setMonitorWindow(10L).setProtocolType(EtherTypes.Arp).build()).build();
+ doReturn(Futures.immediateCheckedFuture(Optional.absent())).when(readWriteTx).read(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class));
+ doReturn(Futures.immediateCheckedFuture(null)).when(readWriteTx).submit();
+ RpcResult<MonitorProfileCreateOutput> output = alivenessMonitor.monitorProfileCreate(input).get();
+ return output.getResult().getProfileId();
+ }
+
+ private MonitorProfile getTestMonitorProfile() {
+ return new MonitorProfileBuilder().setFailureThreshold(10L).setMonitorInterval(10000L)
+ .setMonitorWindow(10L).setProtocolType(EtherTypes.Arp).build();
+ }
+
+ private InterfaceMonitorEntry getInterfaceMonitorEntry() {
+ return new InterfaceMonitorEntryBuilder().setInterfaceName("test-interface").setMonitorIds(Arrays.asList(1L, 2L)).build();
+ }
+
+ private Interface getInterface(String ipAddress) {
+ return new InterfaceBuilder().setInterfaceIp(IpAddressBuilder.getDefaultInstance(ipAddress)).build();
+ }
+
+ private Interface getInterface(String interfaceName, String ipAddress) {
+ return new InterfaceBuilder().setInterfaceIp(IpAddressBuilder.getDefaultInstance(ipAddress)).setInterfaceName(interfaceName).build();
+ }
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.opendaylight.odlparent</groupId>
+ <artifactId>odlparent</artifactId>
+ <version>1.6.0-SNAPSHOT</version>
+ <relativePath/>
+ </parent>
+
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>alivenessmonitor-aggregator</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ <name>alivenessmonitor</name>
+ <packaging>pom</packaging>
+ <modelVersion>4.0.0</modelVersion>
+ <prerequisites>
+ <maven>3.1.1</maven>
+ </prerequisites>
+ <modules>
+ <module>alivenessmonitor-api</module>
+ <module>alivenessmonitor-impl</module>
+ </modules>
+ <!-- DO NOT install or deploy the repo root pom as it's only needed to initiate a build -->
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-install-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
<mdsal.model.version>0.8.0-SNAPSHOT</mdsal.model.version>
<vpnservices.version>0.2.0-SNAPSHOT</vpnservices.version>
<ovsdb.version>1.2.1-SNAPSHOT</ovsdb.version>
+ <liblldp.version>0.10.0-SNAPSHOT</liblldp.version>
<arputil.version>${vpnservices.version}</arputil.version>
<mdsalutil.version>${vpnservices.version}</mdsalutil.version>
<vpnmanager.version>${vpnservices.version}</vpnmanager.version>
<artifactId>interfacemgr-api</artifactId>
<version>${interfacemgr.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>liblldp</artifactId>
+ <version>${liblldp.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>alivenessmonitor-impl</artifactId>
+ <version>${vpnservices.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>alivenessmonitor-impl</artifactId>
+ <version>${vpnservices.version}</version>
+ <classifier>config</classifier>
+ <type>xml</type>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>alivenessmonitor-api</artifactId>
+ <version>${vpnservices.version}</version>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>vpnmanager-api</artifactId>
<feature version='${mdsal.version}'>odl-mdsal-broker</feature>
<feature version='${mdsal.model.version}'>odl-mdsal-models</feature>
<feature version='${openflowplugin.version}'>odl-openflowplugin-nsf-model</feature>
+ <bundle>mvn:org.opendaylight.controller/liblldp/${liblldp.version}</bundle>
<bundle>mvn:org.opendaylight.vpnservice/model-bgp/{{VERSION}}</bundle>
<bundle>mvn:org.opendaylight.vpnservice/lockmanager-api/${lockmanager.version}</bundle>
<bundle>mvn:org.opendaylight.vpnservice/idmanager-api/${idmanager.version}</bundle>
<bundle>mvn:org.opendaylight.vpnservice/arputil-api/${arputil.version}</bundle>
+ <bundle>mvn:org.opendaylight.vpnservice/alivenessmonitor-api/${vpnservices.version}</bundle>
<bundle>mvn:org.opendaylight.vpnservice/vpnmanager-api/${vpnmanager.version}</bundle>
<bundle>mvn:org.opendaylight.vpnservice/nexthopmgr-api/${nexthopmgr.version}</bundle>
<bundle>mvn:org.opendaylight.vpnservice/fibmanager-api/${fibmanager.version}</bundle>
<bundle>mvn:org.opendaylight.vpnservice/bgpmanager-impl/${vpnservices.version}</bundle>
<bundle>mvn:org.opendaylight.vpnservice/mdsalutil-api/${interfacemgr.version}</bundle>
<bundle>mvn:org.opendaylight.vpnservice/arputil-impl/${arputil.version}</bundle>
+ <bundle>mvn:org.opendaylight.vpnservice/alivenessmonitor-impl/${vpnservices.version}</bundle>
<bundle>mvn:org.opendaylight.vpnservice/mdsalutil-impl/${interfacemgr.version}</bundle>
<bundle>mvn:org.opendaylight.vpnservice/interfacemgr-api/${interfacemgr.version}</bundle>
<bundle>mvn:org.opendaylight.vpnservice/interfacemgr-impl/${interfacemgr.version}</bundle>
<configfile finalname="mdsalutil-impl-default-config.xml">mvn:org.opendaylight.vpnservice/mdsalutil-impl/${interfacemgr.version}/xml/config</configfile>
<configfile finalname="interfacemgr-impl-default-config.xml">mvn:org.opendaylight.vpnservice/interfacemgr-impl/${interfacemgr.version}/xml/config</configfile>
<configfile finalname="arputil-impl-default-config.xml">mvn:org.opendaylight.vpnservice/arputil-impl/${arputil.version}/xml/config</configfile>
+ <configfile finalname="alivenessmonitor-impl-default-config.xml">mvn:org.opendaylight.vpnservice/alivenessmonitor-impl/${vpnservices.version}/xml/config</configfile>
<configfile finalname="vpnmanager-impl-default-config.xml">mvn:org.opendaylight.vpnservice/vpnmanager-impl/${vpnmanager.version}/xml/config</configfile>
<configfile finalname="nexthopmgr-impl-default-config.xml">mvn:org.opendaylight.vpnservice/nexthopmgr-impl/${nexthopmgr.version}/xml/config</configfile>
<configfile finalname="fibmanager-impl-default-config.xml">mvn:org.opendaylight.vpnservice/fibmanager-impl/${fibmanager.version}/xml/config</configfile>
}
@Override
- public Long getNextHopId() {
+ public String getNextHopIp() {
return null;
}
<module>arputil</module>
<module>vpnmanager</module>
<module>interfacemgr</module>
+ <module>alivenessmonitor</module>
<module>nexthopmgr</module>
<module>fibmanager</module>
<module>bgpmanager</module>
grouping adjacency-list{
list adjacency{
key "ip_address";
- leaf nextHopId { type uint32;}
+ leaf nextHopIp { type string; }
leaf ip_address {type string;}
- leaf label {type uint32;} /* optional */
+ leaf label { type uint32; config "false"; } /* optional */
leaf mac_address {type string;} /* optional */
}
}
-
- grouping vpn-route-list{
- leaf-list route-entry-id{
- type uint32;
- }
- }
+
+ grouping vpn-route-list{
+ leaf-list route-entry-id{
+ type uint32;
+ }
+ }
augment "/l3vpn:vpn-interfaces/l3vpn:vpn-interface" {
ext:augment-identifier "adjacencies";
augment "/l3vpn:vpn-instances/l3vpn:vpn-instance" {
leaf vpn-id { type uint32;}
- uses vpn-route-list;
+ uses vpn-route-list;
+ }
+
+ /* Operational DS containers for reverse lookups*/
+ container prefix-to-interface {
+ config false;
+ list vpn-ids {
+ key vpn-id;
+ leaf vpn-id {type uint32;}
+ list prefixes {
+ key ip_address;
+ leaf ip_address {type string;}
+ leaf dpnId {
+ type uint64;
+ }
+ leaf vpn-interface-name {
+ type string;
+ }
+ }
+ }
+ }
+
+ /* Data models to adhere to restart requirements */
+ container vpn-instance-to-vpn-id {
+ list vpn-instance {
+ key vpn-instance-name;
+ leaf vpn-instance-name {
+ type string;
+ }
+ leaf vpn-id {
+ type uint32;
+ }
+ leaf vrf-id {
+ description
+ "The vrf-id command configures a route distinguisher (RD)
+ for the IPv4 or IPv6 address family of a VPN instance or
+ vpn instance name for internal vpn case.";
+ type string;
+ }
+ }
+ }
+
+ container vpn-instance-op-data {
+ config false;
+ list vpn-instance-op-data-entry {
+ key vrf-id;
+ leaf vpn-id { type uint32;}
+ leaf vrf-id {
+ description
+ "The vrf-id command configures a route distinguisher (RD)
+ for the IPv4 or IPv6 address family of a VPN instance or
+ vpn instance name for internal vpn case.";
+ type string;
+ }
+
+ uses vpn-route-list;
+ list vpn-to-dpn-list {
+ key dpnId;
+ leaf dpnId {
+ type uint64;
+ }
+ list vpn-interfaces {
+ key interface-name;
+ leaf interface-name {
+ type string;
+ }
+ }
+ }
+ }
}
}
\ No newline at end of file