首页 > 学院 > 操作系统 > 正文

dpdk l2fwd (2)

2024-06-28 13:27:02
字体:
来源:转载
供稿:网友
dpdk l2fwd (2)

回到l2fwd的main函数中

intMAIN(int argc, char **argv){    struct lcore_queue_conf *qconf;    struct rte_eth_dev_info dev_info;    int ret;    uint8_t nb_ports;    uint8_t nb_ports_available;    uint8_t portid, last_port;    unsigned lcore_id, rx_lcore_id;    unsigned nb_ports_in_mask = 0;    /* init EAL */    ret = rte_eal_init(argc, argv);    if (ret < 0)        rte_exit(EXIT_FAILURE, "Invalid EAL arguments/n");    argc -= ret;    argv += ret;    /* parse application arguments (after the EAL ones) */    ret = l2fwd_parse_args(argc, argv);    if (ret < 0)        rte_exit(EXIT_FAILURE, "Invalid L2FWD arguments/n");    /* create the mbuf pool */    l2fwd_pktmbuf_pool =        rte_mempool_create("mbuf_pool", NB_MBUF,                   MBUF_SIZE, 32,                   sizeof(struct rte_pktmbuf_pool_PRivate),                   rte_pktmbuf_pool_init, NULL,                   rte_pktmbuf_init, NULL,                   rte_socket_id(), 0);    if (l2fwd_pktmbuf_pool == NULL)        rte_exit(EXIT_FAILURE, "Cannot init mbuf pool/n");    /* init driver(s) */    if (rte_pmd_init_all() < 0)        rte_exit(EXIT_FAILURE, "Cannot init pmd/n");    if (rte_eal_pci_probe() < 0)        rte_exit(EXIT_FAILURE, "Cannot probe PCI/n");    nb_ports = rte_eth_dev_count();    if (nb_ports == 0)        rte_exit(EXIT_FAILURE, "No Ethernet ports - bye/n");    if (nb_ports > RTE_MAX_ETHPORTS)        nb_ports = RTE_MAX_ETHPORTS;    /* reset l2fwd_dst_ports */    for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++)        l2fwd_dst_ports[portid] = 0;    last_port = 0;    /* port0发给port1, port1发给port0. 两个端口为一对,互相发包 */    /*     * Each logical core is assigned a dedicated TX queue on each port.     */    for (portid = 0; portid < nb_ports; portid++) {        /* skip ports that are not enabled */        if ((l2fwd_enabled_port_mask & (1 << portid)) == 0)            continue;        if (nb_ports_in_mask % 2) {            l2fwd_dst_ports[portid] = last_port;            l2fwd_dst_ports[last_port] = portid;        }        else            last_port = portid;        nb_ports_in_mask++;        rte_eth_dev_info_get(portid, &dev_info);    }    if (nb_ports_in_mask % 2) {        printf("Notice: odd number of ports in portmask./n");        l2fwd_dst_ports[last_port] = last_port;    }    rx_lcore_id = 0;    qconf = NULL;    /* 每个core负责收l2fwd_rx_queue_per_lcore个端口, 每个端口(其实应该是QUEUE,因为这里一个port只有一个QUEUE)只能由一个lcore进行收包 */    /* Initialize the port/queue configuration of each logical core */    for (portid = 0; portid < nb_ports; portid++) {        /* skip ports that are not enabled */        if ((l2fwd_enabled_port_mask & (1 << portid)) == 0)            continue;        /* get the lcore_id for this port */        while (rte_lcore_is_enabled(rx_lcore_id) == 0 ||               lcore_queue_conf[rx_lcore_id].n_rx_port ==               l2fwd_rx_queue_per_lcore) {            rx_lcore_id++;            if (rx_lcore_id >= RTE_MAX_LCORE)                rte_exit(EXIT_FAILURE, "Not enough cores/n");        }        if (qconf != &lcore_queue_conf[rx_lcore_id])            /* Assigned a new logical core in the loop above. */            qconf = &lcore_queue_conf[rx_lcore_id];        qconf->rx_port_list[qconf->n_rx_port] = portid;        qconf->n_rx_port++;        printf("Lcore %u: RX port %u/n", rx_lcore_id, (unsigned) portid);    }    nb_ports_available = nb_ports;    /* 每个port收发包队列的初始化 */    /* Initialise each port */    for (portid = 0; portid < nb_ports; portid++) {        /* skip ports that are not enabled */        if ((l2fwd_enabled_port_mask & (1 << portid)) == 0) {            printf("Skipping disabled port %u/n", (unsigned) portid);            nb_ports_available--;            continue;        }        /* init port */        printf("Initializing port %u... ", (unsigned) portid);        fflush(stdout);        ret = rte_eth_dev_configure(portid, 1, 1, &port_conf);        if (ret < 0)            rte_exit(EXIT_FAILURE, "Cannot configure device: err=%d, port=%u/n",                  ret, (unsigned) portid);        rte_eth_macaddr_get(portid,&l2fwd_ports_eth_addr[portid]);        /* init one RX queue */        fflush(stdout);        ret = rte_eth_rx_queue_setup(portid, 0, nb_rxd,                         rte_eth_dev_socket_id(portid), &rx_conf,                         l2fwd_pktmbuf_pool);        if (ret < 0)            rte_exit(EXIT_FAILURE, "rte_eth_rx_queue_setup:err=%d, port=%u/n",                  ret, (unsigned) portid);        /* init one TX queue on each port */        fflush(stdout);        ret = rte_eth_tx_queue_setup(portid, 0, nb_txd,                rte_eth_dev_socket_id(portid), &tx_conf);        if (ret < 0)            rte_exit(EXIT_FAILURE, "rte_eth_tx_queue_setup:err=%d, port=%u/n",                ret, (unsigned) portid);        /* Start device */        ret = rte_eth_dev_start(portid);        if (ret < 0)            rte_exit(EXIT_FAILURE, "rte_eth_dev_start:err=%d, port=%u/n",                  ret, (unsigned) portid);        printf("done: /n");        rte_eth_promiscuous_enable(portid);        printf("Port %u, MAC address: %02X:%02X:%02X:%02X:%02X:%02X/n/n",                (unsigned) portid,                l2fwd_ports_eth_addr[portid].addr_bytes[0],                l2fwd_ports_eth_addr[portid].addr_bytes[1],                l2fwd_ports_eth_addr[portid].addr_bytes[2],                l2fwd_ports_eth_addr[portid].addr_bytes[3],                l2fwd_ports_eth_addr[portid].addr_bytes[4],                l2fwd_ports_eth_addr[portid].addr_bytes[5]);        /* initialize port stats */        memset(&port_statistics, 0, sizeof(port_statistics));    }    if (!nb_ports_available) {        rte_exit(EXIT_FAILURE,            "All available ports are disabled. Please set portmask./n");    }    check_all_ports_link_status(nb_ports, l2fwd_enabled_port_mask);    /* 启动l2fwd线程 */    /* launch per-lcore init on every lcore */    rte_eal_mp_remote_launch(l2fwd_launch_one_lcore, NULL, CALL_MASTER);    RTE_LCORE_FOREACH_SLAVE(lcore_id) {        if (rte_eal_wait_lcore(lcore_id) < 0)            return -1;    }    return 0;}

