- private static ListenableFuture<Collection<MultipartReply>> getNodeStaticInfo(final Xid xid, final ConnectionContext cContext,
- final MultipartType type, final DeviceContext dContext, final InstanceIdentifier<Node> nodeII, final short version) {
- final ListenableFuture<Collection<MultipartReply>> future = cContext.registerMultipartMsg(xid.getValue());
- Futures.addCallback(future, new FutureCallback<Collection<MultipartReply>>() {
- @Override
- public void onSuccess(final Collection<MultipartReply> result) {
- Preconditions.checkArgument(result != null);
- translateAndWriteReply(type, dContext, nodeII, result);
- }
- @Override
- public void onFailure(final Throwable t) {
- LOG.info("Failed to retrieve static node {} info: {}", type, t.getMessage());
- }
- });
- final Future<RpcResult<Void>> rpcFuture = cContext.getConnectionAdapter()
- .multipartRequest(MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), version, type));
- Futures.addCallback(JdkFutureAdapters.listenInPoolThread(rpcFuture), new FutureCallback<RpcResult<Void>>() {
- @Override
- public void onSuccess(final RpcResult<Void> result) {
- // NOOP
- }
- @Override
- public void onFailure(final Throwable t) {
- future.cancel(true);
- }
- });
- return future;
- }
-
- private static void translateAndWriteReply(final MultipartType type, final DeviceContext dContext,
- final InstanceIdentifier<Node> nodeII, final Collection<MultipartReply> result) {
- for (final MultipartReply reply : result) {
- switch (type) {
- case OFPMPDESC:
- Preconditions.checkArgument(reply instanceof MultipartReplyDesc);
- final FlowCapableNode fcNode = NodeStaticReplyTranslatorUtil.nodeDescTranslator((MultipartReplyDesc) reply);
- final InstanceIdentifier<FlowCapableNode> fNodeII = nodeII.augmentation(FlowCapableNode.class);
- dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, fNodeII, fcNode);
- break;
-
- case OFPMPTABLEFEATURES:
- Preconditions.checkArgument(reply instanceof MultipartReplyTableFeatures);
- final List<TableFeatures> tables = NodeStaticReplyTranslatorUtil.nodeTableFeatureTranslator((MultipartReplyTableFeatures) reply);
- for (final TableFeatures table : tables) {
- final Short tableId = table.getTableId();
- final InstanceIdentifier<Table> tableII = nodeII.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId));
- dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, tableII, new TableBuilder().setId(tableId).setTableFeatures(Collections.singletonList(table)).build());
+ @Override
+ public void deviceConnected(@CheckForNull final ConnectionContext connectionContext) {
+ Preconditions.checkArgument(connectionContext != null);
+ try {
+ initializeDeviceContext(connectionContext);
+ } catch (Exception e) {
+ LOG.warn("Exception during initialization phase.", e);
+ }
+ }
+
+ private void initializeDeviceContext(final ConnectionContext connectionContext) throws Exception{
+ LOG.info("Initializing New Connection DeviceContext for node:{}", connectionContext.getNodeId());
+ // Cache this for clarity
+ final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
+
+ //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
+ connectionAdapter.setPacketInFiltering(true);
+
+ final Short version = connectionContext.getFeatures().getVersion();
+ final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version);
+
+ connectionContext.setOutboundQueueProvider(outboundQueueProvider);
+ final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
+ connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
+ connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
+
+ final NodeId nodeId = connectionContext.getNodeId();
+ final DeviceState deviceState = new DeviceStateImpl(connectionContext.getFeatures(), nodeId);
+
+ final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker,
+ hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary);
+
+ deviceContext.addDeviceContextClosedHandler(this);
+ // We would like to crete/register TxChainManager after
+ final DeviceTransactionChainManagerProvider.TransactionChainManagerRegistration txChainManagerReg = deviceTransactionChainManagerProvider
+ .provideTransactionChainManager(connectionContext);
+ if (txChainManagerReg.ownedByInvokingConnectionContext()) {
+ //this actually is new registration for currently processed connection context
+ ((DeviceContextImpl) deviceContext).setTransactionChainManager(txChainManagerReg.getTransactionChainManager());
+ } else {
+ LOG.info("In deviceConnected {}, ownedByInvokingConnectionContext is false", connectionContext.getNodeId());
+ deviceContext.close();
+ return;
+ }
+ ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
+ deviceContext.setNotificationService(notificationService);
+ deviceContext.setNotificationPublishService(notificationPublishService);
+
+ deviceContexts.add(deviceContext);
+
+ updatePacketInRateLimiters();
+
+ final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
+ connectionAdapter, deviceContext);
+ connectionAdapter.setMessageListener(messageListener);
+
+ deviceCtxLevelUp(deviceContext);
+ }
+
+ private void updatePacketInRateLimiters() {
+ synchronized (deviceContexts) {
+ final int deviceContextsSize = deviceContexts.size();
+ if (deviceContextsSize > 0) {
+ long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
+ if (freshNotificationLimit < 100) {
+ freshNotificationLimit = 100;
+ }
+ LOG.debug("fresh notification limit = {}", freshNotificationLimit);
+ for (DeviceContext deviceContext : deviceContexts) {
+ deviceContext.updatePacketInRateLimit(freshNotificationLimit);