Bump MRI upstreams
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyIncrementalImpl.java
index 2caf0c69f666ce27872f3eb193a898ae57c01649..da2117120f17711f16eee157436f27bfaa3e8c1d 100644 (file)
@@ -1,19 +1,16 @@
-/**
+/*
  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
 
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -46,11 +43,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.Add
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
@@ -80,75 +75,57 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
         // TODO enable table-update when ready
         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
 
-        resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
-            @Override
-            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
-                if (!input.isSuccessful()) {
-                    //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
-                    //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
-                    //        Futures.asList Arrays.asList(input, output),
-                    //        ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
-                }
-                return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
-            }
-        });
+        resultVehicle = Futures.transformAsync(resultVehicle, input -> {
+            // if (!input.isSuccessful()) {
+                //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
+                //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
+                //        Futures.asList Arrays.asList(input, output),
+                //        ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
+            // }
+            return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
+        }, MoreExecutors.directExecutor());
         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
                 MoreExecutors.directExecutor());
-        resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
-            @Override
-            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
-                if (!input.isSuccessful()) {
-                    //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
-                }
-                return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
-            }
-        });
+        resultVehicle = Futures.transformAsync(resultVehicle, input -> {
+            // if (!input.isSuccessful()) {
+                //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
+            // }
+            return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
+        }, MoreExecutors.directExecutor());
         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
                 MoreExecutors.directExecutor());
-        resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
-            @Override
-            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
-                if (!input.isSuccessful()) {
-                    //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
-                }
-                return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
-            }
-        });
+        resultVehicle = Futures.transformAsync(resultVehicle, input -> {
+            // if (!input.isSuccessful()) {
+                //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
+            // }
+            return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
+        }, MoreExecutors.directExecutor());
         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
                 MoreExecutors.directExecutor());
 
 
-        resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
-            @Override
-            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
-                if (!input.isSuccessful()) {
-                    //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
-                }
-                return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
-            }
-        });
+        resultVehicle = Futures.transformAsync(resultVehicle, input -> {
+            // if (!input.isSuccessful()) {
+                //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
+            // }
+            return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
+        }, MoreExecutors.directExecutor());
         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
                 MoreExecutors.directExecutor());
-        resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
-            @Override
-            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
-                if (!input.isSuccessful()) {
-                    //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
-                }
-                return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
-            }
-        });
+        resultVehicle = Futures.transformAsync(resultVehicle, input -> {
+            // if (!input.isSuccessful()) {
+                //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
+            // }
+            return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
+        }, MoreExecutors.directExecutor());
         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
                 MoreExecutors.directExecutor());
-        resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
-            @Override
-            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
-                if (!input.isSuccessful()) {
-                    //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
-                }
-                return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
-            }
-        });
+        resultVehicle = Futures.transformAsync(resultVehicle, input -> {
+            // if (!input.isSuccessful()) {
+                //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
+            // }
+            return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
+        }, MoreExecutors.directExecutor());
         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
                 MoreExecutors.directExecutor());
         return resultVehicle;
@@ -175,13 +152,12 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
             final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
 
             for (final Flow flow : flowSyncBox.getItemsToPush()) {
-                final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.getKey());
+                final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.key());
 
                 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
                         flow.getId(), tableKey, nodeId, flow.getMatch());
 
-                allResults.add(JdkFutureAdapters.listenInPoolThread(
-                        flowForwarder.add(flowIdent, flow, nodeIdent)));
+                allResults.add(flowForwarder.add(flowIdent, flow, nodeIdent));
                 flowCrudCounts.incAdded();
             }
 
@@ -189,26 +165,29 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
                 final Flow existingFlow = flowUpdate.getOriginal();
                 final Flow updatedFlow = flowUpdate.getUpdated();
 
-                final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, updatedFlow.getKey());
+                final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class,
+                        updatedFlow.key());
                 LOG.trace("flow {} in table {} - needs update on device {} match{}",
                         updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
 
-                allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
-                        flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent)));
+                allUpdateResults.add(flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent));
                 flowCrudCounts.incUpdated();
             }
         }
 
         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
                 Futures.allAsList(allResults),
-                ReconcileUtil.<AddFlowOutput>createRpcResultCondenser("flow adding"));
+                ReconcileUtil.createRpcResultCondenser("flow adding"),
+                MoreExecutors.directExecutor());
 
         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
                 Futures.allAsList(allUpdateResults),
-                ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("flow updating"));
+                ReconcileUtil.createRpcResultCondenser("flow updating"),
+                MoreExecutors.directExecutor());
 
         return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
-                ReconcileUtil.<Void>createRpcResultCondenser("flow add/update"));
+                ReconcileUtil.createRpcResultCondenser("flow add/update"),
+                MoreExecutors.directExecutor());
     }
 
     ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
@@ -230,17 +209,20 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
             // loop flows on device and check if the are configured
             for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
-                        tableIdent.child(Flow.class, flow.getKey());
-                allResults.add(JdkFutureAdapters.listenInPoolThread(
-                        flowForwarder.remove(flowIdent, flow, nodeIdent)));
+                        tableIdent.child(Flow.class, flow.key());
+                allResults.add(flowForwarder.remove(flowIdent, flow, nodeIdent));
                 flowCrudCounts.incRemoved();
             }
         }
 
         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
-                Futures.allAsList(allResults), ReconcileUtil.<RemoveFlowOutput>createRpcResultCondenser("flow remove"));
+                Futures.allAsList(allResults),
+                ReconcileUtil.createRpcResultCondenser("flow remove"),
+                MoreExecutors.directExecutor());
+
         return Futures.transformAsync(singleVoidResult,
-                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
+                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
+                MoreExecutors.directExecutor());
 
     }
 
@@ -260,14 +242,14 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
             LOG.trace("removing meter {} - absent in config {}",
                     meter.getMeterId(), nodeId);
             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
-                    nodeIdent.child(Meter.class, meter.getKey());
-            allResults.add(JdkFutureAdapters.listenInPoolThread(
-                    meterForwarder.remove(meterIdent, meter, nodeIdent)));
+                    nodeIdent.child(Meter.class, meter.key());
+            allResults.add(meterForwarder.remove(meterIdent, meter, nodeIdent));
             meterCrudCounts.incRemoved();
         }
 
         return Futures.transform(Futures.allAsList(allResults),
-                ReconcileUtil.<RemoveMeterOutput>createRpcResultCondenser("meter remove"));
+                ReconcileUtil.createRpcResultCondenser("meter remove"),
+                MoreExecutors.directExecutor());
     }
 
     ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
@@ -290,26 +272,21 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
             }
             Collections.reverse(groupsRemovalPlan);
             for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
-                chainedResult =
-                        Futures.transformAsync(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
-                            @Override
-                            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
-                                    throws Exception {
-                                final ListenableFuture<RpcResult<Void>> result;
-                                if (input.isSuccessful()) {
-                                    result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
-                                } else {
-                                    // pass through original unsuccessful rpcResult
-                                    result = Futures.immediateFuture(input);
-                                }
-
-                                return result;
-                            }
-                        });
+                chainedResult = Futures.transformAsync(chainedResult, input -> {
+                    final ListenableFuture<RpcResult<Void>> result;
+                    if (input.isSuccessful()) {
+                        result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
+                    } else {
+                        // pass through original unsuccessful rpcResult
+                        result = Futures.immediateFuture(input);
+                    }
+
+                    return result;
+                }, MoreExecutors.directExecutor());
             }
         } catch (IllegalStateException e) {
             chainedResult = RpcResultBuilder.<Void>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
+                    .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
                     .buildFuture();
         }
 
@@ -321,46 +298,48 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
             final ItemSyncBox<Group> groupsPortion) {
         List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
         for (Group group : groupsPortion.getItemsToPush()) {
-            final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
-            allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.remove(groupIdent, group, nodeIdent)));
+            final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
+            allResults.add(groupForwarder.remove(groupIdent, group, nodeIdent));
         }
 
         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
                 Futures.allAsList(allResults),
-                ReconcileUtil.<RemoveGroupOutput>createRpcResultCondenser("group remove"));
+                ReconcileUtil.createRpcResultCondenser("group remove"),
+                MoreExecutors.directExecutor());
 
         return Futures.transformAsync(singleVoidResult,
-                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
+                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
+                MoreExecutors.directExecutor());
     }
 
     ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
                                                           final FlowCapableNode flowCapableNodeConfigured) {
         // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
-        final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
+        //final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
 
         final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
-        for (Table table : tableList) {
-            TableKey tableKey = table.getKey();
-            KeyedInstanceIdentifier<TableFeatures, TableFeaturesKey> tableFeaturesII = nodeIdent
-                    .child(TableFeatures.class, new TableFeaturesKey(tableKey.getId()));
-            List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
-            if (tableFeatures != null) {
-                for (TableFeatures tableFeaturesItem : tableFeatures) {
-                    // TODO uncomment java.lang.NullPointerException
-                    // at
-                    // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer.serializeHeader(AbstractOxmMatchEntrySerializer.java:31
-                    // allResults.add(JdkFutureAdapters.listenInPoolThread(
-                    // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
-                }
-            }
-        }
+//        for (Table table : tableList) {
+//            List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
+//            if (tableFeatures != null) {
+//                for (TableFeatures tableFeaturesItem : tableFeatures) {
+//                    // TODO uncomment java.lang.NullPointerException
+//                    // at
+//                    // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer
+//                    //    .serializeHeader(AbstractOxmMatchEntrySerializer.java:31
+//                    // allResults.add(JdkFutureAdapters.listenInPoolThread(
+//                    // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
+//                }
+//            }
+//        }
 
         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
                 Futures.allAsList(allResults),
-                ReconcileUtil.<UpdateTableOutput>createRpcResultCondenser("table update"));
+                ReconcileUtil.createRpcResultCondenser("table update"),
+                MoreExecutors.directExecutor());
 
         return Futures.transformAsync(singleVoidResult,
-                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
+                ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
+                MoreExecutors.directExecutor());
     }
 
     private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
@@ -370,8 +349,8 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
         final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
 
         for (Group group : groupsPortion.getItemsToPush()) {
-            final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
-            allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.add(groupIdent, group, nodeIdent)));
+            final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
+            allResults.add(groupForwarder.add(groupIdent, group, nodeIdent));
 
         }
 
@@ -379,25 +358,28 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
             final Group existingGroup = groupTuple.getOriginal();
             final Group group = groupTuple.getUpdated();
 
-            final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
-            allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
-                    groupForwarder.update(groupIdent, existingGroup, group, nodeIdent)));
+            final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
+            allUpdateResults.add(groupForwarder.update(groupIdent, existingGroup, group, nodeIdent));
         }
 
         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
-                Futures.allAsList(allResults), ReconcileUtil.<AddGroupOutput>createRpcResultCondenser("group add"));
+                Futures.allAsList(allResults),
+                ReconcileUtil.createRpcResultCondenser("group add"),
+                MoreExecutors.directExecutor());
 
         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
                 Futures.allAsList(allUpdateResults),
-                ReconcileUtil.<UpdateGroupOutput>createRpcResultCondenser("group update"));
+                ReconcileUtil.createRpcResultCondenser("group update"),
+                MoreExecutors.directExecutor());
 
         final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
                 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
-                ReconcileUtil.<Void>createRpcResultCondenser("group add/update"));
+                ReconcileUtil.createRpcResultCondenser("group add/update"),
+                MoreExecutors.directExecutor());
 
 
         return Futures.transformAsync(summaryResult, ReconcileUtil.chainBarrierFlush(
-                PathUtil.digNodePath(nodeIdent), transactionService));
+                PathUtil.digNodePath(nodeIdent), transactionService), MoreExecutors.directExecutor());
     }
 
     ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
@@ -414,33 +396,35 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
         final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
         final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
         for (Meter meter : syncBox.getItemsToPush()) {
-            final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.getKey());
+            final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.key());
             LOG.debug("adding meter {} - absent on device {}",
                     meter.getMeterId(), nodeId);
-            allResults.add(JdkFutureAdapters.listenInPoolThread(
-                    meterForwarder.add(meterIdent, meter, nodeIdent)));
+            allResults.add(meterForwarder.add(meterIdent, meter, nodeIdent));
             meterCrudCounts.incAdded();
         }
 
         for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
             final Meter existingMeter = meterTuple.getOriginal();
             final Meter updated = meterTuple.getUpdated();
-            final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.getKey());
+            final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.key());
             LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
-            allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
-                    meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent)));
+            allUpdateResults.add(meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent));
             meterCrudCounts.incUpdated();
         }
 
         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
-                Futures.allAsList(allResults), ReconcileUtil.<AddMeterOutput>createRpcResultCondenser("meter add"));
+                Futures.allAsList(allResults),
+                ReconcileUtil.createRpcResultCondenser("meter add"),
+                MoreExecutors.directExecutor());
 
         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
                 Futures.allAsList(allUpdateResults),
-                ReconcileUtil.<UpdateMeterOutput>createRpcResultCondenser("meter update"));
+                ReconcileUtil.createRpcResultCondenser("meter update"),
+                MoreExecutors.directExecutor());
 
         return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
-                ReconcileUtil.<Void>createRpcResultCondenser("meter add/update"));
+                ReconcileUtil.createRpcResultCondenser("meter add/update"),
+                MoreExecutors.directExecutor());
     }
 
     ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
@@ -469,28 +453,24 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
                 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
                 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
                     chainedResult =
-                            Futures.transformAsync(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
-                                @Override
-                                public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
-                                        throws Exception {
-                                    final ListenableFuture<RpcResult<Void>> result;
-                                    if (input.isSuccessful()) {
-                                        result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
-                                    } else {
-                                        // pass through original unsuccessful rpcResult
-                                        result = Futures.immediateFuture(input);
-                                    }
-
-                                    return result;
-                                }
-                            });
+                        Futures.transformAsync(chainedResult, input -> {
+                            final ListenableFuture<RpcResult<Void>> result;
+                            if (input.isSuccessful()) {
+                                result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
+                            } else {
+                                // pass through original unsuccessful rpcResult
+                                result = Futures.immediateFuture(input);
+                            }
+
+                            return result;
+                        }, MoreExecutors.directExecutor());
                 }
             } else {
                 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
             }
         } catch (IllegalStateException e) {
             chainedResult = RpcResultBuilder.<Void>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
+                    .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
                     .buildFuture();
         }
 
@@ -518,7 +498,8 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
         return this;
     }
 
-    public SyncPlanPushStrategyIncrementalImpl setTransactionService(final FlowCapableTransactionService transactionService) {
+    public SyncPlanPushStrategyIncrementalImpl setTransactionService(
+            final FlowCapableTransactionService transactionService) {
         this.transactionService = transactionService;
         return this;
     }