Port alivenessmonitor to mdsal 3 39/77339/10
authorStephen Kitt <skitt@redhat.com>
Tue, 30 Oct 2018 08:26:42 +0000 (09:26 +0100)
committerFaseela K <faseela.k@ericsson.com>
Fri, 11 Jan 2019 09:56:24 +0000 (09:56 +0000)
Change-Id: I936f986dabbac2d7c917d1086dcf645ff2067984
Signed-off-by: Stephen Kitt <skitt@redhat.com>
alivenessmonitor/alivenessmonitor-impl-protocols/src/main/java/org/opendaylight/genius/alivenessmonitor/protocols/internal/AbstractAlivenessProtocolHandler.java
alivenessmonitor/alivenessmonitor-impl-protocols/src/main/java/org/opendaylight/genius/alivenessmonitor/protocols/internal/AlivenessProtocolHandlerARP.java
alivenessmonitor/alivenessmonitor-impl-protocols/src/main/java/org/opendaylight/genius/alivenessmonitor/protocols/internal/AlivenessProtocolHandlerIPv6ND.java
alivenessmonitor/alivenessmonitor-impl-protocols/src/main/java/org/opendaylight/genius/alivenessmonitor/protocols/internal/AlivenessProtocolHandlerLLDP.java
alivenessmonitor/alivenessmonitor-impl-protocols/src/test/java/org/opendaylight/genius/alivenessmonitor/protocols/test/AlivenessMonitorTest.java [moved from alivenessmonitor/alivenessmonitor-impl-protocols/src/test/java/org/opendaylight/controller/alivenessmonitor/protocols/test/AlivenessMonitorTest.java with 88% similarity]
alivenessmonitor/alivenessmonitor-impl/pom.xml
alivenessmonitor/alivenessmonitor-impl/src/main/java/org/opendaylight/genius/alivenessmonitor/internal/AlivenessMonitor.java
alivenessmonitor/alivenessmonitor-impl/src/main/java/org/opendaylight/genius/alivenessmonitor/internal/HwVtepTunnelsStateHandler.java

index e6d6982d8dd37b2babb56cb3d09482ca65363a98..169ae06eac1261ec2cafea9c172cfc68cf8f6320 100644 (file)
@@ -8,15 +8,18 @@
 
 package org.opendaylight.genius.alivenessmonitor.protocols.internal;
 
+import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
 import com.google.common.primitives.UnsignedBytes;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import java.util.concurrent.ExecutionException;
 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandler;
 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
-import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.InterfacesState;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.state.Interface;
@@ -27,13 +30,13 @@ abstract class AbstractAlivenessProtocolHandler<P extends Packet> implements Ali
 
     // private static final Logger LOG = LoggerFactory.getLogger(AbstractAlivenessProtocolHandler.class);
 
-    private final SingleTransactionDataBroker singleTxDataBroker;
+    private final ManagedNewTransactionRunner txRunner;
 
     AbstractAlivenessProtocolHandler(
             final DataBroker dataBroker,
             final AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry,
             final MonitorProtocolType protocolType) {
-        this.singleTxDataBroker = new SingleTransactionDataBroker(dataBroker);
+        this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
         alivenessProtocolHandlerRegistry.register(protocolType, this);
     }
 
@@ -50,7 +53,12 @@ abstract class AbstractAlivenessProtocolHandler<P extends Packet> implements Ali
         InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces
             .state.Interface> id = idBuilder.build();
 
-        return singleTxDataBroker.syncRead(LogicalDatastoreType.OPERATIONAL, id);
+        try {
+            return txRunner.applyWithNewReadWriteTransactionAndSubmit(OPERATIONAL,
+                input -> input.read(id).get().orElse(null)).get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new ReadFailedException("Error", e);
+        }
     }
     // @formatter:on
 
index 50d3de1c94058aa4a3688f9d59c5b18cf48d8d26..cd7cf4f477927ec15bf3ad5decbf7d8a235b0400 100644 (file)
@@ -25,12 +25,12 @@ import javax.inject.Inject;
 import javax.inject.Singleton;
 
 import org.apache.aries.blueprint.annotation.service.Reference;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
 import org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil;
 import org.opendaylight.genius.mdsalutil.MetaDataUtil;
 import org.opendaylight.genius.mdsalutil.NWUtil;
 import org.opendaylight.genius.mdsalutil.packet.ARP;
+import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddressBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.PhysAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProtocolType;
index 6aa075b6d58c594232d23624853166e2555e8535..5443502f3963bc6a052228dffdb3207b9aa20b61 100644 (file)
@@ -26,13 +26,13 @@ import javax.inject.Singleton;
 
 import org.apache.aries.blueprint.annotation.service.Reference;
 import org.apache.commons.lang3.StringUtils;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
 import org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil;
 import org.opendaylight.genius.ipv6util.api.Ipv6Util;
 import org.opendaylight.genius.ipv6util.api.decoders.Ipv6NaDecoder;
 import org.opendaylight.genius.mdsalutil.MetaDataUtil;
 import org.opendaylight.genius.mdsalutil.packet.utils.PacketUtil;
+import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.openflowplugin.libraries.liblldp.BufferException;
 import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv6Address;
index d18c5de86f3f4b36cf474b5e3a9ebff3248bf534..3331d0261d777ff25257218011113df1b7c065a9 100644 (file)
@@ -25,8 +25,6 @@ import javax.inject.Singleton;
 
 import org.apache.aries.blueprint.annotation.service.Reference;
 import org.apache.commons.lang3.StringUtils;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
 import org.opendaylight.genius.interfacemanager.globals.IfmConstants;
 import org.opendaylight.genius.mdsalutil.ActionInfo;
@@ -34,6 +32,8 @@ import org.opendaylight.genius.mdsalutil.MDSALUtil;
 import org.opendaylight.genius.mdsalutil.actions.ActionOutput;
 import org.opendaylight.genius.mdsalutil.actions.ActionSetFieldTunnelId;
 import org.opendaylight.genius.mdsalutil.packet.Ethernet;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.openflowplugin.libraries.liblldp.EtherTypes;
 import org.opendaylight.openflowplugin.libraries.liblldp.LLDP;
 import org.opendaylight.openflowplugin.libraries.liblldp.LLDPTLV;
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.alivenessmonitor.protocols.test;
+package org.opendaylight.genius.alivenessmonitor.protocols.test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -16,15 +16,14 @@ import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.Futures;
 import java.util.Arrays;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.Description;
