import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
private static final Logger LOG = LoggerFactory.getLogger(FlowNodeReconciliationImpl.class);
private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
- // The number of nanoseconds to wait for a single group to be added.
- private static final long ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(3);
-
- // The maximum number of nanoseconds to wait for completion of add-group RPCs.
- private static final long MAX_ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(20);
private static final String SEPARATOR = ":";
- private static final int THREAD_POOL_SIZE = 4;
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
.setNameFormat("BundleResync-%d")
.setDaemon(false)
.setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
.build();
+
+ // FIXME: these three should be configurable
+ private static final int THREAD_POOL_SIZE = 4;
+ // The number of nanoseconds to wait for a single group to be added.
+ private static final long ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(3);
+ // The maximum number of nanoseconds to wait for completion of add-group RPCs.
+ private static final long MAX_ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(20);
+
+ private final ConcurrentMap<DeviceInfo, ListenableFuture<Boolean>> futureMap = new ConcurrentHashMap<>();
+ private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+ private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
+ private static final AtomicLong BUNDLE_ID = new AtomicLong();
+ private final Map<String, ReconciliationState> reconciliationStates;
private final DataBroker dataBroker;
private final ForwardingRulesManager provider;
private final String serviceName;
private final int priority;
private final ResultState resultState;
- private final Map<DeviceInfo, ListenableFuture<Boolean>> futureMap = new ConcurrentHashMap<>();
-
- private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
-
- private final SalBundleService salBundleService;
-
- private static final AtomicLong BUNDLE_ID = new AtomicLong();
- private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
- private final Map<String, ReconciliationState> reconciliationStates;
public FlowNodeReconciliationImpl(final ForwardingRulesManager manager, final DataBroker db,
final String serviceName, final int priority, final ResultState resultState,
this.serviceName = serviceName;
this.priority = priority;
this.resultState = resultState;
- salBundleService = requireNonNull(manager.getSalBundleService(), "salBundleService can not be null!");
reconciliationStates = flowGroupCacheManager.getReconciliationStates();
}
LOG.debug("Closing openflow bundle for device {}", dpnId);
/* Close previously opened bundle on the openflow switch if any */
- final var closeBundle = salBundleService.controlBundle(closeBundleInput);
+ final var closeBundle = provider.controlBundle().invoke(closeBundleInput);
/* Open a new bundle on the switch */
final var openBundle = Futures.transformAsync(closeBundle, rpcResult -> {
if (rpcResult.isSuccessful()) {
LOG.debug("Existing bundle is successfully closed for device {}", dpnId);
}
- return salBundleService.controlBundle(openBundleInput);
+ return provider.controlBundle().invoke(openBundleInput);
}, service);
/* Push groups and flows via bundle add messages */
final var deleteAllFlowGroupsFuture = Futures.transformAsync(openBundle, rpcResult -> {
if (rpcResult.isSuccessful()) {
LOG.debug("Open bundle is successful for device {}", dpnId);
- return salBundleService.addBundleMessages(deleteAllFlowGroupsInput);
+ return provider.addBundleMessages().invoke(deleteAllFlowGroupsInput);
}
return Futures.immediateFuture(null);
}, service);
/* Commit the bundle on the openflow switch */
final var commitBundleFuture = Futures.transformAsync(addbundlesFuture, rpcResult -> {
LOG.debug("Adding bundle messages completed for device {}", dpnId);
- return salBundleService.controlBundle(commitBundleInput);
+ return provider.controlBundle().invoke(commitBundleInput);
}, service);
/* Bundles not supported for meters */
* The group to add.
*/
private void addGroup(final Map<Uint32, ListenableFuture<?>> map, final Group group) {
- KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdentity.child(Group.class, group.key());
- final Uint32 groupId = group.getGroupId().getValue();
+ final var groupIdent = nodeIdentity.child(Group.class, group.key());
+ final var groupId = group.getGroupId().getValue();
final var future = provider.getGroupCommiter().add(groupIdent, group, nodeIdentity);
Futures.addCallback(future, new FutureCallback<RpcResult<?>>() {