Commit 3052891a authored by Lyle Zhu's avatar Lyle Zhu Committed by Benjamin Cabé
Browse files

tests: bluetooth: classic: Add test cases for ATTR ID list feature



Add two optional arguments to shell commands `sdp_client sa_discovery`
and `sdp_client ssa_discovery`.

The first argument is the beginning of the attribute ID.
The second argument is the ending of the attribute ID. If the second
argument is omitted, the ending of the attribute ID is 0xffff.

Add test case `test_sdp_ssa_discover_multiple_records_with_range` to
test the ssa request with attribute ID list.

Add test case `test_sdp_sa_discover_multiple_records_with_range` to
test the sa request with attribute ID list.

Signed-off-by: default avatarLyle Zhu <lyle.zhu@nxp.com>
parent 22346b47
Loading
Loading
Loading
Loading
+146 −0
Original line number Diff line number Diff line
@@ -542,6 +542,73 @@ async def sdp_ssa_discover_multiple_records(hci_port, shell, dut, address) -> No
            assert found is True


async def sdp_ssa_discover_multiple_records_with_range(hci_port, shell, dut, address) -> None:
    logger.info('<<< SDP Discovery ...')
    async with await open_transport_or_link(hci_port) as hci_transport:
        device = Device.with_hci(
            'Bumble',
            Address('F0:F1:F2:F3:F4:F5'),
            hci_transport.source,
            hci_transport.sink,
        )
        device.classic_enabled = True
        device.le_enabled = False
        device.sdp_service_records = SDP_SERVICE_MULTIPLE_RECORDS
        with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file:
            device.host.snooper = BtSnooper(snoop_file)
            await device_power_on(device)
            await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF))

            target_address = address.split(" ")[0]
            logger.info(f'=== Connecting to {target_address}...')
            try:
                connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
                logger.info(f'=== Connected to {connection.peer_address}!')
            except Exception as e:
                logger.error(f'Fail to connect to {target_address}!')
                raise e

            # Discover SDP Record with range SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID ~
            # SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
            shell.exec_command(
                f"sdp_client ssa_discovery {BT_L2CAP_PROTOCOL_ID.to_hex_str()} "
                f"{SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID} "
                f"{SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID}"
            )
            found, lines = await wait_for_shell_response(dut, "SDP Discovery Done")
            logger.info(f'{lines}')
            assert found is True

            # Discover SDP Record with range SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID ~
            # SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID
            shell.exec_command(
                f"sdp_client ssa_discovery {BT_L2CAP_PROTOCOL_ID.to_hex_str()} "
                f"{SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID} "
                f"{SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID}"
            )
            found, lines = await wait_for_shell_response(dut, "SDP Discovery Done")
            logger.info(f'{lines}')
            assert found is True

            # Discover SDP Record with range SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID ~
            # 0xffff
            shell.exec_command(
                f"sdp_client ssa_discovery {BT_L2CAP_PROTOCOL_ID.to_hex_str()} "
                f"{SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID} 0xffff"
            )
            found, lines = await wait_for_shell_response(dut, "SDP Discovery Done")
            logger.info(f'{lines}')
            assert found is True

            # Discover SDP Record with range 0xff00 ~ 0xffff
            shell.exec_command(
                f"sdp_client ssa_discovery {BT_L2CAP_PROTOCOL_ID.to_hex_str()} 0xff00 0xffff"
            )
            found, lines = await wait_for_shell_response(dut, "No SDP Record")
            logger.info(f'{lines}')
            assert found is True


async def sdp_ss_discover_no_record(hci_port, shell, dut, address) -> None:
    logger.info('<<< SDP Discovery ...')
    async with await open_transport_or_link(hci_port) as hci_transport:
@@ -889,6 +956,69 @@ async def sdp_sa_discover_multiple_records(hci_port, shell, dut, address) -> Non
            assert found is True