@@ -38,20 +37,20 @@ import org.mockito.Captor;
 import org.mockito.Matchers;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.genius.alivenessmonitor.internal.AlivenessMonitor;
 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandler;
 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
 import org.opendaylight.genius.alivenessmonitor.protocols.impl.AlivenessProtocolHandlerRegistryImpl;
 import org.opendaylight.genius.alivenessmonitor.protocols.internal.AlivenessProtocolHandlerARP;
 import org.opendaylight.genius.alivenessmonitor.protocols.internal.AlivenessProtocolHandlerLLDP;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddressBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseInputBuilder;
@@ -99,6 +98,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.OdlInterfaceRpcService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -127,7 +127,7 @@ public class AlivenessMonitorTest {
     private AlivenessProtocolHandler lldpHandler;
     private long mockId;
     @Mock
-    private ReadOnlyTransaction readTx;
+    private ReadTransaction readTx;
     @Mock
     private WriteTransaction writeTx;
     @Mock
@@ -194,9 +194,8 @@ public class AlivenessMonitorTest {
         doReturn(readWriteTx).when(dataBroker).newReadWriteTransaction();
         doNothing().when(writeTx).put(eq(LogicalDatastoreType.OPERATIONAL),
                 any(InstanceIdentifier.class), any(DataObject.class));
-        doReturn(Futures.immediateCheckedFuture(null)).when(writeTx).submit();
-        doReturn(Futures.immediateCheckedFuture(null)).when(readWriteTx)
-                .submit();
+        doReturn(Futures.immediateCheckedFuture(null)).when(writeTx).commit();
+        doReturn(Futures.immediateCheckedFuture(null)).when(readWriteTx).commit();
     }
 
     @After
@@ -212,11 +211,10 @@ public class AlivenessMonitorTest {
                         .setMonitorInterval(10000L).setMonitorWindow(10L)
                         .setProtocolType(MonitorProtocolType.Arp).build())
                 .build();
-        doReturn(Futures.immediateCheckedFuture(Optional.absent()))
+        doReturn(Futures.immediateCheckedFuture(Optional.empty()))
                 .when(readWriteTx).read(eq(LogicalDatastoreType.OPERATIONAL),
                         argThat(isType(MonitorProfile.class)));
-        doReturn(Futures.immediateCheckedFuture(null)).when(readWriteTx)
-                .submit();
+        doReturn(Futures.immediateCheckedFuture(null)).when(readWriteTx).commit();
         RpcResult<MonitorProfileCreateOutput> output = alivenessMonitor
                 .monitorProfileCreate(input).get();
         assertTrue("Monitor Profile Create result", output.isSuccessful());
@@ -232,12 +230,7 @@ public class AlivenessMonitorTest {
                         .setMonitorInterval(10000L).setMonitorWindow(10L)
                         .setProtocolType(MonitorProtocolType.Arp).build())
                 .build();
-        @SuppressWarnings("unchecked")
-        Optional<MonitorProfile> optionalProfile = mock(Optional.class);
-        CheckedFuture<Optional<MonitorProfile>, ReadFailedException> proFuture = Futures
-                .immediateCheckedFuture(optionalProfile);
-        doReturn(true).when(optionalProfile).isPresent();
-        doReturn(proFuture).when(readWriteTx).read(
+        doReturn(FluentFutures.immediateFluentFuture(Optional.of(input))).when(readWriteTx).read(
                 eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitorProfile.class)));
         RpcResult<MonitorProfileCreateOutput> output = alivenessMonitor
@@ -265,14 +258,13 @@ public class AlivenessMonitorTest {
                                 .setMode(MonitoringMode.OneOne)
                                 .setProfileId(profileId).build())
                 .build();
-        Optional<MonitorProfile> optionalProfile = Optional
-                .of(getTestMonitorProfile());
-        CheckedFuture<Optional<MonitorProfile>, ReadFailedException> proFuture = Futures
-                .immediateCheckedFuture(optionalProfile);
+        Optional<MonitorProfile> optionalProfile = Optional.of(getTestMonitorProfile());
+        FluentFuture<Optional<MonitorProfile>> proFuture =
+            FluentFutures.immediateFluentFuture(optionalProfile);
         when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitorProfile.class)))).thenReturn(proFuture);
-        CheckedFuture<Optional<MonitoringInfo>, ReadFailedException> outFuture = Futures
-                .immediateCheckedFuture(Optional.<MonitoringInfo>absent());
+        FluentFuture<Optional<MonitoringInfo>> outFuture = FluentFutures
+                .immediateFluentFuture(Optional.<MonitoringInfo>empty());
         when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitoringInfo.class)))).thenReturn(outFuture);
         RpcResult<MonitorStartOutput> output = alivenessMonitor
@@ -292,24 +284,24 @@ public class AlivenessMonitorTest {
                 .of(getTestMonitorProfile());
         when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitorProfile.class))))
-                        .thenReturn(Futures.immediateCheckedFuture(optProfile));
+                        .thenReturn(FluentFutures.immediateFluentFuture(optProfile));
         Optional<MonitoringInfo> optInfo = Optional.of(
                 new MonitoringInfoBuilder().setId(2L).setProfileId(1L).build());
         when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitoringInfo.class))))
-                        .thenReturn(Futures.immediateCheckedFuture(optInfo));
+                        .thenReturn(FluentFutures.immediateFluentFuture(optInfo));
         Optional<MonitoringState> optState = Optional
                 .of(new MonitoringStateBuilder()
                         .setStatus(MonitorStatus.Started).build());
         when(readWriteTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitoringState.class))))
-                        .thenReturn(Futures.immediateCheckedFuture(optState));
+                        .thenReturn(FluentFutures.immediateFluentFuture(optState));
         Optional<MonitoridKeyEntry> optMap = Optional
                 .of(new MonitoridKeyEntryBuilder().setMonitorId(2L)
                         .setMonitorKey("Test monitor Key").build());
         when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitoridKeyEntry.class))))
