More FRM convertions
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / FlowNodeReconciliationImpl.java
index 300072ccf5d95a55d870579f76da5e11127bbdc3..6cba63687e524af8a8bec403abf21ce9444fc219 100644 (file)
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -80,7 +81,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
@@ -111,32 +111,30 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
     private static final Logger LOG = LoggerFactory.getLogger(FlowNodeReconciliationImpl.class);
     private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
 
-    // The number of nanoseconds to wait for a single group to be added.
-    private static final long ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(3);
-
-    // The maximum number of nanoseconds to wait for completion of add-group RPCs.
-    private static final long MAX_ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(20);
     private static final String SEPARATOR = ":";
-    private static final int THREAD_POOL_SIZE = 4;
     private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
             .setNameFormat("BundleResync-%d")
             .setDaemon(false)
             .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
             .build();
+
+    // FIXME: these three should be configurable
+    private static final int THREAD_POOL_SIZE = 4;
+    // The number of nanoseconds to wait for a single group to be added.
+    private static final long ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(3);
+    // The maximum number of nanoseconds to wait for completion of add-group RPCs.
+    private static final long MAX_ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(20);
+
+    private final ConcurrentMap<DeviceInfo, ListenableFuture<Boolean>> futureMap = new ConcurrentHashMap<>();
+    private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+    private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
+    private static final AtomicLong BUNDLE_ID = new AtomicLong();
+    private final Map<String, ReconciliationState> reconciliationStates;
     private final DataBroker dataBroker;
     private final ForwardingRulesManager provider;
     private final String serviceName;
     private final int priority;
     private final ResultState resultState;
-    private final Map<DeviceInfo, ListenableFuture<Boolean>> futureMap = new ConcurrentHashMap<>();
-
-    private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
-
-    private final SalBundleService salBundleService;
-
-    private static final AtomicLong BUNDLE_ID = new AtomicLong();
-    private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
-    private final Map<String, ReconciliationState> reconciliationStates;
 
     public FlowNodeReconciliationImpl(final ForwardingRulesManager manager, final DataBroker db,
                                       final String serviceName, final int priority, final ResultState resultState,
@@ -146,7 +144,6 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
         this.serviceName = serviceName;
         this.priority = priority;
         this.resultState = resultState;
-        salBundleService = requireNonNull(manager.getSalBundleService(), "salBundleService can not be null!");
         reconciliationStates = flowGroupCacheManager.getReconciliationStates();
     }
 
@@ -228,21 +225,21 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
 
                 LOG.debug("Closing openflow bundle for device {}", dpnId);
                 /* Close previously opened bundle on the openflow switch if any */
-                final var closeBundle = salBundleService.controlBundle(closeBundleInput);
+                final var closeBundle = provider.controlBundle().invoke(closeBundleInput);
 
                 /* Open a new bundle on the switch */
                 final var openBundle = Futures.transformAsync(closeBundle, rpcResult -> {
                     if (rpcResult.isSuccessful()) {
                         LOG.debug("Existing bundle is successfully closed for device {}", dpnId);
                     }
-                    return salBundleService.controlBundle(openBundleInput);
+                    return provider.controlBundle().invoke(openBundleInput);
                 }, service);
 
                 /* Push groups and flows via bundle add messages */
                 final var deleteAllFlowGroupsFuture = Futures.transformAsync(openBundle, rpcResult -> {
                     if (rpcResult.isSuccessful()) {
                         LOG.debug("Open bundle is successful for device {}", dpnId);
-                        return salBundleService.addBundleMessages(deleteAllFlowGroupsInput);
+                        return provider.addBundleMessages().invoke(deleteAllFlowGroupsInput);
                     }
                     return Futures.immediateFuture(null);
                 }, service);
@@ -261,7 +258,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                 /* Commit the bundle on the openflow switch */
                 final var commitBundleFuture = Futures.transformAsync(addbundlesFuture, rpcResult -> {
                     LOG.debug("Adding bundle messages completed for device {}", dpnId);
-                    return salBundleService.controlBundle(commitBundleInput);
+                    return provider.controlBundle().invoke(commitBundleInput);
                 }, service);
 
                 /* Bundles not supported for meters */
@@ -517,8 +514,8 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
          *            The group to add.
          */
         private void addGroup(final Map<Uint32, ListenableFuture<?>> map, final Group group) {
-            KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdentity.child(Group.class, group.key());
-            final Uint32 groupId = group.getGroupId().getValue();
+            final var groupIdent = nodeIdentity.child(Group.class, group.key());
+            final var groupId = group.getGroupId().getValue();
             final var future = provider.getGroupCommiter().add(groupIdent, group, nodeIdentity);
 
             Futures.addCallback(future, new FutureCallback<RpcResult<?>>() {