async def sdp_sa_discover_multiple_records_with_range(hci_port, shell, dut, address) -> None:
    logger.info('<<< SDP Discovery ...')
    async with await open_transport_or_link(hci_port) as hci_transport:
        device = Device.with_hci(
            'Bumble',
            Address('F0:F1:F2:F3:F4:F5'),
            hci_transport.source,
            hci_transport.sink,
        )
        device.classic_enabled = True
        device.le_enabled = False
        device.sdp_service_records = SDP_SERVICE_MULTIPLE_RECORDS
        with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file:
            device.host.snooper = BtSnooper(snoop_file)
            await device_power_on(device)
            await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF))

            target_address = address.split(" ")[0]
            logger.info(f'=== Connecting to {target_address}...')
            try:
                connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
                logger.info(f'=== Connected to {connection.peer_address}!')
            except Exception as e:
                logger.error(f'Fail to connect to {target_address}!')
                raise e

            # Discover SDP Record with range SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID ~
            # SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
            shell.exec_command(
                f"sdp_client sa_discovery 00010003 {SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID} "
                f"{SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID}"
            )
            found, lines = await wait_for_shell_response(dut, "SDP Discovery Done")
            logger.info(f'{lines}')
            assert found is True

            # Discover SDP Record with range SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID ~
            # SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID
            shell.exec_command(
                f"sdp_client sa_discovery 00010003 {SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID} "
                f"{SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID}"
            )
            found, lines = await wait_for_shell_response(dut, "SDP Discovery Done")
            logger.info(f'{lines}')
            assert found is True

            # Discover SDP Record with range SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID ~
            # 0xffff
            shell.exec_command(
                "sdp_client sa_discovery 00010003 "
                f"{SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID} 0xffff"
            )
            found, lines = await wait_for_shell_response(dut, "SDP Discovery Done")
            logger.info(f'{lines}')
            assert found is True

            # Discover SDP Record with range 0xff00 ~ 0xffff
            shell.exec_command("sdp_client sa_discovery 00010003 0xff00 0xffff")
            found, lines = await wait_for_shell_response(dut, "No SDP Record")
            logger.info(f'{lines}')
            assert found is True


async def sdp_ssa_discover_fail(hci_port, shell, dut, address) -> None:
    def on_app_connection_request(self, request) -> None:
        logger.info('Force L2CAP connection failure')
@@ -969,6 +1099,14 @@ class TestSdpServer:
        hci, iut_address = sdp_client_dut
        asyncio.run(sdp_ssa_discover_multiple_records(hci, shell, dut, iut_address))

    def test_sdp_ssa_discover_multiple_records_with_range(
        self, shell: Shell, dut: DeviceAdapter, sdp_client_dut
    ):
        """Test case to request SDP records with range. Multiple SDP record registered."""
        logger.info(f'test_sdp_ssa_discover_multiple_records_with_range {sdp_client_dut}')
        hci, iut_address = sdp_client_dut
        asyncio.run(sdp_ssa_discover_multiple_records_with_range(hci, shell, dut, iut_address))

    def test_sdp_ss_discover_no_record(self, shell: Shell, dut: DeviceAdapter, sdp_client_dut):
        """Test case to request SDP records. No SDP record registered."""
        logger.info(f'test_sdp_ss_discover_no_record {sdp_client_dut}')
@@ -1021,6 +1159,14 @@ class TestSdpServer:
        hci, iut_address = sdp_client_dut
        asyncio.run(sdp_sa_discover_multiple_records(hci, shell, dut, iut_address))

    def test_sdp_sa_discover_multiple_records_with_range(
        self, shell: Shell, dut: DeviceAdapter, sdp_client_dut
    ):
        """Test case to request SDP records with range. Multiple SDP record registered."""
        logger.info(f'test_sdp_sa_discover_multiple_records_with_range {sdp_client_dut}')
        hci, iut_address = sdp_client_dut
        asyncio.run(sdp_sa_discover_multiple_records_with_range(hci, shell, dut, iut_address))

    def test_sdp_ssa_discover_fail(self, shell: Shell, dut: DeviceAdapter, sdp_client_dut):
        """Test case to request SDP records. but the L2CAP connecting fail."""
        logger.info(f'test_sdp_ssa_discover_fail {sdp_client_dut}')
+59 −4
Original line number Diff line number Diff line
@@ -166,9 +166,12 @@ static uint8_t sdp_discover_func(struct bt_conn *conn, struct bt_sdp_client_resu
	return BT_SDP_DISCOVER_UUID_CONTINUE;
}

static struct bt_sdp_attribute_id_list attr_ids;
static struct bt_sdp_attribute_id_range attr_id_ranges[1];