-                        .thenReturn(Futures.immediateCheckedFuture(optMap));
+                        .thenReturn(FluentFutures.immediateFluentFuture(optMap));
         alivenessMonitor.monitorPause(input).get();
         verify(readWriteTx).merge(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitoringState.class)), stateCaptor.capture());
@@ -326,23 +318,23 @@ public class AlivenessMonitorTest {
                         .build());
         when(readWriteTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitoringState.class))))
-                        .thenReturn(Futures.immediateCheckedFuture(optState));
+                        .thenReturn(FluentFutures.immediateFluentFuture(optState));
         Optional<MonitoringInfo> optInfo = Optional.of(
                 new MonitoringInfoBuilder().setId(2L).setProfileId(1L).build());
         when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitoringInfo.class))))
-                        .thenReturn(Futures.immediateCheckedFuture(optInfo));
+                        .thenReturn(FluentFutures.immediateFluentFuture(optInfo));
         Optional<MonitorProfile> optProfile = Optional
                 .of(getTestMonitorProfile());
         when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitorProfile.class))))
-                        .thenReturn(Futures.immediateCheckedFuture(optProfile));
+                        .thenReturn(FluentFutures.immediateFluentFuture(optProfile));
         Optional<MonitoridKeyEntry> optMap = Optional
                 .of(new MonitoridKeyEntryBuilder().setMonitorId(2L)
                         .setMonitorKey("Test monitor Key").build());
         when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitoridKeyEntry.class))))
-                        .thenReturn(Futures.immediateCheckedFuture(optMap));
+                        .thenReturn(FluentFutures.immediateFluentFuture(optMap));
         RpcResult<MonitorUnpauseOutput> result = alivenessMonitor.monitorUnpause(input).get();
         verify(readWriteTx).merge(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitoringState.class)), stateCaptor.capture());
@@ -360,8 +352,7 @@ public class AlivenessMonitorTest {
                         .setEndpointType(
                                 getInterface("testInterface", "10.1.1.1"))
                         .build()).build());
-        CheckedFuture<Optional<MonitoringInfo>, ReadFailedException> outFuture = Futures
-                .immediateCheckedFuture(optInfo);
+        FluentFuture<Optional<MonitoringInfo>> outFuture = FluentFutures.immediateFluentFuture(optInfo);
         when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitoringInfo.class)))).thenReturn(outFuture);
         Optional<MonitoridKeyEntry> optMap = Optional
@@ -369,17 +360,17 @@ public class AlivenessMonitorTest {
                         .setMonitorKey("Test monitor Key").build());
         when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitoridKeyEntry.class))))
-                        .thenReturn(Futures.immediateCheckedFuture(optMap));
+                        .thenReturn(FluentFutures.immediateFluentFuture(optMap));
         Optional<MonitorProfile> optProfile = Optional
                 .of(getTestMonitorProfile());
         when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitorProfile.class))))
-                        .thenReturn(Futures.immediateCheckedFuture(optProfile));
+                        .thenReturn(FluentFutures.immediateFluentFuture(optProfile));
         Optional<InterfaceMonitorEntry> optEntry = Optional
                 .of(getInterfaceMonitorEntry());
         when(readWriteTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(InterfaceMonitorEntry.class))))
-                        .thenReturn(Futures.immediateCheckedFuture(optEntry));
+                        .thenReturn(FluentFutures.immediateFluentFuture(optEntry));
         RpcResult<MonitorStopOutput> result = alivenessMonitor.monitorStop(input).get();
         verify(idManager).releaseId(any(ReleaseIdInput.class));
         verify(writeTx, times(3)).delete(eq(LogicalDatastoreType.OPERATIONAL),
@@ -396,7 +387,7 @@ public class AlivenessMonitorTest {
                 .of(getTestMonitorProfile());
         when(readWriteTx.read(eq(LogicalDatastoreType.OPERATIONAL),
                 argThat(isType(MonitorProfile.class))))
-                        .thenReturn(Futures.immediateCheckedFuture(optProfile));
+                        .thenReturn(FluentFutures.immediateFluentFuture(optProfile));
         RpcResult<MonitorProfileDeleteOutput> result = alivenessMonitor.monitorProfileDelete(input)
                 .get();
         verify(idManager).releaseId(any(ReleaseIdInput.class));
@@ -413,11 +404,10 @@ public class AlivenessMonitorTest {
                         .setMonitorInterval(10000L).setMonitorWindow(10L)
                         .setProtocolType(MonitorProtocolType.Arp).build())
                 .build();
-        doReturn(Futures.immediateCheckedFuture(Optional.absent()))
+        doReturn(FluentFutures.immediateFluentFuture(Optional.empty()))
                 .when(readWriteTx).read(eq(LogicalDatastoreType.OPERATIONAL),
                         any(InstanceIdentifier.class));
-        doReturn(Futures.immediateCheckedFuture(null)).when(readWriteTx)
-                .submit();
+        doReturn(CommitInfo.emptyFluentFuture()).when(readWriteTx).commit();
         RpcResult<MonitorProfileCreateOutput> output = alivenessMonitor
                 .monitorProfileCreate(input).get();
         return output.getResult().getProfileId();
index ca20ee8ed4996a64bcb19dfccd9c327aa485c211..a7d6f2c8767f5aac1f8cd608e8db6d07c6aec15b 100644 (file)
@@ -58,7 +58,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
     </dependency>
     <dependency>
       <groupId>org.opendaylight.serviceutils</groupId>
-      <artifactId>tools-api</artifactId>
+      <artifactId>listener-api</artifactId>
     </dependency>
     <dependency>
       <groupId>org.opendaylight.genius</groupId>
@@ -77,6 +77,10 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
       <groupId>org.opendaylight.controller.model</groupId>
       <artifactId>model-inventory</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.mdsal</groupId>
+      <artifactId>mdsal-binding-util</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.mdsal.binding.model.ietf</groupId>
       <artifactId>rfc7223</artifactId>
@@ -91,10 +95,6 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
       <artifactId>ipv6util-api</artifactId>
       <version>${project.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>sal-binding-broker-impl</artifactId>
-    </dependency>
     <dependency>
       <groupId>javax.inject</groupId>
       <artifactId>javax.inject</artifactId>
index 514a849ec684b7532826147be41dcb477730a945..bbedd30c06a77e484ec7eba64abf5944f45280d6 100644 (file)
@@ -12,15 +12,15 @@ import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUti
 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorProfileId;
 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorStateId;
 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitoringInfoId;
-import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
+import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -44,23 +45,24 @@ import javax.inject.Inject;
 import javax.inject.Singleton;
 
 import org.apache.aries.blueprint.annotation.service.Reference;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandler;
 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
 import org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil;
-import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
-import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
-import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.mdsalutil.packet.Ethernet;
 import org.opendaylight.genius.mdsalutil.packet.utils.PacketUtil;
 import org.opendaylight.infrautils.utils.concurrent.Executors;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.util.Datastore.Operational;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
 import org.opendaylight.openflowplugin.libraries.liblldp.PacketException;
-import org.opendaylight.serviceutils.tools.mdsal.listener.AbstractClusteredSyncDataTreeChangeListener;
+import org.opendaylight.serviceutils.tools.listener.AbstractClusteredSyncDataTreeChangeListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.AlivenessMonitorService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.LivenessState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEvent;
@@ -199,9 +201,15 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
         monitorIdKeyCache = CacheBuilder.newBuilder().build(new CacheLoader<Long, String>() {
             @Override
             public String load(@Nonnull Long monitorId) {
-                return SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
-                        dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId))
-                        .toJavaUtil().map(MonitoridKeyEntry::getMonitorKey).orElse(null);
+                try {
+                    return txRunner.<Operational, ExecutionException, Optional<MonitoridKeyEntry>>
+                        applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+                            tx -> tx.read(getMonitorMapId(monitorId)).get()).map(MonitoridKeyEntry::getMonitorKey)
+                        .orElse(null);
+                } catch (InterruptedException | ExecutionException e) {
+                    LOG.error("Error reading monitor {}", monitorId, e);
+                    return null;
+                }
             }
         });
 
