#!/usr/bin/env python import os, sys, re, datetime, argparse, traceback, tempfile, json import html5lib import debug # pyflakes:ignore # Set up import path to find our own Django basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../")) if not basedir in sys.path: sys.path.insert(0, basedir) # Parse args now, so we can use custom settings when importing django parser = argparse.ArgumentParser( description="""Perform a test crawl of the project. For each found URL, the HTTP response status is printed. If it's not OK/redirect, FAIL is printed - in case of errors, a stacktrace is also included.""") parser.add_argument('urls', metavar='URL', nargs='*', help='One or more URLs to start the crawl from') parser.add_argument('--urls', '-u', dest='url_file', help='file with URLs to start the crawl from') parser.add_argument('--slow', dest='slow_threshold', type=float, default=1.0, help='responses taking longer than this (in seconds) results in SLOW being printed') parser.add_argument('--settings', dest='settings', help='custom settings file') parser.add_argument('--logfile', dest='logfile', help='write to logfile') args = parser.parse_args() # Import Django, call setup() os.environ.setdefault("DJANGO_SETTINGS_MODULE", args.settings or "ietf.settings") import django import django.test django.setup() # prevent memory from leaking when settings.DEBUG=True from django.db import connection class DontSaveQueries(object): def append(self, x): pass connection.queries = DontSaveQueries() from ietf.name.models import DocTypeName # --- Constants --- MAX_URL_LENGTH = 500 # --- Functions --- def strip_url(url): if url.startswith("http://testserver"): url = url[len("http://testserver"):] return url def extract_html_urls(content): for m in re.finditer(r'(<(?:a|link) [^>]*href=[\'"]([^"]+)[\'"][^>]*>)', content): if re.search(r'rel=["\']?nofollow["\']', m.group(1)): continue url = strip_url(m.group(2)) if len(url) > MAX_URL_LENGTH: continue # avoid infinite GET parameter appendages if not url.startswith("/"): continue if url.startswith("//"): continue yield url def extract_tastypie_urls(content): VISIT_OBJECTS = False VISIT_NEXT = False data = json.loads(content) for item in data: if type(data[item]) is dict: if "list_endpoint" in data[item]: uri = data[item]["list_endpoint"] yield uri if VISIT_NEXT: if "meta" in data and "next" in data["meta"]: uri = data["meta"]["next"] if uri != None: yield uri if VISIT_OBJECTS: if "objects" in data: object_list = data["objects"] for i in range(len(object_list)): if "resource_uri" in object_list[i]: uri = object_list[i]["resource_uri"] yield uri def check_html_valid(url, response): global parser, validated_urls, doc_types, warnings # derive a key for urls like this by replacing primary keys key = url key = re.sub("/[0-9.]+/", "/nnnn/", key) key = re.sub("/.+@.+/", "/x@x.org/", key) key = re.sub("#.*$", "", key) key = re.sub("\?.*$", "", key) key = re.sub("/rfc[0-9]+/", "/rfcnnnn/", key) key = re.sub("/wg/[a-z0-9-]+/", "/wg/foo/", key) key = re.sub("/rg/[a-z0-9-]+/", "/rg/foo/", key) for slug in doc_types: key = re.sub("/%s-.*/"%slug, "/%s-nnnn/"%slug, key) if not key in validated_urls: if hasattr(response, "content"): content = response.content else: content = response.streaming_content try: validated_urls[key] = True parser.parse(content) except Exception: e = SyntaxWarning("ParseError") for err in parser.errors: pos, code, data = err tags.append(u"WARN invalid html: Position %s: %s" % (pos, code)) warnings += 1 def log(s): print(s) if logfile: logfile.write(s) logfile.write('\n') def get_referrers(url): ref_list = [] while url in referrers: url = referrers[url] if url in ref_list: log("Circular referral list, discovered at %s" % url) break ref_list.append(url) return ref_list # --- GLobals --- slow_threshold = args.slow_threshold visited = set() urls = {} # url -> referrer referrers = {} initial_urls = [] initial_urls.extend(args.urls) if args.url_file: with open(args.url_file) as f: for line in f: line = line.partition("#")[0].strip() if line: initial_urls.append(line) if not initial_urls: initial_urls.append("/") initial_urls.append("/api/v1") for url in initial_urls: urls[url] = "[initial]" parser = html5lib.HTMLParser(strict=True) validated_urls = {} doc_types = [ t.slug for t in DocTypeName.objects.all() ] errors = 0 warnings = 0 count = 0 start_time = datetime.datetime.now() client = django.test.Client(Accept='text/html,text/plain,application/json') logfile = None if args.logfile: logfile = open(args.logfile, "w") validated_urls = {} # --- Main --- if __name__ == "__main__": while urls: url, referrer = urls.popitem() visited.add(url) try: timestamp = datetime.datetime.now() r = client.get(url) elapsed = datetime.datetime.now() - timestamp except KeyboardInterrupt: log(" ... was fetching %s" % url) sys.exit(1) except: log("500 %.3fs %s FAIL (from: [ %s ])" % ((datetime.datetime.now() - timestamp).total_seconds(), url, (",\n\t".join(get_referrers(url))))) log("=============") log(traceback.format_exc()) log("=============") errors += 1 else: tags = [] if r.status_code in (301, 302): u = strip_url(r["Location"]) if u not in visited and u not in urls: urls[u] = referrer # referrer is original referrer, not redirected url referrers[u] = referrer elif r.status_code == 200: ctype = r["Content-Type"] if ";" in ctype: ctype = ctype[:ctype.index(";")] if ctype == "text/html": try: for u in extract_html_urls(r.content): if u not in visited and u not in urls: urls[u] = url referrers[u] = url check_html_valid(url, r) except: log("error extracting HTML urls from %s" % url) log("=============") log(traceback.format_exc()) log("=============") elif ctype == "application/json": try: for u in extract_tastypie_urls(r.content): if u not in visited and u not in urls: urls[u] = url referrers[u] = url except: log("error extracting urls from %s" % url) log("=============") log(traceback.format_exc()) log("=============") else: tags.append(u"FAIL for %s\n (from %s)" % (url, referrer)) errors += 1 if elapsed.total_seconds() > slow_threshold: tags.append("SLOW") acc_time = (timestamp - start_time).total_seconds() acc_secs = (timestamp - start_time).total_seconds() hrs = acc_secs // (60*60) min = (acc_secs % (60*60)) // 60 sec = acc_secs % 60 if (len(visited) % 100) == 1: log("\nElapsed Visited Queue Code Time Url ... Notes") log("%2d:%02d:%02d %7d %6d %s %6.3fs %s %s" % (hrs,min,sec, len(visited), len(urls), r.status_code, elapsed.total_seconds(), url, " ".join(tags))) if logfile: logfile.close() sys.stderr.write("Output written to %s\n\n" % logfile.name) if errors > 0: sys.stderr.write("Found %s errors, grep output for FAIL for details\n" % errors) sys.exit(1) else: sys.stderr.write("Found no errors.\n") if warnings > 0: sys.stderr.write("Found %s warnings, grep output for WARN for details\n" % warnings) else: sys.stderr.write("Found no warnings.\n")