import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
private final DOMDataTreeChangeService domDataTreeChangeService;
private final ActorSystem actorSystem;
- private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
- new HashMap<>();
+ private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
+ routedRegistrations = new HashMap<>();
- private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
+ private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
private FlappingSingletonService flappingSingletonService;
private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
private IdIntsListener idIntsListener;
- private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
+ private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
private IdIntsDOMDataTreeLIstener idIntsDdtl;
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public Future<RpcResult<Void>> unregisterSingletonConstant() {
LOG.debug("unregister-singleton-constant");
if (getSingletonConstantRegistration == null) {
LOG.debug("No get-singleton-constant registration present.");
- final RpcError rpcError = RpcResultBuilder
- .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
+ final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
+ "No get-singleton-constant rpc registration present.");
final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
return Futures.immediateFuture(result);
}
getSingletonConstantRegistration = null;
return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
- } catch (final Exception e) {
+ } catch (Exception e) {
LOG.debug("There was a problem closing the singleton constant service", e);
- final RpcError rpcError = RpcResultBuilder
- .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
+ final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
+ "There was a problem closing get-singleton-constant");
final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
return Futures.immediateFuture(result);
}
@Override
public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
LOG.debug("write-transactions, input: {}", input);
-
- final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
-
- final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
- writeTransactionsHandler.start(settableFuture);
-
- return settableFuture;
+ return WriteTransactionsHandler.start(domDataBroker, input);
}
@Override
}
@Override
- public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
+ public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
return null;
}
public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
LOG.debug("unregister-bound-constant, {}", input);
- final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
+ final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
routedRegistrations.remove(input.getContext());
- if (registration == null) {
+ if (rpcRegistration == null) {
LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
- final RpcError rpcError = RpcResultBuilder
- .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
+ final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
+ "No get-constant rpc registration present.");
final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
return Futures.immediateFuture(result);
}
- registration.close();
+ rpcRegistration.close();
return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
}
}
@Override
- public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
+ public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
return null;
}
public Future<RpcResult<Void>> unregisterConstant() {
if (globalGetConstantRegistration == null) {
- final RpcError rpcError = RpcResultBuilder
- .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
+ final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
+ "No get-constant rpc registration present.");
final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
return Futures.immediateFuture(result);
}
LOG.debug("unregister-flapping-singleton received.");
if (flappingSingletonService == null) {
- final RpcError rpcError = RpcResultBuilder
- .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
+ final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
+ "No flapping-singleton registration present.");
final RpcResult<UnregisterFlappingSingletonOutput> result =
RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
return Futures.immediateFuture(result);
}
@Override
- public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
+ public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
return null;
}
idIntsDdtl = new IdIntsDOMDataTreeLIstener();
try {
- ddtlReg =
- domDataTreeService.registerListener(idIntsDdtl,
- Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
- ProduceTransactionsHandler.ID_INT_YID))
- , true, Collections.emptyList());
+ ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
+ Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
+ ProduceTransactionsHandler.ID_INT_YID)),
+ true, Collections.emptyList());
} catch (DOMDataTreeLoopException e) {
LOG.error("Failed to register DOMDataTreeListener.", e);
return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
}
- final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
+ final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
input.getConstant(), input.getContext());
- routedRegistrations.put(input.getContext(), registration);
+ routedRegistrations.put(input.getContext(), rpcRegistration);
return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
}
if (idIntsListener == null || dtclReg == null) {
final RpcError error = RpcResultBuilder.newError(
ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
+ .withRpcError(error).build());
+ }
+
+ try {
+ idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
+ "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
+ .withRpcError(error).build());
}
dtclReg.close();
if (!idIntsListener.hasTriggered()) {
final RpcError error = RpcResultBuilder.newError(
- ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
- "any notifications.");
+ ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
+ + "any notifications.");
return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
.withRpcError(error).build());
}
LOG.debug("Received unsubscribe-ynl, input: {}", input);
if (!ynlRegistrations.containsKey(input.getId())) {
- final RpcError rpcError = RpcResultBuilder
- .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
+ final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
+ "No ynl listener with this id registered.");
final RpcResult<UnsubscribeYnlOutput> result =
RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
return Futures.immediateFuture(result);
}
- final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
- final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
+ final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
+ final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
- registration.close();
+ reg.close();
return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
}
if (task.getLastError() != null) {
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw);
- task.getLastError().printStackTrace(pw);
+ LOG.error("Last error for {}", task, task.getLastError());
checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
}
@Override
public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
LOG.debug("producer-transactions, input: {}", input);
-
- final ProduceTransactionsHandler handler =
- new ProduceTransactionsHandler(domDataTreeService, input);
-
- final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
- handler.start(settableFuture);
-
- return settableFuture;
+ return ProduceTransactionsHandler.start(domDataTreeService, input);
}
@Override
public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
- final InstanceIdentifier shardPrefix = input.getPrefix();
+ final InstanceIdentifier<?> shardPrefix = input.getPrefix();
if (shardPrefix == null) {
final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
LOG.debug("Received unsubscribe-ddtl.");
if (idIntsDdtl == null || ddtlReg == null) {
final RpcError error = RpcResultBuilder.newError(
ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withRpcError(error).build());
+ }
+
+ try {
+ idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
+ "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withRpcError(error).build());
}
ddtlReg.close();
if (!idIntsDdtl.hasTriggered()) {
final RpcError error = RpcResultBuilder.newError(
- ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
- "any notifications.");
+ ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
+ + "any notifications.");
return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
.withRpcError(error).build());
}
try {
distributedDataStoreClient = SimpleDataStoreClientActor
.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
- } catch (final Exception e) {
+ } catch (RuntimeException e) {
LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
clientActor.tell(PoisonPill.getInstance(), noSender());
final RpcError error = RpcResultBuilder.newError(