以下详细分析端口初始化过程; 对于每个port, 首先调用rte_eth_dev_configure配置端口的收发包队列个数,并初始化收发包队列控制块;

intrte_eth_dev_configure(uint8_t port_id, uint16_t nb_rx_q, uint16_t nb_tx_q,              const struct rte_eth_conf *dev_conf){    struct rte_eth_dev *dev;    struct rte_eth_dev_info dev_info;    int diag;    /* 只能由primary进程初始化 */    /* This function is only safe when called from the primary process     * in a multi-process setup*/    PROC_PRIMARY_OR_ERR_RET(-E_RTE_SECONDARY);    if (port_id >= nb_ports || port_id >= RTE_MAX_ETHPORTS) {        PMD_DEBUG_TRACE("Invalid port_id=%d/n", port_id);        return (-EINVAL);    }    dev = &rte_eth_devices[port_id];    /* 在PMD驱动初始化过程中,E1000的ops注册为eth_em_ops */    FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_infos_get, -ENOTSUP);    FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_configure, -ENOTSUP);    /* rte_eth_dev_start会把该标记为置为1 */    if (dev->data->dev_started) {        PMD_DEBUG_TRACE(            "port %d must be stopped to allow configuration/n", port_id);        return (-EBUSY);    }    /* eth_em_infos_get会返回tx,rx队列数; 本例子max_rx_queues = 1 max_tx_queues = 1 */    /*     * Check that the numbers of RX and TX queues are not greater     * than the maximum number of RX and TX queues supported by the     * configured device.     */    (*dev->dev_ops->dev_infos_get)(dev, &dev_info);    if (nb_rx_q > dev_info.max_rx_queues) {        PMD_DEBUG_TRACE("ethdev port_id=%d nb_rx_queues=%d > %d/n",                port_id, nb_rx_q, dev_info.max_rx_queues);        return (-EINVAL);    }    if (nb_rx_q == 0) {        PMD_DEBUG_TRACE("ethdev port_id=%d nb_rx_q == 0/n", port_id);        return (-EINVAL);    }    if (nb_tx_q > dev_info.max_tx_queues) {        PMD_DEBUG_TRACE("ethdev port_id=%d nb_tx_queues=%d > %d/n",                port_id, nb_tx_q, dev_info.max_tx_queues);        return (-EINVAL);    }    if (nb_tx_q == 0) {        PMD_DEBUG_TRACE("ethdev port_id=%d nb_tx_q == 0/n", port_id);        return (-EINVAL);    }    /* dev_conf里面是tx,rx模式的配置 */    /* Copy the dev_conf parameter into the dev structure */    memcpy(&dev->data->dev_conf, dev_conf, sizeof(dev->data->dev_conf));    /* 是否收大报文 一般不需要 */    /*     * If jumbo frames are enabled, check that the maximum RX packet     * length is supported by the configured device.     */    if (dev_conf->rxmode.jumbo_frame == 1) {        if (dev_conf->rxmode.max_rx_pkt_len >            dev_info.max_rx_pktlen) {            PMD_DEBUG_TRACE("ethdev port_id=%d max_rx_pkt_len %u"                " > max valid value %u/n",                port_id,                (unsigned)dev_conf->rxmode.max_rx_pkt_len,                (unsigned)dev_info.max_rx_pktlen);            return (-EINVAL);        }        else if (dev_conf->rxmode.max_rx_pkt_len < ETHER_MIN_LEN) {            PMD_DEBUG_TRACE("ethdev port_id=%d max_rx_pkt_len %u"                " < min valid value %u/n",                port_id,                (unsigned)dev_conf->rxmode.max_rx_pkt_len,                (unsigned)ETHER_MIN_LEN);            return (-EINVAL);        }    } else        /* Use default value */        dev->data->dev_conf.rxmode.max_rx_pkt_len = ETHER_MAX_LEN;    /* 多队列的检查, 其中各种模式DCB/rss表示什么意思? */    /* multipe queue mode checking */    diag = rte_eth_dev_check_mq_mode(port_id, nb_rx_q, nb_tx_q, dev_conf);    if (diag != 0) {        PMD_DEBUG_TRACE("port%d rte_eth_dev_check_mq_mode = %d/n",                port_id, diag);        return diag;    }    /*     * Setup new number of RX/TX queues and reconfigure device.     */    /* RX队列控制块内存分配 */    diag = rte_eth_dev_rx_queue_config(dev, nb_rx_q);    if (diag != 0) {        PMD_DEBUG_TRACE("port%d rte_eth_dev_rx_queue_config = %d/n",                port_id, diag);        return diag;    }    /* TX队列控制块内存分配 */    diag = rte_eth_dev_tx_queue_config(dev, nb_tx_q);    if (diag != 0) {        PMD_DEBUG_TRACE("port%d rte_eth_dev_tx_queue_config = %d/n",                port_id, diag);        rte_eth_dev_rx_queue_config(dev, 0);        return diag;    }    /* eth_em_configure, 标记intr->flags |= E1000_FLAG_NEED_LINK_UPDATE; */    diag = (*dev->dev_ops->dev_configure)(dev);    if (diag != 0) {        PMD_DEBUG_TRACE("port%d dev_configure = %d/n",                port_id, diag);        rte_eth_dev_rx_queue_config(dev, 0);        rte_eth_dev_tx_queue_config(dev, 0);        return diag;    }    return 0;}

