pluto/test/rt-test.py

103 lines
4.4 KiB
Python
Raw Normal View History

2019-10-06 21:52:43 +02:00
import atexit
import queue
import threading
2019-06-30 00:50:44 +02:00
import subprocess
import signal
import re
import sys
import datetime
import os
import importlib.util
2019-10-06 21:52:43 +02:00
msg_queue = queue.Queue(-1)
proc = None
2019-06-30 00:50:44 +02:00
class TestCase:
2019-10-08 01:11:50 +02:00
def __init__(self, name, expected, prefix=r"\[INFO\] "):
2019-06-30 00:50:44 +02:00
self.name = name
self.expected = expected
2019-10-08 01:11:50 +02:00
self.prefix = prefix
2019-06-30 00:50:44 +02:00
2019-10-06 21:52:43 +02:00
def failure(msg):
print("FAILURE: %s" %(msg))
2019-06-30 00:50:44 +02:00
sys.exit(1)
2019-10-06 21:52:43 +02:00
def test_failure(case, exp, expected_idx, found):
failure("%s #%d, expected '%s', found '%s'" %(case.name, expected_idx + 1, exp, found))
2019-06-30 00:50:44 +02:00
def test_pass(case, exp, expected_idx, found):
print("PASS: %s #%d, expected '%s', found '%s'" %(case.name, expected_idx + 1, exp, found))
def get_pre_archinit_cases():
return [
2019-10-08 01:11:50 +02:00
TestCase("Serial tests", [r"c", r"123"], ""),
2019-10-08 12:20:37 +02:00
TestCase("Log info tests", [r"Test INFO level", r"Test INFO level with args a, 1", r"Test INFO function", r"Test INFO function with args a, 1"], "\[INFO\] "),
TestCase("Log debug tests", [r"Test DEBUG level", r"Test DEBUG level with args a, 1", r"Test DEBUG function", r"Test DEBUG function with args a, 1"], "\[DEBUG\] "),
TestCase("Log warning tests", [r"Test WARNING level", r"Test WARNING level with args a, 1", r"Test WARNING function", r"Test WARNING function with args a, 1"], "\[WARNING\] "),
TestCase("Log error tests", [r"Test ERROR level", r"Test ERROR level with args a, 1", r"Test ERROR function", r"Test ERROR function with args a, 1"], "\[ERROR\] "),
2019-11-02 03:00:49 +01:00
TestCase("Mem init", [r"Init mem", r"Done"]),
2019-10-08 12:20:37 +02:00
TestCase("Arch init starts", [r"Init arch \w+"])
]
def get_post_archinit_cases():
return [
TestCase("Arch init finishes", [r"Arch init done"]),
2019-11-02 03:18:02 +01:00
TestCase("Panic init", [r"Init panic", r"Done"]),
TestCase("VGA init", [r"Init vga", r"Done"]),
TestCase("VGA tests", [r"VGA: Tested max scan line", r"VGA: Tested cursor shape", r"VGA: Tested updating cursor"]),
TestCase("TTY init", [r"Init tty", r"Done"]),
TestCase("TTY tests", [r"TTY: Tested globals", r"TTY: Tested printing"]),
2019-11-02 03:18:02 +01:00
TestCase("Init finishes", [r"Init done"]),
TestCase("Panic tests", [r"Kernel panic: integer overflow", r"c[a-z\d]+: panic", r"c[a-z\d]+: panic.runtimeTests", r"c[a-z\d]+: kmain", r"c[a-z\d]+: start_higher_half"], "\[ERROR\] ")
]
2019-10-06 21:52:43 +02:00
def read_messages(proc):
while True:
line = proc.stdout.readline().decode("utf-8")
msg_queue.put(line)
def cleanup():
global proc
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
2019-06-30 00:50:44 +02:00
if __name__ == "__main__":
arch = sys.argv[1]
zig_path = sys.argv[2]
spec = importlib.util.spec_from_file_location("arch", "test/kernel/arch/" + arch + "/rt-test.py")
arch_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(arch_module)
# The list of log statements to look for before arch init is called +
# All log statements to look for, including the arch-specific ones +
2019-06-30 00:50:44 +02:00
# The list of log statements to look for after arch init is called
cases = get_pre_archinit_cases() + arch_module.get_test_cases(TestCase) + get_post_archinit_cases()
2019-06-30 00:50:44 +02:00
if len(cases) > 0:
2019-10-06 21:52:43 +02:00
proc = subprocess.Popen(zig_path + " build run -Drt-test=true", stdout=subprocess.PIPE, shell=True, preexec_fn=os.setsid)
atexit.register(cleanup)
2019-06-30 00:50:44 +02:00
case_idx = 0
2019-10-06 21:52:43 +02:00
read_thread = threading.Thread(target=read_messages, args=(proc,))
read_thread.daemon = True
read_thread.start()
2019-06-30 00:50:44 +02:00
# Go through the cases
while case_idx < len(cases):
case = cases[case_idx]
expected_idx = 0
# Go through the expected log messages
while expected_idx < len(case.expected):
2019-10-08 01:11:50 +02:00
e = case.prefix + case.expected[expected_idx]
2019-10-06 21:52:43 +02:00
try:
line = msg_queue.get(block=True, timeout=5)
except queue.Empty:
failure("Timed out waiting for '%s'" %(e))
2019-06-30 00:50:44 +02:00
line = line.strip()
pattern = re.compile(e)
# Pass if the line matches the expected pattern, else fail
if pattern.fullmatch(line):
test_pass(case, e, expected_idx, line)
else:
test_failure(case, e, expected_idx, line)
expected_idx += 1
case_idx += 1
sys.exit(0)