package org.opendaylight.openflowplugin.openflow.md.core;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterConfigStatsUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterFeaturesUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterStatisticsUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.BadActionErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.BadInstructionErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.BadMatchErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.BadRequestErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.ExperimenterErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.FlowModErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.GroupModErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.HelloFailedErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.MeterModErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.PortModErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.QueueOpErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.RoleRequestErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.SwitchConfigErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.TableFeaturesErrorNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.TableModErrorNotification;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.QueueStatisticsUpdate;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.TableUpdated;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
/**
- * @author mirehak
*
*/
public class MDController implements IMDController, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MDController.class);
- private SwitchConnectionProvider switchConnectionProvider;
+ private Collection<SwitchConnectionProvider> switchConnectionProviders;
private ConcurrentMap<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> messageTranslators;
private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListeners;
- private MessageSpy<OfHeader, DataObject> messageSpyCounter;
+ private MessageSpy<DataContainer> messageSpyCounter;
final private int OF10 = OFConstants.OFP_VERSION_1_0;
final private int OF13 = OFConstants.OFP_VERSION_1_3;
+ private ErrorHandlerSimpleImpl errorHandler;
/**
* @return translator mapping
addMessageTranslator(MultipartReplyMessage.class,OF13,new MultipartReplyTableFeaturesToTableUpdatedTranslator());
addMessageTranslator(GetFeaturesOutput.class,OF10, new FeaturesV10ToNodeConnectorUpdatedTranslator());
- //TODO: move registration to factory
NotificationPopListener<DataObject> notificationPopListener = new NotificationPopListener<DataObject>();
+ notificationPopListener.setNotificationProviderService(
+ OFSessionUtil.getSessionManager().getNotificationProviderService());
+ notificationPopListener.setMessageSpy(messageSpyCounter);
+
+ //TODO: move registration to factory
addMessagePopListener(NodeErrorNotification.class, notificationPopListener);
+ addMessagePopListener(BadActionErrorNotification.class, notificationPopListener);
+ addMessagePopListener(BadInstructionErrorNotification.class, notificationPopListener);
+ addMessagePopListener(BadMatchErrorNotification.class, notificationPopListener);
+ addMessagePopListener(BadRequestErrorNotification.class, notificationPopListener);
+ addMessagePopListener(ExperimenterErrorNotification.class, notificationPopListener);
+ addMessagePopListener(FlowModErrorNotification.class, notificationPopListener);
+ addMessagePopListener(GroupModErrorNotification.class, notificationPopListener);
+ addMessagePopListener(HelloFailedErrorNotification.class, notificationPopListener);
+ addMessagePopListener(MeterModErrorNotification.class, notificationPopListener);
+ addMessagePopListener(PortModErrorNotification.class, notificationPopListener);
+ addMessagePopListener(QueueOpErrorNotification.class, notificationPopListener);
+ addMessagePopListener(RoleRequestErrorNotification.class, notificationPopListener);
+ addMessagePopListener(SwitchConfigErrorNotification.class, notificationPopListener);
+ addMessagePopListener(TableFeaturesErrorNotification.class, notificationPopListener);
+ addMessagePopListener(TableModErrorNotification.class, notificationPopListener);
addMessagePopListener(NodeConnectorUpdated.class,notificationPopListener);
addMessagePopListener(PacketReceived.class,notificationPopListener);
addMessagePopListener(TransmitPacketInput.class, notificationPopListener);
// Push the updated Listeners to Session Manager which will be then picked up by ConnectionConductor eventually
OFSessionUtil.getSessionManager().setTranslatorMapping(messageTranslators);
OFSessionUtil.getSessionManager().setPopListenerMapping(popListeners);
+ OFSessionUtil.getSessionManager().setMessageSpy(messageSpyCounter);
+
+ // prepare worker pool for rpc
+ // TODO: get size from configSubsystem
+ int rpcThreadLimit = 10;
+ ListeningExecutorService rpcPoolDelegator = createRpcPoolSpyDecorated(rpcThreadLimit, messageSpyCounter);
+ OFSessionUtil.getSessionManager().setRpcPool(rpcPoolDelegator);
+
}
/**
- * @param switchConnectionProvider
- * the switchConnectionProvider to set
+ * @param rpcThreadLimit
+ * @param messageSpy
+ * @return
*/
- public void setSwitchConnectionProvider(SwitchConnectionProvider switchConnectionProvider) {
- this.switchConnectionProvider = switchConnectionProvider;
+ private static ListeningExecutorService createRpcPoolSpyDecorated(int rpcThreadLimit, MessageSpy<DataContainer> messageSpy) {
+ ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L,
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+ ListeningExecutorService listeningRpcPool = MoreExecutors.listeningDecorator(rpcPool);
+ RpcListeningExecutorService rpcPoolDecorated = new RpcListeningExecutorService(listeningRpcPool);
+ rpcPoolDecorated.setMessageSpy(messageSpy);
+ return rpcPoolDecorated;
}
/**
- * @param switchConnectionProviderToUnset
- * the switchConnectionProvider to unset
+ * @param switchConnectionProviders
+ * the switchConnectionProviders to set
*/
- public void unsetSwitchConnectionProvider(SwitchConnectionProvider switchConnectionProviderToUnset) {
- if (this.switchConnectionProvider == switchConnectionProviderToUnset) {
- this.switchConnectionProvider = null;
- }
+ public void setSwitchConnectionProviders(Collection<SwitchConnectionProvider> switchConnectionProviders) {
+ this.switchConnectionProviders = switchConnectionProviders;
}
/**
*/
public void start() {
LOG.debug("starting ..");
- LOG.debug("switchConnectionProvider: " + switchConnectionProvider);
+ LOG.debug("switchConnectionProvider: " + switchConnectionProviders);
// setup handler
SwitchConnectionHandlerImpl switchConnectionHandler = new SwitchConnectionHandlerImpl();
switchConnectionHandler.setMessageSpy(messageSpyCounter);
+
+ errorHandler = new ErrorHandlerSimpleImpl();
+
+ switchConnectionHandler.setErrorHandler(errorHandler);
switchConnectionHandler.init();
- switchConnectionProvider.setSwitchConnectionHandler(switchConnectionHandler);
-
- // configure and startup library servers
- switchConnectionProvider.configure(getConnectionConfiguration());
- Future<List<Boolean>> srvStarted = switchConnectionProvider.startup();
+ List<ListenableFuture<Boolean>> starterChain = new ArrayList<>();
+ for (SwitchConnectionProvider switchConnectionPrv : switchConnectionProviders) {
+ switchConnectionPrv.setSwitchConnectionHandler(switchConnectionHandler);
+ ListenableFuture<Boolean> isOnlineFuture = switchConnectionPrv.startup();
+ starterChain.add(isOnlineFuture);
+ }
+
+ Future<List<Boolean>> srvStarted = Futures.allAsList(starterChain);
}
/**
* @return wished connections configurations
+ * @deprecated use configSubsystem
*/
+ @Deprecated
private static Collection<ConnectionConfiguration> getConnectionConfiguration() {
// TODO:: get config from state manager
ConnectionConfiguration configuration = ConnectionConfigurationFactory.getDefault();
*/
public void stop() {
LOG.debug("stopping");
- Future<List<Boolean>> srvStopped = switchConnectionProvider.shutdown();
+ List<ListenableFuture<Boolean>> stopChain = new ArrayList<>();
try {
- srvStopped.get(5000, TimeUnit.MILLISECONDS);
+ for (SwitchConnectionProvider switchConnectionPrv : switchConnectionProviders) {
+ ListenableFuture<Boolean> shutdown = switchConnectionPrv.shutdown();
+ stopChain.add(shutdown);
+ }
+ Futures.allAsList(stopChain).get(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error(e.getMessage(), e);
}
* @param messageSpyCounter the messageSpyCounter to set
*/
public void setMessageSpyCounter(
- MessageSpy<OfHeader, DataObject> messageSpyCounter) {
+ MessageSpy<DataContainer> messageSpyCounter) {
this.messageSpyCounter = messageSpyCounter;
}
messageSpyCounter = null;
messageTranslators = null;
popListeners = null;
- switchConnectionProvider = null;
+ for (SwitchConnectionProvider switchConnectionPrv : switchConnectionProviders) {
+ switchConnectionPrv.setSwitchConnectionHandler(null);
+ }
+ switchConnectionProviders = null;
OFSessionUtil.releaseSessionManager();
+ errorHandler = null;
}
}