x264 source for verification 2026-05-22
This commit is contained in:
597
tools/digress/testing.py
Normal file
597
tools/digress/testing.py
Normal file
@@ -0,0 +1,597 @@
|
||||
"""
|
||||
Digress testing core.
|
||||
"""
|
||||
|
||||
from digress.errors import SkippedTestError, DisabledTestError, NoSuchTestError, \
|
||||
FailedTestError, AlreadyRunError, SCMError, \
|
||||
ComparisonError
|
||||
from digress.constants import *
|
||||
from digress.cli import dispatchable
|
||||
|
||||
import inspect
|
||||
import operator
|
||||
import os
|
||||
import json
|
||||
|
||||
import textwrap
|
||||
|
||||
from shutil import rmtree
|
||||
|
||||
from time import time
|
||||
from functools import wraps
|
||||
|
||||
from itertools import izip_longest
|
||||
|
||||
from hashlib import sha1
|
||||
|
||||
class depends(object):
|
||||
"""
|
||||
Dependency decorator for a test.
|
||||
"""
|
||||
def __init__(self, *test_names):
|
||||
self.test_names = test_names
|
||||
|
||||
def __call__(self, func):
|
||||
func.digress_depends = self.test_names
|
||||
return func
|
||||
|
||||
class _skipped(object):
|
||||
"""
|
||||
Internal skipped decorator.
|
||||
"""
|
||||
def __init__(self, reason=""):
|
||||
self._reason = reason
|
||||
|
||||
def __call__(self, func):
|
||||
@wraps(func)
|
||||
def _closure(*args):
|
||||
raise SkippedTestError(self._reason)
|
||||
return _closure
|
||||
|
||||
class disabled(object):
|
||||
"""
|
||||
Disable a test, with reason.
|
||||
"""
|
||||
def __init__(self, reason=""):
|
||||
self._reason = reason
|
||||
|
||||
def __call__(self, func):
|
||||
@wraps(func)
|
||||
def _closure(*args):
|
||||
raise DisabledTestError(self._reason)
|
||||
return _closure
|
||||
|
||||
class comparer(object):
|
||||
"""
|
||||
Set the comparer for a test.
|
||||
"""
|
||||
def __init__(self, comparer_):
|
||||
self._comparer = comparer_
|
||||
|
||||
def __call__(self, func):
|
||||
func.digress_comparer = self._comparer
|
||||
return func
|
||||
|
||||
class Fixture(object):
|
||||
cases = []
|
||||
scm = None
|
||||
|
||||
flush_before = False
|
||||
|
||||
def _skip_case(self, case, depend):
|
||||
for name, meth in inspect.getmembers(case):
|
||||
if name[:5] == "test_":
|
||||
setattr(
|
||||
case,
|
||||
name,
|
||||
_skipped("failed dependency: case %s" % depend)(meth)
|
||||
)
|
||||
|
||||
def _run_case(self, case, results):
|
||||
if case.__name__ in results:
|
||||
raise AlreadyRunError
|
||||
|
||||
for depend in case.depends:
|
||||
if depend.__name__ in results and results[depend.__name__]["status"] != CASE_PASS:
|
||||
self._skip_case(case, depend.__name__)
|
||||
|
||||
try:
|
||||
result = self._run_case(depend, results)
|
||||
except AlreadyRunError:
|
||||
continue
|
||||
|
||||
if result["status"] != CASE_PASS:
|
||||
self._skip_case(case, depend.__name__)
|
||||
|
||||
result = case().run()
|
||||
results[case.__name__] = result
|
||||
return result
|
||||
|
||||
@dispatchable
|
||||
def flush(self, revision=None):
|
||||
"""
|
||||
Flush any cached results. Takes a revision for an optional argument.
|
||||
"""
|
||||
if not revision:
|
||||
print "Flushing all cached results...",
|
||||
|
||||
try:
|
||||
rmtree(".digress_%s" % self.__class__.__name__)
|
||||
except Exception, e:
|
||||
print "failed: %s" % e
|
||||
else:
|
||||
print "done."
|
||||
else:
|
||||
try:
|
||||
rev = self.scm.rev_parse(revision)
|
||||
except SCMError, e:
|
||||
print e
|
||||
else:
|
||||
print "Flushing cached results for %s..." % rev,
|
||||
|
||||
try:
|
||||
rmtree(os.path.join(".digress_%s" % self.__class__.__name__, rev))
|
||||
except Exception, e:
|
||||
print "failed: %s" % e
|
||||
else:
|
||||
print "done."
|
||||
|
||||
@dispatchable
|
||||
def run(self, revision=None):
|
||||
"""
|
||||
Run the fixture for a specified revision.
|
||||
|
||||
Takes a revision for an argument.
|
||||
"""
|
||||
oldrev = None
|
||||
oldbranch = None
|
||||
dirty = False
|
||||
|
||||
try:
|
||||
dirty = self.scm.dirty()
|
||||
|
||||
# if the tree is clean, then we don't need to make an exception
|
||||
if not dirty and revision is None: revision = "HEAD"
|
||||
|
||||
if revision:
|
||||
oldrev = self.scm.current_rev()
|
||||
oldbranch = self.scm.current_branch()
|
||||
|
||||
if dirty:
|
||||
self.scm.stash()
|
||||
self.scm.checkout(revision)
|
||||
|
||||
rev = self.scm.current_rev()
|
||||
|
||||
self.datastore = os.path.join(".digress_%s" % self.__class__.__name__, rev)
|
||||
|
||||
if os.path.isdir(self.datastore):
|
||||
if self.flush_before:
|
||||
self.flush(rev)
|
||||
else:
|
||||
os.makedirs(self.datastore)
|
||||
else:
|
||||
rev = "(dirty working tree)"
|
||||
self.datastore = None
|
||||
|
||||
print "Running fixture %s on revision %s...\n" % (self.__class__.__name__, rev)
|
||||
|
||||
results = {}
|
||||
|
||||
for case in self.cases:
|
||||
try:
|
||||
self._run_case(case, results)
|
||||
except AlreadyRunError:
|
||||
continue
|
||||
|
||||
total_time = reduce(operator.add, filter(
|
||||
None,
|
||||
[
|
||||
result["time"] for result in results.values()
|
||||
]
|
||||
), 0)
|
||||
|
||||
overall_status = (
|
||||
CASE_FAIL in [ result["status"] for result in results.values() ]
|
||||
) and FIXTURE_FAIL or FIXTURE_PASS
|
||||
|
||||
print "Fixture %s in %.4f.\n" % (
|
||||
(overall_status == FIXTURE_PASS) and "passed" or "failed",
|
||||
total_time
|
||||
)
|
||||
|
||||
return { "cases" : results, "time" : total_time, "status" : overall_status, "revision" : rev }
|
||||
|
||||
finally:
|
||||
if oldrev:
|
||||
self.scm.checkout(oldrev)
|
||||
if oldbranch:
|
||||
self.scm.checkout(oldbranch)
|
||||
if dirty:
|
||||
self.scm.unstash()
|
||||
|
||||
@dispatchable
|
||||
def bisect(self, good_rev, bad_rev=None):
|
||||
"""
|
||||
Perform a bisection between two revisions.
|
||||
|
||||
First argument is the good revision, second is the bad revision, which
|
||||
defaults to the current revision.
|
||||
"""
|
||||
if not bad_rev: bad_rev = self.scm.current_rev()
|
||||
|
||||
dirty = False
|
||||
|
||||
# get a set of results for the good revision
|
||||
good_result = self.run(good_rev)
|
||||
|
||||
good_rev = good_result["revision"]
|
||||
|
||||
try:
|
||||
dirty = self.scm.dirty()
|
||||
|
||||
if dirty:
|
||||
self.scm.stash()
|
||||
|
||||
self.scm.bisect("start")
|
||||
|
||||
self.scm.bisect("bad", bad_rev)
|
||||
self.scm.bisect("good", good_rev)
|
||||
|
||||
bisecting = True
|
||||
isbad = False
|
||||
|
||||
while bisecting:
|
||||
results = self.run(self.scm.current_rev())
|
||||
revision = results["revision"]
|
||||
|
||||
# perform comparisons
|
||||
# FIXME: this just uses a lot of self.compare
|
||||
for case_name, case_result in good_result["cases"].iteritems():
|
||||
case = filter(lambda case: case.__name__ == case_name, self.cases)[0]
|
||||
|
||||
for test_name, test_result in case_result["tests"].iteritems():
|
||||
test = filter(
|
||||
lambda pair: pair[0] == "test_%s" % test_name,
|
||||
inspect.getmembers(case)
|
||||
)[0][1]
|
||||
|
||||
other_result = results["cases"][case_name]["tests"][test_name]
|
||||
|
||||
if other_result["status"] == TEST_FAIL and case_result["status"] != TEST_FAIL:
|
||||
print "Revision %s failed %s.%s." % (revision, case_name, test_name)
|
||||
isbad = True
|
||||
break
|
||||
|
||||
elif hasattr(test, "digress_comparer"):
|
||||
try:
|
||||
test.digress_comparer(test_result["value"], other_result["value"])
|
||||
except ComparisonError, e:
|
||||
print "%s differs: %s" % (test_name, e)
|
||||
isbad = True
|
||||
break
|
||||
|
||||
if isbad:
|
||||
output = self.scm.bisect("bad", revision)
|
||||
print "Marking revision %s as bad." % revision
|
||||
else:
|
||||
output = self.scm.bisect("good", revision)
|
||||
print "Marking revision %s as good." % revision
|
||||
|
||||
if output.split("\n")[0].endswith("is the first bad commit"):
|
||||
print "\nBisection complete.\n"
|
||||
print output
|
||||
bisecting = False
|
||||
|
||||
print ""
|
||||
except SCMError, e:
|
||||
print e
|
||||
finally:
|
||||
self.scm.bisect("reset")
|
||||
|
||||
if dirty:
|
||||
self.scm.unstash()
|
||||
|
||||
@dispatchable
|
||||
def multicompare(self, rev_a=None, rev_b=None, mode="waterfall"):
|
||||
"""
|
||||
Generate a comparison of tests.
|
||||
|
||||
Takes three optional arguments, from which revision, to which revision,
|
||||
and the method of display (defaults to vertical "waterfall", also
|
||||
accepts "river" for horizontal display)
|
||||
"""
|
||||
if not rev_a: rev_a = self.scm.current_rev()
|
||||
if not rev_b: rev_b = self.scm.current_rev()
|
||||
|
||||
revisions = self.scm.revisions(rev_a, rev_b)
|
||||
|
||||
results = []
|
||||
|
||||
for revision in revisions:
|
||||
results.append(self.run(revision))
|
||||
|
||||
test_names = reduce(operator.add, [
|
||||
[
|
||||
(case_name, test_name)
|
||||
for
|
||||
test_name, test_result
|
||||
in
|
||||
case_result["tests"].iteritems()
|
||||
]
|
||||
for
|
||||
case_name, case_result
|
||||
in
|
||||
results[0]["cases"].iteritems()
|
||||
], [])
|
||||
|
||||
MAXLEN = 20
|
||||
|
||||
colfmt = "| %s "
|
||||
|
||||
table = []
|
||||
|
||||
if mode not in ("waterfall", "river"):
|
||||
mode = "waterfall"
|
||||
|
||||
print "Unknown multicompare mode specified, defaulting to %s." % mode
|
||||
|
||||
if mode == "waterfall":
|
||||
header = [ "Test" ]
|
||||
|
||||
for result in results:
|
||||
header.append(result["revision"])
|
||||
|
||||
table.append(header)
|
||||
|
||||
for test_name in test_names:
|
||||
row_data = [ ".".join(test_name) ]
|
||||
|
||||
for result in results:
|
||||
test_result = result["cases"][test_name[0]]["tests"][test_name[1]]
|
||||
|
||||
if test_result["status"] != TEST_PASS:
|
||||
value = "did not pass: %s" % (test_result["value"])
|
||||
else:
|
||||
value = "%s (%.4f)" % (test_result["value"], test_result["time"])
|
||||
|
||||
row_data.append(value)
|
||||
|
||||
table.append(row_data)
|
||||
|
||||
elif mode == "river":
|
||||
header = [ "Revision" ]
|
||||
|
||||
for test_name in test_names:
|
||||
header.append(".".join(test_name))
|
||||
|
||||
table.append(header)
|
||||
|
||||
for result in results:
|
||||
row_data = [ result["revision"] ]
|
||||
|
||||
for case_name, case_result in result["cases"].iteritems():
|
||||
for test_name, test_result in case_result["tests"].iteritems():
|
||||
|
||||
if test_result["status"] != TEST_PASS:
|
||||
value = "did not pass: %s" % (test_result["value"])
|
||||
else:
|
||||
value = "%s (%.4f)" % (test_result["value"], test_result["time"])
|
||||
|
||||
row_data.append(value)
|
||||
|
||||
table.append(row_data)
|
||||
|
||||
breaker = "=" * (len(colfmt % "".center(MAXLEN)) * len(table[0]) + 1)
|
||||
|
||||
print breaker
|
||||
|
||||
for row in table:
|
||||
for row_stuff in izip_longest(*[
|
||||
textwrap.wrap(col, MAXLEN, break_on_hyphens=False) for col in row
|
||||
], fillvalue=""):
|
||||
row_output = ""
|
||||
|
||||
for col in row_stuff:
|
||||
row_output += colfmt % col.ljust(MAXLEN)
|
||||
|
||||
row_output += "|"
|
||||
|
||||
print row_output
|
||||
print breaker
|
||||
|
||||
@dispatchable
|
||||
def compare(self, rev_a, rev_b=None):
|
||||
"""
|
||||
Compare two revisions directly.
|
||||
|
||||
Takes two arguments, second is optional and implies current revision.
|
||||
"""
|
||||
results_a = self.run(rev_a)
|
||||
results_b = self.run(rev_b)
|
||||
|
||||
for case_name, case_result in results_a["cases"].iteritems():
|
||||
case = filter(lambda case: case.__name__ == case_name, self.cases)[0]
|
||||
|
||||
header = "Comparison of case %s" % case_name
|
||||
print header
|
||||
print "=" * len(header)
|
||||
|
||||
for test_name, test_result in case_result["tests"].iteritems():
|
||||
test = filter(
|
||||
lambda pair: pair[0] == "test_%s" % test_name,
|
||||
inspect.getmembers(case)
|
||||
)[0][1]
|
||||
|
||||
other_result = results_b["cases"][case_name]["tests"][test_name]
|
||||
|
||||
if test_result["status"] != TEST_PASS or other_result["status"] != TEST_PASS:
|
||||
print "%s cannot be compared as one of the revisions have not passed it." % test_name
|
||||
|
||||
elif hasattr(test, "digress_comparer"):
|
||||
try:
|
||||
test.digress_comparer(test_result["value"], other_result["value"])
|
||||
except ComparisonError, e:
|
||||
print "%s differs: %s" % (test_name, e)
|
||||
else:
|
||||
print "%s does not differ." % test_name
|
||||
else:
|
||||
print "%s has no comparer and therefore cannot be compared." % test_name
|
||||
|
||||
print ""
|
||||
|
||||
@dispatchable
|
||||
def list(self):
|
||||
"""
|
||||
List all available test cases, excluding dependencies.
|
||||
"""
|
||||
print "\nAvailable Test Cases"
|
||||
print "===================="
|
||||
for case in self.cases:
|
||||
print case.__name__
|
||||
|
||||
def register_case(self, case):
|
||||
case.fixture = self
|
||||
self.cases.append(case)
|
||||
|
||||
class Case(object):
|
||||
depends = []
|
||||
fixture = None
|
||||
|
||||
def _get_test_by_name(self, test_name):
|
||||
if not hasattr(self, "test_%s" % test_name):
|
||||
raise NoSuchTestError(test_name)
|
||||
return getattr(self, "test_%s" % test_name)
|
||||
|
||||
def _run_test(self, test, results):
|
||||
test_name = test.__name__[5:]
|
||||
|
||||
if test_name in results:
|
||||
raise AlreadyRunError
|
||||
|
||||
if hasattr(test, "digress_depends"):
|
||||
for depend in test.digress_depends:
|
||||
if depend in results and results[depend]["status"] != TEST_PASS:
|
||||
test = _skipped("failed dependency: %s" % depend)(test)
|
||||
|
||||
dependtest = self._get_test_by_name(depend)
|
||||
|
||||
try:
|
||||
result = self._run_test(dependtest, results)
|
||||
except AlreadyRunError:
|
||||
continue
|
||||
|
||||
if result["status"] != TEST_PASS:
|
||||
test = _skipped("failed dependency: %s" % depend)(test)
|
||||
|
||||
start_time = time()
|
||||
run_time = None
|
||||
|
||||
print "Running test %s..." % test_name,
|
||||
|
||||
try:
|
||||
if not self.datastore:
|
||||
# XXX: this smells funny
|
||||
raise IOError
|
||||
|
||||
with open(os.path.join(
|
||||
self.datastore,
|
||||
"%s.json" % sha1(test_name).hexdigest()
|
||||
), "r") as f:
|
||||
result = json.load(f)
|
||||
|
||||
value = str(result["value"])
|
||||
|
||||
if result["status"] == TEST_DISABLED:
|
||||
status = "disabled"
|
||||
elif result["status"] == TEST_SKIPPED:
|
||||
status = "skipped"
|
||||
elif result["status"] == TEST_FAIL:
|
||||
status = "failed"
|
||||
elif result["status"] == TEST_PASS:
|
||||
status = "passed"
|
||||
value = "%s (in %.4f)" % (
|
||||
result["value"] or "(no result)",
|
||||
result["time"]
|
||||
)
|
||||
else:
|
||||
status = "???"
|
||||
|
||||
print "%s (cached): %s" % (status, value)
|
||||
except IOError:
|
||||
try:
|
||||
value = test()
|
||||
except DisabledTestError, e:
|
||||
print "disabled: %s" % e
|
||||
status = TEST_DISABLED
|
||||
value = str(e)
|
||||
except SkippedTestError, e:
|
||||
print "skipped: %s" % e
|
||||
status = TEST_SKIPPED
|
||||
value = str(e)
|
||||
except FailedTestError, e:
|
||||
print "failed: %s" % e
|
||||
status = TEST_FAIL
|
||||
value = str(e)
|
||||
except Exception, e:
|
||||
print "failed with exception: %s" % e
|
||||
status = TEST_FAIL
|
||||
value = str(e)
|
||||
else:
|
||||
run_time = time() - start_time
|
||||
print "passed: %s (in %.4f)" % (
|
||||
value or "(no result)",
|
||||
run_time
|
||||
)
|
||||
status = TEST_PASS
|
||||
|
||||
result = { "status" : status, "value" : value, "time" : run_time }
|
||||
|
||||
if self.datastore:
|
||||
with open(os.path.join(
|
||||
self.datastore,
|
||||
"%s.json" % sha1(test_name).hexdigest()
|
||||
), "w") as f:
|
||||
json.dump(result, f)
|
||||
|
||||
results[test_name] = result
|
||||
return result
|
||||
|
||||
def run(self):
|
||||
print "Running case %s..." % self.__class__.__name__
|
||||
|
||||
if self.fixture.datastore:
|
||||
self.datastore = os.path.join(
|
||||
self.fixture.datastore,
|
||||
sha1(self.__class__.__name__).hexdigest()
|
||||
)
|
||||
if not os.path.isdir(self.datastore):
|
||||
os.makedirs(self.datastore)
|
||||
else:
|
||||
self.datastore = None
|
||||
|
||||
results = {}
|
||||
|
||||
for name, meth in inspect.getmembers(self):
|
||||
if name[:5] == "test_":
|
||||
try:
|
||||
self._run_test(meth, results)
|
||||
except AlreadyRunError:
|
||||
continue
|
||||
|
||||
total_time = reduce(operator.add, filter(
|
||||
None, [
|
||||
result["time"] for result in results.values()
|
||||
]
|
||||
), 0)
|
||||
|
||||
overall_status = (
|
||||
TEST_FAIL in [ result["status"] for result in results.values() ]
|
||||
) and CASE_FAIL or CASE_PASS
|
||||
|
||||
print "Case %s in %.4f.\n" % (
|
||||
(overall_status == FIXTURE_PASS) and "passed" or "failed",
|
||||
total_time
|
||||
)
|
||||
|
||||
return { "tests" : results, "time" : total_time, "status" : overall_status }
|
||||
Reference in New Issue
Block a user