#!/usr/bin/env python import os, sys, re, datetime, argparse, traceback, tempfile, json, subprocess import html5lib import random # Set up import path to find our own Django basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../")) if not basedir in sys.path: sys.path.insert(0, basedir) # Parse args now, so we can use custom settings when importing django parser = argparse.ArgumentParser( description="""Perform a test crawl of the project. For each found URL, the HTTP response status is printed. If it's not OK/redirect, FAIL is printed - in case of errors, a stacktrace is also included.""") parser.add_argument('urls', metavar='URL', nargs='*', help='One or more URLs to start the crawl from') parser.add_argument('--urls', '-u', dest='url_file', help='File with URLs to start the crawl from') parser.add_argument('--slow', dest='slow_threshold', type=float, default=1.0, help='Responses taking longer than this (in seconds) results in SLOW being printed') parser.add_argument('--settings', help='Custom settings file') parser.add_argument('--logfile', help='Write to logfile') parser.add_argument('--user', help='Crawl logged in as this user', default=None) parser.add_argument('--validator-nu', dest='validator_nu', action='store_true', help='Use validator.nu instead of html5lib for HTML validation') parser.add_argument('--pedantic', action='store_true', help='Stop the crawl on the first HTML validation issue') parser.add_argument('--random', action='store_true', help='Crawl URLs randomly') parser.add_argument('--validate-all', dest='validate_all', action='store_true', default=False, help='Run html 5 validation on all pages, without skipping similar urls. ' '(The default is to only run validation on one of /foo/1/, /foo/2/, /foo/3/, etc.)') parser.add_argument('-v', '--verbose', action='store_true', default=False, help='Be more verbose') args = parser.parse_args() # Import Django, call setup() os.environ.setdefault("DJANGO_SETTINGS_MODULE", args.settings or "ietf.settings_testcrawl") import django import django.test import django.core.checks django.setup() # This needs to come after we set up sys path to include the local django import debug # pyflakes:ignore # prevent memory from leaking when settings.DEBUG=True from django.db import connection class DontSaveQueries(object): def append(self, x): pass connection.queries = DontSaveQueries() from ietf.name.models import DocTypeName # --- Constants --- MAX_URL_LENGTH = 500 # --- Functions --- def note(s): if args.verbose: sys.stderr.write(s) sys.stderr.write('\n') def strip_url(url): if url.startswith("http://testserver"): url = url[len("http://testserver"):] return url def extract_html_urls(content): for m in re.finditer(r'(<(?:(?:a|link) [^>]*href|(?:img|script) [^>]*src)=[\'"]([^"]+)[\'"][^>]*>)', content): if re.search(r'rel=["\']?nofollow["\']', m.group(1)): continue url = strip_url(m.group(2)) if len(url) > MAX_URL_LENGTH: continue # avoid infinite GET parameter appendages if not url.startswith("/"): continue if url.startswith("//"): continue yield url def extract_tastypie_urls(content): VISIT_OBJECTS = False VISIT_NEXT = False data = json.loads(content) for item in data: if type(data[item]) is dict: if "list_endpoint" in data[item]: uri = data[item]["list_endpoint"] yield uri if VISIT_NEXT: if "meta" in data and "next" in data["meta"]: uri = data["meta"]["next"] if uri != None: yield uri if VISIT_OBJECTS: if "objects" in data: object_list = data["objects"] for i in range(len(object_list)): if "resource_uri" in object_list[i]: uri = object_list[i]["resource_uri"] yield uri def check_html_valid(url, response, args): global parser, validated_urls, doc_types, warnings key = url if not args.validate_all: # derive a key for urls like this by replacing primary keys key = re.sub("\?.*$", "", key) key = re.sub("#.*$", "", key) key = re.sub("/.+@.+/", "/x@x.org/", key) key = re.sub("/[0-9.]+/", "/nnnn/", key) key = re.sub("/[0-9.]+/", "/mmmm/", key) key = re.sub("/ag/[a-z0-9-]+/", "/ag/foo/", key) key = re.sub("/area/[a-z0-9-]+/", "/area/foo/", key) key = re.sub("/bcp[0-9]+/", "/bcpnnn/", key) key = re.sub("/conflict-review-[a-z0-9-]+/", "/conflrev-foo/", key) key = re.sub("/dir/[a-z0-9-]+/", "/dir/foo/", key) key = re.sub("/draft-[a-z0-9-]+/", "/draft-foo/", key) key = re.sub("/group/[a-z0-9-]+/", "/group/foo/", key) key = re.sub("/ipr/search/.*", "/ipr/search/", key) key = re.sub("/release/[0-9dev.]+/", "/release/n.n.n/", key) key = re.sub("/rfc[0-9]+/", "/rfcnnnn/", key) key = re.sub("/rg/[a-z0-9-]+/", "/rg/foo/", key) key = re.sub("/secr/srec/nnnn/[0-9a-z-]+/", "/secr/sreq/nn/bar/", key) key = re.sub("/state/[a-z0-9-]+/", "/state/foo/", key) key = re.sub("/state/[a-z0-9-]+/[a-z0-9-]+/", "/state/foo/bar/", key) key = re.sub("/status-change-[a-z0-9-]+/", "/statchg-foo/", key) key = re.sub("/std[0-9]+/", "/stdnnn/", key) key = re.sub("/submit/status/nnnn/[0-9a-f]+/", "/submit/status/nnnn/bar/", key) key = re.sub("/team/[a-z0-9-]+/", "/team/foo/", key) key = re.sub("/wg/[a-z0-9-]+/", "/wg/foo/", key) for slug in doc_types: key = re.sub("/%s-.*/"%slug, "/%s-nnnn/"%slug, key) if not key in validated_urls: note('Validate: %-32s: %s' % (url[:32], key)) # These URLs have known issues, skip them until those are fixed if re.search('(/secr|admin/)|/doc/.*/edit/info/', url): log("%s blacklisted; skipping HTML validation" % url) validated_urls[key] = True return if hasattr(response, "content"): content = response.content else: content = response.streaming_content validated_urls[key] = True if args.validator_nu: v = subprocess.Popen(["java", "-jar", basedir + "/bin/vnu.jar", "--format", "json", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE) for m in json.loads(v.communicate(content)[1])["messages"]: t = m["subType"] if m["type"] == "info" else m["type"] tags.append("\n%s\tLine %d: %s" % (t.upper(), m["lastLine"], m["message"])) tags.append("\n\t%s" % m["extract"].replace('\n', ' ')) tags.append("\n\t%s%s" % (" " * m["hiliteStart"], "^" * m["hiliteLength"])) # disregard some HTML issues that are (usually) due to invalid # database content if not re.search('Forbidden code point|Bad value|seamless|The first child', m["message"]): warnings += 1 else: try: parser.parse(content) except Exception: e = SyntaxWarning("ParseError") for err in parser.errors: pos, code, data = err tags.append(u"WARN invalid html: Position %s: %s" % (pos, code)) warnings += 1 def skip_url(url): for pattern in ( "^/community/[0-9]+/remove_document/", "^/community/personal/", ): if re.search(pattern, url): return True return False def log(s): print(s) if logfile: if not type(s) is str: s = s.encode('utf-8') logfile.write(s) logfile.write('\n') def get_referrers(url): ref_list = [] while url in referrers: url = referrers[url] if url in ref_list: log("Circular referral list, discovered at %s" % url) break ref_list.append(url) return ref_list # --- GLobals --- slow_threshold = args.slow_threshold visited = set() urls = {} # url -> referrer referrers = {} initial_urls = [] initial_urls.extend(args.urls) if args.url_file: with open(args.url_file) as f: for line in f: line = line.partition("#")[0].strip() if line: initial_urls.append(line) if not initial_urls: initial_urls.append("/") initial_urls.append("/api/v1") for url in initial_urls: urls[url] = "[initial]" parser = html5lib.HTMLParser(strict=True) validated_urls = {} doc_types = [ t.slug for t in DocTypeName.objects.all() ] errors = 0 warnings = 0 count = 0 start_time = datetime.datetime.now() client = django.test.Client(Accept='text/html,text/plain,application/json') logfile = None if args.logfile: logfile = open(args.logfile, "w") # --- Main --- if __name__ == "__main__": if (args.user): # log in as user, to have the respective HTML generated by the templates response = client.post('/accounts/login/', {'username': args.user, 'password': 'password'}, secure=True, follow=True) if (response.status_code != 200): log("Could not log in as %s, HTML response %d" % (args.user, response.status_code)) sys.exit(1) # Run django system checks and checks from ietf.checks: error_list = django.core.checks.run_checks() if error_list: print("") for entry in error_list: print(entry) sys.exit(1) while urls: if args.random: # popitem() is documented to be random, but really isn't url = random.choice(urls.keys()) referrer = urls.pop(url) else: url, referrer = urls.popitem() visited.add(url) if skip_url(url): continue try: timestamp = datetime.datetime.now() r = client.get(url, secure=True, follow=True) elapsed = datetime.datetime.now() - timestamp except KeyboardInterrupt: log(" ... was fetching %s" % url) sys.exit(1) except: log("500 %.3fs %s FAIL (from: [ %s ])" % ((datetime.datetime.now() - timestamp).total_seconds(), url, (",\n\t".join(get_referrers(url))))) log("=============") log(traceback.format_exc()) log("=============") errors += 1 else: tags = [] if r.status_code in (301, 302): u = strip_url(r["Location"]) if u not in visited and u not in urls: urls[u] = referrer # referrer is original referrer, not redirected url referrers[u] = referrer elif r.status_code == 200: ctype = r["Content-Type"] if ";" in ctype: ctype = ctype[:ctype.index(";")] if ctype == "text/html": try: for u in extract_html_urls(r.content): if u not in visited and u not in urls: urls[u] = url referrers[u] = url check_html_valid(url, r, args) except: log("error extracting HTML urls from %s" % url) log("=============") log(traceback.format_exc()) log("=============") elif ctype == "application/json": try: for u in extract_tastypie_urls(r.content): if u not in visited and u not in urls: urls[u] = url referrers[u] = url except: log("error extracting urls from %s" % url) log("=============") log(traceback.format_exc()) log("=============") else: tags.append(u"FAIL (from %s)" % (referrer, )) errors += 1 if elapsed.total_seconds() > slow_threshold: tags.append("SLOW") acc_time = (timestamp - start_time).total_seconds() acc_secs = (timestamp - start_time).total_seconds() hrs = acc_secs // (60*60) min = (acc_secs % (60*60)) // 60 sec = acc_secs % 60 if (len(visited) % 100) == 1: log("\nElapsed Visited Queue Code Time Url ... Notes") log("%2d:%02d:%02d %7d %6d %s %6.3fs %s %s" % (hrs,min,sec, len(visited), len(urls), r.status_code, elapsed.total_seconds(), url, " ".join(tags))) if ((errors or warnings) and args.pedantic): sys.exit(1) if logfile: logfile.close() sys.stderr.write("Output written to %s\n\n" % logfile.name) if errors > 0: sys.stderr.write("Found %s errors, grep output for FAIL for details\n" % errors) sys.exit(1) else: sys.stderr.write("Found no errors.\n") if warnings > 0: sys.stderr.write("Found %s warnings, grep output for WARN for details\n" % warnings) else: sys.stderr.write("Found no warnings.\n")