package org.opendaylight.genius.alivenessmonitor.protocols.internal;
+import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
+
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.primitives.UnsignedBytes;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import java.util.concurrent.ExecutionException;
import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandler;
import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
-import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.InterfacesState;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.state.Interface;
// private static final Logger LOG = LoggerFactory.getLogger(AbstractAlivenessProtocolHandler.class);
- private final SingleTransactionDataBroker singleTxDataBroker;
+ private final ManagedNewTransactionRunner txRunner;
AbstractAlivenessProtocolHandler(
final DataBroker dataBroker,
final AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry,
final MonitorProtocolType protocolType) {
- this.singleTxDataBroker = new SingleTransactionDataBroker(dataBroker);
+ this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
alivenessProtocolHandlerRegistry.register(protocolType, this);
}
InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces
.state.Interface> id = idBuilder.build();
- return singleTxDataBroker.syncRead(LogicalDatastoreType.OPERATIONAL, id);
+ try {
+ return txRunner.applyWithNewReadWriteTransactionAndSubmit(OPERATIONAL,
+ input -> input.read(id).get().orElse(null)).get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new ReadFailedException("Error", e);
+ }
}
// @formatter:on
import javax.inject.Singleton;
import org.apache.aries.blueprint.annotation.service.Reference;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
import org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil;
import org.opendaylight.genius.mdsalutil.MetaDataUtil;
import org.opendaylight.genius.mdsalutil.NWUtil;
import org.opendaylight.genius.mdsalutil.packet.ARP;
+import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddressBuilder;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.PhysAddress;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProtocolType;
import org.apache.aries.blueprint.annotation.service.Reference;
import org.apache.commons.lang3.StringUtils;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
import org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil;
import org.opendaylight.genius.ipv6util.api.Ipv6Util;
import org.opendaylight.genius.ipv6util.api.decoders.Ipv6NaDecoder;
import org.opendaylight.genius.mdsalutil.MetaDataUtil;
import org.opendaylight.genius.mdsalutil.packet.utils.PacketUtil;
+import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.openflowplugin.libraries.liblldp.BufferException;
import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv6Address;
import org.apache.aries.blueprint.annotation.service.Reference;
import org.apache.commons.lang3.StringUtils;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
import org.opendaylight.genius.interfacemanager.globals.IfmConstants;
import org.opendaylight.genius.mdsalutil.ActionInfo;
import org.opendaylight.genius.mdsalutil.actions.ActionOutput;
import org.opendaylight.genius.mdsalutil.actions.ActionSetFieldTunnelId;
import org.opendaylight.genius.mdsalutil.packet.Ethernet;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.openflowplugin.libraries.liblldp.EtherTypes;
import org.opendaylight.openflowplugin.libraries.liblldp.LLDP;
import org.opendaylight.openflowplugin.libraries.liblldp.LLDPTLV;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.alivenessmonitor.protocols.test;
+package org.opendaylight.genius.alivenessmonitor.protocols.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
import java.util.Arrays;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.genius.alivenessmonitor.internal.AlivenessMonitor;
import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandler;
import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
import org.opendaylight.genius.alivenessmonitor.protocols.impl.AlivenessProtocolHandlerRegistryImpl;
import org.opendaylight.genius.alivenessmonitor.protocols.internal.AlivenessProtocolHandlerARP;
import org.opendaylight.genius.alivenessmonitor.protocols.internal.AlivenessProtocolHandlerLLDP;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddressBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorPauseInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.OdlInterfaceRpcService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcError;
private AlivenessProtocolHandler lldpHandler;
private long mockId;
@Mock
- private ReadOnlyTransaction readTx;
+ private ReadTransaction readTx;
@Mock
private WriteTransaction writeTx;
@Mock
doReturn(readWriteTx).when(dataBroker).newReadWriteTransaction();
doNothing().when(writeTx).put(eq(LogicalDatastoreType.OPERATIONAL),
any(InstanceIdentifier.class), any(DataObject.class));
- doReturn(Futures.immediateCheckedFuture(null)).when(writeTx).submit();
- doReturn(Futures.immediateCheckedFuture(null)).when(readWriteTx)
- .submit();
+ doReturn(Futures.immediateCheckedFuture(null)).when(writeTx).commit();
+ doReturn(Futures.immediateCheckedFuture(null)).when(readWriteTx).commit();
}
@After
.setMonitorInterval(10000L).setMonitorWindow(10L)
.setProtocolType(MonitorProtocolType.Arp).build())
.build();
- doReturn(Futures.immediateCheckedFuture(Optional.absent()))
+ doReturn(Futures.immediateCheckedFuture(Optional.empty()))
.when(readWriteTx).read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitorProfile.class)));
- doReturn(Futures.immediateCheckedFuture(null)).when(readWriteTx)
- .submit();
+ doReturn(Futures.immediateCheckedFuture(null)).when(readWriteTx).commit();
RpcResult<MonitorProfileCreateOutput> output = alivenessMonitor
.monitorProfileCreate(input).get();
assertTrue("Monitor Profile Create result", output.isSuccessful());
.setMonitorInterval(10000L).setMonitorWindow(10L)
.setProtocolType(MonitorProtocolType.Arp).build())
.build();
- @SuppressWarnings("unchecked")
- Optional<MonitorProfile> optionalProfile = mock(Optional.class);
- CheckedFuture<Optional<MonitorProfile>, ReadFailedException> proFuture = Futures
- .immediateCheckedFuture(optionalProfile);
- doReturn(true).when(optionalProfile).isPresent();
- doReturn(proFuture).when(readWriteTx).read(
+ doReturn(FluentFutures.immediateFluentFuture(Optional.of(input))).when(readWriteTx).read(
eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitorProfile.class)));
RpcResult<MonitorProfileCreateOutput> output = alivenessMonitor
.setMode(MonitoringMode.OneOne)
.setProfileId(profileId).build())
.build();
- Optional<MonitorProfile> optionalProfile = Optional
- .of(getTestMonitorProfile());
- CheckedFuture<Optional<MonitorProfile>, ReadFailedException> proFuture = Futures
- .immediateCheckedFuture(optionalProfile);
+ Optional<MonitorProfile> optionalProfile = Optional.of(getTestMonitorProfile());
+ FluentFuture<Optional<MonitorProfile>> proFuture =
+ FluentFutures.immediateFluentFuture(optionalProfile);
when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitorProfile.class)))).thenReturn(proFuture);
- CheckedFuture<Optional<MonitoringInfo>, ReadFailedException> outFuture = Futures
- .immediateCheckedFuture(Optional.<MonitoringInfo>absent());
+ FluentFuture<Optional<MonitoringInfo>> outFuture = FluentFutures
+ .immediateFluentFuture(Optional.<MonitoringInfo>empty());
when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitoringInfo.class)))).thenReturn(outFuture);
RpcResult<MonitorStartOutput> output = alivenessMonitor
.of(getTestMonitorProfile());
when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitorProfile.class))))
- .thenReturn(Futures.immediateCheckedFuture(optProfile));
+ .thenReturn(FluentFutures.immediateFluentFuture(optProfile));
Optional<MonitoringInfo> optInfo = Optional.of(
new MonitoringInfoBuilder().setId(2L).setProfileId(1L).build());
when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitoringInfo.class))))
- .thenReturn(Futures.immediateCheckedFuture(optInfo));
+ .thenReturn(FluentFutures.immediateFluentFuture(optInfo));
Optional<MonitoringState> optState = Optional
.of(new MonitoringStateBuilder()
.setStatus(MonitorStatus.Started).build());
when(readWriteTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitoringState.class))))
- .thenReturn(Futures.immediateCheckedFuture(optState));
+ .thenReturn(FluentFutures.immediateFluentFuture(optState));
Optional<MonitoridKeyEntry> optMap = Optional
.of(new MonitoridKeyEntryBuilder().setMonitorId(2L)
.setMonitorKey("Test monitor Key").build());
when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitoridKeyEntry.class))))
- .thenReturn(Futures.immediateCheckedFuture(optMap));
+ .thenReturn(FluentFutures.immediateFluentFuture(optMap));
alivenessMonitor.monitorPause(input).get();
verify(readWriteTx).merge(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitoringState.class)), stateCaptor.capture());
.build());
when(readWriteTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitoringState.class))))
- .thenReturn(Futures.immediateCheckedFuture(optState));
+ .thenReturn(FluentFutures.immediateFluentFuture(optState));
Optional<MonitoringInfo> optInfo = Optional.of(
new MonitoringInfoBuilder().setId(2L).setProfileId(1L).build());
when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitoringInfo.class))))
- .thenReturn(Futures.immediateCheckedFuture(optInfo));
+ .thenReturn(FluentFutures.immediateFluentFuture(optInfo));
Optional<MonitorProfile> optProfile = Optional
.of(getTestMonitorProfile());
when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitorProfile.class))))
- .thenReturn(Futures.immediateCheckedFuture(optProfile));
+ .thenReturn(FluentFutures.immediateFluentFuture(optProfile));
Optional<MonitoridKeyEntry> optMap = Optional
.of(new MonitoridKeyEntryBuilder().setMonitorId(2L)
.setMonitorKey("Test monitor Key").build());
when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitoridKeyEntry.class))))
- .thenReturn(Futures.immediateCheckedFuture(optMap));
+ .thenReturn(FluentFutures.immediateFluentFuture(optMap));
RpcResult<MonitorUnpauseOutput> result = alivenessMonitor.monitorUnpause(input).get();
verify(readWriteTx).merge(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitoringState.class)), stateCaptor.capture());
.setEndpointType(
getInterface("testInterface", "10.1.1.1"))
.build()).build());
- CheckedFuture<Optional<MonitoringInfo>, ReadFailedException> outFuture = Futures
- .immediateCheckedFuture(optInfo);
+ FluentFuture<Optional<MonitoringInfo>> outFuture = FluentFutures.immediateFluentFuture(optInfo);
when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitoringInfo.class)))).thenReturn(outFuture);
Optional<MonitoridKeyEntry> optMap = Optional
.setMonitorKey("Test monitor Key").build());
when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitoridKeyEntry.class))))
- .thenReturn(Futures.immediateCheckedFuture(optMap));
+ .thenReturn(FluentFutures.immediateFluentFuture(optMap));
Optional<MonitorProfile> optProfile = Optional
.of(getTestMonitorProfile());
when(readTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitorProfile.class))))
- .thenReturn(Futures.immediateCheckedFuture(optProfile));
+ .thenReturn(FluentFutures.immediateFluentFuture(optProfile));
Optional<InterfaceMonitorEntry> optEntry = Optional
.of(getInterfaceMonitorEntry());
when(readWriteTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(InterfaceMonitorEntry.class))))
- .thenReturn(Futures.immediateCheckedFuture(optEntry));
+ .thenReturn(FluentFutures.immediateFluentFuture(optEntry));
RpcResult<MonitorStopOutput> result = alivenessMonitor.monitorStop(input).get();
verify(idManager).releaseId(any(ReleaseIdInput.class));
verify(writeTx, times(3)).delete(eq(LogicalDatastoreType.OPERATIONAL),
.of(getTestMonitorProfile());
when(readWriteTx.read(eq(LogicalDatastoreType.OPERATIONAL),
argThat(isType(MonitorProfile.class))))
- .thenReturn(Futures.immediateCheckedFuture(optProfile));
+ .thenReturn(FluentFutures.immediateFluentFuture(optProfile));
RpcResult<MonitorProfileDeleteOutput> result = alivenessMonitor.monitorProfileDelete(input)
.get();
verify(idManager).releaseId(any(ReleaseIdInput.class));
.setMonitorInterval(10000L).setMonitorWindow(10L)
.setProtocolType(MonitorProtocolType.Arp).build())
.build();
- doReturn(Futures.immediateCheckedFuture(Optional.absent()))
+ doReturn(FluentFutures.immediateFluentFuture(Optional.empty()))
.when(readWriteTx).read(eq(LogicalDatastoreType.OPERATIONAL),
any(InstanceIdentifier.class));
- doReturn(Futures.immediateCheckedFuture(null)).when(readWriteTx)
- .submit();
+ doReturn(CommitInfo.emptyFluentFuture()).when(readWriteTx).commit();
RpcResult<MonitorProfileCreateOutput> output = alivenessMonitor
.monitorProfileCreate(input).get();
return output.getResult().getProfileId();
</dependency>
<dependency>
<groupId>org.opendaylight.serviceutils</groupId>
- <artifactId>tools-api</artifactId>
+ <artifactId>listener-api</artifactId>
</dependency>
<dependency>
<groupId>org.opendaylight.genius</groupId>
<groupId>org.opendaylight.controller.model</groupId>
<artifactId>model-inventory</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-binding-util</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.mdsal.binding.model.ietf</groupId>
<artifactId>rfc7223</artifactId>
<artifactId>ipv6util-api</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-binding-broker-impl</artifactId>
- </dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorProfileId;
import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorStateId;
import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitoringInfoId;
-import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
+import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import javax.inject.Singleton;
import org.apache.aries.blueprint.annotation.service.Reference;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandler;
import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
import org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil;
-import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
-import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
-import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
import org.opendaylight.genius.mdsalutil.packet.Ethernet;
import org.opendaylight.genius.mdsalutil.packet.utils.PacketUtil;
import org.opendaylight.infrautils.utils.concurrent.Executors;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.util.Datastore.Operational;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
import org.opendaylight.openflowplugin.libraries.liblldp.PacketException;
-import org.opendaylight.serviceutils.tools.mdsal.listener.AbstractClusteredSyncDataTreeChangeListener;
+import org.opendaylight.serviceutils.tools.listener.AbstractClusteredSyncDataTreeChangeListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.AlivenessMonitorService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.LivenessState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorEvent;
monitorIdKeyCache = CacheBuilder.newBuilder().build(new CacheLoader<Long, String>() {
@Override
public String load(@Nonnull Long monitorId) {
- return SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
- dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitorMapId(monitorId))
- .toJavaUtil().map(MonitoridKeyEntry::getMonitorKey).orElse(null);
+ try {
+ return txRunner.<Operational, ExecutionException, Optional<MonitoridKeyEntry>>
+ applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+ tx -> tx.read(getMonitorMapId(monitorId)).get()).map(MonitoridKeyEntry::getMonitorKey)
+ .orElse(null);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error reading monitor {}", monitorId, e);
+ return null;
+ }
}
});
LOG.trace("Monitoring State not available for key: {} to process the Packet received",
monitorKey);
}
- return Optional.<Result>absent();
+ return Optional.<Result>empty();
}
}).addCallback(new FutureCallback<Optional<Result>>() {
@Override
public void onSuccess(Optional<Result> optResult) {
releaseLock(lock);
- optResult.toJavaUtil().ifPresent(result -> {
+ optResult.ifPresent(result -> {
final boolean stateChanged = result.currentState.getState() == LivenessState.Down
|| result.currentState.getState() == LivenessState.Unknown;
if (stateChanged) {
}
Optional<MonitorProfile> optProfile =
- SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
- dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
+ txRunner.<Operational, ExecutionException, Optional<MonitorProfile>>
+ applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+ tx -> tx.read(getMonitorProfileId(profileId)).get());
final MonitorProfile profile;
if (!optProfile.isPresent()) {
String errMsg = String.format("No monitoring profile associated with Id: %d", profileId);
String idKey = getUniqueKey(interfaceName, protocolType.toString(), srcEndpointType, destEndpointType);
final long monitorId = getUniqueId(idKey);
Optional<MonitoringInfo> optKey =
- SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
- dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
+ txRunner.<Operational, ExecutionException, Optional<MonitoringInfo>>
+ applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+ tx -> tx.read(getMonitoringInfoId(monitorId)).get());
final AlivenessProtocolHandler<?> handler;
if (optKey.isPresent()) {
String message = String.format(
.setMonitorKey(monitoringKey).build();
operTx.put(getMonitorMapId(monitorId), mapEntry, CREATE_MISSING_PARENT);
LOG.debug("adding oper map entry {}", mapEntry);
- }).addCallback(new FutureCallback<Void>() {
+ }).addCallback(new FutureCallback<Object>() {
@Override
public void onFailure(Throwable error) {
String errorMsg = String.format("Adding Monitoring info: %s in Datastore failed",
}
@Override
- public void onSuccess(Void ignored) {
+ public void onSuccess(Object ignored) {
lockMap.put(monitoringKey, new Semaphore(1, true));
if (protocolType == MonitorProtocolType.Bfd) {
handler.startMonitoringTask(monitoringInfo);
MonitorStartOutput output = new MonitorStartOutputBuilder().setMonitorId(monitorId).build();
rpcResultBuilder = RpcResultBuilder.success(output);
- } catch (UnsupportedConfigException e) {
+ } catch (UnsupportedConfigException | ExecutionException | InterruptedException e) {
LOG.error("Start Monitoring Failed. ", e);
rpcResultBuilder = RpcResultBuilder.<MonitorStartOutput>failed().withError(ErrorType.APPLICATION,
e.getMessage(), e);
private void associateMonitorIdWithInterface(final Long monitorId, final String interfaceName) {
LOG.debug("associate monitor Id {} with interface {}", monitorId, interfaceName);
final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
- ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
+ FluentFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
getInterfaceMonitorMapId(interfaceName));
- ListenableFuture<Void> updateFuture = Futures.transformAsync(readFuture, optEntry -> {
+ FluentFuture<? extends CommitInfo> updateFuture = readFuture.transformAsync(optEntry -> {
if (optEntry.isPresent()) {
InterfaceMonitorEntry entry = optEntry.get();
List<Long> monitorIds1 =
tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry2,
CREATE_MISSING_PARENT);
}
- return tx.submit();
+ return tx.commit();
}, callbackExecutorService);
Futures.addCallback(updateFuture, new FutureCallbackImpl(
final SettableFuture<RpcResult<MonitorUnpauseOutput>> result = SettableFuture.create();
final Long monitorId = input.getMonitorId();
- final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
- ListenableFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
+ final ReadTransaction tx = dataBroker.newReadOnlyTransaction();
+ FluentFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
getMonitoringInfoId(monitorId));
- Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
+ readInfoResult.addCallback(new FutureCallback<Optional<MonitoringInfo>>() {
@Override
public void onFailure(Throwable error) {
public void onSuccess(@Nonnull Optional<MonitoringInfo> optInfo) {
if (optInfo.isPresent()) {
final MonitoringInfo info = optInfo.get();
- ListenableFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
+ FluentFuture<Optional<MonitorProfile>> readProfile = tx.read(LogicalDatastoreType.OPERATIONAL,
getMonitorProfileId(info.getProfileId()));
- Futures.addCallback(readProfile, new FutureCallback<Optional<MonitorProfile>>() {
+ readProfile.addCallback(new FutureCallback<Optional<MonitorProfile>>() {
@Override
public void onFailure(Throwable error) {
}
private boolean stopMonitoringTask(Long monitorId, boolean interruptTask) {
- Optional<MonitoringInfo> optInfo =
- SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
- LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
+ Optional<MonitoringInfo> optInfo;
+ try {
+ optInfo = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+ tx -> tx.read(getMonitoringInfoId(monitorId))).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error reading monitor {}", monitorId, e);
+ optInfo = Optional.empty();
+ }
if (!optInfo.isPresent()) {
LOG.warn("There is no monitoring info present for monitor id {}", monitorId);
return false;
}
MonitoringInfo monitoringInfo = optInfo.get();
- Optional<MonitorProfile> optProfile =
- SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
- LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(monitoringInfo.getProfileId()));
+ Optional<MonitorProfile> optProfile;
+ try {
+ optProfile = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+ tx -> tx.read(getMonitorProfileId(monitoringInfo.getProfileId()))).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error reading monitor profile for {}", monitorId, e);
+ optProfile = Optional.empty();
+ }
MonitorProtocolType protocolType = optProfile.get().getProtocolType();
if (protocolType == MonitorProtocolType.Bfd) {
LOG.debug("disabling bfd for hwvtep tunnel montior id {}", monitorId);
}
Optional<MonitorProfile> getMonitorProfile(Long profileId) {
- return SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
- dataBroker, LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
+ try {
+ return txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+ tx -> tx.read(getMonitorProfileId(profileId))).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error reading monitor profile for {}", profileId, e);
+ return Optional.empty();
+ }
}
void acquireLock(Semaphore lock) {
acquireLock(lock);
final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
- ListenableFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
+ FluentFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
getMonitorStateId(monitorKey));
- ListenableFuture<Void> writeResult = Futures.transformAsync(readResult, optState -> {
+ FluentFuture<? extends CommitInfo> writeResult = readResult.transformAsync(optState -> {
if (optState.isPresent()) {
MonitoringState state = optState.get();
.setResponsePendingCount(responsePendingCount).setState(currentLivenessState).build();
tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(state.getMonitorKey()),
updatedState);
- return tx.submit();
+ return tx.commit();
} else {
// Close the transaction
- tx.submit();
+ tx.commit();
String errorMsg = String.format(
"Monitoring State associated with id %d is not present to send packet out.", monitorId);
return Futures.immediateFailedFuture(new RuntimeException(errorMsg));
}
}, callbackExecutorService);
- Futures.addCallback(writeResult, new FutureCallback<Void>() {
+ writeResult.addCallback(new FutureCallback<CommitInfo>() {
@Override
- public void onSuccess(Void noarg) {
+ public void onSuccess(CommitInfo noarg) {
// invoke packetout on protocol handler
AlivenessProtocolHandler<?> handler =
alivenessProtocolHandlerRegistry.getOpt(profile.getProtocolType());
final Long profileId = (long) getUniqueId(idKey);
final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
- ListenableFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
+ FluentFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
getMonitorProfileId(profileId));
- ListenableFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture = Futures.transformAsync(readFuture,
+ FluentFuture<RpcResult<MonitorProfileCreateOutput>> resultFuture = readFuture.transformAsync(
optProfile -> {
if (optProfile.isPresent()) {
tx.cancel();
.setMonitorWindow(monitorWindow).setProtocolType(protocolType).build();
tx.put(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId), monitorProfile,
CREATE_MISSING_PARENT);
- Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
+ tx.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onFailure(Throwable error) {
String msg = String.format("Error when storing monitorprofile %s in datastore",
}
@Override
- public void onSuccess(Void noarg) {
+ public void onSuccess(CommitInfo noarg) {
MonitorProfileCreateOutput output = new MonitorProfileCreateOutputBuilder()
.setProfileId(profileId).build();
returnFuture.set(RpcResultBuilder.success(output).build());
}
return returnFuture;
}, callbackExecutorService);
- Futures.addCallback(resultFuture, new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
+ resultFuture.addCallback(new FutureCallback<RpcResult<MonitorProfileCreateOutput>>() {
@Override
public void onFailure(Throwable error) {
// This would happen when any error happens during reading for
final SettableFuture<RpcResult<MonitorProfileDeleteOutput>> result = SettableFuture.create();
final Long profileId = input.getProfileId();
final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
- ListenableFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
+ FluentFuture<Optional<MonitorProfile>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
getMonitorProfileId(profileId));
- ListenableFuture<RpcResult<MonitorProfileDeleteOutput>> writeFuture =
- Futures.transformAsync(readFuture, optProfile -> {
+ FluentFuture<RpcResult<MonitorProfileDeleteOutput>> writeFuture =
+ readFuture.transformAsync(optProfile -> {
if (optProfile.isPresent()) {
tx.delete(LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(profileId));
- Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
+ tx.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onFailure(Throwable error) {
String msg = String.format("Error when removing monitor profile %d from datastore",
}
@Override
- public void onSuccess(Void noarg) {
+ public void onSuccess(CommitInfo noarg) {
MonitorProfile profile = optProfile.get();
String id = getUniqueProfileKey(profile.getFailureThreshold(),
profile.getMonitorInterval(), profile.getMonitorWindow(),
return result;
}, callbackExecutorService);
- Futures.addCallback(writeFuture, new FutureCallback<RpcResult<MonitorProfileDeleteOutput>>() {
+ writeFuture.addCallback(new FutureCallback<RpcResult<MonitorProfileDeleteOutput>>() {
@Override
public void onFailure(Throwable error) {
SettableFuture<RpcResult<MonitorStopOutput>> result = SettableFuture.create();
final Long monitorId = input.getMonitorId();
- Optional<MonitoringInfo> optInfo =
- SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
- LogicalDatastoreType.OPERATIONAL, getMonitoringInfoId(monitorId));
+ Optional<MonitoringInfo> optInfo;
+ try {
+ optInfo = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+ tx -> tx.read(getMonitoringInfoId(monitorId))).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error reading monitor {}", monitorId, e);
+ optInfo = Optional.empty();
+ }
if (optInfo.isPresent()) {
// Stop the monitoring task
stopMonitoringTask(monitorId);
private void removeMonitorIdFromInterfaceAssociation(final Long monitorId, final String interfaceName) {
LOG.debug("Remove monitorId {} from Interface association {}", monitorId, interfaceName);
final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
- ListenableFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
+ FluentFuture<Optional<InterfaceMonitorEntry>> readFuture = tx.read(LogicalDatastoreType.OPERATIONAL,
getInterfaceMonitorMapId(interfaceName));
- ListenableFuture<Void> updateFuture = Futures.transformAsync(readFuture, optEntry -> {
+ FluentFuture<? extends CommitInfo> updateFuture = readFuture.transformAsync(optEntry -> {
if (optEntry.isPresent()) {
InterfaceMonitorEntry entry = optEntry.get();
List<Long> monitorIds =
tx.put(LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName), newEntry,
CREATE_MISSING_PARENT);
}
- return tx.submit();
+ return tx.commit();
} else {
LOG.warn("No Interface map entry found {} to remove monitorId {}", interfaceName, monitorId);
tx.cancel();
}
}, MoreExecutors.directExecutor());
- Futures.addCallback(updateFuture, new FutureCallbackImpl(
+ updateFuture.addCallback(new FutureCallbackImpl(
String.format("Dis-association of monitorId %d with Interface %s", monitorId, interfaceName)),
MoreExecutors.directExecutor());
}
EndpointType source = info.getSource().getEndpointType();
String interfaceName = getInterfaceName(source);
if (!Strings.isNullOrEmpty(interfaceName)) {
- Optional<MonitorProfile> optProfile =
- SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
- LogicalDatastoreType.OPERATIONAL, getMonitorProfileId(info.getProfileId()));
+ Optional<MonitorProfile> optProfile;
+ try {
+ optProfile = txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+ tx -> tx.read(getMonitorProfileId(info.getProfileId()))).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error reading monitor profile for {}", info.getProfileId(), e);
+ optProfile = Optional.empty();
+ }
if (optProfile.isPresent()) {
MonitorProtocolType protocolType = optProfile.get().getProtocolType();
EndpointType destination = info.getDestination() != null ? info.getDestination().getEndpointType()
}
final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
- ListenableFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
+ FluentFuture<Optional<MonitoringState>> readResult = tx.read(LogicalDatastoreType.OPERATIONAL,
getMonitorStateId(monitorKey));
- ListenableFuture<Void> writeResult = Futures.transformAsync(readResult, optState -> {
+ FluentFuture<? extends CommitInfo> writeResult = readResult.transformAsync(optState -> {
if (optState.isPresent()) {
MonitoringState state = optState.get();
if (isValidStatus.apply(state.getStatus())) {
LOG.warn("No associated monitoring state data available to update the status to {} for {}",
newStatus, monitorId);
}
- return tx.submit();
+ return tx.commit();
}, MoreExecutors.directExecutor());
- Futures.addCallback(writeResult, new FutureCallbackImpl(
+ writeResult.addCallback(new FutureCallbackImpl(
String.format("Monitor status update for %d to %s", monitorId, newStatus.toString())),
MoreExecutors.directExecutor());
}
private void resumeMonitoring(final long monitorId) {
- final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
- ListenableFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
+ final ReadTransaction tx = dataBroker.newReadOnlyTransaction();
+ FluentFuture<Optional<MonitoringInfo>> readInfoResult = tx.read(LogicalDatastoreType.OPERATIONAL,
getMonitoringInfoId(monitorId));
- Futures.addCallback(readInfoResult, new FutureCallback<Optional<MonitoringInfo>>() {
+ readInfoResult.addCallback(new FutureCallback<Optional<MonitoringInfo>>() {
@Override
public void onFailure(Throwable error) {
}
private List<Long> getMonitorIds(String interfaceName) {
- return SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(
- dataBroker, LogicalDatastoreType.OPERATIONAL, getInterfaceMonitorMapId(interfaceName))
- .toJavaUtil().map(InterfaceMonitorEntry::getMonitorIds).orElse(Collections.emptyList());
+ try {
+ return txRunner.applyInterruptiblyWithNewReadOnlyTransactionAndClose(OPERATIONAL,
+ tx -> tx.read(getInterfaceMonitorMapId(interfaceName))).get().map(
+ InterfaceMonitorEntry::getMonitorIds).orElse(Collections.emptyList());
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error retrieving monitor ids for {}", interfaceName, e);
+ return Collections.emptyList();
+ }
}
//handle monitor stop
package org.opendaylight.genius.alivenessmonitor.internal;
import static org.opendaylight.genius.alivenessmonitor.utils.AlivenessMonitorUtil.getMonitorStateId;
+import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
+import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
-import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import javax.inject.Singleton;
-
import org.apache.aries.blueprint.annotation.service.Reference;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandler;
import org.opendaylight.genius.alivenessmonitor.protocols.AlivenessProtocolHandlerRegistry;
-import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
-import org.opendaylight.genius.mdsalutil.MDSALUtil;
+import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.util.Datastore;
+import org.opendaylight.mdsal.binding.util.InterruptibleCheckedConsumer;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
+import org.opendaylight.mdsal.binding.util.RetryingManagedNewTransactionRunner;
+import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
-import org.opendaylight.serviceutils.tools.mdsal.listener.AbstractSyncDataTreeChangeListener;
+import org.opendaylight.serviceutils.tools.listener.AbstractSyncDataTreeChangeListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.LivenessState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.MonitorProtocolType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.alivenessmonitor.rev160411.endpoint.EndpointType;
private static final Logger LOG = LoggerFactory.getLogger(HwVtepTunnelsStateHandler.class);
- private final DataBroker dataBroker;
+ private final ManagedNewTransactionRunner txRunner;
private final AlivenessMonitor alivenessMonitor;
@Inject
- public HwVtepTunnelsStateHandler(@Reference final DataBroker dataBroker, final AlivenessMonitor alivenessMonitor,
+ public HwVtepTunnelsStateHandler(@Reference final DataBroker dataBroker,
+ final AlivenessMonitor alivenessMonitor,
final AlivenessProtocolHandlerRegistry alivenessProtocolHandlerRegistry) {
super(dataBroker, LogicalDatastoreType.CONFIGURATION,
InstanceIdentifier.create(NetworkTopology.class).child(Topology.class).child(Node.class)
.augmentation(PhysicalSwitchAugmentation.class).child(Tunnels.class));
- this.dataBroker = dataBroker;
+ this.txRunner = new RetryingManagedNewTransactionRunner(dataBroker);
this.alivenessMonitor = alivenessMonitor;
alivenessProtocolHandlerRegistry.register(MonitorProtocolType.Bfd, this);
}
LOG.debug("Acquiring lock for monitor key : {} to process monitor DCN", monitorKey);
alivenessMonitor.acquireLock(lock);
- final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
-
- ListenableFuture<Optional<MonitoringState>> stateResult = tx.read(LogicalDatastoreType.OPERATIONAL,
- getMonitorStateId(monitorKey));
- Futures.addCallback(stateResult, new FutureCallback<Optional<MonitoringState>>() {
-
+ AtomicBoolean stateChanged = new AtomicBoolean();
+ AtomicReference<MonitoringState> currentState = new AtomicReference<>();
+ txRunner.callWithNewReadWriteTransactionAndSubmit(OPERATIONAL, tx -> {
+ Optional<MonitoringState> optState = tx.read(getMonitorStateId(monitorKey)).get();
+ if (optState.isPresent()) {
+ currentState.set(optState.get());
+ if (currentState.get().getState() == newTunnelOpState) {
+ return;
+ }
+ stateChanged.set(true);
+ final MonitoringState state = new MonitoringStateBuilder().setMonitorKey(monitorKey)
+ .setState(newTunnelOpState).build();
+ tx.merge(getMonitorStateId(monitorKey), state);
+ } else {
+ LOG.warn("Monitoring State not available for key: {} to process the Packet received", monitorKey);
+ }
+ }).addCallback(new FutureCallback<Object>() {
@Override
- public void onSuccess(@Nonnull Optional<MonitoringState> optState) {
- if (optState.isPresent()) {
- final MonitoringState currentState = optState.get();
- if (currentState.getState() == newTunnelOpState) {
- return;
- }
- final boolean stateChanged = true;
- final MonitoringState state = new MonitoringStateBuilder().setMonitorKey(monitorKey)
- .setState(newTunnelOpState).build();
- tx.merge(LogicalDatastoreType.OPERATIONAL, getMonitorStateId(monitorKey), state);
- ListenableFuture<Void> writeResult = tx.submit();
- // WRITE Callback
- Futures.addCallback(writeResult, new FutureCallback<Void>() {
-
- @Override
- public void onSuccess(Void arg0) {
- alivenessMonitor.releaseLock(lock);
- if (stateChanged) {
- // send notifications
- LOG.info("Sending notification for monitor Id : {} with Current State: {}",
- currentState.getMonitorId(), newTunnelOpState);
- alivenessMonitor.publishNotification(currentState.getMonitorId(), newTunnelOpState);
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Successful in writing monitoring state {} to ODS", state);
- }
- }
- }
-
- @Override
- public void onFailure(@Nonnull Throwable error) {
- alivenessMonitor.releaseLock(lock);
- LOG.warn("Error in writing monitoring state : {} to Datastore", monitorKey, error);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Error in writing monitoring state: {} to Datastore", state);
- }
- }
- }, MoreExecutors.directExecutor());
+ public void onSuccess(@Nullable Object result) {
+ alivenessMonitor.releaseLock(lock);
+ if (stateChanged.get()) {
+ // send notifications
+ LOG.info("Sending notification for monitor Id : {} with Current State: {}",
+ currentState.get().getMonitorId(), newTunnelOpState);
+ alivenessMonitor.publishNotification(currentState.get().getMonitorId(), newTunnelOpState);
} else {
- LOG.warn("Monitoring State not available for key: {} to process the Packet received", monitorKey);
- // Complete the transaction
- tx.submit();
- alivenessMonitor.releaseLock(lock);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Successful in writing monitoring state {} to ODS", newTunnelOpState);
+ }
}
}
@Override
public void onFailure(@Nonnull Throwable error) {
- LOG.error("Error when reading Monitoring State for key: {} to process the Packet received", monitorKey,
- error);
- // FIXME: Not sure if the transaction status is valid to cancel
- tx.cancel();
alivenessMonitor.releaseLock(lock);
+ LOG.warn("Error in writing monitoring state for {} to Datastore", monitorKey, error);
}
}, MoreExecutors.directExecutor());
}
return null;
}
- // tunnelKey, nodeId, topologyId are initialized to null and immediately passed to getTunnelIdentifier which
- // FindBugs as a "Load of known null value" violation. Not sure sure what the intent...
- @SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE")
void resetMonitoringTask(boolean isEnable) {
// TODO: get the corresponding hwvtep tunnel from the sourceInterface
// once InterfaceMgr implements renderer for HWVTEP VXLAN tunnels
- // tunnelKey, nodeId, topologyId are initialized to null and immediately passed to getTunnelIdentifier which
- // FindBugs flags as a "Load of known null value" violation. Not sure sure what the intent...
- TunnelsKey tunnelKey = null;
- String nodeId = null;
- String topologyId = null;
- Optional<Tunnels> tunnelsOptional =
- SingleTransactionDataBroker.syncReadOptionalAndTreatReadFailedExceptionAsAbsentOptional(dataBroker,
- LogicalDatastoreType.CONFIGURATION, getTunnelIdentifier(topologyId, nodeId, tunnelKey));
- if (!tunnelsOptional.isPresent()) {
- LOG.warn("Tunnel {} is not present on the Node {}. So not disabling the BFD monitoring", tunnelKey, nodeId);
- return;
- }
- Tunnels tunnel = tunnelsOptional.get();
- List<BfdParams> tunnelBfdParams = tunnel.getBfdParams();
- if (tunnelBfdParams == null || tunnelBfdParams.isEmpty()) {
- LOG.debug("there is no bfd params available for the tunnel {}", tunnel);
- return;
- }
+ LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
+ new InterruptibleCheckedConsumer<TypedReadWriteTransaction<Datastore.Configuration>, ExecutionException>() {
+ @Override
+ // tunnelKey, nodeId, topologyId are initialized to null and immediately passed to
+ // getTunnelIdentifier which FindBugs as a "Load of known null value" violation. Not sure sure what
+ // the intent...
+ @SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE")
+ public void accept(TypedReadWriteTransaction<Datastore.Configuration> tx)
+ throws ExecutionException, InterruptedException {
+ TunnelsKey tunnelKey = null;
+ String nodeId = null;
+ String topologyId = null;
+ Optional<Tunnels> tunnelsOptional =
+ tx.read(getTunnelIdentifier(topologyId, nodeId, tunnelKey)).get();
+ if (!tunnelsOptional.isPresent()) {
+ LOG.warn("Tunnel {} is not present on the Node {}. So not disabling the BFD monitoring",
+ tunnelKey,
+ nodeId);
+ return;
+ }
+ Tunnels tunnel = tunnelsOptional.get();
+ List<BfdParams> tunnelBfdParams = tunnel.getBfdParams();
+ if (tunnelBfdParams == null || tunnelBfdParams.isEmpty()) {
+ LOG.debug("there is no bfd params available for the tunnel {}", tunnel);
+ return;
+ }
- Iterator<BfdParams> tunnelBfdParamsIterator = tunnelBfdParams.iterator();
- while (tunnelBfdParamsIterator.hasNext()) {
- BfdParams bfdParam = tunnelBfdParamsIterator.next();
- if (AlivenessMonitorConstants.BFD_PARAM_ENABLE.equals(bfdParam.getBfdParamKey())) {
- tunnelBfdParamsIterator.remove();
- break;
- }
- }
- setBfdParamForEnable(tunnelBfdParams, isEnable);
- Tunnels tunnelWithBfdReset = new TunnelsBuilder().withKey(tunnelKey).setBfdParams(tunnelBfdParams).build();
- MDSALUtil.syncUpdate(dataBroker, LogicalDatastoreType.CONFIGURATION,
- getTunnelIdentifier(topologyId, nodeId, tunnelKey), tunnelWithBfdReset);
+ Iterator<BfdParams> tunnelBfdParamsIterator = tunnelBfdParams.iterator();
+ while (tunnelBfdParamsIterator.hasNext()) {
+ BfdParams bfdParam = tunnelBfdParamsIterator.next();
+ if (AlivenessMonitorConstants.BFD_PARAM_ENABLE.equals(bfdParam.getBfdParamKey())) {
+ tunnelBfdParamsIterator.remove();
+ break;
+ }
+ }
+ HwVtepTunnelsStateHandler.this.setBfdParamForEnable(tunnelBfdParams, isEnable);
+ Tunnels tunnelWithBfdReset =
+ new TunnelsBuilder().withKey(tunnelKey).setBfdParams(tunnelBfdParams).build();
+ tx.mergeParentStructureMerge(
+ getTunnelIdentifier(topologyId, nodeId, tunnelKey), tunnelWithBfdReset);
+ }
+ }), LOG, "Error resetting monitoring task");
}
@Override
}
MonitorProfile profile;
long profileId = monitorInfo.getProfileId();
- Optional<MonitorProfile> optProfile = alivenessMonitor.getMonitorProfile(profileId);
+ java.util.Optional<MonitorProfile> optProfile = alivenessMonitor.getMonitorProfile(profileId);
if (optProfile.isPresent()) {
profile = optProfile.get();
} else {
// be done as part of tunnel add DCN handling.
String topologyId = "";
String nodeId = "";
- MDSALUtil.syncUpdate(dataBroker, LogicalDatastoreType.CONFIGURATION,
+ LoggingFutures.addErrorLogging(
+ txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> tx.mergeParentStructureMerge(
getTunnelIdentifier(topologyId, nodeId, new TunnelsKey(/*localRef*/ null, /*remoteRef*/ null)),
- tunnelWithBfd);
+ tunnelWithBfd)), LOG, "Error starting a monitoring task");
}
private void fillBfdRemoteConfigs(List<BfdRemoteConfigs> bfdRemoteConfigs, String tunnelRemoteMacAddress) {