import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ReconciliationManagerImpl implements ReconciliationManager, ReconciliationFrameworkEvent {
private static final Logger LOG = LoggerFactory.getLogger(ReconciliationManagerImpl.class);
- private MastershipChangeServiceManager mastershipChangeServiceManager;
- private Map<Integer, List<ReconciliationNotificationListener>> registeredServices = new ConcurrentSkipListMap<>();
- private Map<DeviceInfo, ListenableFuture<ResultState>> futureMap = new ConcurrentHashMap<>();
- private Map<ResultState, Integer> resultStateMap = new ConcurrentHashMap<>();
- private AtomicReference<ResultState> decidedResultState = new AtomicReference<>(ResultState.DONOTHING);
+ private final MastershipChangeServiceManager mastershipChangeServiceManager;
+ private final Map<Integer, List<ReconciliationNotificationListener>> registeredServices =
+ new ConcurrentSkipListMap<>();
+ private final Map<DeviceInfo, ListenableFuture<ResultState>> futureMap = new ConcurrentHashMap<>();
+ private final Map<ResultState, Integer> resultStateMap = new ConcurrentHashMap<>();
+ private final AtomicReference<ResultState> decidedResultState = new AtomicReference<>(ResultState.DONOTHING);
public ReconciliationManagerImpl(MastershipChangeServiceManager mastershipChangeServiceManager) {
this.mastershipChangeServiceManager = Preconditions
List<ReconciliationNotificationListener>
servicesForPriority,
DeviceInfo node) {
- return Futures.transformAsync(prevFuture, prevResult -> {
- return Futures.transform(Futures.allAsList(
- servicesForPriority.stream().map(service -> service.startReconciliation(node))
- .collect(Collectors.toList())), results -> decidedResultState.get());
- });
+ return Futures.transformAsync(prevFuture, prevResult -> Futures.transform(Futures.allAsList(
+ servicesForPriority.stream().map(service -> service.startReconciliation(node))
+ .collect(Collectors.toList())), results -> decidedResultState.get(),
+ MoreExecutors.directExecutor()),
+ MoreExecutors.directExecutor());
}
private ListenableFuture<Void> cancelNodeReconciliation(DeviceInfo node) {
List<ReconciliationNotificationListener>
servicesForPriority,
DeviceInfo node) {
- return Futures.transformAsync(prevFuture, prevResult -> {
- return Futures.transform(Futures.allAsList(
- servicesForPriority.stream().map(service -> service.endReconciliation(node))
- .collect(Collectors.toList())), results -> null);
- });
+ return Futures.transformAsync(prevFuture, prevResult -> Futures.transform(Futures.allAsList(
+ servicesForPriority.stream().map(service -> service.endReconciliation(node))
+ .collect(Collectors.toList())), results -> null, MoreExecutors.directExecutor()),
+ MoreExecutors.directExecutor());
}
}
\ No newline at end of file