import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
-import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareProvider {
private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesSyncProvider.class);
- private static final int STARTUP_LOOP_TICK = 500;
- private static final int STARTUP_LOOP_MAX_RETRIES = 8;
private final DataBroker dataService;
private final SalTableService salTableService;
private static final InstanceIdentifier<Node> NODE_WC_PATH =
InstanceIdentifier.create(Nodes.class).child(Node.class);
-
private final DataTreeIdentifier<FlowCapableNode> nodeConfigDataTreePath;
private final DataTreeIdentifier<Node> nodeOperationalDataTreePath;
private ListenerRegistration<NodeListener> dataTreeConfigChangeListener;
private ListenerRegistration<NodeListener> dataTreeOperationalChangeListener;
+ private final ListeningExecutorService syncThreadPool;
+
public ForwardingRulesSyncProvider(final BindingAwareBroker broker,
final DataBroker dataBroker,
final RpcConsumerRegistry rpcRegistry) {
nodeConfigDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, FLOW_CAPABLE_NODE_WC_PATH);
nodeOperationalDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NODE_WC_PATH);
+ final ExecutorService executorService= Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+ .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
+ .setDaemon(false)
+ .setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
+ .build());
+ syncThreadPool = MoreExecutors.listeningDecorator(executorService);
+
broker.registerProvider(this);
}
- private final ListeningExecutorService syncThreadPool = FrmExecutors.instance()
- // TODO improve log in ThreadPoolExecutor.afterExecute
- // TODO max bloking queue size
- // TODO core/min pool size
- .newFixedThreadPool(6, new ThreadFactoryBuilder()
- .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
- .setDaemon(false)
- .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread thread, Throwable e) {
- LOG.error("uncaught exception {}", thread, e);
- }
- })
- .build());
-
@Override
public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
final TableForwarder tableForwarder = new TableForwarder(salTableService);
final NodeListener<Node> nodeListenerOperational =
new SimplifiedOperationalRetryListener(reactor, operationalSnapshot, configDao, retryRegistry);
- try {
- SimpleTaskRetryLooper looper1 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
- dataTreeConfigChangeListener = looper1.loopUntilNoException(
- new Callable<ListenerRegistration<NodeListener>>() {
- @Override
- public ListenerRegistration<NodeListener> call() throws Exception {
- return dataService.registerDataTreeChangeListener(
- nodeConfigDataTreePath, nodeListenerConfig);
- }
- });
-
- SimpleTaskRetryLooper looper2 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
- dataTreeOperationalChangeListener = looper2.loopUntilNoException(
- new Callable<ListenerRegistration<NodeListener>>() {
- @Override
- public ListenerRegistration<NodeListener> call() throws Exception {
- return dataService.registerDataTreeChangeListener(
- nodeOperationalDataTreePath, nodeListenerOperational);
- }
- });
- } catch (final Exception e) {
- LOG.warn("FR-Sync node DataChange listener registration fail!", e);
- throw new IllegalStateException("FR-Sync startup fail!", e);
- }
+ dataTreeConfigChangeListener =
+ dataService.registerDataTreeChangeListener(nodeConfigDataTreePath, nodeListenerConfig);
+ dataTreeOperationalChangeListener =
+ dataService.registerDataTreeChangeListener(nodeOperationalDataTreePath, nodeListenerOperational);
+
LOG.info("ForwardingRulesSync has started.");
}
- public void close() throws Exception {
+ public void close() throws InterruptedException {
if (dataTreeConfigChangeListener != null) {
dataTreeConfigChangeListener.close();
dataTreeConfigChangeListener = null;
}
+
if (dataTreeOperationalChangeListener != null) {
dataTreeOperationalChangeListener.close();
dataTreeOperationalChangeListener = null;
+++ /dev/null
-/**
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.applications.frsync.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-/**
- * Static Factory for creating ExecutorServices (because there is no dependency injection but
- * static getInstance).
- */
-public final class FrmExecutors {
- public static PceExecutorsFactory instance() {
- return DEFAULT_EXECUTORS;
- }
-
- public interface PceExecutorsFactory {
-
- ListeningExecutorService newFixedThreadPool(int nThreads, ThreadFactory factory);
- }
-
- /**
- * This will be rewritten in JUnits using SynchronousExecutorService.
- */
- @VisibleForTesting // should not be private and final
- static PceExecutorsFactory DEFAULT_EXECUTORS = new PceExecutorsFactory() {
-
- public ListeningExecutorService newFixedThreadPool(int nThreads, ThreadFactory factory) {
- final ExecutorService executorService = Executors.newFixedThreadPool(nThreads, factory);
- return MoreExecutors.listeningDecorator(executorService);
- }
- };
-}
LOG.trace("syncup guard {}", nodeId.getValue());
final long stampBeforeGuard = System.nanoTime();
- final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException
+ final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);
+ if (guard == null) {
+ return Futures.immediateFuture(false);
+ }
+ final long stampAfterGuard = System.nanoTime();
try {
- final long stampAfterGuard = System.nanoTime();
if (LOG.isDebugEnabled()) {
LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),
formatNanos(stampAfterGuard - stampBeforeGuard),
}
final ListenableFuture<Boolean> endResult =
- delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);//TODO handle InteruptedException
+ delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);
Futures.addCallback(endResult, new FutureCallback<Boolean>() {
@Override
}
});
return endResult;
- } catch(InterruptedException e) {
+ } catch (InterruptedException e) {
releaseGuardForNodeId(guard);
throw e;
}
* @param flowcapableNodePath II of node for which guard should be acquired
* @return semaphore guard
*/
- private Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)
- throws InterruptedException {
+ private Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),
- "no guard for " + flowcapableNodePath);
-
- if (LOG.isDebugEnabled()) {
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
- try {
- LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());
- } catch (Exception e) {
- LOG.error("error logging guard after summon before aquiring {}", nodeId);
- }
+ "no guard for " + nodeId.getValue());
+ try {
+ guard.acquire();
+ } catch (InterruptedException e) {
+ LOG.error("syncup summon {} failed {}", nodeId.getValue(), e);
+ return null;
}
-
- guard.acquire();
+ LOG.trace("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());
return guard;
}
* @param guard semaphore guard which should be unlocked
*/
private void releaseGuardForNodeId(final Semaphore guard) {
- if (guard == null) {
- return;
+ if (guard != null) {
+ guard.release();
+ LOG.trace("syncup release guard:{} thread:{}", guard, threadName());
}
- guard.release();
}
private static String threadName() {
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
@Before
public void setUp() {
- syncThreadPool = FrmExecutors.instance()
- .newFixedThreadPool(1, new ThreadFactoryBuilder()
- .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
- .setDaemon(false)
- .build());
-
+ final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setDaemon(false)
+ .setNameFormat("frsync-test%d")
+ .setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
+ .build());
+ syncThreadPool = MoreExecutors.listeningDecorator(executorService);
reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool);
fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
.augmentation(FlowCapableNode.class);
}
@VisibleForTesting
- Future<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(final List<AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>> batchChainElements) {
+ Future<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(final List<AsyncFunction<RpcResult<ProcessFlatBatchOutput>,
+ RpcResult<ProcessFlatBatchOutput>>> batchChainElements) {
ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainSummaryResult =
RpcResultBuilder.success(new ProcessFlatBatchOutputBuilder().build()).buildFuture();
}
return chainSummaryResult;
-
}
@VisibleForTesting
final int currentOffset = stepOffset;
chainJobs.add(new AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>() {
@Override
- public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> apply(final RpcResult<ProcessFlatBatchOutput> chainInput) throws Exception {
+ public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> apply(final RpcResult<ProcessFlatBatchOutput> chainInput) {
if (exitOnFirstError && !chainInput.isSuccessful()) {
LOG.debug("error on flat batch chain occurred -> skipping step {}", planStep.getStepType());
return Futures.immediateFuture(chainInput);
final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainOutput;
switch (planStep.getStepType()) {
case FLOW_ADD:
- final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(
- planStep, node);
+ final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
final Future<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture = salFlowService.addFlowsBatch(addFlowsBatchInput);
chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(chainInput, resultAddFlowFuture, currentOffset);
break;
case FLOW_REMOVE:
- final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(
- planStep, node);
+ final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
final Future<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture = salFlowService.removeFlowsBatch(removeFlowsBatchInput);
chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(chainInput, resultRemoveFlowFuture, currentOffset);
break;
case FLOW_UPDATE:
- final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(
- planStep, node);
+ final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
final Future<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture = salFlowService.updateFlowsBatch(updateFlowsBatchInput);
chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(chainInput, resultUpdateFlowFuture, currentOffset);
break;
case GROUP_ADD:
- final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(
- planStep, node);
+ final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
final Future<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture = salGroupService.addGroupsBatch(addGroupsBatchInput);
chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(chainInput, resultAddGroupFuture, currentOffset);
break;
case GROUP_REMOVE:
- final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(
- planStep, node);
+ final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
final Future<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture = salGroupService.removeGroupsBatch(removeGroupsBatchInput);
chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(chainInput, resultRemoveGroupFuture, currentOffset);
break;
case GROUP_UPDATE:
- final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(
- planStep, node);
+ final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
final Future<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture = salGroupService.updateGroupsBatch(updateGroupsBatchInput);
chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(chainInput, resultUpdateGroupFuture, currentOffset);
break;
case METER_ADD:
- final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(
- planStep, node);
+ final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
final Future<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture = salMeterService.addMetersBatch(addMetersBatchInput);
chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(chainInput, resultAddMeterFuture, currentOffset);
break;
case METER_REMOVE:
- final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(
- planStep, node);
+ final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
final Future<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture = salMeterService.removeMetersBatch(removeMetersBatchInput);
chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(chainInput, resultRemoveMeterFuture, currentOffset);
break;
case METER_UPDATE:
- final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(
- planStep, node);
+ final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
final Future<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture = salMeterService.updateMetersBatch(updateMetersBatchInput);
chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(chainInput, resultUpdateMeterFuture, currentOffset);
break;