Commit 6ebf5866 authored by Felix Guo's avatar Felix Guo Committed by Shuah Khan
Browse files

kunit: tool: add Python wrappers for running KUnit tests



The ultimate goal is to create minimal isolated test binaries; in the
meantime we are using UML to provide the infrastructure to run tests, so
define an abstract way to configure and run tests that allow us to
change the context in which tests are built without affecting the user.
This also makes pretty and dynamic error reporting, and a lot of other
nice features easier.

kunit_config.py:
  - parse .config and Kconfig files.

kunit_kernel.py: provides helper functions to:
  - configure the kernel using kunitconfig.
  - build the kernel with the appropriate configuration.
  - provide function to invoke the kernel and stream the output back.

kunit_parser.py: parses raw logs returned out by kunit_kernel and
displays them in a user friendly way.

test_data/*: samples of test data for testing kunit.py, kunit_config.py,
etc.

Signed-off-by: default avatarFelix Guo <felixguoxiuping@gmail.com>
Signed-off-by: default avatarBrendan Higgins <brendanhiggins@google.com>
Reviewed-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
Reviewed-by: default avatarLogan Gunthorpe <logang@deltatee.com>
Reviewed-by: default avatarStephen Boyd <sboyd@kernel.org>
Signed-off-by: default avatarShuah Khan <skhan@linuxfoundation.org>
parent 73ba5aaf
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
 No newline at end of file
+116 −0
Original line number Diff line number Diff line
#!/usr/bin/python3
# SPDX-License-Identifier: GPL-2.0
#
# A thin wrapper on top of the KUnit Kernel
#
# Copyright (C) 2019, Google LLC.
# Author: Felix Guo <felixguoxiuping@gmail.com>
# Author: Brendan Higgins <brendanhiggins@google.com>

import argparse
import sys
import os
import time

from collections import namedtuple
from enum import Enum, auto

import kunit_config
import kunit_kernel
import kunit_parser

KunitResult = namedtuple('KunitResult', ['status','result'])

KunitRequest = namedtuple('KunitRequest', ['raw_output','timeout', 'jobs', 'build_dir'])

class KunitStatus(Enum):
	SUCCESS = auto()
	CONFIG_FAILURE = auto()
	BUILD_FAILURE = auto()
	TEST_FAILURE = auto()

def run_tests(linux: kunit_kernel.LinuxSourceTree,
	      request: KunitRequest) -> KunitResult:
	config_start = time.time()
	success = linux.build_reconfig(request.build_dir)
	config_end = time.time()
	if not success:
		return KunitResult(KunitStatus.CONFIG_FAILURE, 'could not configure kernel')

	kunit_parser.print_with_timestamp('Building KUnit Kernel ...')

	build_start = time.time()
	success = linux.build_um_kernel(request.jobs, request.build_dir)
	build_end = time.time()
	if not success:
		return KunitResult(KunitStatus.BUILD_FAILURE, 'could not build kernel')

	kunit_parser.print_with_timestamp('Starting KUnit Kernel ...')
	test_start = time.time()

	test_result = kunit_parser.TestResult(kunit_parser.TestStatus.SUCCESS,
					      [],
					      'Tests not Parsed.')
	if request.raw_output:
		kunit_parser.raw_output(
			linux.run_kernel(timeout=request.timeout))
	else:
		kunit_output = linux.run_kernel(timeout=request.timeout)
		test_result = kunit_parser.parse_run_tests(kunit_output)
	test_end = time.time()

	kunit_parser.print_with_timestamp((
		'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' +
		'building, %.3fs running\n') % (
				test_end - config_start,
				config_end - config_start,
				build_end - build_start,
				test_end - test_start))

	if test_result.status != kunit_parser.TestStatus.SUCCESS:
		return KunitResult(KunitStatus.TEST_FAILURE, test_result)
	else:
		return KunitResult(KunitStatus.SUCCESS, test_result)

def main(argv, linux):
	parser = argparse.ArgumentParser(
			description='Helps writing and running KUnit tests.')
	subparser = parser.add_subparsers(dest='subcommand')

	run_parser = subparser.add_parser('run', help='Runs KUnit tests.')
	run_parser.add_argument('--raw_output', help='don\'t format output from kernel',
				action='store_true')

	run_parser.add_argument('--timeout',
				help='maximum number of seconds to allow for all tests '
				'to run. This does not include time taken to build the '
				'tests.',
				type=int,
				default=300,
				metavar='timeout')

	run_parser.add_argument('--jobs',
				help='As in the make command, "Specifies  the number of '
				'jobs (commands) to run simultaneously."',
				type=int, default=8, metavar='jobs')

	run_parser.add_argument('--build_dir',
				help='As in the make command, it specifies the build '
				'directory.',
				type=str, default=None, metavar='build_dir')

	cli_args = parser.parse_args(argv)

	if cli_args.subcommand == 'run':
		request = KunitRequest(cli_args.raw_output,
				       cli_args.timeout,
				       cli_args.jobs,
				       cli_args.build_dir)
		result = run_tests(linux, request)
		if result.status != KunitStatus.SUCCESS:
			sys.exit(1)
	else:
		parser.print_help()

if __name__ == '__main__':
	main(sys.argv[1:], kunit_kernel.LinuxSourceTree())
+66 −0
Original line number Diff line number Diff line
# SPDX-License-Identifier: GPL-2.0
#
# Builds a .config from a kunitconfig.
#
# Copyright (C) 2019, Google LLC.
# Author: Felix Guo <felixguoxiuping@gmail.com>
# Author: Brendan Higgins <brendanhiggins@google.com>

import collections
import re

CONFIG_IS_NOT_SET_PATTERN = r'^# CONFIG_\w+ is not set$'
CONFIG_PATTERN = r'^CONFIG_\w+=\S+$'

KconfigEntryBase = collections.namedtuple('KconfigEntry', ['raw_entry'])


class KconfigEntry(KconfigEntryBase):

	def __str__(self) -> str:
		return self.raw_entry


class KconfigParseError(Exception):
	"""Error parsing Kconfig defconfig or .config."""


class Kconfig(object):
	"""Represents defconfig or .config specified using the Kconfig language."""

	def __init__(self):
		self._entries = []

	def entries(self):
		return set(self._entries)

	def add_entry(self, entry: KconfigEntry) -> None:
		self._entries.append(entry)

	def is_subset_of(self, other: 'Kconfig') -> bool:
		return self.entries().issubset(other.entries())

	def write_to_file(self, path: str) -> None:
		with open(path, 'w') as f:
			for entry in self.entries():
				f.write(str(entry) + '\n')

	def parse_from_string(self, blob: str) -> None:
		"""Parses a string containing KconfigEntrys and populates this Kconfig."""
		self._entries = []
		is_not_set_matcher = re.compile(CONFIG_IS_NOT_SET_PATTERN)
		config_matcher = re.compile(CONFIG_PATTERN)
		for line in blob.split('\n'):
			line = line.strip()
			if not line:
				continue
			elif config_matcher.match(line) or is_not_set_matcher.match(line):
				self._entries.append(KconfigEntry(line))
			elif line[0] == '#':
				continue
			else:
				raise KconfigParseError('Failed to parse: ' + line)

	def read_from_file(self, path: str) -> None:
		with open(path, 'r') as f:
			self.parse_from_string(f.read())
+148 −0
Original line number Diff line number Diff line
# SPDX-License-Identifier: GPL-2.0
#
# Runs UML kernel, collects output, and handles errors.
#
# Copyright (C) 2019, Google LLC.
# Author: Felix Guo <felixguoxiuping@gmail.com>
# Author: Brendan Higgins <brendanhiggins@google.com>


import logging
import subprocess
import os

import kunit_config

KCONFIG_PATH = '.config'

class ConfigError(Exception):
	"""Represents an error trying to configure the Linux kernel."""


class BuildError(Exception):
	"""Represents an error trying to build the Linux kernel."""


class LinuxSourceTreeOperations(object):
	"""An abstraction over command line operations performed on a source tree."""

	def make_mrproper(self):
		try:
			subprocess.check_output(['make', 'mrproper'])
		except OSError as e:
			raise ConfigError('Could not call make command: ' + e)
		except subprocess.CalledProcessError as e:
			raise ConfigError(e.output)

	def make_olddefconfig(self, build_dir):
		command = ['make', 'ARCH=um', 'olddefconfig']
		if build_dir:
			command += ['O=' + build_dir]
		try:
			subprocess.check_output(command)
		except OSError as e:
			raise ConfigError('Could not call make command: ' + e)
		except subprocess.CalledProcessError as e:
			raise ConfigError(e.output)

	def make(self, jobs, build_dir):
		command = ['make', 'ARCH=um', '--jobs=' + str(jobs)]
		if build_dir:
			command += ['O=' + build_dir]
		try:
			subprocess.check_output(command)
		except OSError as e:
			raise BuildError('Could not call execute make: ' + e)
		except subprocess.CalledProcessError as e:
			raise BuildError(e.output)

	def linux_bin(self, params, timeout, build_dir):
		"""Runs the Linux UML binary. Must be named 'linux'."""
		linux_bin = './linux'
		if build_dir:
			linux_bin = os.path.join(build_dir, 'linux')
		process = subprocess.Popen(
			[linux_bin] + params,
			stdin=subprocess.PIPE,
			stdout=subprocess.PIPE,
			stderr=subprocess.PIPE)
		process.wait(timeout=timeout)
		return process


def get_kconfig_path(build_dir):
	kconfig_path = KCONFIG_PATH
	if build_dir:
		kconfig_path = os.path.join(build_dir, KCONFIG_PATH)
	return kconfig_path

class LinuxSourceTree(object):
	"""Represents a Linux kernel source tree with KUnit tests."""

	def __init__(self):
		self._kconfig = kunit_config.Kconfig()
		self._kconfig.read_from_file('kunitconfig')
		self._ops = LinuxSourceTreeOperations()

	def clean(self):
		try:
			self._ops.make_mrproper()
		except ConfigError as e:
			logging.error(e)
			return False
		return True

	def build_config(self, build_dir):
		kconfig_path = get_kconfig_path(build_dir)
		if build_dir and not os.path.exists(build_dir):
			os.mkdir(build_dir)
		self._kconfig.write_to_file(kconfig_path)
		try:
			self._ops.make_olddefconfig(build_dir)
		except ConfigError as e:
			logging.error(e)
			return False
		validated_kconfig = kunit_config.Kconfig()
		validated_kconfig.read_from_file(kconfig_path)
		if not self._kconfig.is_subset_of(validated_kconfig):
			logging.error('Provided Kconfig is not contained in validated .config!')
			return False
		return True

	def build_reconfig(self, build_dir):
		"""Creates a new .config if it is not a subset of the kunitconfig."""
		kconfig_path = get_kconfig_path(build_dir)
		if os.path.exists(kconfig_path):
			existing_kconfig = kunit_config.Kconfig()
			existing_kconfig.read_from_file(kconfig_path)
			if not self._kconfig.is_subset_of(existing_kconfig):
				print('Regenerating .config ...')
				os.remove(kconfig_path)
				return self.build_config(build_dir)
			else:
				return True
		else:
			print('Generating .config ...')
			return self.build_config(build_dir)

	def build_um_kernel(self, jobs, build_dir):
		try:
			self._ops.make_olddefconfig(build_dir)
			self._ops.make(jobs, build_dir)
		except (ConfigError, BuildError) as e:
			logging.error(e)
			return False
		used_kconfig = kunit_config.Kconfig()
		used_kconfig.read_from_file(get_kconfig_path(build_dir))
		if not self._kconfig.is_subset_of(used_kconfig):
			logging.error('Provided Kconfig is not contained in final config!')
			return False
		return True

	def run_kernel(self, args=[], timeout=None, build_dir=None):
		args.extend(['mem=256M'])
		process = self._ops.linux_bin(args, timeout, build_dir)
		with open('test.log', 'w') as f:
			for line in process.stdout:
				f.write(line.rstrip().decode('ascii') + '\n')
				yield line.rstrip().decode('ascii')
+310 −0
Original line number Diff line number Diff line
# SPDX-License-Identifier: GPL-2.0
#
# Parses test results from a kernel dmesg log.
#
# Copyright (C) 2019, Google LLC.
# Author: Felix Guo <felixguoxiuping@gmail.com>
# Author: Brendan Higgins <brendanhiggins@google.com>

import re

from collections import namedtuple
from datetime import datetime
from enum import Enum, auto
from functools import reduce
from typing import List

TestResult = namedtuple('TestResult', ['status','suites','log'])

class TestSuite(object):
	def __init__(self):
		self.status = None
		self.name = None
		self.cases = []

	def __str__(self):
		return 'TestSuite(' + self.status + ',' + self.name + ',' + str(self.cases) + ')'

	def __repr__(self):
		return str(self)

class TestCase(object):
	def __init__(self):
		self.status = None
		self.name = ''
		self.log = []

	def __str__(self):
		return 'TestCase(' + self.status + ',' + self.name + ',' + str(self.log) + ')'

	def __repr__(self):
		return str(self)

class TestStatus(Enum):
	SUCCESS = auto()
	FAILURE = auto()
	TEST_CRASHED = auto()
	NO_TESTS = auto()

kunit_start_re = re.compile(r'^TAP version [0-9]+$')
kunit_end_re = re.compile('List of all partitions:')

def isolate_kunit_output(kernel_output):
	started = False
	for line in kernel_output:
		if kunit_start_re.match(line):
			started = True
			yield line
		elif kunit_end_re.match(line):
			break
		elif started:
			yield line

def raw_output(kernel_output):
	for line in kernel_output:
		print(line)

DIVIDER = '=' * 60

RESET = '\033[0;0m'

def red(text):
	return '\033[1;31m' + text + RESET

def yellow(text):
	return '\033[1;33m' + text + RESET

def green(text):
	return '\033[1;32m' + text + RESET

def print_with_timestamp(message):
	print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))

def format_suite_divider(message):
	return '======== ' + message + ' ========'

def print_suite_divider(message):
	print_with_timestamp(DIVIDER)
	print_with_timestamp(format_suite_divider(message))

def print_log(log):
	for m in log:
		print_with_timestamp(m)

TAP_ENTRIES = re.compile(r'^(TAP|\t?ok|\t?not ok|\t?[0-9]+\.\.[0-9]+|\t?#).*$')

def consume_non_diagnositic(lines: List[str]) -> None:
	while lines and not TAP_ENTRIES.match(lines[0]):
		lines.pop(0)

def save_non_diagnositic(lines: List[str], test_case: TestCase) -> None:
	while lines and not TAP_ENTRIES.match(lines[0]):
		test_case.log.append(lines[0])
		lines.pop(0)

OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])

OK_NOT_OK_SUBTEST = re.compile(r'^\t(ok|not ok) [0-9]+ - (.*)$')

OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) [0-9]+ - (.*)$')

def parse_ok_not_ok_test_case(lines: List[str],
			      test_case: TestCase,
			      expecting_test_case: bool) -> bool:
	save_non_diagnositic(lines, test_case)
	if not lines:
		if expecting_test_case:
			test_case.status = TestStatus.TEST_CRASHED
			return True
		else:
			return False
	line = lines[0]
	match = OK_NOT_OK_SUBTEST.match(line)
	if match:
		test_case.log.append(lines.pop(0))
		test_case.name = match.group(2)
		if test_case.status == TestStatus.TEST_CRASHED:
			return True
		if match.group(1) == 'ok':
			test_case.status = TestStatus.SUCCESS
		else:
			test_case.status = TestStatus.FAILURE
		return True
	else:
		return False

SUBTEST_DIAGNOSTIC = re.compile(r'^\t# .*?: (.*)$')
DIAGNOSTIC_CRASH_MESSAGE = 'kunit test case crashed!'

def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
	save_non_diagnositic(lines, test_case)
	if not lines:
		return False
	line = lines[0]
	match = SUBTEST_DIAGNOSTIC.match(line)
	if match:
		test_case.log.append(lines.pop(0))
		if match.group(1) == DIAGNOSTIC_CRASH_MESSAGE:
			test_case.status = TestStatus.TEST_CRASHED
		return True
	else:
		return False

def parse_test_case(lines: List[str], expecting_test_case: bool) -> TestCase:
	test_case = TestCase()
	save_non_diagnositic(lines, test_case)
	while parse_diagnostic(lines, test_case):
		pass
	if parse_ok_not_ok_test_case(lines, test_case, expecting_test_case):
		return test_case
	else:
		return None

SUBTEST_HEADER = re.compile(r'^\t# Subtest: (.*)$')

def parse_subtest_header(lines: List[str]) -> str:
	consume_non_diagnositic(lines)
	if not lines:
		return None
	match = SUBTEST_HEADER.match(lines[0])
	if match:
		lines.pop(0)
		return match.group(1)
	else:
		return None

SUBTEST_PLAN = re.compile(r'\t[0-9]+\.\.([0-9]+)')

def parse_subtest_plan(lines: List[str]) -> int:
	consume_non_diagnositic(lines)
	match = SUBTEST_PLAN.match(lines[0])
	if match:
		lines.pop(0)
		return int(match.group(1))
	else:
		return None

def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
	if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
		return TestStatus.TEST_CRASHED
	elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
		return TestStatus.FAILURE
	elif left != TestStatus.SUCCESS:
		return left
	elif right != TestStatus.SUCCESS:
		return right
	else:
		return TestStatus.SUCCESS

def parse_ok_not_ok_test_suite(lines: List[str], test_suite: TestSuite) -> bool:
	consume_non_diagnositic(lines)
	if not lines:
		test_suite.status = TestStatus.TEST_CRASHED
		return False
	line = lines[0]
	match = OK_NOT_OK_MODULE.match(line)
	if match:
		lines.pop(0)
		if match.group(1) == 'ok':
			test_suite.status = TestStatus.SUCCESS
		else:
			test_suite.status = TestStatus.FAILURE
		return True
	else:
		return False

def bubble_up_errors(to_status, status_container_list) -> TestStatus:
	status_list = map(to_status, status_container_list)
	return reduce(max_status, status_list, TestStatus.SUCCESS)

def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
	max_test_case_status = bubble_up_errors(lambda x: x.status, test_suite.cases)
	return max_status(max_test_case_status, test_suite.status)

def parse_test_suite(lines: List[str]) -> TestSuite:
	if not lines:
		return None
	consume_non_diagnositic(lines)
	test_suite = TestSuite()
	test_suite.status = TestStatus.SUCCESS
	name = parse_subtest_header(lines)
	if not name:
		return None
	test_suite.name = name
	expected_test_case_num = parse_subtest_plan(lines)
	if not expected_test_case_num:
		return None
	test_case = parse_test_case(lines, expected_test_case_num > 0)
	expected_test_case_num -= 1
	while test_case:
		test_suite.cases.append(test_case)
		test_case = parse_test_case(lines, expected_test_case_num > 0)
		expected_test_case_num -= 1
	if parse_ok_not_ok_test_suite(lines, test_suite):
		test_suite.status = bubble_up_test_case_errors(test_suite)
		return test_suite
	elif not lines:
		print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
		return test_suite
	else:
		print('failed to parse end of suite' + lines[0])
		return None

TAP_HEADER = re.compile(r'^TAP version 14$')

def parse_tap_header(lines: List[str]) -> bool:
	consume_non_diagnositic(lines)
	if TAP_HEADER.match(lines[0]):
		lines.pop(0)
		return True
	else:
		return False

def bubble_up_suite_errors(test_suite_list: List[TestSuite]) -> TestStatus:
	return bubble_up_errors(lambda x: x.status, test_suite_list)

def parse_test_result(lines: List[str]) -> TestResult:
	if not lines:
		return TestResult(TestStatus.NO_TESTS, [], lines)
	consume_non_diagnositic(lines)
	if not parse_tap_header(lines):
		return None
	test_suites = []
	test_suite = parse_test_suite(lines)
	while test_suite:
		test_suites.append(test_suite)
		test_suite = parse_test_suite(lines)
	return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)

def parse_run_tests(kernel_output) -> TestResult:
	total_tests = 0
	failed_tests = 0
	crashed_tests = 0
	test_result = parse_test_result(list(isolate_kunit_output(kernel_output)))
	for test_suite in test_result.suites:
		if test_suite.status == TestStatus.SUCCESS:
			print_suite_divider(green('[PASSED] ') + test_suite.name)
		elif test_suite.status == TestStatus.TEST_CRASHED:
			print_suite_divider(red('[CRASHED] ' + test_suite.name))
		else:
			print_suite_divider(red('[FAILED] ') + test_suite.name)
		for test_case in test_suite.cases:
			total_tests += 1
			if test_case.status == TestStatus.SUCCESS:
				print_with_timestamp(green('[PASSED] ') + test_case.name)
			elif test_case.status == TestStatus.TEST_CRASHED:
				crashed_tests += 1
				print_with_timestamp(red('[CRASHED] ' + test_case.name))
				print_log(map(yellow, test_case.log))
				print_with_timestamp('')
			else:
				failed_tests += 1
				print_with_timestamp(red('[FAILED] ') + test_case.name)
				print_log(map(yellow, test_case.log))
				print_with_timestamp('')
	print_with_timestamp(DIVIDER)
	fmt = green if test_result.status == TestStatus.SUCCESS else red
	print_with_timestamp(
		fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
		    (total_tests, failed_tests, crashed_tests)))
	return test_result
Loading