blob: 2e90759164d38d9e9f67965a0d729dc880537d69 [file] [log] [blame]
# backends
#
# Copyright (C) 2011 Carlos Garcia Campos <carlosgc@gnome.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from hashlib import md5
import os
from Config import Config
__all__ = [ 'register_backend',
'get_backend',
'get_all_backends',
'UnknownBackendError',
'Backend' ]
class UnknownBackendError(Exception):
'''Unknown backend type'''
class Backend:
def __init__(self, name):
self._name = name
self._utilsdir = Config().utils_dir
def get_name(self):
return self._name
def __should_have_checksum(self, entry):
if not entry.startswith(self._name):
return False
name, ext = os.path.splitext(entry)
return ext not in ('.md5', '.crashed', '.failed', '.stderr');
def create_checksums(self, refs_path, delete_refs = False):
path = os.path.join(refs_path, self._name)
md5_file = open(path + '.md5', 'w')
for entry in os.listdir(refs_path):
if not self.__should_have_checksum(entry):
continue
ref_path = os.path.join(refs_path, entry)
f = open(ref_path, 'rb')
md5_file.write("%s %s\n" % (md5(f.read()).hexdigest(), ref_path))
f.close()
if delete_refs:
os.remove(ref_path)
md5_file.close()
def compare_checksums(self, refs_path, out_path, remove_results = True, create_diffs = True):
retval = True
md5_path = os.path.join(refs_path, self._name)
md5_file = open(md5_path + '.md5', 'r')
tests = os.listdir(out_path)
for line in md5_file.readlines():
md5sum, ref_path = line.strip('\n').split(' ', 1)
basename = os.path.basename(ref_path)
if not self.__should_have_checksum(basename):
continue
if not basename in tests:
retval = False
print("%s found in md5 ref file but missing in output dir %s" % (basename, out_path))
continue
result_path = os.path.join(out_path, basename)
f = open(result_path, 'rb')
matched = md5sum == md5(f.read()).hexdigest()
f.close()
if matched:
if remove_results:
os.remove(result_path)
else:
print("Differences found in %s" % (basename))
if create_diffs:
if not os.path.exists(ref_path):
print("Reference file %s not found, skipping diff for %s" % (ref_path, result_path))
else:
try:
self._create_diff(ref_path, result_path)
except NotImplementedError:
# Diff not supported by backend
pass
retval = False
md5_file.close()
return retval
def has_md5(self, test_path):
return os.path.exists(os.path.join(test_path, self._name + '.md5'))
def is_crashed(self, test_path):
return os.path.exists(os.path.join(test_path, self._name + '.crashed'))
def is_failed(self, test_path):
failed_path = os.path.join(test_path, self._name + '.failed')
if not os.path.exists(failed_path):
return 0
f = open(failed_path, 'r')
status = int(f.read())
f.close()
return status
def has_stderr(self, test_path):
return os.path.exists(os.path.join(test_path, self._name + '.stderr'))
def __create_stderr_file(self, stderr, out_path):
if not stderr:
return
stderr_file = open(out_path + '.stderr', 'wb')
stderr_file.write(stderr)
stderr_file.close()
def __create_failed_file_if_needed(self, status, out_path):
if os.WIFEXITED(status) or os.WEXITSTATUS(status) == 0:
return False
failed_file = open(out_path + '.failed', 'w')
failed_file.write("%d" % (os.WEXITSTATUS(status)))
failed_file.close()
return True
def _check_exit_status(self, p, out_path):
status = p.wait()
stderr = p.stderr.read()
self.__create_stderr_file(stderr, out_path)
if not os.WIFEXITED(status):
open(out_path + '.crashed', 'w').close()
return False
if self.__create_failed_file_if_needed(status, out_path):
return False
return True
def _check_exit_status2(self, p1, p2, out_path):
status1 = p1.wait()
status2 = p2.wait()
p1_stderr = p1.stderr.read()
p2_stderr = p2.stderr.read()
if p1_stderr or p2_stderr:
self.__create_stderr_file(p1_stderr + p2_stderr, out_path)
if not os.WIFEXITED(status1) or not os.WIFEXITED(status2):
open(out_path + '.crashed', 'w').close()
return False
if self.__create_failed_file_if_needed(status1, out_path):
return False
if self.__create_failed_file_if_needed(status2, out_path):
return False
return True
def _diff_png(self, ref_path, result_path):
try:
import Image, ImageChops
except ImportError:
raise NotImplementedError
ref = Image.open(ref_path)
result = Image.open(result_path)
diff = ImageChops.difference(ref, result)
diff.save(result_path + '.diff', 'png')
def _create_diff(self, ref_path, result_path):
raise NotImplementedError
def create_refs(self, doc_path, refs_path):
raise NotImplementedError
_backends = {}
def register_backend(backend_name, backend_class):
_backends[backend_name] = backend_class
def _get_backend(backend_name):
if backend_name not in _backends:
try:
__import__('backends.%s' % backend_name)
except ImportError:
pass
if backend_name not in _backends:
raise UnknownBackendError('Backend %s does not exist' % backend_name)
return _backends[backend_name]
def get_backend(backend_name):
backend_class = _get_backend(backend_name)
return backend_class(backend_name)
def get_all_backends():
backends = []
thisdir = os.path.abspath(os.path.dirname(__file__))
for fname in os.listdir(os.path.join(thisdir)):
name, ext = os.path.splitext(fname)
if not ext == '.py':
continue
try:
__import__('backends.%s' % name)
except ImportError:
continue
if name in _backends:
backends.append(_backends[name](name))
return backends