回到l2fwd的main函数中
intMAIN(int argc, char **argv){ struct lcore_queue_conf *qconf; struct rte_eth_dev_info dev_info; int ret; uint8_t nb_ports; uint8_t nb_ports_available; uint8_t portid, last_port; unsigned lcore_id, rx_lcore_id; unsigned nb_ports_in_mask = 0; /* init EAL */ ret = rte_eal_init(argc, argv); if (ret < 0) rte_exit(EXIT_FAILURE, "Invalid EAL arguments/n"); argc -= ret; argv += ret; /* parse application arguments (after the EAL ones) */ ret = l2fwd_parse_args(argc, argv); if (ret < 0) rte_exit(EXIT_FAILURE, "Invalid L2FWD arguments/n"); /* create the mbuf pool */ l2fwd_pktmbuf_pool = rte_mempool_create("mbuf_pool", NB_MBUF, MBUF_SIZE, 32, sizeof(struct rte_pktmbuf_pool_PRivate), rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL, rte_socket_id(), 0); if (l2fwd_pktmbuf_pool == NULL) rte_exit(EXIT_FAILURE, "Cannot init mbuf pool/n"); /* init driver(s) */ if (rte_pmd_init_all() < 0) rte_exit(EXIT_FAILURE, "Cannot init pmd/n"); if (rte_eal_pci_probe() < 0) rte_exit(EXIT_FAILURE, "Cannot probe PCI/n"); nb_ports = rte_eth_dev_count(); if (nb_ports == 0) rte_exit(EXIT_FAILURE, "No Ethernet ports - bye/n"); if (nb_ports > RTE_MAX_ETHPORTS) nb_ports = RTE_MAX_ETHPORTS; /* reset l2fwd_dst_ports */ for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) l2fwd_dst_ports[portid] = 0; last_port = 0; /* port0发给port1, port1发给port0. 两个端口为一对,互相发包 */ /* * Each logical core is assigned a dedicated TX queue on each port. */ for (portid = 0; portid < nb_ports; portid++) { /* skip ports that are not enabled */ if ((l2fwd_enabled_port_mask & (1 << portid)) == 0) continue; if (nb_ports_in_mask % 2) { l2fwd_dst_ports[portid] = last_port; l2fwd_dst_ports[last_port] = portid; } else last_port = portid; nb_ports_in_mask++; rte_eth_dev_info_get(portid, &dev_info); } if (nb_ports_in_mask % 2) { printf("Notice: odd number of ports in portmask./n"); l2fwd_dst_ports[last_port] = last_port; } rx_lcore_id = 0; qconf = NULL; /* 每个core负责收l2fwd_rx_queue_per_lcore个端口, 每个端口(其实应该是QUEUE,因为这里一个port只有一个QUEUE)只能由一个lcore进行收包 */ /* Initialize the port/queue configuration of each logical core */ for (portid = 0; portid < nb_ports; portid++) { /* skip ports that are not enabled */ if ((l2fwd_enabled_port_mask & (1 << portid)) == 0) continue; /* get the lcore_id for this port */ while (rte_lcore_is_enabled(rx_lcore_id) == 0 || lcore_queue_conf[rx_lcore_id].n_rx_port == l2fwd_rx_queue_per_lcore) { rx_lcore_id++; if (rx_lcore_id >= RTE_MAX_LCORE) rte_exit(EXIT_FAILURE, "Not enough cores/n"); } if (qconf != &lcore_queue_conf[rx_lcore_id]) /* Assigned a new logical core in the loop above. */ qconf = &lcore_queue_conf[rx_lcore_id]; qconf->rx_port_list[qconf->n_rx_port] = portid; qconf->n_rx_port++; printf("Lcore %u: RX port %u/n", rx_lcore_id, (unsigned) portid); } nb_ports_available = nb_ports; /* 每个port收发包队列的初始化 */ /* Initialise each port */ for (portid = 0; portid < nb_ports; portid++) { /* skip ports that are not enabled */ if ((l2fwd_enabled_port_mask & (1 << portid)) == 0) { printf("Skipping disabled port %u/n", (unsigned) portid); nb_ports_available--; continue; } /* init port */ printf("Initializing port %u... ", (unsigned) portid); fflush(stdout); ret = rte_eth_dev_configure(portid, 1, 1, &port_conf); if (ret < 0) rte_exit(EXIT_FAILURE, "Cannot configure device: err=%d, port=%u/n", ret, (unsigned) portid); rte_eth_macaddr_get(portid,&l2fwd_ports_eth_addr[portid]); /* init one RX queue */ fflush(stdout); ret = rte_eth_rx_queue_setup(portid, 0, nb_rxd, rte_eth_dev_socket_id(portid), &rx_conf, l2fwd_pktmbuf_pool); if (ret < 0) rte_exit(EXIT_FAILURE, "rte_eth_rx_queue_setup:err=%d, port=%u/n", ret, (unsigned) portid); /* init one TX queue on each port */ fflush(stdout); ret = rte_eth_tx_queue_setup(portid, 0, nb_txd, rte_eth_dev_socket_id(portid), &tx_conf); if (ret < 0) rte_exit(EXIT_FAILURE, "rte_eth_tx_queue_setup:err=%d, port=%u/n", ret, (unsigned) portid); /* Start device */ ret = rte_eth_dev_start(portid); if (ret < 0) rte_exit(EXIT_FAILURE, "rte_eth_dev_start:err=%d, port=%u/n", ret, (unsigned) portid); printf("done: /n"); rte_eth_promiscuous_enable(portid); printf("Port %u, MAC address: %02X:%02X:%02X:%02X:%02X:%02X/n/n", (unsigned) portid, l2fwd_ports_eth_addr[portid].addr_bytes[0], l2fwd_ports_eth_addr[portid].addr_bytes[1], l2fwd_ports_eth_addr[portid].addr_bytes[2], l2fwd_ports_eth_addr[portid].addr_bytes[3], l2fwd_ports_eth_addr[portid].addr_bytes[4], l2fwd_ports_eth_addr[portid].addr_bytes[5]); /* initialize port stats */ memset(&port_statistics, 0, sizeof(port_statistics)); } if (!nb_ports_available) { rte_exit(EXIT_FAILURE, "All available ports are disabled. Please set portmask./n"); } check_all_ports_link_status(nb_ports, l2fwd_enabled_port_mask); /* 启动l2fwd线程 */ /* launch per-lcore init on every lcore */ rte_eal_mp_remote_launch(l2fwd_launch_one_lcore, NULL, CALL_MASTER); RTE_LCORE_FOREACH_SLAVE(lcore_id) { if (rte_eal_wait_lcore(lcore_id) < 0) return -1; } return 0;}
以下详细分析端口初始化过程; 对于每个port, 首先调用rte_eth_dev_configure配置端口的收发包队列个数,并初始化收发包队列控制块;
intrte_eth_dev_configure(uint8_t port_id, uint16_t nb_rx_q, uint16_t nb_tx_q, const struct rte_eth_conf *dev_conf){ struct rte_eth_dev *dev; struct rte_eth_dev_info dev_info; int diag; /* 只能由primary进程初始化 */ /* This function is only safe when called from the primary process * in a multi-process setup*/ PROC_PRIMARY_OR_ERR_RET(-E_RTE_SECONDARY); if (port_id >= nb_ports || port_id >= RTE_MAX_ETHPORTS) { PMD_DEBUG_TRACE("Invalid port_id=%d/n", port_id); return (-EINVAL); } dev = &rte_eth_devices[port_id]; /* 在PMD驱动初始化过程中,E1000的ops注册为eth_em_ops */ FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_infos_get, -ENOTSUP); FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_configure, -ENOTSUP); /* rte_eth_dev_start会把该标记为置为1 */ if (dev->data->dev_started) { PMD_DEBUG_TRACE( "port %d must be stopped to allow configuration/n", port_id); return (-EBUSY); } /* eth_em_infos_get会返回tx,rx队列数; 本例子max_rx_queues = 1 max_tx_queues = 1 */ /* * Check that the numbers of RX and TX queues are not greater * than the maximum number of RX and TX queues supported by the * configured device. */ (*dev->dev_ops->dev_infos_get)(dev, &dev_info); if (nb_rx_q > dev_info.max_rx_queues) { PMD_DEBUG_TRACE("ethdev port_id=%d nb_rx_queues=%d > %d/n", port_id, nb_rx_q, dev_info.max_rx_queues); return (-EINVAL); } if (nb_rx_q == 0) { PMD_DEBUG_TRACE("ethdev port_id=%d nb_rx_q == 0/n", port_id); return (-EINVAL); } if (nb_tx_q > dev_info.max_tx_queues) { PMD_DEBUG_TRACE("ethdev port_id=%d nb_tx_queues=%d > %d/n", port_id, nb_tx_q, dev_info.max_tx_queues); return (-EINVAL); } if (nb_tx_q == 0) { PMD_DEBUG_TRACE("ethdev port_id=%d nb_tx_q == 0/n", port_id); return (-EINVAL); } /* dev_conf里面是tx,rx模式的配置 */ /* Copy the dev_conf parameter into the dev structure */ memcpy(&dev->data->dev_conf, dev_conf, sizeof(dev->data->dev_conf)); /* 是否收大报文 一般不需要 */ /* * If jumbo frames are enabled, check that the maximum RX packet * length is supported by the configured device. */ if (dev_conf->rxmode.jumbo_frame == 1) { if (dev_conf->rxmode.max_rx_pkt_len > dev_info.max_rx_pktlen) { PMD_DEBUG_TRACE("ethdev port_id=%d max_rx_pkt_len %u" " > max valid value %u/n", port_id, (unsigned)dev_conf->rxmode.max_rx_pkt_len, (unsigned)dev_info.max_rx_pktlen); return (-EINVAL); } else if (dev_conf->rxmode.max_rx_pkt_len < ETHER_MIN_LEN) { PMD_DEBUG_TRACE("ethdev port_id=%d max_rx_pkt_len %u" " < min valid value %u/n", port_id, (unsigned)dev_conf->rxmode.max_rx_pkt_len, (unsigned)ETHER_MIN_LEN); return (-EINVAL); } } else /* Use default value */ dev->data->dev_conf.rxmode.max_rx_pkt_len = ETHER_MAX_LEN; /* 多队列的检查, 其中各种模式DCB/rss表示什么意思? */ /* multipe queue mode checking */ diag = rte_eth_dev_check_mq_mode(port_id, nb_rx_q, nb_tx_q, dev_conf); if (diag != 0) { PMD_DEBUG_TRACE("port%d rte_eth_dev_check_mq_mode = %d/n", port_id, diag); return diag; } /* * Setup new number of RX/TX queues and reconfigure device. */ /* RX队列控制块内存分配 */ diag = rte_eth_dev_rx_queue_config(dev, nb_rx_q); if (diag != 0) { PMD_DEBUG_TRACE("port%d rte_eth_dev_rx_queue_config = %d/n", port_id, diag); return diag; } /* TX队列控制块内存分配 */ diag = rte_eth_dev_tx_queue_config(dev, nb_tx_q); if (diag != 0) { PMD_DEBUG_TRACE("port%d rte_eth_dev_tx_queue_config = %d/n", port_id, diag); rte_eth_dev_rx_queue_config(dev, 0); return diag; } /* eth_em_configure, 标记intr->flags |= E1000_FLAG_NEED_LINK_UPDATE; */ diag = (*dev->dev_ops->dev_configure)(dev); if (diag != 0) { PMD_DEBUG_TRACE("port%d dev_configure = %d/n", port_id, diag); rte_eth_dev_rx_queue_config(dev, 0); rte_eth_dev_tx_queue_config(dev, 0); return diag; } return 0;}
RX queue setup
intrte_eth_rx_queue_setup(uint8_t port_id, uint16_t rx_queue_id, uint16_t nb_rx_desc, unsigned int socket_id, const struct rte_eth_rxconf *rx_conf, struct rte_mempool *mp){ struct rte_eth_dev *dev; struct rte_pktmbuf_pool_private *mbp_priv; struct rte_eth_dev_info dev_info; /* This function is only safe when called from the primary process * in a multi-process setup*/ PROC_PRIMARY_OR_ERR_RET(-E_RTE_SECONDARY); if (port_id >= nb_ports) { PMD_DEBUG_TRACE("Invalid port_id=%d/n", port_id); return (-EINVAL); } dev = &rte_eth_devices[port_id]; if (rx_queue_id >= dev->data->nb_rx_queues) { PMD_DEBUG_TRACE("Invalid RX queue_id=%d/n", rx_queue_id); return (-EINVAL); } if (dev->data->dev_started) { PMD_DEBUG_TRACE( "port %d must be stopped to allow configuration/n", port_id); return -EBUSY; } FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_infos_get, -ENOTSUP); FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_setup, -ENOTSUP); /* * Check the size of the mbuf data buffer. * This value must be provided in the private data of the memory pool. * First check that the memory pool has a valid private data. */ (*dev->dev_ops->dev_infos_get)(dev, &dev_info); if (mp->private_data_size < sizeof(struct rte_pktmbuf_pool_private)) { PMD_DEBUG_TRACE("%s private_data_size %d < %d/n", mp->name, (int) mp->private_data_size, (int) sizeof(struct rte_pktmbuf_pool_private)); return (-ENOSPC); } /* mbuf data部分大小(2048) > 256 */ mbp_priv = rte_mempool_get_priv(mp); if ((uint32_t) (mbp_priv->mbuf_data_room_size - RTE_PKTMBUF_HEADROOM) < dev_info.min_rx_bufsize) { PMD_DEBUG_TRACE("%s mbuf_data_room_size %d < %d " "(RTE_PKTMBUF_HEADROOM=%d + min_rx_bufsize(dev)" "=%d)/n", mp->name, (int)mbp_priv->mbuf_data_room_size, (int)(RTE_PKTMBUF_HEADROOM + dev_info.min_rx_bufsize), (int)RTE_PKTMBUF_HEADROOM, (int)dev_info.min_rx_bufsize); return (-EINVAL); } /* eth_em_rx_queue_setup, 初始化收包描述符 */ return (*dev->dev_ops->rx_queue_setup)(dev, rx_queue_id, nb_rx_desc, socket_id, rx_conf, mp);}
inteth_em_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_idx, uint16_t nb_desc, unsigned int socket_id, const struct rte_eth_rxconf *rx_conf, struct rte_mempool *mp){ const struct rte_memzone *rz; struct em_rx_queue *rxq; struct e1000_hw *hw; uint32_t rsize; hw = E1000_DEV_PRIVATE_TO_HW(dev->data->dev_private); /* * Validate number of receive descriptors. * It must not exceed hardware maximum, and must be multiple * of EM_ALIGN. */ if (((nb_desc * sizeof(rxq->rx_ring[0])) % EM_ALIGN) != 0 || (nb_desc > EM_MAX_RING_DESC) || (nb_desc < EM_MIN_RING_DESC)) { return (-EINVAL); } /* * EM devices don't support drop_en functionality */ if (rx_conf->rx_drop_en) { RTE_LOG(ERR, PMD, "drop_en functionality not supported by device/n"); return (-EINVAL); } /* 之前setup过, 释放资源 */ /* Free memory prior to re-allocation if needed. */ if (dev->data->rx_queues[queue_idx] != NULL) { em_rx_queue_release(dev->data->rx_queues[queue_idx]); dev->data->rx_queues[queue_idx] = NULL; } /* 名为rte_em_pmd_rx_ring_0_1的memzone分配,用于收包描述符 */ /* Allocate RX ring for max possible mumber of hardware descriptors. */ rsize = sizeof (rxq->rx_ring[0]) * EM_MAX_RING_DESC; if ((rz = ring_dma_zone_reserve(dev, "rx_ring", queue_idx, rsize, socket_id)) == NULL) return (-ENOMEM); /* rx队列控制块内存分配 */ /* Allocate the RX queue data structure. */ if ((rxq = rte_zmalloc("ethdev RX queue", sizeof(*rxq), CACHE_LINE_SIZE)) == NULL) return (-ENOMEM); /* 与rx描述符管理的mbuf指针 */ /* Allocate software ring. */ if ((rxq->sw_ring = rte_zmalloc("rxq->sw_ring", sizeof (rxq->sw_ring[0]) * nb_desc, CACHE_LINE_SIZE)) == NULL) { em_rx_queue_release(rxq); return (-ENOMEM); } rxq->mb_pool = mp; rxq->nb_rx_desc = nb_desc; rxq->pthresh = rx_conf->rx_thresh.pthresh; rxq->hthresh = rx_conf->rx_thresh.hthresh; rxq->wthresh = rx_conf->rx_thresh.wthresh; rxq->rx_free_thresh = rx_conf->rx_free_thresh; rxq->queue_id = queue_idx; rxq->port_id = dev->data->port_id; rxq->crc_len = (uint8_t) ((dev->data->dev_conf.rxmode.hw_strip_crc) ? 0 : ETHER_CRC_LEN); rxq->rdt_reg_addr = E1000_PCI_REG_ADDR(hw, E1000_RDT(queue_idx)); rxq->rdh_reg_addr = E1000_PCI_REG_ADDR(hw, E1000_RDH(queue_idx));#ifndef RTE_LIBRTE_XEN_DOM0 rxq->rx_ring_phys_addr = (uint64_t) rz->phys_addr;#else rxq->rx_ring_phys_addr = rte_mem_phy2mch(rz->memseg_id, rz->phys_addr); #endif rxq->rx_ring = (struct e1000_rx_desc *) rz->addr; PMD_INIT_LOG(DEBUG, "sw_ring=%p hw_ring=%p dma_addr=0x%"PRIx64"/n", rxq->sw_ring, rxq->rx_ring, rxq->rx_ring_phys_addr); dev->data->rx_queues[queue_idx] = rxq; em_reset_rx_queue(rxq); return (0);}
TX queue setup
intrte_eth_tx_queue_setup(uint8_t port_id, uint16_t tx_queue_id, uint16_t nb_tx_desc, unsigned int socket_id, const struct rte_eth_txconf *tx_conf){ struct rte_eth_dev *dev; /* This function is only safe when called from the primary process * in a multi-process setup*/ PROC_PRIMARY_OR_ERR_RET(-E_RTE_SECONDARY); if (port_id >= RTE_MAX_ETHPORTS || port_id >= nb_ports) { PMD_DEBUG_TRACE("Invalid port_id=%d/n", port_id); return (-EINVAL); } dev = &rte_eth_devices[port_id]; if (tx_queue_id >= dev->data->nb_tx_queues) { PMD_DEBUG_TRACE("Invalid TX queue_id=%d/n", tx_queue_id); return (-EINVAL); } /* 必须在设备启动前做初始化操作 */ if (dev->data->dev_started) { PMD_DEBUG_TRACE( "port %d must be stopped to allow configuration/n", port_id); return -EBUSY; } /* 调用PMD驱动的tx_queue_setup */ FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_setup, -ENOTSUP); return (*dev->dev_ops->tx_queue_setup)(dev, tx_queue_id, nb_tx_desc, socket_id, tx_conf);}
inteth_em_tx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_idx, uint16_t nb_desc, unsigned int socket_id, const struct rte_eth_txconf *tx_conf){ const struct rte_memzone *tz; struct em_tx_queue *txq; struct e1000_hw *hw; uint32_t tsize; uint16_t tx_rs_thresh, tx_free_thresh; hw = E1000_DEV_PRIVATE_TO_HW(dev->data->dev_private); /* tx descriptor必须是cache line对齐的 */ /* * Validate number of transmit descriptors. * It must not exceed hardware maximum, and must be multiple * of EM_ALIGN. */ if (((nb_desc * sizeof(*txq->tx_ring)) % EM_ALIGN) != 0 || (nb_desc > EM_MAX_RING_DESC) || (nb_desc < EM_MIN_RING_DESC)) { return -(EINVAL); } /* threshold 配置 */ tx_free_thresh = tx_conf->tx_free_thresh; if (tx_free_thresh == 0) tx_free_thresh = (uint16_t)RTE_MIN(nb_desc / 4, DEFAULT_TX_FREE_THRESH); tx_rs_thresh = tx_conf->tx_rs_thresh; if (tx_rs_thresh == 0) tx_rs_thresh = (uint16_t)RTE_MIN(tx_free_thresh, DEFAULT_TX_RS_THRESH); if (tx_free_thresh >= (nb_desc - 3)) { RTE_LOG(ERR, PMD, "tx_free_thresh must be less than the " "number of TX descriptors minus 3. (tx_free_thresh=%u " "port=%d queue=%d)/n", (unsigned int)tx_free_thresh, (int)dev->data->port_id, (int)queue_idx); return -(EINVAL); } if (tx_rs_thresh > tx_free_thresh) { RTE_LOG(ERR, PMD, "tx_rs_thresh must be less than or equal to " "tx_free_thresh. (tx_free_thresh=%u tx_rs_thresh=%u " "port=%d queue=%d)/n", (unsigned int)tx_free_thresh, (unsigned int)tx_rs_thresh, (int)dev->data->port_id, (int)queue_idx); return -(EINVAL); } /* * If rs_bit_thresh is greater than 1, then TX WTHRESH should be * set to 0. If WTHRESH is greater than zero, the RS bit is ignored * by the NIC and all descriptors are written back after the NIC * accumulates WTHRESH descriptors. */ if (tx_conf->tx_thresh.wthresh != 0 && tx_rs_thresh != 1) { RTE_LOG(ERR, PMD, "TX WTHRESH must be set to 0 if " "tx_rs_thresh is greater than 1. (tx_rs_thresh=%u " "port=%d queue=%d)/n", (unsigned int)tx_rs_thresh, (int)dev->data->port_id, (int)queue_idx); return -(EINVAL); } /* txq不为空,释放原先的队列中的mbuf和txq */ /* Free memory prior to re-allocation if needed... */ if (dev->data->tx_queues[queue_idx] != NULL) { em_tx_queue_release(dev->data->tx_queues[queue_idx]); dev->data->tx_queues[queue_idx] = NULL; } /* 分配名为rte_em_pmd_tx_ring_p_q的memzone, 用于存放EM_MAX_RING_DESC个tx descriptor */ /* * Allocate TX ring hardware descriptors. A memzone large enough to * handle the maximum ring size is allocated in order to allow for * resizing in later calls to the queue setup function. */ tsize = sizeof (txq->tx_ring[0]) * EM_MAX_RING_DESC; if ((tz = ring_dma_zone_reserve(dev, "tx_ring", queue_idx, tsize, socket_id)) == NULL) return (-ENOMEM); /* txq内存分配 */ /* Allocate the tx queue data structure. */ if ((txq = rte_zmalloc("ethdev TX queue", sizeof(*txq), CACHE_LINE_SIZE)) == NULL) return (-ENOMEM); /* txq sw_ring内存分配 */ /* Allocate software ring */ if ((txq->sw_ring = rte_zmalloc("txq->sw_ring", sizeof(txq->sw_ring[0]) * nb_desc, CACHE_LINE_SIZE)) == NULL) { em_tx_queue_release(txq); return (-ENOMEM); } txq->nb_tx_desc = nb_desc; txq->tx_free_thresh = tx_free_thresh; txq->tx_rs_thresh = tx_rs_thresh; txq->pthresh = tx_conf->tx_thresh.pthresh; txq->hthresh = tx_conf->tx_thresh.hthresh; txq->wthresh = tx_conf->tx_thresh.wthresh; txq->queue_id = queue_idx; txq->port_id = dev->data->port_id; txq->tdt_reg_addr = E1000_PCI_REG_ADDR(hw, E1000_TDT(queue_idx)); /* tx_ring的物理地址 */#ifndef RTE_LIBRTE_XEN_DOM0 txq->tx_ring_phys_addr = (uint64_t) tz->phys_addr;#else txq->tx_ring_phys_addr = rte_mem_phy2mch(tz->memseg_id, tz->phys_addr);#endif /* tx_ring的虚拟地址 */ txq->tx_ring = (struct e1000_data_desc *) tz->addr; PMD_INIT_LOG(DEBUG, "sw_ring=%p hw_ring=%p dma_addr=0x%"PRIx64"/n", txq->sw_ring, txq->tx_ring, txq->tx_ring_phys_addr); /* 环状队列初始化,每个entry的next指向下一个,最后一个指向第一个 */ em_reset_tx_queue(txq); dev->data->tx_queues[queue_idx] = txq; return (0);}
端口初始化的最后一步是使能端口收发包功能,其中主要是通知E1000驱动tx ring和rx ring的地址, 细节就不再跟进
voideth_em_tx_init(struct rte_eth_dev *dev){ struct e1000_hw *hw; struct em_tx_queue *txq; uint32_t tctl; uint32_t txdctl; uint16_t i; hw = E1000_DEV_PRIVATE_TO_HW(dev->data->dev_private); /* 把每一个queue的tx ring的物理地址通告给E1000驱动 */ /* Setup the Base and Length of the Tx Descriptor Rings. */ for (i = 0; i < dev->data->nb_tx_queues; i++) { uint64_t bus_addr; txq = dev->data->tx_queues[i]; bus_addr = txq->tx_ring_phys_addr; E1000_WRITE_REG(hw, E1000_TDLEN(i), txq->nb_tx_desc * sizeof(*txq->tx_ring)); E1000_WRITE_REG(hw, E1000_TDBAH(i), (uint32_t)(bus_addr >> 32)); E1000_WRITE_REG(hw, E1000_TDBAL(i), (uint32_t)bus_addr); /* Setup the HW Tx Head and Tail descriptor pointers. */ E1000_WRITE_REG(hw, E1000_TDT(i), 0); E1000_WRITE_REG(hw, E1000_TDH(i), 0); /* Setup Transmit threshold registers. */ txdctl = E1000_READ_REG(hw, E1000_TXDCTL(i)); /* * bit 22 is reserved, on some models should always be 0, * on others - always 1. */ txdctl &= E1000_TXDCTL_COUNT_DESC; txdctl |= txq->pthresh & 0x3F; txdctl |= (txq->hthresh & 0x3F) << 8; txdctl |= (txq->wthresh & 0x3F) << 16; txdctl |= E1000_TXDCTL_GRAN; E1000_WRITE_REG(hw, E1000_TXDCTL(i), txdctl); } /* Program the Transmit Control Register. */ tctl = E1000_READ_REG(hw, E1000_TCTL); tctl &= ~E1000_TCTL_CT; tctl |= (E1000_TCTL_PSP | E1000_TCTL_RTLC | E1000_TCTL_EN | (E1000_COLLISION_THRESHOLD << E1000_CT_SHIFT)); /* This write will effectively turn on the transmit unit. */ E1000_WRITE_REG(hw, E1000_TCTL, tctl);}
inteth_em_rx_init(struct rte_eth_dev *dev){ struct e1000_hw *hw; struct em_rx_queue *rxq; uint32_t rctl; uint32_t rfctl; uint32_t rxcsum; uint32_t rctl_bsize; uint16_t i; int ret; hw = E1000_DEV_PRIVATE_TO_HW(dev->data->dev_private); /* * Make sure receives are disabled while setting * up the descriptor ring. */ rctl = E1000_READ_REG(hw, E1000_RCTL); E1000_WRITE_REG(hw, E1000_RCTL, rctl & ~E1000_RCTL_EN); rfctl = E1000_READ_REG(hw, E1000_RFCTL); /* Disable extended descriptor type. */ rfctl &= ~E1000_RFCTL_EXTEN; /* Disable accelerated acknowledge */ if (hw->mac.type == e1000_82574) rfctl |= E1000_RFCTL_ACK_DIS; E1000_WRITE_REG(hw, E1000_RFCTL, rfctl); /* * XXX TEMPORARY WORKAROUND: on some systems with 82573 * long latencies are observed, like Lenovo X60. This * change eliminates the problem, but since having positive * values in RDTR is a known source of problems on other * platforms another solution is being sought. */ if (hw->mac.type == e1000_82573) E1000_WRITE_REG(hw, E1000_RDTR, 0x20); dev->rx_pkt_burst = (eth_rx_burst_t)eth_em_recv_pkts; /* 计算pkt buf的大小 */ /* Determine RX bufsize. */ rctl_bsize = EM_MAX_BUF_SIZE; for (i = 0; i < dev->data->nb_rx_queues; i++) { struct rte_pktmbuf_pool_private *mbp_priv; uint32_t buf_size; rxq = dev->data->rx_queues[i]; mbp_priv = rte_mempool_get_priv(rxq->mb_pool); buf_size = mbp_priv->mbuf_data_room_size - RTE_PKTMBUF_HEADROOM; rctl_bsize = RTE_MIN(rctl_bsize, buf_size); } rctl |= em_rctl_bsize(hw->mac.type, &rctl_bsize); /* Configure and enable each RX queue. */ for (i = 0; i < dev->data->nb_rx_queues; i++) { uint64_t bus_addr; uint32_t rxdctl; rxq = dev->data->rx_queues[i]; /* 从mbuf pool中分配mbuf, 填写到rxq->sw_ring,记录每个pkt buf的物理地址到rxq->rx_ring */ /* Allocate buffers for descriptor rings and setup queue */ ret = em_alloc_rx_queue_mbufs(rxq); if (ret) return ret; /* 把rx ring的物理地址通告给E1000驱动 */ /* * Reset crc_len in case it was changed after queue setup by a * call to configure */ rxq->crc_len = (uint8_t)(dev->data->dev_conf.rxmode.hw_strip_crc ? 0 : ETHER_CRC_LEN); bus_addr = rxq->rx_ring_phys_addr; E1000_WRITE_REG(hw, E1000_RDLEN(i), rxq->nb_rx_desc * sizeof(*rxq->rx_ring)); E1000_WRITE_REG(hw, E1000_RDBAH(i), (uint32_t)(bus_addr >> 32)); E1000_WRITE_REG(hw, E1000_RDBAL(i), (uint32_t)bus_addr); E1000_WRITE_REG(hw, E1000_RDH(i), 0); E1000_WRITE_REG(hw, E1000_RDT(i), rxq->nb_rx_desc - 1); rxdctl = E1000_READ_REG(hw, E1000_RXDCTL(0)); rxdctl &= 0xFE000000; rxdctl |= rxq->pthresh & 0x3F; rxdctl |= (rxq->hthresh & 0x3F) << 8; rxdctl |= (rxq->wthresh & 0x3F) << 16; rxdctl |= E1000_RXDCTL_GRAN; E1000_WRITE_REG(hw, E1000_RXDCTL(i), rxdctl); /* 收大报文用的收包函数 */ /* * Due to EM devices not having any sort of hardware * limit for packet length, jumbo frame of any size * can be accepted, thus we have to enable scattered * rx if jumbo frames are enabled (or if buffer size * is too small to accomodate non-jumbo packets) * to avoid splitting packets that don't fit into * one buffer. */ if (dev->data->dev_conf.rxmode.jumbo_frame || rctl_bsize < ETHER_MAX_LEN) { dev->rx_pkt_burst = (eth_rx_burst_t)eth_em_recv_scattered_pkts; dev->data->scattered_rx = 1; } } /* 以下省略 */ ... return 0;}
到此端口初始化完成,比启动,回到main函数中, 在每个lcore上启动循环收包函数
/* launch per-lcore init on every lcore */rte_eal_mp_remote_launch(l2fwd_launch_one_lcore, NULL, CALL_MASTER);
lcore的主线程处理如下
/* main processing loop */static voidl2fwd_main_loop(void){ struct rte_mbuf *pkts_burst[MAX_PKT_BURST]; struct rte_mbuf *m; unsigned lcore_id; uint64_t prev_tsc, diff_tsc, cur_tsc, timer_tsc; unsigned i, j, portid, nb_rx; struct lcore_queue_conf *qconf; const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * BURST_TX_DRAIN_US; prev_tsc = 0; timer_tsc = 0; lcore_id = rte_lcore_id(); qconf = &lcore_queue_conf[lcore_id]; if (qconf->n_rx_port == 0) { RTE_LOG(INFO, L2FWD, "lcore %u has nothing to do/n", lcore_id); return; } RTE_LOG(INFO, L2FWD, "entering main loop on lcore %u/n", lcore_id); /* 当前lcore需要处理哪些port(queue) */ for (i = 0; i < qconf->n_rx_port; i++) { portid = qconf->rx_port_list[i]; RTE_LOG(INFO, L2FWD, " -- lcoreid=%u portid=%u/n", lcore_id, portid); } while (1) { cur_tsc = rte_rdtsc(); /* * TX burst queue drain */ diff_tsc = cur_tsc - prev_tsc; /* 隔一段时间才把所有要发送的报文发送出去并打印统计信息 */ if (unlikely(diff_tsc > drain_tsc)) { for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) { /* 当前port没有需要发送的报文 */ if (qconf->tx_mbufs[portid].len == 0) continue; /* 调用device的发包函数并统计发送的报文个数 */ l2fwd_send_burst(&lcore_queue_conf[lcore_id], qconf->tx_mbufs[portid].len, (uint8_t) portid); /* 到此应该当前端口需要发送的报文全部发送,因此len置为0 */ qconf->tx_mbufs[portid].len = 0; } /* if timer is enabled */ if (timer_period > 0) { /* advance the timer */ timer_tsc += diff_tsc; /* if timer has reached its timeout */ if (unlikely(timer_tsc >= (uint64_t) timer_period)) { /* do this only on master core */ if (lcore_id == rte_get_master_lcore()) { print_stats(); /* reset the timer */ timer_tsc = 0; } } } prev_tsc = cur_tsc; } /* 当前lcore需要处理的queue */ /* * Read packet from RX queues */ for (i = 0; i < qconf->n_rx_port; i++) { portid = qconf->rx_port_list[i]; /* 当前port只有queue0 */ nb_rx = rte_eth_rx_burst((uint8_t) portid, 0, pkts_burst, MAX_PKT_BURST); /* 更新收包统计 */ port_statistics[portid].rx += nb_rx; /* 把所有收上来的报文修改目的MAC后加入到发包队列 */ for (j = 0; j < nb_rx; j++) { m = pkts_burst[j]; /* PKT DATA部分载入cache,这个好像收包部分已经prefetch过了 */ rte_prefetch0(rte_pktmbuf_mtod(m, void *)); /* forWord */ l2fwd_simple_forward(m, portid); } } }}
首先看报文是如何收上来的, 调用device的rx_pkt_burst
static inline uint16_trte_eth_rx_burst(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **rx_pkts, uint16_t nb_pkts){ struct rte_eth_dev *dev; dev = &rte_eth_devices[port_id]; return (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id], rx_pkts, nb_pkts);}
PMD的收包函数如下:
uint16_teth_em_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts){ /* volatile防止编译器优化,每次使用必须重新从memory中取而不是用寄存器的值 */ volatile struct e1000_rx_desc *rx_ring; volatile struct e1000_rx_desc *rxdp; struct em_rx_queue *rxq; struct em_rx_entry *sw_ring; struct em_rx_entry *rxe; struct rte_mbuf *rxm; struct rte_mbuf *nmb; struct e1000_rx_desc rxd; uint64_t dma_addr; uint16_t pkt_len; uint16_t rx_id; uint16_t nb_rx; uint16_t nb_hold; uint8_t status; rxq = rx_queue; nb_rx = 0; nb_hold = 0; rx_id = rxq->rx_tail; /* 当前收包位置 */ rx_ring = rxq->rx_ring; /* rx descriptor */ sw_ring = rxq->sw_ring; /* mbuf */ /* 一次性收32个报文 */ while (nb_rx < nb_pkts) { /* * The order of Operations here is important as the DD status * bit must not be read after any other descriptor fields. * rx_ring and rxdp are pointing to volatile data so the order * of accesses cannot be reordered by the compiler. If they were * not volatile, they could be reordered which could lead to * using invalid descriptor fields when read from rxd. */ /* 当前报文的descriptor */ rxdp = &rx_ring[rx_id]; /* 结束标记,必须首先读取 */ status = rxdp->status;
新闻热点
疑难解答