1

I'm trying to create a custom command for django that can be run with python manage.py cert_transparency and i'm almost there but i'm having a little trouble. The purpose of this one is to create a 24/7 running command in the background which I just run in a docker container.

I'm receiving this error message

certificate_update: 0cert [00:00, ?cert/s]Traceback (most recent call last):
  File "manage.py", line 15, in <module>
    execute_from_command_line(sys.argv)
  File "/usr/local/lib/python3.7/site-packages/django/core/management/__init__.py", line 401, in execute_from_command_line
    utility.execute()
  File "/usr/local/lib/python3.7/site-packages/django/core/management/__init__.py", line 395, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/usr/local/lib/python3.7/site-packages/django/core/management/base.py", line 330, in run_from_argv
    self.execute(*args, **cmd_options)
  File "/usr/local/lib/python3.7/site-packages/django/core/management/base.py", line 371, in execute
    output = self.handle(*args, **options)
  File "/src/scraper/management/commands/cert_transparency.py", line 184, in handle
    certstream.listen_for_events(callback, url=certstream_url)
NameError: name 'callback' is not defined

Basically what i'm trying to do is just import this script as a custom management command in django.

Directory Structure

cert_transparency.py:

from django.core.management.base import BaseCommand, CommandError
import re
import math

import certstream
import tqdm
import yaml
import time
import os
from Levenshtein import distance
from termcolor import colored, cprint
from tld import get_tld
from .confusables import unconfuse