RX queue setup

intrte_eth_rx_queue_setup(uint8_t port_id, uint16_t rx_queue_id,               uint16_t nb_rx_desc, unsigned int socket_id,               const struct rte_eth_rxconf *rx_conf,               struct rte_mempool *mp){    struct rte_eth_dev *dev;    struct rte_pktmbuf_pool_private *mbp_priv;    struct rte_eth_dev_info dev_info;    /* This function is only safe when called from the primary process     * in a multi-process setup*/    PROC_PRIMARY_OR_ERR_RET(-E_RTE_SECONDARY);    if (port_id >= nb_ports) {        PMD_DEBUG_TRACE("Invalid port_id=%d/n", port_id);        return (-EINVAL);    }    dev = &rte_eth_devices[port_id];    if (rx_queue_id >= dev->data->nb_rx_queues) {        PMD_DEBUG_TRACE("Invalid RX queue_id=%d/n", rx_queue_id);        return (-EINVAL);    }    if (dev->data->dev_started) {        PMD_DEBUG_TRACE(            "port %d must be stopped to allow configuration/n", port_id);        return -EBUSY;    }    FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_infos_get, -ENOTSUP);    FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_setup, -ENOTSUP);    /*     * Check the size of the mbuf data buffer.     * This value must be provided in the private data of the memory pool.     * First check that the memory pool has a valid private data.     */    (*dev->dev_ops->dev_infos_get)(dev, &dev_info);    if (mp->private_data_size < sizeof(struct rte_pktmbuf_pool_private)) {        PMD_DEBUG_TRACE("%s private_data_size %d < %d/n",                mp->name, (int) mp->private_data_size,                (int) sizeof(struct rte_pktmbuf_pool_private));        return (-ENOSPC);    }    /* mbuf data部分大小(2048) > 256 */    mbp_priv = rte_mempool_get_priv(mp);    if ((uint32_t) (mbp_priv->mbuf_data_room_size - RTE_PKTMBUF_HEADROOM) <        dev_info.min_rx_bufsize) {        PMD_DEBUG_TRACE("%s mbuf_data_room_size %d < %d "                "(RTE_PKTMBUF_HEADROOM=%d + min_rx_bufsize(dev)"                "=%d)/n",                mp->name,                (int)mbp_priv->mbuf_data_room_size,                (int)(RTE_PKTMBUF_HEADROOM +                      dev_info.min_rx_bufsize),                (int)RTE_PKTMBUF_HEADROOM,                (int)dev_info.min_rx_bufsize);        return (-EINVAL);    }    /* eth_em_rx_queue_setup, 初始化收包描述符 */    return (*dev->dev_ops->rx_queue_setup)(dev, rx_queue_id, nb_rx_desc,                           socket_id, rx_conf, mp);}