static int cmd_ssa_discovery(const struct shell *sh, size_t argc, char *argv[])
{
	int err;
	int err = 0;
	size_t len;
	uint8_t uuid128[BT_UUID_SIZE_128];

@@ -198,9 +201,33 @@ static int cmd_ssa_discovery(const struct shell *sh, size_t argc, char *argv[])
		return -ENOEXEC;
	}

	attr_ids.count = 0;

	if (argc > 2) {
		attr_ids.count = ARRAY_SIZE(attr_id_ranges);
		attr_ids.ranges = attr_id_ranges;
		attr_id_ranges[0].beginning = (uint16_t)shell_strtol(argv[2], 0, &err);
		if (err < 0) {
			shell_error(sh, "Invalid beginning ATTR ID");
			return -ENOEXEC;
		}
		attr_id_ranges[0].ending = 0xffff;
	}

	if (argc > 3) {
		attr_id_ranges[0].ending = (uint16_t)shell_strtol(argv[3], 0, &err);
		if (err < 0) {
			shell_error(sh, "Invalid ending ATTR ID");
			return -ENOEXEC;
		}
	}

	sdp_discover.func = sdp_discover_func;
	sdp_discover.pool = &sdp_client_pool;
	sdp_discover.type = BT_SDP_DISCOVER_SERVICE_SEARCH_ATTR;
	if (attr_ids.count != 0) {
		sdp_discover.ids = &attr_ids;
	}

	err = bt_sdp_discover(default_conn, &sdp_discover);
	if (err) {
@@ -256,7 +283,7 @@ static int cmd_ss_discovery(const struct shell *sh, size_t argc, char *argv[])

static int cmd_sa_discovery(const struct shell *sh, size_t argc, char *argv[])
{
	int err;
	int err = 0;
	size_t len;
	uint32_t handle;

@@ -270,9 +297,33 @@ static int cmd_sa_discovery(const struct shell *sh, size_t argc, char *argv[])
		return -ENOEXEC;
	}

	attr_ids.count = 0;

	if (argc > 2) {
		attr_ids.count = ARRAY_SIZE(attr_id_ranges);
		attr_ids.ranges = attr_id_ranges;
		attr_id_ranges[0].beginning = (uint16_t)shell_strtol(argv[2], 0, &err);
		if (err < 0) {
			shell_error(sh, "Invalid beginning ATTR ID");
			return -ENOEXEC;
		}
		attr_id_ranges[0].ending = 0xffff;
	}

	if (argc > 3) {
		attr_id_ranges[0].ending = (uint16_t)shell_strtol(argv[3], 0, &err);
		if (err < 0) {
			shell_error(sh, "Invalid ending ATTR ID");
			return -ENOEXEC;
		}
	}

	sdp_discover.func = sdp_discover_func;
	sdp_discover.pool = &sdp_client_pool;
	sdp_discover.type = BT_SDP_DISCOVER_SERVICE_ATTR;
	if (attr_ids.count != 0) {
		sdp_discover.ids = &attr_ids;
	}

	err = bt_sdp_discover(default_conn, &sdp_discover);
	if (err) {
@@ -311,10 +362,14 @@ static int cmd_ssa_discovery_fail(const struct shell *sh, size_t argc, char *arg
	return 0;
}

#define HELP_ATTR_ID_LIST " [start] [end]"

SHELL_STATIC_SUBCMD_SET_CREATE(sdp_client_cmds,
	SHELL_CMD_ARG(ss_discovery, NULL, "<Big endian UUID>", cmd_ss_discovery, 2, 0),
	SHELL_CMD_ARG(sa_discovery, NULL, "<Service Record Handle>", cmd_sa_discovery, 2, 0),
	SHELL_CMD_ARG(ssa_discovery, NULL, "<Big endian UUID>", cmd_ssa_discovery, 2, 0),
	SHELL_CMD_ARG(sa_discovery, NULL, "<Service Record Handle>" HELP_ATTR_ID_LIST,
		      cmd_sa_discovery, 2, 2),
	SHELL_CMD_ARG(ssa_discovery, NULL, "<Big endian UUID>" HELP_ATTR_ID_LIST,
		      cmd_ssa_discovery, 2, 2),
	SHELL_CMD_ARG(ssa_discovery_fail, NULL, "", cmd_ssa_discovery_fail, 1, 0),
	SHELL_SUBCMD_SET_END
);