2019-10-06 21:52:43 +02:00
import atexit
import queue
import threading
2019-06-30 00:50:44 +02:00
import subprocess
import signal
import re
import sys
import datetime
import os
import importlib . util
2019-10-06 21:52:43 +02:00
msg_queue = queue . Queue ( - 1 )
proc = None
2019-06-30 00:50:44 +02:00
class TestCase :
2019-10-08 01:11:50 +02:00
def __init__ ( self , name , expected , prefix = r " \ [INFO \ ] " ) :
2019-06-30 00:50:44 +02:00
self . name = name
self . expected = expected
2019-10-08 01:11:50 +02:00
self . prefix = prefix
2019-06-30 00:50:44 +02:00
2019-10-06 21:52:43 +02:00
def failure ( msg ) :
print ( " FAILURE: %s " % ( msg ) )
2019-06-30 00:50:44 +02:00
sys . exit ( 1 )
2019-10-06 21:52:43 +02:00
def test_failure ( case , exp , expected_idx , found ) :
failure ( " %s # %d , expected ' %s ' , found ' %s ' " % ( case . name , expected_idx + 1 , exp , found ) )
2019-06-30 00:50:44 +02:00
def test_pass ( case , exp , expected_idx , found ) :
print ( " PASS: %s # %d , expected ' %s ' , found ' %s ' " % ( case . name , expected_idx + 1 , exp , found ) )
2019-09-29 13:55:34 +02:00
def get_pre_archinit_cases ( ) :
return [
2019-10-08 01:11:50 +02:00
TestCase ( " Serial tests " , [ r " c " , r " 123 " ] , " " ) ,
2019-10-08 12:20:37 +02:00
TestCase ( " Log info tests " , [ r " Test INFO level " , r " Test INFO level with args a, 1 " , r " Test INFO function " , r " Test INFO function with args a, 1 " ] , " \ [INFO \ ] " ) ,
TestCase ( " Log debug tests " , [ r " Test DEBUG level " , r " Test DEBUG level with args a, 1 " , r " Test DEBUG function " , r " Test DEBUG function with args a, 1 " ] , " \ [DEBUG \ ] " ) ,
TestCase ( " Log warning tests " , [ r " Test WARNING level " , r " Test WARNING level with args a, 1 " , r " Test WARNING function " , r " Test WARNING function with args a, 1 " ] , " \ [WARNING \ ] " ) ,
TestCase ( " Log error tests " , [ r " Test ERROR level " , r " Test ERROR level with args a, 1 " , r " Test ERROR function " , r " Test ERROR function with args a, 1 " ] , " \ [ERROR \ ] " ) ,
2019-11-02 03:00:49 +01:00
TestCase ( " Mem init " , [ r " Init mem " , r " Done " ] ) ,
2019-10-08 12:20:37 +02:00
TestCase ( " Arch init starts " , [ r " Init arch \ w+ " ] )
2019-09-29 13:55:34 +02:00
]
def get_post_archinit_cases ( ) :
return [
TestCase ( " Arch init finishes " , [ r " Arch init done " ] ) ,
TestCase ( " VGA init " , [ r " Init vga " , r " Done " ] ) ,
TestCase ( " VGA tests " , [ r " VGA: Tested max scan line " , r " VGA: Tested cursor shape " , r " VGA: Tested updating cursor " ] ) ,
TestCase ( " TTY init " , [ r " Init tty " , r " Done " ] ) ,
2019-10-01 12:52:23 +02:00
TestCase ( " TTY tests " , [ r " TTY: Tested globals " , r " TTY: Tested printing " ] ) ,
2019-09-29 13:55:34 +02:00
TestCase ( " Init finishes " , [ r " Init done " ] )
]
2019-10-06 21:52:43 +02:00
def read_messages ( proc ) :
while True :
line = proc . stdout . readline ( ) . decode ( " utf-8 " )
msg_queue . put ( line )
def cleanup ( ) :
global proc
os . killpg ( os . getpgid ( proc . pid ) , signal . SIGTERM )
2019-06-30 00:50:44 +02:00
if __name__ == " __main__ " :
arch = sys . argv [ 1 ]
zig_path = sys . argv [ 2 ]
spec = importlib . util . spec_from_file_location ( " arch " , " test/kernel/arch/ " + arch + " /rt-test.py " )
arch_module = importlib . util . module_from_spec ( spec )
spec . loader . exec_module ( arch_module )
2019-09-29 13:55:34 +02:00
# The list of log statements to look for before arch init is called +
# All log statements to look for, including the arch-specific ones +
2019-06-30 00:50:44 +02:00
# The list of log statements to look for after arch init is called
2019-09-29 13:55:34 +02:00
cases = get_pre_archinit_cases ( ) + arch_module . get_test_cases ( TestCase ) + get_post_archinit_cases ( )
2019-06-30 00:50:44 +02:00
if len ( cases ) > 0 :
2019-10-06 21:52:43 +02:00
proc = subprocess . Popen ( zig_path + " build run -Drt-test=true " , stdout = subprocess . PIPE , shell = True , preexec_fn = os . setsid )
atexit . register ( cleanup )
2019-06-30 00:50:44 +02:00
case_idx = 0
2019-10-06 21:52:43 +02:00
read_thread = threading . Thread ( target = read_messages , args = ( proc , ) )
read_thread . daemon = True
read_thread . start ( )
2019-06-30 00:50:44 +02:00
# Go through the cases
while case_idx < len ( cases ) :
case = cases [ case_idx ]
expected_idx = 0
# Go through the expected log messages
while expected_idx < len ( case . expected ) :
2019-10-08 01:11:50 +02:00
e = case . prefix + case . expected [ expected_idx ]
2019-10-06 21:52:43 +02:00
try :
line = msg_queue . get ( block = True , timeout = 5 )
except queue . Empty :
failure ( " Timed out waiting for ' %s ' " % ( e ) )
2019-06-30 00:50:44 +02:00
line = line . strip ( )
pattern = re . compile ( e )
# Pass if the line matches the expected pattern, else fail
if pattern . fullmatch ( line ) :
test_pass ( case , e , expected_idx , line )
else :
test_failure ( case , e , expected_idx , line )
expected_idx + = 1
case_idx + = 1
sys . exit ( 0 )