inteth_em_rx_queue_setup(struct rte_eth_dev *dev,        uint16_t queue_idx,        uint16_t nb_desc,        unsigned int socket_id,        const struct rte_eth_rxconf *rx_conf,        struct rte_mempool *mp){    const struct rte_memzone *rz;    struct em_rx_queue *rxq;    struct e1000_hw     *hw;    uint32_t rsize;    hw = E1000_DEV_PRIVATE_TO_HW(dev->data->dev_private);    /*     * Validate number of receive descriptors.     * It must not exceed hardware maximum, and must be multiple     * of EM_ALIGN.     */    if (((nb_desc * sizeof(rxq->rx_ring[0])) % EM_ALIGN) != 0 ||            (nb_desc > EM_MAX_RING_DESC) ||            (nb_desc < EM_MIN_RING_DESC)) {        return (-EINVAL);    }    /*     * EM devices don't support drop_en functionality     */    if (rx_conf->rx_drop_en) {        RTE_LOG(ERR, PMD, "drop_en functionality not supported by device/n");        return (-EINVAL);    }    /* 之前setup过, 释放资源 */    /* Free memory prior to re-allocation if needed. */    if (dev->data->rx_queues[queue_idx] != NULL) {        em_rx_queue_release(dev->data->rx_queues[queue_idx]);        dev->data->rx_queues[queue_idx] = NULL;    }    /* 名为rte_em_pmd_rx_ring_0_1的memzone分配,用于收包描述符 */    /* Allocate RX ring for max possible mumber of hardware descriptors. */    rsize = sizeof (rxq->rx_ring[0]) * EM_MAX_RING_DESC;    if ((rz = ring_dma_zone_reserve(dev, "rx_ring", queue_idx, rsize,            socket_id)) == NULL)        return (-ENOMEM);    /* rx队列控制块内存分配 */    /* Allocate the RX queue data structure. */    if ((rxq = rte_zmalloc("ethdev RX queue", sizeof(*rxq),            CACHE_LINE_SIZE)) == NULL)        return (-ENOMEM);    /* 与rx描述符管理的mbuf指针 */    /* Allocate software ring. */    if ((rxq->sw_ring = rte_zmalloc("rxq->sw_ring",            sizeof (rxq->sw_ring[0]) * nb_desc,            CACHE_LINE_SIZE)) == NULL) {        em_rx_queue_release(rxq);        return (-ENOMEM);    }    rxq->mb_pool = mp;    rxq->nb_rx_desc = nb_desc;    rxq->pthresh = rx_conf->rx_thresh.pthresh;    rxq->hthresh = rx_conf->rx_thresh.hthresh;    rxq->wthresh = rx_conf->rx_thresh.wthresh;    rxq->rx_free_thresh = rx_conf->rx_free_thresh;    rxq->queue_id = queue_idx;    rxq->port_id = dev->data->port_id;    rxq->crc_len = (uint8_t) ((dev->data->dev_conf.rxmode.hw_strip_crc) ?                0 : ETHER_CRC_LEN);    rxq->rdt_reg_addr = E1000_PCI_REG_ADDR(hw, E1000_RDT(queue_idx));    rxq->rdh_reg_addr = E1000_PCI_REG_ADDR(hw, E1000_RDH(queue_idx));#ifndef RTE_LIBRTE_XEN_DOM0        rxq->rx_ring_phys_addr = (uint64_t) rz->phys_addr;#else    rxq->rx_ring_phys_addr = rte_mem_phy2mch(rz->memseg_id, rz->phys_addr); #endif     rxq->rx_ring = (struct e1000_rx_desc *) rz->addr;    PMD_INIT_LOG(DEBUG, "sw_ring=%p hw_ring=%p dma_addr=0x%"PRIx64"/n",        rxq->sw_ring, rxq->rx_ring, rxq->rx_ring_phys_addr);    dev->data->rx_queues[queue_idx] = rxq;    em_reset_rx_queue(rxq);    return (0);}

TX queue setup

intrte_eth_tx_queue_setup(uint8_t port_id, uint16_t tx_queue_id,               uint16_t nb_tx_desc, unsigned int socket_id,               const struct rte_eth_txconf *tx_conf){    struct rte_eth_dev *dev;    /* This function is only safe when called from the primary process     * in a multi-process setup*/    PROC_PRIMARY_OR_ERR_RET(-E_RTE_SECONDARY);    if (port_id >= RTE_MAX_ETHPORTS || port_id >= nb_ports) {        PMD_DEBUG_TRACE("Invalid port_id=%d/n", port_id);        return (-EINVAL);    }    dev = &rte_eth_devices[port_id];    if (tx_queue_id >= dev->data->nb_tx_queues) {        PMD_DEBUG_TRACE("Invalid TX queue_id=%d/n", tx_queue_id);        return (-EINVAL);    }    /* 必须在设备启动前做初始化操作 */    if (dev->data->dev_started) {        PMD_DEBUG_TRACE(            "port %d must be stopped to allow configuration/n", port_id);        return -EBUSY;    }    /* 调用PMD驱动的tx_queue_setup */    FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_setup, -ENOTSUP);    return (*dev->dev_ops->tx_queue_setup)(dev, tx_queue_id, nb_tx_desc,                           socket_id, tx_conf);}

