@Override
public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
- final List<MonitorRequest> monitorRequest,
- final MonitorCallBack callback,
- final int timeout) {
-
- final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
- MonitorRequest::getTableName);
-
- final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
- registerCallback(monitorHandle, callback, dbSchema);
-
- ListenableFuture<JsonNode> monitor = rpc.monitor(
- () -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
- JsonNode result;
- try {
- if (timeout == NO_TIMEOUT) {
- result = monitor.get();
- } else {
- result = monitor.get(timeout, TimeUnit.SECONDS);
- }
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- LOG.warn("Failed to monitor {}", dbSchema, e);
- return null;
- }
- return transformingCallback(result, dbSchema);
+ final List<MonitorRequest> monitorRequest,
+ final MonitorCallBack callback,
+ final int timeout) {
+ return monitor(dbSchema, monitorRequest, new MonitorHandle(UUID.randomUUID().toString()), callback, timeout);
}
@Override
final MonitorHandle monitorHandle,
final MonitorCallBack callback,
final int timeout) {
-
final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
MonitorRequest::getTableName);
registerCallback(monitorHandle, callback, dbSchema);
- ListenableFuture<JsonNode> monitor = rpc.monitor(
+ final ListenableFuture<JsonNode> monitor = rpc.monitor(
() -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
- JsonNode result;
+ final JsonNode result;
try {
if (timeout == NO_TIMEOUT) {
result = monitor.get();