@@ -393,13 +401,13 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
                     LOG.trace("Monitoring State not available for key: {} to process the Packet received",
                             monitorKey);
                 }
-                return Optional.<Result>absent();
+                return Optional.<Result>empty();
             }
         }).addCallback(new FutureCallback<Optional<Result>>() {
             @Override
             public void onSuccess(Optional<Result> optResult) {
                 releaseLock(lock);
-                optResult.toJavaUtil().ifPresent(result -> {
+                optResult.ifPresent(result -> {
                     final boolean stateChanged = result.currentState.getState() == LivenessState.Down
                             || result.currentState.getState() == LivenessState.Unknown;
                     if (stateChanged) {
@@ -453,8 +461,9 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
             }
 
             Optional<MonitorProfile> optProfile =
-                    SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
-                            dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
+                txRunner.<Operational, ExecutionException, Optional<MonitorProfile>>
+                    applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+                        tx -> tx.read(getMonitorProfileId(profileId)).get());
             final MonitorProfile profile;
             if (!optProfile.isPresent()) {
                 String errMsg = String.format("No monitoring profile associated with Id: %d", profileId);
@@ -491,8 +500,9 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
             String idKey = getUniqueKey(interfaceName, protocolType.toString(), srcEndpointType, destEndpointType);
             final long monitorId = getUniqueId(idKey);
             Optional<MonitoringInfo> optKey =
-                    SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
-                            dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
+                txRunner.<Operational, ExecutionException, Optional<MonitoringInfo>>
+                    applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+                        tx -> tx.read(getMonitoringInfoId(monitorId)).get());
             final AlivenessProtocolHandler<?> handler;
             if (optKey.isPresent()) {
                 String message = String.format(
@@ -531,7 +541,7 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
                             .setMonitorKey(monitoringKey).build();
                     operTx.put(getMonitorMapId(monitorId), mapEntry, CREATE_MISSING_PARENT);
                     LOG.debug("adding oper map entry {}", mapEntry);
-                }).addCallback(new FutureCallback<Void>() {
+                }).addCallback(new FutureCallback<Object>() {
                     @Override
                     public void onFailure(Throwable error) {
                         String errorMsg = String.format("Adding Monitoring info: %s in Datastore failed",
@@ -541,7 +551,7 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
                     }
 
                     @Override
-                    public void onSuccess(Void ignored) {
+                    public void onSuccess(Object ignored) {
                         lockMap.put(monitoringKey, new Semaphore(1, true));
                         if (protocolType == MonitorProtocolType.Bfd) {
                             handler.startMonitoringTask(monitoringInfo);
@@ -559,7 +569,7 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
             MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
 
             rpcResultBuilder = RpcResultBuilder.success(output);
-        } catch (UnsupportedConfigException e) {
+        } catch (UnsupportedConfigException | ExecutionException | InterruptedException e) {
             LOG.error("Start Monitoring Failed. ", e);
             rpcResultBuilder = RpcResultBuilder.<MonitorStartOutput>failed().withError(ErrorType.APPLICATION,
                     e.getMessage(), e);
@@ -570,9 +580,9 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
     private void associateMonitorIdWithInterface(final Long monitorId, final String interfaceName) {
         LOG.debug("associate monitor Id {} with interface {}", monitorId, interfaceName);
         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
-        ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
+        FluentFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
                 getInterfaceMonitorMapId(interfaceName));
-        ListenableFuture<Void> updateFuture = Futures.transformAsync(readFuture, optEntry -> {
+        FluentFuture<? extends CommitInfo> updateFuture = readFuture.transformAsync(optEntry -> {
             if (optEntry.isPresent()) {
                 InterfaceMonitorEntry entry = optEntry.get();
                 List<Long> monitorIds1 =
@@ -592,7 +602,7 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
                 tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry2,
                             CREATE_MISSING_PARENT);
             }
-            return tx.submit();
+            return tx.commit();
         }, callbackExecutorService);
 
         Futures.addCallback(updateFuture, new FutureCallbackImpl(
@@ -635,11 +645,11 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
         final SettableFuture<RpcResult<MonitorUnpauseOutput>> result = SettableFuture.create();
 
         final Long monitorId = input.getMonitorId();
-        final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
-        ListenableFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
+        final ReadTransaction tx = dataBroker.newReadOnlyTransaction();
+        FluentFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
                 getMonitoringInfoId(monitorId));
 
-        Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
+        readInfoResult.addCallback(new FutureCallback<Optional<MonitoringInfo>>() {
 
             @Override
             public void onFailure(Throwable error) {
@@ -654,9 +664,9 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
             public void onSuccess(@Nonnull Optional<MonitoringInfo> optInfo) {
                 if (optInfo.isPresent()) {
                     final MonitoringInfo info = optInfo.get();
-                    ListenableFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
+                    FluentFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
                             getMonitorProfileId(info.getProfileId()));
-                    Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>() {
+                    readProfile.addCallback(new FutureCallback<Optional<MonitorProfile>>() {
 
                         @Override
                         public void onFailure(Throwable error) {
@@ -714,17 +724,27 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
     }
 
     private boolean stopMonitoringTask(Long monitorId, boolean interruptTask) {
-        Optional<MonitoringInfo> optInfo =
-                SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
-                        LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
+        Optional<MonitoringInfo> optInfo;
+        try {
+            optInfo = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+                tx -> tx.read(getMonitoringInfoId(monitorId))).get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Error reading monitor {}", monitorId, e);
+            optInfo = Optional.empty();
+        }
         if (!optInfo.isPresent()) {
             LOG.warn("There is no monitoring info present for monitor id {}", monitorId);
             return false;
         }
         MonitoringInfo monitoringInfo = optInfo.get();
-        Optional<MonitorProfile> optProfile =
-                SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
-                        LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(monitoringInfo.getProfileId()));
+        Optional<MonitorProfile> optProfile;
+        try {
+            optProfile = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+                tx -> tx.read(getMonitorProfileId(monitoringInfo.getProfileId()))).get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Error reading monitor profile for {}", monitorId, e);
+            optProfile = Optional.empty();
+        }
         MonitorProtocolType protocolType = optProfile.get().getProtocolType();
         if (protocolType == MonitorProtocolType.Bfd) {
             LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
@@ -741,8 +761,13 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
     }
 
     Optional<MonitorProfile> getMonitorProfile(Long profileId) {
-        return SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
-                dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
+        try {
+            return txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+                tx -> tx.read(getMonitorProfileId(profileId))).get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Error reading monitor profile for {}", profileId, e);
+            return Optional.empty();
+        }
     }
 
     void acquireLock(Semaphore lock) {
@@ -803,9 +828,9 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
         acquireLock(lock);
 
         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
-        ListenableFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
+        FluentFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
                 getMonitorStateId(monitorKey));
-        ListenableFuture<Void> writeResult = Futures.transformAsync(readResult, optState -> {
+        FluentFuture<? extends CommitInfo> writeResult = readResult.transformAsync(optState -> {
             if (optState.isPresent()) {
                 MonitoringState state = optState.get();
 
@@ -843,19 +868,19 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
                             .setResponsePendingCount(responsePendingCount).setState(currentLivenessState).build();
                 tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(state.getMonitorKey()),
                             updatedState);
-                return tx.submit();
+                return tx.commit();
             } else {
                 // Close the transaction
-                tx.submit();
+                tx.commit();
                 String errorMsg = String.format(
                         "Monitoring State associated with id %d is not present to send packet out.", monitorId);
                 return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
             }
         }, callbackExecutorService);
 
-        Futures.addCallback(writeResult, new FutureCallback<Void>() {
+        writeResult.addCallback(new FutureCallback<CommitInfo>() {
             @Override
-            public void onSuccess(Void noarg) {
+            public void onSuccess(CommitInfo noarg) {
                 // invoke packetout on protocol handler
                 AlivenessProtocolHandler<?> handler =
                         alivenessProtocolHandlerRegistry.getOpt(profile.getProtocolType());
@@ -907,9 +932,9 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
         final Long profileId = (long) getUniqueId(idKey);
 
         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
-        ListenableFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
+        FluentFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
                 getMonitorProfileId(profileId));
-        ListenableFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture = Futures.transformAsync(readFuture,
+        FluentFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture = readFuture.transformAsync(
             optProfile -> {
                 if (optProfile.isPresent()) {
                     tx.cancel();
@@ -925,7 +950,7 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
                                 .setMonitorWindow(monitorWindow).setProtocolType(protocolType).build();
                     tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId), monitorProfile,
                           CREATE_MISSING_PARENT);
-                    Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
+                    tx.commit().addCallback(new FutureCallback<CommitInfo>() {
                             @Override
                             public void onFailure(Throwable error) {
                                 String msg = String.format("Error when storing monitorprofile %s in datastore",
@@ -936,7 +961,7 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
                             }
 
                             @Override
-                            public void onSuccess(Void noarg) {
+                            public void onSuccess(CommitInfo noarg) {
                                 MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
                                         .setProfileId(profileId).build();
                                 returnFuture.set(RpcResultBuilder.success(output).build());
@@ -945,7 +970,7 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
                 }
                 return returnFuture;
             }, callbackExecutorService);
-        Futures.addCallback(resultFuture, new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
+        resultFuture.addCallback(new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
             @Override
             public void onFailure(Throwable error) {
                 // This would happen when any error happens during reading for
@@ -1004,13 +1029,13 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
         final SettableFuture<RpcResult<MonitorProfileDeleteOutput>> result = SettableFuture.create();
         final Long profileId = input.getProfileId();
         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
-        ListenableFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
+        FluentFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
                 getMonitorProfileId(profileId));
-        ListenableFuture<RpcResult<MonitorProfileDeleteOutput>> writeFuture =
-                Futures.transformAsync(readFuture, optProfile -> {
+        FluentFuture<RpcResult<MonitorProfileDeleteOutput>> writeFuture =
+                readFuture.transformAsync(optProfile -> {
                     if (optProfile.isPresent()) {
                         tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
-                        Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
+                        tx.commit().addCallback(new FutureCallback<CommitInfo>() {
                             @Override
                             public void onFailure(Throwable error) {
                                 String msg = String.format("Error when removing monitor profile %d from datastore",
@@ -1022,7 +1047,7 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
                             }
 
                             @Override
-                            public void onSuccess(Void noarg) {
+                            public void onSuccess(CommitInfo noarg) {
                                 MonitorProfile profile = optProfile.get();
                                 String id = getUniqueProfileKey(profile.getFailureThreshold(),
                                         profile.getMonitorInterval(), profile.getMonitorWindow(),
@@ -1040,7 +1065,7 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
                     return result;
                 }, callbackExecutorService);
 
-        Futures.addCallback(writeFuture, new FutureCallback<RpcResult<MonitorProfileDeleteOutput>>() {
+        writeFuture.addCallback(new FutureCallback<RpcResult<MonitorProfileDeleteOutput>>() {
 
             @Override
             public void onFailure(Throwable error) {
@@ -1064,9 +1089,14 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
         SettableFuture<RpcResult<MonitorStopOutput>> result = SettableFuture.create();
 
         final Long monitorId = input.getMonitorId();
-        Optional<MonitoringInfo> optInfo =
-                SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
-                        LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
+        Optional<MonitoringInfo> optInfo;
+        try {
+            optInfo = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+                tx -> tx.read(getMonitoringInfoId(monitorId))).get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Error reading monitor {}", monitorId, e);
+            optInfo = Optional.empty();
+        }
         if (optInfo.isPresent()) {
             // Stop the monitoring task
             stopMonitoringTask(monitorId);
@@ -1111,9 +1141,9 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
     private void removeMonitorIdFromInterfaceAssociation(final Long monitorId, final String interfaceName) {
         LOG.debug("Remove monitorId {} from Interface association {}", monitorId, interfaceName);
         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
-        ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
+        FluentFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
                 getInterfaceMonitorMapId(interfaceName));
-        ListenableFuture<Void> updateFuture = Futures.transformAsync(readFuture, optEntry -> {
+        FluentFuture<? extends CommitInfo> updateFuture = readFuture.transformAsync(optEntry -> {
             if (optEntry.isPresent()) {
                 InterfaceMonitorEntry entry = optEntry.get();
                 List<Long> monitorIds =
@@ -1127,7 +1157,7 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
                     tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry,
                             CREATE_MISSING_PARENT);
                 }
-                return tx.submit();
+                return tx.commit();
             } else {
                 LOG.warn("No Interface map entry found {} to remove monitorId {}", interfaceName, monitorId);
                 tx.cancel();
@@ -1135,7 +1165,7 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
             }
         }, MoreExecutors.directExecutor());
 
-        Futures.addCallback(updateFuture, new FutureCallbackImpl(
+        updateFuture.addCallback(new FutureCallbackImpl(
                 String.format("Dis-association of monitorId %d with Interface %s", monitorId, interfaceName)),
                 MoreExecutors.directExecutor());
     }
@@ -1145,9 +1175,14 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
         EndpointType source = info.getSource().getEndpointType();
         String interfaceName = getInterfaceName(source);
         if (!Strings.isNullOrEmpty(interfaceName)) {
-            Optional<MonitorProfile> optProfile =
-                    SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
-                            LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
+            Optional<MonitorProfile> optProfile;
+            try {
+                optProfile = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+                    tx -> tx.read(getMonitorProfileId(info.getProfileId()))).get();
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.error("Error reading monitor profile for {}", info.getProfileId(), e);
+                optProfile = Optional.empty();
+            }
             if (optProfile.isPresent()) {
                 MonitorProtocolType protocolType = optProfile.get().getProtocolType();
                 EndpointType destination = info.getDestination() != null ? info.getDestination().getEndpointType()
@@ -1185,10 +1220,10 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
         }
         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
 
-        ListenableFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
+        FluentFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
                 getMonitorStateId(monitorKey));
 
-        ListenableFuture<Void> writeResult = Futures.transformAsync(readResult, optState -> {
+        FluentFuture<? extends CommitInfo> writeResult = readResult.transformAsync(optState -> {
             if (optState.isPresent()) {
                 MonitoringState state = optState.get();
                 if (isValidStatus.apply(state.getStatus())) {
@@ -1203,20 +1238,20 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
                 LOG.warn("No associated monitoring state data available to update the status to {} for {}",
                            newStatus, monitorId);
             }
-            return tx.submit();
+            return tx.commit();
         }, MoreExecutors.directExecutor());
 
-        Futures.addCallback(writeResult, new FutureCallbackImpl(
+        writeResult.addCallback(new FutureCallbackImpl(
                 String.format("Monitor status update for %d to %s", monitorId, newStatus.toString())),
                 MoreExecutors.directExecutor());
     }
 
     private void resumeMonitoring(final long monitorId) {
-        final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
-        ListenableFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
+        final ReadTransaction tx = dataBroker.newReadOnlyTransaction();
+        FluentFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
                 getMonitoringInfoId(monitorId));
 
-        Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
+        readInfoResult.addCallback(new FutureCallback<Optional<MonitoringInfo>>() {
 
             @Override
             public void onFailure(Throwable error) {
@@ -1293,9 +1328,14 @@ public class AlivenessMonitor extends AbstractClusteredSyncDataTreeChangeListene
     }
 
     private List<Long> getMonitorIds(String interfaceName) {
-        return SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
-                dataBroker, LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName))
-                .toJavaUtil().map(InterfaceMonitorEntry::getMonitorIds).orElse(Collections.emptyList());
+        try {
+            return txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+                tx -> tx.read(getInterfaceMonitorMapId(interfaceName))).get().map(
+                InterfaceMonitorEntry::getMonitorIds).orElse(Collections.emptyList());
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Error retrieving monitor ids for {}", interfaceName, e);
+            return Collections.emptyList();
+        }
     }
 
     //handle monitor stop
index ee7e9234b938288b492e6206f5c9ddb92a30aa46..435a22636aa5e0959410b92e234db73d29471d95 100644 (file)
@@ -8,31 +8,37 @@
 package org.opendaylight.genius.alivenessmonitor.internal;
 
 import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorStateId;
+import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
+import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
 
-import com.google.common.base.Optional;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nonnull;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-
 import org.apache.aries.blueprint.annotation.service.Reference;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandler;
 import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
-import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
-import org.opendaylight.genius.mdsalutil.MDSALUtil;
+import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.util.Datastore;
+import org.opendaylight.mdsal.binding.util.InterruptibleCheckedConsumer;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
+import org.opendaylight.mdsal.binding.util.RetryingManagedNewTransactionRunner;
+import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
-import org.opendaylight.serviceutils.tools.mdsal.listener.AbstractSyncDataTreeChangeListener;
+import org.opendaylight.serviceutils.tools.listener.AbstractSyncDataTreeChangeListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.LivenessState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProtocolType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.EndpointType;
@@ -73,16 +79,17 @@ public class HwVtepTunnelsStateHandler extends AbstractSyncDataTreeChangeListene
 
     private static final Logger LOG = LoggerFactory.getLogger(HwVtepTunnelsStateHandler.class);
 
-    private final DataBroker dataBroker;
+    private final ManagedNewTransactionRunner txRunner;
     private final AlivenessMonitor alivenessMonitor;
 
     @Inject
-    public HwVtepTunnelsStateHandler(@Reference final DataBroker dataBroker, final AlivenessMonitor alivenessMonitor,
+    public HwVtepTunnelsStateHandler(@Reference final DataBroker dataBroker,
+                                     final AlivenessMonitor alivenessMonitor,
                                      final AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry) {
         super(dataBroker, LogicalDatastoreType.CONFIGURATION,
               InstanceIdentifier.create(NetworkTopology.class).child(Topology.class).child(Node.class)
                       .augmentation(PhysicalSwitchAugmentation.class).child(Tunnels.class));
-        this.dataBroker = dataBroker;
+        this.txRunner = new RetryingManagedNewTransactionRunner(dataBroker);
         this.alivenessMonitor = alivenessMonitor;
         alivenessProtocolHandlerRegistry.register(MonitorProtocolType.Bfd, this);
     }
@@ -114,66 +121,42 @@ public class HwVtepTunnelsStateHandler extends AbstractSyncDataTreeChangeListene
         LOG.debug("Acquiring lock for monitor key : {} to process monitor DCN", monitorKey);
         alivenessMonitor.acquireLock(lock);
 
-        final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
-
-        ListenableFuture<Optional<MonitoringState>> stateResult = tx.read(LogicalDatastoreType.OPERATIONAL,
-                getMonitorStateId(monitorKey));
-        Futures.addCallback(stateResult, new FutureCallback<Optional<MonitoringState>>() {
-
+        AtomicBoolean stateChanged = new AtomicBoolean();
+        AtomicReference<MonitoringState> currentState = new AtomicReference<>();
+        txRunner.callWithNewReadWriteTransactionAndSubmit(OPERATIONAL, tx -> {
+            Optional<MonitoringState> optState = tx.read(getMonitorStateId(monitorKey)).get();
+            if (optState.isPresent()) {
+                currentState.set(optState.get());
+                if (currentState.get().getState() == newTunnelOpState) {
+                    return;
+                }
+                stateChanged.set(true);
+                final MonitoringState state = new MonitoringStateBuilder().setMonitorKey(monitorKey)
+                    .setState(newTunnelOpState).build();
+                tx.merge(getMonitorStateId(monitorKey), state);
+            } else {
+                LOG.warn("Monitoring State not available for key: {} to process the Packet received", monitorKey);
+            }
+        }).addCallback(new FutureCallback<Object>() {
             @Override
-            public void onSuccess(@Nonnull Optional<MonitoringState> optState) {
-                if (optState.isPresent()) {
-                    final MonitoringState currentState = optState.get();
-                    if (currentState.getState() == newTunnelOpState) {
-                        return;
-                    }
-                    final boolean stateChanged = true;
-                    final MonitoringState state = new MonitoringStateBuilder().setMonitorKey(monitorKey)
-                            .setState(newTunnelOpState).build();
-                    tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), state);
-                    ListenableFuture<Void> writeResult = tx.submit();
-                    // WRITE Callback
-                    Futures.addCallback(writeResult, new FutureCallback<Void>() {
-
-                        @Override
-                        public void onSuccess(Void arg0) {
-                            alivenessMonitor.releaseLock(lock);
-                            if (stateChanged) {
-                                // send notifications
-                                LOG.info("Sending notification for monitor Id : {} with Current State: {}",
-                                        currentState.getMonitorId(), newTunnelOpState);
-                                alivenessMonitor.publishNotification(currentState.getMonitorId(), newTunnelOpState);
-                            } else {
-                                if (LOG.isTraceEnabled()) {
-                                    LOG.trace("Successful in writing monitoring state {} to ODS", state);
-                                }
-                            }
-                        }
-
-                        @Override
-                        public void onFailure(@Nonnull Throwable error) {
-                            alivenessMonitor.releaseLock(lock);
-                            LOG.warn("Error in writing monitoring state : {} to Datastore", monitorKey, error);
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Error in writing monitoring state: {} to Datastore", state);
-                            }
-                        }
-                    }, MoreExecutors.directExecutor());
+            public void onSuccess(@Nullable Object result) {
+                alivenessMonitor.releaseLock(lock);
+                if (stateChanged.get()) {
+                    // send notifications
+                    LOG.info("Sending notification for monitor Id : {} with Current State: {}",
+                        currentState.get().getMonitorId(), newTunnelOpState);
+                    alivenessMonitor.publishNotification(currentState.get().getMonitorId(), newTunnelOpState);
                 } else {
-                    LOG.warn("Monitoring State not available for key: {} to process the Packet received", monitorKey);
-                    // Complete the transaction
-                    tx.submit();
-                    alivenessMonitor.releaseLock(lock);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Successful in writing monitoring state {} to ODS", newTunnelOpState);
+                    }
                 }
             }
 
             @Override
             public void onFailure(@Nonnull Throwable error) {
-                LOG.error("Error when reading Monitoring State for key: {} to process the Packet received", monitorKey,
-                        error);
-                // FIXME: Not sure if the transaction status is valid to cancel
-                tx.cancel();
                 alivenessMonitor.releaseLock(lock);
+                LOG.warn("Error in writing monitoring state for {} to Datastore", monitorKey, error);
             }
         }, MoreExecutors.directExecutor());
     }
@@ -213,44 +196,52 @@ public class HwVtepTunnelsStateHandler extends AbstractSyncDataTreeChangeListene
         return null;
     }
 
-    // tunnelKey, nodeId, topologyId are initialized to null and immediately passed to getTunnelIdentifier which
-    // FindBugs as a "Load of known null value" violation. Not sure sure what the intent...
-    @SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE")
     void resetMonitoringTask(boolean isEnable) {
         // TODO: get the corresponding hwvtep tunnel from the sourceInterface
         // once InterfaceMgr implements renderer for HWVTEP VXLAN tunnels
 
-        // tunnelKey, nodeId, topologyId are initialized to null and immediately passed to getTunnelIdentifier which
-        // FindBugs flags as a "Load of known null value" violation. Not sure sure what the intent...
-        TunnelsKey tunnelKey = null;
-        String nodeId = null;
-        String topologyId = null;
-        Optional<Tunnels> tunnelsOptional =
-                SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
-                        LogicalDatastoreType.CONFIGURATION, getTunnelIdentifier(topologyId, nodeId, tunnelKey));
-        if (!tunnelsOptional.isPresent()) {
-            LOG.warn("Tunnel {} is not present on the Node {}. So not disabling the BFD monitoring", tunnelKey, nodeId);
-            return;
-        }
-        Tunnels tunnel = tunnelsOptional.get();
-        List<BfdParams> tunnelBfdParams = tunnel.getBfdParams();
-        if (tunnelBfdParams == null || tunnelBfdParams.isEmpty()) {
-            LOG.debug("there is no bfd params available for the tunnel {}", tunnel);
-            return;
-        }
+        LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
+            new InterruptibleCheckedConsumer<TypedReadWriteTransaction<Datastore.Configuration>, ExecutionException>() {
+                @Override
+                // tunnelKey, nodeId, topologyId are initialized to null and immediately passed to
+                // getTunnelIdentifier which FindBugs as a "Load of known null value" violation. Not sure sure what
+                // the intent...
+                @SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE")
+                public void accept(TypedReadWriteTransaction<Datastore.Configuration> tx)
+                    throws ExecutionException, InterruptedException {
+                    TunnelsKey tunnelKey = null;
+                    String nodeId = null;
+                    String topologyId = null;
+                    Optional<Tunnels> tunnelsOptional =
+                        tx.read(getTunnelIdentifier(topologyId, nodeId, tunnelKey)).get();
+                    if (!tunnelsOptional.isPresent()) {
+                        LOG.warn("Tunnel {} is not present on the Node {}. So not disabling the BFD monitoring",
+                            tunnelKey,
+                            nodeId);
+                        return;
+                    }
+                    Tunnels tunnel = tunnelsOptional.get();
+                    List<BfdParams> tunnelBfdParams = tunnel.getBfdParams();
+                    if (tunnelBfdParams == null || tunnelBfdParams.isEmpty()) {
+                        LOG.debug("there is no bfd params available for the tunnel {}", tunnel);
+                        return;
+                    }
 
-        Iterator<BfdParams> tunnelBfdParamsIterator = tunnelBfdParams.iterator();
-        while (tunnelBfdParamsIterator.hasNext()) {
-            BfdParams bfdParam = tunnelBfdParamsIterator.next();
-            if (AlivenessMonitorConstants.BFD_PARAM_ENABLE.equals(bfdParam.getBfdParamKey())) {
-                tunnelBfdParamsIterator.remove();
-                break;
-            }
-        }
-        setBfdParamForEnable(tunnelBfdParams, isEnable);
-        Tunnels tunnelWithBfdReset = new TunnelsBuilder().withKey(tunnelKey).setBfdParams(tunnelBfdParams).build();
-        MDSALUtil.syncUpdate(dataBroker, LogicalDatastoreType.CONFIGURATION,
-                getTunnelIdentifier(topologyId, nodeId, tunnelKey), tunnelWithBfdReset);
+                    Iterator<BfdParams> tunnelBfdParamsIterator = tunnelBfdParams.iterator();
+                    while (tunnelBfdParamsIterator.hasNext()) {
+                        BfdParams bfdParam = tunnelBfdParamsIterator.next();
+                        if (AlivenessMonitorConstants.BFD_PARAM_ENABLE.equals(bfdParam.getBfdParamKey())) {
+                            tunnelBfdParamsIterator.remove();
+                            break;
+                        }
+                    }
+                    HwVtepTunnelsStateHandler.this.setBfdParamForEnable(tunnelBfdParams, isEnable);
+                    Tunnels tunnelWithBfdReset =
+                        new TunnelsBuilder().withKey(tunnelKey).setBfdParams(tunnelBfdParams).build();
+                    tx.mergeParentStructureMerge(
+                        getTunnelIdentifier(topologyId, nodeId, tunnelKey), tunnelWithBfdReset);
+                }
+            }), LOG, "Error resetting monitoring task");
     }
 
     @Override
@@ -265,7 +256,7 @@ public class HwVtepTunnelsStateHandler extends AbstractSyncDataTreeChangeListene
         }
         MonitorProfile profile;
         long profileId = monitorInfo.getProfileId();
-        Optional<MonitorProfile> optProfile = alivenessMonitor.getMonitorProfile(profileId);
+        java.util.Optional<MonitorProfile> optProfile = alivenessMonitor.getMonitorProfile(profileId);
         if (optProfile.isPresent()) {
             profile = optProfile.get();
         } else {
@@ -299,9 +290,10 @@ public class HwVtepTunnelsStateHandler extends AbstractSyncDataTreeChangeListene
         // be done as part of tunnel add DCN handling.
         String topologyId = "";
         String nodeId = "";
-        MDSALUtil.syncUpdate(dataBroker, LogicalDatastoreType.CONFIGURATION,
+        LoggingFutures.addErrorLogging(
+            txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> tx.mergeParentStructureMerge(
                 getTunnelIdentifier(topologyId, nodeId, new TunnelsKey(/*localRef*/ null, /*remoteRef*/ null)),
-                tunnelWithBfd);
+                tunnelWithBfd)), LOG, "Error starting a monitoring task");
     }
 
     private void fillBfdRemoteConfigs(List<BfdRemoteConfigs> bfdRemoteConfigs, String tunnelRemoteMacAddress) {