inteth_em_tx_queue_setup(struct rte_eth_dev *dev,             uint16_t queue_idx,             uint16_t nb_desc,             unsigned int socket_id,             const struct rte_eth_txconf *tx_conf){    const struct rte_memzone *tz;    struct em_tx_queue *txq;    struct e1000_hw     *hw;    uint32_t tsize;    uint16_t tx_rs_thresh, tx_free_thresh;    hw = E1000_DEV_PRIVATE_TO_HW(dev->data->dev_private);    /* tx descriptor必须是cache line对齐的 */    /*     * Validate number of transmit descriptors.     * It must not exceed hardware maximum, and must be multiple     * of EM_ALIGN.     */    if (((nb_desc * sizeof(*txq->tx_ring)) % EM_ALIGN) != 0 ||            (nb_desc > EM_MAX_RING_DESC) ||            (nb_desc < EM_MIN_RING_DESC)) {        return -(EINVAL);    }    /* threshold 配置 */    tx_free_thresh = tx_conf->tx_free_thresh;    if (tx_free_thresh == 0)        tx_free_thresh = (uint16_t)RTE_MIN(nb_desc / 4,                    DEFAULT_TX_FREE_THRESH);    tx_rs_thresh = tx_conf->tx_rs_thresh;    if (tx_rs_thresh == 0)        tx_rs_thresh = (uint16_t)RTE_MIN(tx_free_thresh,                    DEFAULT_TX_RS_THRESH);    if (tx_free_thresh >= (nb_desc - 3)) {        RTE_LOG(ERR, PMD, "tx_free_thresh must be less than the "            "number of TX descriptors minus 3. (tx_free_thresh=%u "            "port=%d queue=%d)/n", (unsigned int)tx_free_thresh,                (int)dev->data->port_id, (int)queue_idx);        return -(EINVAL);    }    if (tx_rs_thresh > tx_free_thresh) {        RTE_LOG(ERR, PMD, "tx_rs_thresh must be less than or equal to "            "tx_free_thresh. (tx_free_thresh=%u tx_rs_thresh=%u "            "port=%d queue=%d)/n", (unsigned int)tx_free_thresh,            (unsigned int)tx_rs_thresh, (int)dev->data->port_id,                            (int)queue_idx);        return -(EINVAL);    }    /*     * If rs_bit_thresh is greater than 1, then TX WTHRESH should be     * set to 0. If WTHRESH is greater than zero, the RS bit is ignored     * by the NIC and all descriptors are written back after the NIC     * accumulates WTHRESH descriptors.     */    if (tx_conf->tx_thresh.wthresh != 0 && tx_rs_thresh != 1) {        RTE_LOG(ERR, PMD, "TX WTHRESH must be set to 0 if "            "tx_rs_thresh is greater than 1. (tx_rs_thresh=%u "            "port=%d queue=%d)/n", (unsigned int)tx_rs_thresh,                (int)dev->data->port_id, (int)queue_idx);        return -(EINVAL);    }    /* txq不为空,释放原先的队列中的mbuf和txq */    /* Free memory prior to re-allocation if needed... */    if (dev->data->tx_queues[queue_idx] != NULL) {        em_tx_queue_release(dev->data->tx_queues[queue_idx]);        dev->data->tx_queues[queue_idx] = NULL;    }    /* 分配名为rte_em_pmd_tx_ring_p_q的memzone, 用于存放EM_MAX_RING_DESC个tx descriptor */    /*     * Allocate TX ring hardware descriptors. A memzone large enough to     * handle the maximum ring size is allocated in order to allow for     * resizing in later calls to the queue setup function.     */    tsize = sizeof (txq->tx_ring[0]) * EM_MAX_RING_DESC;    if ((tz = ring_dma_zone_reserve(dev, "tx_ring", queue_idx, tsize,            socket_id)) == NULL)        return (-ENOMEM);    /* txq内存分配 */    /* Allocate the tx queue data structure. */    if ((txq = rte_zmalloc("ethdev TX queue", sizeof(*txq),            CACHE_LINE_SIZE)) == NULL)        return (-ENOMEM);    /* txq sw_ring内存分配 */    /* Allocate software ring */    if ((txq->sw_ring = rte_zmalloc("txq->sw_ring",            sizeof(txq->sw_ring[0]) * nb_desc,            CACHE_LINE_SIZE)) == NULL) {        em_tx_queue_release(txq);        return (-ENOMEM);    }    txq->nb_tx_desc = nb_desc;    txq->tx_free_thresh = tx_free_thresh;    txq->tx_rs_thresh = tx_rs_thresh;    txq->pthresh = tx_conf->tx_thresh.pthresh;    txq->hthresh = tx_conf->tx_thresh.hthresh;    txq->wthresh = tx_conf->tx_thresh.wthresh;    txq->queue_id = queue_idx;    txq->port_id = dev->data->port_id;    txq->tdt_reg_addr = E1000_PCI_REG_ADDR(hw, E1000_TDT(queue_idx));    /* tx_ring的物理地址 */#ifndef RTE_LIBRTE_XEN_DOM0    txq->tx_ring_phys_addr = (uint64_t) tz->phys_addr;#else       txq->tx_ring_phys_addr = rte_mem_phy2mch(tz->memseg_id, tz->phys_addr);#endif    /* tx_ring的虚拟地址 */    txq->tx_ring = (struct e1000_data_desc *) tz->addr;    PMD_INIT_LOG(DEBUG, "sw_ring=%p hw_ring=%p dma_addr=0x%"PRIx64"/n",        txq->sw_ring, txq->tx_ring, txq->tx_ring_phys_addr);    /* 环状队列初始化,每个entry的next指向下一个,最后一个指向第一个 */    em_reset_tx_queue(txq);    dev->data->tx_queues[queue_idx] = txq;    return (0);}

端口初始化的最后一步是使能端口收发包功能,其中主要是通知E1000驱动tx ring和rx ring的地址, 细节就不再跟进

voideth_em_tx_init(struct rte_eth_dev *dev){    struct e1000_hw     *hw;    struct em_tx_queue *txq;    uint32_t tctl;    uint32_t txdctl;    uint16_t i;    hw = E1000_DEV_PRIVATE_TO_HW(dev->data->dev_private);    /* 把每一个queue的tx ring的物理地址通告给E1000驱动 */    /* Setup the Base and Length of the Tx Descriptor Rings. */    for (i = 0; i < dev->data->nb_tx_queues; i++) {        uint64_t bus_addr;        txq = dev->data->tx_queues[i];        bus_addr = txq->tx_ring_phys_addr;        E1000_WRITE_REG(hw, E1000_TDLEN(i),                txq->nb_tx_desc *                sizeof(*txq->tx_ring));        E1000_WRITE_REG(hw, E1000_TDBAH(i),                (uint32_t)(bus_addr >> 32));        E1000_WRITE_REG(hw, E1000_TDBAL(i), (uint32_t)bus_addr);        /* Setup the HW Tx Head and Tail descriptor pointers. */        E1000_WRITE_REG(hw, E1000_TDT(i), 0);        E1000_WRITE_REG(hw, E1000_TDH(i), 0);        /* Setup Transmit threshold registers. */        txdctl = E1000_READ_REG(hw, E1000_TXDCTL(i));        /*         * bit 22 is reserved, on some models should always be 0,         * on others  - always 1.         */        txdctl &= E1000_TXDCTL_COUNT_DESC;        txdctl |= txq->pthresh & 0x3F;        txdctl |= (txq->hthresh & 0x3F) << 8;        txdctl |= (txq->wthresh & 0x3F) << 16;        txdctl |= E1000_TXDCTL_GRAN;        E1000_WRITE_REG(hw, E1000_TXDCTL(i), txdctl);    }    /* Program the Transmit Control Register. */    tctl = E1000_READ_REG(hw, E1000_TCTL);    tctl &= ~E1000_TCTL_CT;    tctl |= (E1000_TCTL_PSP | E1000_TCTL_RTLC | E1000_TCTL_EN |         (E1000_COLLISION_THRESHOLD << E1000_CT_SHIFT));    /* This write will effectively turn on the transmit unit. */    E1000_WRITE_REG(hw, E1000_TCTL, tctl);}

inteth_em_rx_init(struct rte_eth_dev *dev){    struct e1000_hw *hw;    struct em_rx_queue *rxq;    uint32_t rctl;    uint32_t rfctl;    uint32_t rxcsum;    uint32_t rctl_bsize;    uint16_t i;    int ret;    hw = E1000_DEV_PRIVATE_TO_HW(dev->data->dev_private);    /*     * Make sure receives are disabled while setting     * up the descriptor ring.     */    rctl = E1000_READ_REG(hw, E1000_RCTL);    E1000_WRITE_REG(hw, E1000_RCTL, rctl & ~E1000_RCTL_EN);    rfctl = E1000_READ_REG(hw, E1000_RFCTL);    /* Disable extended descriptor type. */    rfctl &= ~E1000_RFCTL_EXTEN;    /* Disable accelerated acknowledge */    if (hw->mac.type == e1000_82574)        rfctl |= E1000_RFCTL_ACK_DIS;    E1000_WRITE_REG(hw, E1000_RFCTL, rfctl);    /*     * XXX TEMPORARY WORKAROUND: on some systems with 82573     * long latencies are observed, like Lenovo X60. This     * change eliminates the problem, but since having positive     * values in RDTR is a known source of problems on other     * platforms another solution is being sought.     */    if (hw->mac.type == e1000_82573)        E1000_WRITE_REG(hw, E1000_RDTR, 0x20);    dev->rx_pkt_burst = (eth_rx_burst_t)eth_em_recv_pkts;    /* 计算pkt buf的大小 */    /* Determine RX bufsize. */    rctl_bsize = EM_MAX_BUF_SIZE;    for (i = 0; i < dev->data->nb_rx_queues; i++) {        struct rte_pktmbuf_pool_private *mbp_priv;        uint32_t buf_size;        rxq = dev->data->rx_queues[i];        mbp_priv = rte_mempool_get_priv(rxq->mb_pool);        buf_size = mbp_priv->mbuf_data_room_size - RTE_PKTMBUF_HEADROOM;        rctl_bsize = RTE_MIN(rctl_bsize, buf_size);    }    rctl |= em_rctl_bsize(hw->mac.type, &rctl_bsize);    /* Configure and enable each RX queue. */    for (i = 0; i < dev->data->nb_rx_queues; i++) {        uint64_t bus_addr;        uint32_t rxdctl;        rxq = dev->data->rx_queues[i];        /* 从mbuf pool中分配mbuf, 填写到rxq->sw_ring,记录每个pkt buf的物理地址到rxq->rx_ring */        /* Allocate buffers for descriptor rings and setup queue */        ret = em_alloc_rx_queue_mbufs(rxq);        if (ret)            return ret;        /* 把rx ring的物理地址通告给E1000驱动 */        /*         * Reset crc_len in case it was changed after queue setup by a         *  call to configure         */        rxq->crc_len =            (uint8_t)(dev->data->dev_conf.rxmode.hw_strip_crc ?                            0 : ETHER_CRC_LEN);        bus_addr = rxq->rx_ring_phys_addr;        E1000_WRITE_REG(hw, E1000_RDLEN(i),                rxq->nb_rx_desc *                sizeof(*rxq->rx_ring));        E1000_WRITE_REG(hw, E1000_RDBAH(i),                (uint32_t)(bus_addr >> 32));        E1000_WRITE_REG(hw, E1000_RDBAL(i), (uint32_t)bus_addr);        E1000_WRITE_REG(hw, E1000_RDH(i), 0);        E1000_WRITE_REG(hw, E1000_RDT(i), rxq->nb_rx_desc - 1);        rxdctl = E1000_READ_REG(hw, E1000_RXDCTL(0));        rxdctl &= 0xFE000000;        rxdctl |= rxq->pthresh & 0x3F;        rxdctl |= (rxq->hthresh & 0x3F) << 8;        rxdctl |= (rxq->wthresh & 0x3F) << 16;        rxdctl |= E1000_RXDCTL_GRAN;        E1000_WRITE_REG(hw, E1000_RXDCTL(i), rxdctl);        /* 收大报文用的收包函数 */        /*         * Due to EM devices not having any sort of hardware         * limit for packet length, jumbo frame of any size         * can be accepted, thus we have to enable scattered         * rx if jumbo frames are enabled (or if buffer size         * is too small to accomodate non-jumbo packets)         * to avoid splitting packets that don't fit into         * one buffer.         */        if (dev->data->dev_conf.rxmode.jumbo_frame ||                rctl_bsize < ETHER_MAX_LEN) {            dev->rx_pkt_burst =                (eth_rx_burst_t)eth_em_recv_scattered_pkts;            dev->data->scattered_rx = 1;        }    }    /* 以下省略 */    ...    return 0;}

到此端口初始化完成,比启动,回到main函数中, 在每个lcore上启动循环收包函数

/* launch per-lcore init on every lcore */rte_eal_mp_remote_launch(l2fwd_launch_one_lcore, NULL, CALL_MASTER);

lcore的主线程处理如下

/* main processing loop */static voidl2fwd_main_loop(void){    struct rte_mbuf *pkts_burst[MAX_PKT_BURST];    struct rte_mbuf *m;    unsigned lcore_id;    uint64_t prev_tsc, diff_tsc, cur_tsc, timer_tsc;    unsigned i, j, portid, nb_rx;    struct lcore_queue_conf *qconf;    const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * BURST_TX_DRAIN_US;    prev_tsc = 0;    timer_tsc = 0;    lcore_id = rte_lcore_id();    qconf = &lcore_queue_conf[lcore_id];    if (qconf->n_rx_port == 0) {        RTE_LOG(INFO, L2FWD, "lcore %u has nothing to do/n", lcore_id);        return;    }    RTE_LOG(INFO, L2FWD, "entering main loop on lcore %u/n", lcore_id);    /* 当前lcore需要处理哪些port(queue) */    for (i = 0; i < qconf->n_rx_port; i++) {        portid = qconf->rx_port_list[i];        RTE_LOG(INFO, L2FWD, " -- lcoreid=%u portid=%u/n", lcore_id,            portid);    }    while (1) {        cur_tsc = rte_rdtsc();        /*         * TX burst queue drain         */        diff_tsc = cur_tsc - prev_tsc;        /* 隔一段时间才把所有要发送的报文发送出去并打印统计信息 */        if (unlikely(diff_tsc > drain_tsc)) {            for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) {                /* 当前port没有需要发送的报文 */                if (qconf->tx_mbufs[portid].len == 0)                    continue;                /* 调用device的发包函数并统计发送的报文个数 */                l2fwd_send_burst(&lcore_queue_conf[lcore_id],                         qconf->tx_mbufs[portid].len,                         (uint8_t) portid);                /* 到此应该当前端口需要发送的报文全部发送,因此len置为0 */                qconf->tx_mbufs[portid].len = 0;            }            /* if timer is enabled */            if (timer_period > 0) {                /* advance the timer */                timer_tsc += diff_tsc;                /* if timer has reached its timeout */                if (unlikely(timer_tsc >= (uint64_t) timer_period)) {                    /* do this only on master core */                    if (lcore_id == rte_get_master_lcore()) {                        print_stats();                        /* reset the timer */                        timer_tsc = 0;                    }                }            }            prev_tsc = cur_tsc;        }        /* 当前lcore需要处理的queue */        /*         * Read packet from RX queues         */        for (i = 0; i < qconf->n_rx_port; i++) {            portid = qconf->rx_port_list[i];            /* 当前port只有queue0 */            nb_rx = rte_eth_rx_burst((uint8_t) portid, 0,                         pkts_burst, MAX_PKT_BURST);            /* 更新收包统计 */            port_statistics[portid].rx += nb_rx;            /* 把所有收上来的报文修改目的MAC后加入到发包队列 */            for (j = 0; j < nb_rx; j++) {                m = pkts_burst[j];                /* PKT DATA部分载入cache,这个好像收包部分已经prefetch过了 */                rte_prefetch0(rte_pktmbuf_mtod(m, void *));                /* forWord */                l2fwd_simple_forward(m, portid);            }        }    }}