class Command(BaseCommand):
    help = 'Scrapes calidogs websocket for cert renewals and rates them.'
    
    def score_domain(self, domain):
        """Score `domain`.

    The highest score, the most probable `domain` is a phishing site.

    Args:
        domain (str): the domain to check.

    Returns:
        int: the score of `domain`.
        """
        score = 0
        for t in suspicious['tlds']:
            if domain.endswith(t):
                score += 20

    # Remove initial '*.' for wildcard certificates bug
        if domain.startswith('*.'):
            domain = domain[2:]

    # Removing TLD to catch inner TLD in subdomain (ie. paypal.com.domain.com)
        try:
            res = get_tld(domain, as_object=True, fail_silently=True, fix_protocol=True)
            domain = '.'.join([res.subdomain, res.domain])
        except Exception:
            pass

    # Higer entropy is kind of suspicious
    #score += int(round(entropy(domain)*10))

    # Remove lookalike characters using list from http://www.unicode.org/reports/tr39
        domain = unconfuse(domain)

        words_in_domain = re.split("\W+", domain)

    # ie. detect fake .com (ie. *.com-account-management.info)
        if words_in_domain[0] in ['com', 'net', 'org']:
            score += 10

    # Testing keywords
        for word in suspicious['keywords']:
            if word in domain:
                score += suspicious['keywords'][word]

    # Testing Levenshtein distance for strong keywords (>= 70 points) (ie. paypol)
        for key in [k for (k,s) in suspicious['keywords'].items() if s >= 70]:
        # Removing too generic keywords (ie. mail.domain.com)
            for word in [w for w in words_in_domain if w not in ['email', 'mail', 'cloud']]:
                if distance(str(word), str(key)) == 1:
                    score += 70

    # Lots of '-' (ie. www.paypal-datacenter.com-acccount-alert.com)
        if 'xn--' not in domain and domain.count('-') >= 4:
            score += domain.count('-') * 3

    # Deeply nested subdomains (ie. www.paypal.com.security.accountupdate.gq)
        if domain.count('.') >= 3:
            score += domain.count('.') * 3

        return score


    def callback(self, message, context):
        """Callback handler for certstream events."""
        if message['message_type'] == "heartbeat":
            return

        if message['message_type'] == "certificate_update":
            all_domains = message['data']['leaf_cert']['all_domains']

            for domain in all_domains:
                pbar.update(1)
                score = score_domain(self, domain.lower())

            # If issued from a free CA = more suspicious
                if "Let's Encrypt" in message['data']['chain'][0]['subject']['aggregated']:
                    score += 10

                if score >= 100:
                    self.stdout.write(tqdm.tqdm.write(
                    "[!] Suspicious: "
                    "{} (score={})".format(colored(domain, 'red', attrs=['underline', 'bold']), score)))
                elif score >= 90:
                    self.stdout.write(tqdm.tqdm.write(
                    "[!] Suspicious: "
                    "{} (score={})".format(colored(domain, 'red', attrs=['underline']), score)))
                elif score >= 80:
                    self.stdout.write(tqdm.tqdm.write(
                    "[!] Likely    : "
                    "{} (score={})".format(colored(domain, 'yellow', attrs=['underline']), score)))
                elif score >= 65:
                    self.stdout.write(tqdm.tqdm.write(
                    "[+] Potential : "
                    "{} (score={})".format(colored(domain, attrs=['underline']), score)))

                if score >= 75:
                    with open(log_suspicious, 'a') as f:
                        f.write("{}\n".format(domain))

    def callback(self, message, context):
        """Callback handler for certstream events."""
        if message['message_type'] == "heartbeat":
            return

        if message['message_type'] == "certificate_update":
            all_domains = message['data']['leaf_cert']['all_domains']

            for domain in all_domains:
                pbar.update(1)
                score = score_domain(domain.lower())

            # If issued from a free CA = more suspicious
                if "Let's Encrypt" in message['data']['chain'][0]['subject']['aggregated']:
                    score += 10

                if score >= 100:
                    tqdm.tqdm.write(
                    "[!] Suspicious: "
                    "{} (score={})".format(colored(domain, 'red', attrs=['underline', 'bold']), score))
                elif score >= 90:
                    tqdm.tqdm.write(
                    "[!] Suspicious: "
                    "{} (score={})".format(colored(domain, 'red', attrs=['underline']), score))
                elif score >= 80:
                    tqdm.tqdm.write(
                    "[!] Likely    : "
                    "{} (score={})".format(colored(domain, 'yellow', attrs=['underline']), score))
                elif score >= 65:
                    tqdm.tqdm.write(
                    "[+] Potential : "
                    "{} (score={})".format(colored(domain, attrs=['underline']), score))

                if score >= 75:
                    with open(log_suspicious, 'a') as f:
                        f.write("{}\n".format(domain))

                        
    def handle(self, *args, **options):
        certstream_url = 'wss://certstream.calidog.io'

        log_suspicious = os.path.dirname(os.path.realpath(__file__))+'/suspicious_domains_'+time.strftime("%Y-%m-%d")+'.log'

        suspicious_yaml = os.path.dirname(os.path.realpath(__file__))+'/suspicious.yaml'

        external_yaml = os.path.dirname(os.path.realpath(__file__))+'/external.yaml'

        pbar = tqdm.tqdm(desc='certificate_update', unit='cert')



        with open(suspicious_yaml, 'r') as f:
            suspicious = yaml.safe_load(f)

        with open(external_yaml, 'r') as f:
            external = yaml.safe_load(f)

        if external['override_suspicious.yaml'] is True:
            suspicious = external
        else:
            if external['keywords'] is not None:
                suspicious['keywords'].update(external['keywords'])

            if external['tlds'] is not None:
                suspicious['tlds'].update(external['tlds'])

        certstream.listen_for_events(callback, url=certstream_url)
2
  • It should be self.callback, so certstream.listen_for_events(callback, url=certstream_url). You however defined two callback methods? Commented Oct 11, 2020 at 0:04
  • Ooooops, I've copied the code two times. Thanks for pointing it out! I'll fix it and see if it your solution works. Commented Oct 11, 2020 at 0:12

1 Answer 1

1

You defined the .callback(…) method twice, so I think you should remove one of the two. You can pass a reference to the .callback(…) method with self.callback:

def handle(self, *args, **options):
    # …
    certstream.listen_for_events(self.callback, url=certstream_url)
Sign up to request clarification or add additional context in comments.

1 Comment

I forgot to reply back yesterday. Your solution fixed my issue. Thanks for it!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.