package org.opendaylight.controller.clustering.it.provider;
import static akka.actor.ActorRef.noSender;
+import static org.opendaylight.yangtools.yang.common.RpcResultBuilder.newError;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
+import org.opendaylight.controller.clustering.it.provider.impl.DataListenerState;
import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
+ private static final ListenableFuture<RpcResult<Void>> VOID_SUCCESS = success(null);
+
private final RpcProviderRegistry rpcRegistry;
private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
private final DistributedShardFactory distributedShardFactory;
if (getSingletonConstantRegistration == null) {
LOG.debug("No get-singleton-constant registration present.");
- final RpcError rpcError = RpcResultBuilder
- .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
- final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
- return Futures.immediateFuture(result);
+ return failure(newError(ErrorType.APPLICATION, "missing-registration",
+ "No get-singleton-constant rpc registration present."));
}
try {
getSingletonConstantRegistration.close();
- getSingletonConstantRegistration = null;
-
- return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
} catch (final Exception e) {
LOG.debug("There was a problem closing the singleton constant service", e);
- final RpcError rpcError = RpcResultBuilder
- .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
- final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
- return Futures.immediateFuture(result);
+ return failure(newError(ErrorType.APPLICATION, "error-closing",
+ "There was a problem closing get-singleton-constant"));
+ } finally {
+ getSingletonConstantRegistration = null;
}
+
+ return VOID_SUCCESS;
}
@Override
task.start();
- return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+ return VOID_SUCCESS;
}
@Override
public Future<RpcResult<Void>> subscribeDtcl() {
-
if (dtclReg != null) {
- final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
- "There is already dataTreeChangeListener registered on id-ints list.");
- return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ return failure(newError(ErrorType.RPC, "Registration present.",
+ "There is already dataTreeChangeListener registered on id-ints list."));
}
idIntsListener = new IdIntsListener();
CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
idIntsListener);
- return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+ LOG.debug("ClusteredDOMDataTreeChangeListener registered");
+ return VOID_SUCCESS;
}
@Override
@Override
public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
+ // FIXME: implement this
return null;
}
@Override
public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
+ // FIXME: implement this
return null;
}
@Override
public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
-
LOG.debug("subscribe-ynl, input: {}", input);
if (ynlRegistrations.containsKey(input.getId())) {
- final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
- "There is already ynl listener registered for this id: " + input.getId());
- return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ return failure(newError(ErrorType.RPC, "Registration present.",
+ "There is already ynl listener registered for this id: " + input.getId()));
}
ynlRegistrations.put(input.getId(),
notificationService.registerNotificationListener(new YnlListener(input.getId())));
- return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+ return success(null);
}
@Override
if (registration == null) {
LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
- final RpcError rpcError = RpcResultBuilder
- .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
- final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
- return Futures.immediateFuture(result);
+ return failure(newError(ErrorType.APPLICATION, "missing-registration",
+ "No get-constant rpc registration present."));
}
registration.close();
- return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+ return VOID_SUCCESS;
}
@Override
public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
-
LOG.debug("Received register-singleton-constant rpc, input: {}", input);
if (input.getConstant() == null) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.RPC, "Invalid input.", "Constant value is null");
- return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ return failure(newError(ErrorType.RPC, "Invalid input.", "Constant value is null"));
}
getSingletonConstantRegistration =
SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
- return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+ return VOID_SUCCESS;
}
@Override
public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
+ // FIXME: implement this
return null;
}
public Future<RpcResult<Void>> unregisterConstant() {
if (globalGetConstantRegistration == null) {
- final RpcError rpcError = RpcResultBuilder
- .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
- final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
- return Futures.immediateFuture(result);
+ return failure(newError(ErrorType.APPLICATION, "missing-registration",
+ "No get-constant rpc registration present."));
}
globalGetConstantRegistration.close();
globalGetConstantRegistration = null;
- return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+ return VOID_SUCCESS;
}
@Override
LOG.debug("unregister-flapping-singleton received.");
if (flappingSingletonService == null) {
- final RpcError rpcError = RpcResultBuilder
- .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
- final RpcResult<UnregisterFlappingSingletonOutput> result =
- RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
- return Futures.immediateFuture(result);
+ return failure(newError(ErrorType.APPLICATION, "missing-registration",
+ "No flapping-singleton registration present."));
}
final long flapCount = flappingSingletonService.setInactive();
flappingSingletonService = null;
- final UnregisterFlappingSingletonOutput output =
- new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
-
- return Futures.immediateFuture(RpcResultBuilder.success(output).build());
+ return success(new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build());
}
@Override
public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
+ // FIXME: implement this
return null;
}
@Override
public Future<RpcResult<Void>> subscribeDdtl() {
-
if (ddtlReg != null) {
- final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
- "There is already dataTreeChangeListener registered on id-ints list.");
- return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ return failure(newError(ErrorType.RPC, "Registration present.",
+ "There is already dataTreeChangeListener registered on id-ints list."));
}
idIntsDdtl = new IdIntsDOMDataTreeLIstener();
try {
- ddtlReg =
- domDataTreeService.registerListener(idIntsDdtl,
- Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
- ProduceTransactionsHandler.ID_INT_YID))
- , true, Collections.emptyList());
+ ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
+ Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
+ ProduceTransactionsHandler.ID_INT_YID)), true, Collections.emptyList());
} catch (DOMDataTreeLoopException e) {
LOG.error("Failed to register DOMDataTreeListener.", e);
-
+ return failure(newError(ErrorType.APPLICATION, "register-failed", e.getMessage()));
}
- return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+ LOG.debug("DOMDataTreeListener registered");
+ return VOID_SUCCESS;
}
@Override
LOG.debug("register-bound-constant: {}", input);
if (input.getContext() == null) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.RPC, "Invalid input.", "Context value is null");
- return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ return failure(newError(ErrorType.RPC, "Invalid input.", "Context value is null"));
}
if (input.getConstant() == null) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.RPC, "Invalid input.", "Constant value is null");
- return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ return failure(newError(ErrorType.RPC, "Invalid input.", "Constant value is null"));
}
if (routedRegistrations.containsKey(input.getContext())) {
- final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
- "There is already a rpc registered for context: " + input.getContext());
- return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ return failure(newError(ErrorType.RPC, "Registration present.",
+ "There is already a rpc registered for context: " + input.getContext()));
}
final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
input.getConstant(), input.getContext());
routedRegistrations.put(input.getContext(), registration);
- return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+ return VOID_SUCCESS;
}
@Override
LOG.debug("Received register-flapping-singleton.");
if (flappingSingletonService != null) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
- return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ return failure(RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
+ "flapping-singleton already registered"));
}
flappingSingletonService = new FlappingSingletonService(singletonService);
-
- return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+ return VOID_SUCCESS;
}
@Override
LOG.debug("Received unsubscribe-dtcl");
if (idIntsListener == null || dtclReg == null) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
+ return failure(newError(ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered."));
}
- try {
- idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
- "clustering-it", "clustering-it", e);
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
- .withRpcError(error).build());
- }
-
- dtclReg.close();
+ final ListenableFuture<DataListenerState> future = idIntsListener.tryFinishProcessing(dtclReg);
dtclReg = null;
- if (!idIntsListener.hasTriggered()) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
- "any notifications.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
- .withRpcError(error).build());
+ return Futures.withFallback(Futures.transform(future, this::dtclOutput),
+ t -> failure(newError(ErrorType.RPC, "resource-denied-transport", "Failed to finish processing",
+ "clustering-it", "clustering-it", t)));
+ }
+
+ private RpcResult<UnsubscribeDtclOutput> dtclOutput(final DataListenerState state) {
+ if (state.changeCount() == 0) {
+ return failed(newError(ErrorType.APPLICATION, "No notification received.",
+ "id-ints listener has not received any notifications."));
}
final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
- try {
- final Optional<NormalizedNode<?, ?>> readResult =
- rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
-
- if (!readResult.isPresent()) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
- .withRpcError(error).build());
- }
-
- return Futures.immediateFuture(
- RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
- .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
+ final Optional<NormalizedNode<?, ?>> readResult;
+ try {
+ readResult = rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
} catch (final ReadFailedException e) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
- .withRpcError(error).build());
+ return failed(newError(ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed."));
+ } finally {
+ rTx.close();
+ }
+ final NormalizedNode<?, ?> expected = state.lastData().orNull();
+ final NormalizedNode<?, ?> actual = readResult.orNull();
+ final boolean equal = Objects.equals(expected, actual);
+ if (!equal) {
+ LOG.debug("Expected result {} read resulted in {}", expected, actual);
}
+
+ final RpcResultBuilder<UnsubscribeDtclOutput> b = RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
+ .setCopyMatches(equal).build());
+
+// for (DataListenerViolation violation : state.violations()) {
+// final Optional<NormalizedNodeDiff> diff = violation.toDiff();
+// if (diff.isPresent()) {
+// b.withWarning(ErrorType.APPLICATION, "Sequence mismatch", diff.get().toString());
+// }
+// }
+
+ return b.build();
}
@Override
@Override
public Future<RpcResult<Void>> deconfigureIdIntsShard() {
+ // FIXME: implement this
return null;
}
LOG.debug("Received unsubscribe-ynl, input: {}", input);
if (!ynlRegistrations.containsKey(input.getId())) {
- final RpcError rpcError = RpcResultBuilder
- .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
- final RpcResult<UnsubscribeYnlOutput> result =
- RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
- return Futures.immediateFuture(result);
+ return failure(newError(ErrorType.APPLICATION, "missing-registration",
+ "No ynl listener with this id registered."));
}
final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
registration.close();
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
+ return success(output);
}
@Override
final CheckPublishNotificationsInput input) {
final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
-
if (task == null) {
- return Futures.immediateFuture(RpcResultBuilder.success(
- new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
+ return success(new CheckPublishNotificationsOutputBuilder().setActive(Boolean.FALSE).build());
}
final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
}
- final CheckPublishNotificationsOutput output =
- checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
-
- return Futures.immediateFuture(RpcResultBuilder.success(output).build());
+ return success(checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build());
}
@Override
final String shardName = input.getShardName();
if (Strings.isNullOrEmpty(shardName)) {
- final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
- "A valid shard name must be specified");
- return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
+ return failure(newError(ErrorType.APPLICATION, "bad-element", "A valid shard name must be specified"));
}
return shutdownShardGracefully(shardName);
LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
final InstanceIdentifier<?> shardPrefix = input.getPrefix();
-
if (shardPrefix == null) {
- final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
- "A valid shard prefix must be specified");
- return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
+ return failure(newError(ErrorType.APPLICATION, "bad-element", "A valid shard prefix must be specified"));
}
final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
@Override
- public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
+ public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
if (throwable != null) {
- final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
- .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
- rpcResult.set(failedResult);
+ rpcResult.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION,
+ "Failed to gracefully shutdown shard", throwable).build());
} else {
// according to Patterns.gracefulStop API, we don't have to
// check value of gracefulStopResult
LOG.debug("Received register-constant rpc, input: {}", input);
if (input.getConstant() == null) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.RPC, "Invalid input.", "Constant value is null");
- return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ return failure(newError(ErrorType.RPC, "Invalid input.", "Constant value is null"));
}
if (globalGetConstantRegistration != null) {
- final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
- "There is already a get-constant rpc registered.");
- return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ return failure(newError(ErrorType.RPC, "Registration present.",
+ "There is already a get-constant rpc registered."));
}
globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
- return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+ return VOID_SUCCESS;
}
@Override
public Future<RpcResult<Void>> unregisterDefaultConstant() {
+ // FIXME: implement this
return null;
}
LOG.debug("Received unsubscribe-ddtl.");
if (idIntsDdtl == null || ddtlReg == null) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
- }
-
- try {
- idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
- "clustering-it", "clustering-it", e);
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withRpcError(error).build());
+ return failure(newError(ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered."));
}
- ddtlReg.close();
+ final ListenableFuture<DataListenerState> future = idIntsDdtl.tryFinishProcessing(ddtlReg);
ddtlReg = null;
- if (!idIntsDdtl.hasTriggered()) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
- "any notifications.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withRpcError(error).build());
+ return Futures.withFallback(Futures.transform(future, this::ddtlOutput),
+ t -> failure(newError(ErrorType.RPC, "resource-denied-transport", "Failed to finish processing",
+ "clustering-it", "clustering-it", t)));
+ }
+
+ private RpcResult<UnsubscribeDdtlOutput> ddtlOutput(final DataListenerState state) {
+ if (state.changeCount() == 0) {
+ return failed(newError(ErrorType.APPLICATION, "No notification received.",
+ "id-ints listener has not received any notifications."));
}
final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
LOG.debug("Creating distributed datastore client for shard {}", shardName);
final ActorContext actorContext = configDataStore.getActorContext();
- final Props distributedDataStoreClientProps =
- SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
- "Shard-" + shardName, actorContext, shardName);
+ final Props distributedDataStoreClientProps = SimpleDataStoreClientActor.props(
+ actorContext.getCurrentMemberName(), "Shard-" + shardName, actorContext, shardName);
final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
final DataStoreClient distributedDataStoreClient;
try {
- distributedDataStoreClient = SimpleDataStoreClientActor
- .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+ distributedDataStoreClient = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30,
+ TimeUnit.SECONDS);
} catch (final Exception e) {
LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
clientActor.tell(PoisonPill.getInstance(), noSender());
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.APPLICATION, "Unable to create ds client for read.",
- "Unable to create ds client for read.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withRpcError(error).build());
+ return failed(newError(ErrorType.APPLICATION, "Unable to create ds client for read.",
+ "Unable to create ds client for read."));
}
final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
tx.abort();
localHistory.close();
- try {
- final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
- if (!optional.isPresent()) {
- LOG.warn("Final read from client is empty.");
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withRpcError(error).build());
- }
-
- return Futures.immediateFuture(
- RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
- .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
+ final Optional<NormalizedNode<?, ?>> readResult;
+ try {
+ readResult = read.checkedGet();
} catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
LOG.error("Unable to read data to verify ddtl data.", e);
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withRpcError(error).build());
+ return failed(newError( ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed."));
} finally {
distributedDataStoreClient.close();
clientActor.tell(PoisonPill.getInstance(), noSender());
}
+
+ // FIXME run a diff
+ final NormalizedNode<?, ?> expected = state.lastData().orNull();
+ final NormalizedNode<?, ?> actual = readResult.orNull();
+ final boolean equal = Objects.equals(expected, actual);
+ if (!equal) {
+ LOG.debug("Expected result {} read resulted in {}", expected, actual);
+ }
+ final RpcResultBuilder<UnsubscribeDdtlOutput> b = RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
+ .setCopyMatches(equal).build());
+
+// for (DataListenerViolation violation : state.violations()) {
+// final Optional<NormalizedNodeDiff> diff = violation.toDiff();
+// if (diff.isPresent()) {
+// b.withWarning(ErrorType.APPLICATION, "Sequence mismatch", diff.get().toString());
+// }
+// }
+
+ return b.build();
+ }
+
+ private static <T> RpcResult<T> failed(final RpcError error) {
+ return RpcResultBuilder.<T>failed().withRpcError(error).build();
+ }
+
+ private static <T> ListenableFuture<RpcResult<T>> failure(final RpcError error) {
+ return Futures.immediateFuture(failed(error));
+ }
+
+ private static <T> ListenableFuture<RpcResult<T>> success(final T result) {
+ return Futures.immediateFuture(RpcResultBuilder.success(result).build());
}
}