首先看报文是如何收上来的, 调用device的rx_pkt_burst

static inline uint16_trte_eth_rx_burst(uint8_t port_id, uint16_t queue_id,         struct rte_mbuf **rx_pkts, uint16_t nb_pkts){    struct rte_eth_dev *dev;    dev = &rte_eth_devices[port_id];    return (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id], rx_pkts, nb_pkts);}

PMD的收包函数如下:

uint16_teth_em_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,        uint16_t nb_pkts){    /* volatile防止编译器优化,每次使用必须重新从memory中取而不是用寄存器的值 */    volatile struct e1000_rx_desc *rx_ring;    volatile struct e1000_rx_desc *rxdp;    struct em_rx_queue *rxq;    struct em_rx_entry *sw_ring;    struct em_rx_entry *rxe;    struct rte_mbuf *rxm;    struct rte_mbuf *nmb;    struct e1000_rx_desc rxd;    uint64_t dma_addr;    uint16_t pkt_len;    uint16_t rx_id;    uint16_t nb_rx;    uint16_t nb_hold;    uint8_t status;    rxq = rx_queue;    nb_rx = 0;    nb_hold = 0;    rx_id = rxq->rx_tail;       /* 当前收包位置 */    rx_ring = rxq->rx_ring;     /* rx descriptor */    sw_ring = rxq->sw_ring;     /* mbuf */    /* 一次性收32个报文 */    while (nb_rx < nb_pkts) {        /*         * The order of Operations here is important as the DD status         * bit must not be read after any other descriptor fields.         * rx_ring and rxdp are pointing to volatile data so the order         * of accesses cannot be reordered by the compiler. If they were         * not volatile, they could be reordered which could lead to         * using invalid descriptor fields when read from rxd.         */                /* 当前报文的descriptor */        rxdp = &rx_ring[rx_id];        /* 结束标记,必须首先读取 */        status = rxdp->status;
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表