I'm trying to create a custom command for django that can be run with python manage.py cert_transparency and i'm almost there but i'm having a little trouble. The purpose of this one is to create a 24/7 running command in the background which I just run in a docker container.
I'm receiving this error message
certificate_update: 0cert [00:00, ?cert/s]Traceback (most recent call last):
File "manage.py", line 15, in <module>
execute_from_command_line(sys.argv)
File "/usr/local/lib/python3.7/site-packages/django/core/management/__init__.py", line 401, in execute_from_command_line
utility.execute()
File "/usr/local/lib/python3.7/site-packages/django/core/management/__init__.py", line 395, in execute
self.fetch_command(subcommand).run_from_argv(self.argv)
File "/usr/local/lib/python3.7/site-packages/django/core/management/base.py", line 330, in run_from_argv
self.execute(*args, **cmd_options)
File "/usr/local/lib/python3.7/site-packages/django/core/management/base.py", line 371, in execute
output = self.handle(*args, **options)
File "/src/scraper/management/commands/cert_transparency.py", line 184, in handle
certstream.listen_for_events(callback, url=certstream_url)
NameError: name 'callback' is not defined
Basically what i'm trying to do is just import this script as a custom management command in django.
cert_transparency.py:
from django.core.management.base import BaseCommand, CommandError
import re
import math
import certstream
import tqdm
import yaml
import time
import os
from Levenshtein import distance
from termcolor import colored, cprint
from tld import get_tld
from .confusables import unconfuse
class Command(BaseCommand):
help = 'Scrapes calidogs websocket for cert renewals and rates them.'
def score_domain(self, domain):
"""Score `domain`.
The highest score, the most probable `domain` is a phishing site.
Args:
domain (str): the domain to check.
Returns:
int: the score of `domain`.
"""
score = 0
for t in suspicious['tlds']:
if domain.endswith(t):
score += 20
# Remove initial '*.' for wildcard certificates bug
if domain.startswith('*.'):
domain = domain[2:]
# Removing TLD to catch inner TLD in subdomain (ie. paypal.com.domain.com)
try:
res = get_tld(domain, as_object=True, fail_silently=True, fix_protocol=True)
domain = '.'.join([res.subdomain, res.domain])
except Exception:
pass
# Higer entropy is kind of suspicious
#score += int(round(entropy(domain)*10))
# Remove lookalike characters using list from http://www.unicode.org/reports/tr39
domain = unconfuse(domain)
words_in_domain = re.split("\W+", domain)
# ie. detect fake .com (ie. *.com-account-management.info)
if words_in_domain[0] in ['com', 'net', 'org']:
score += 10
# Testing keywords
for word in suspicious['keywords']:
if word in domain:
score += suspicious['keywords'][word]
# Testing Levenshtein distance for strong keywords (>= 70 points) (ie. paypol)
for key in [k for (k,s) in suspicious['keywords'].items() if s >= 70]:
# Removing too generic keywords (ie. mail.domain.com)
for word in [w for w in words_in_domain if w not in ['email', 'mail', 'cloud']]:
if distance(str(word), str(key)) == 1:
score += 70
# Lots of '-' (ie. www.paypal-datacenter.com-acccount-alert.com)
if 'xn--' not in domain and domain.count('-') >= 4:
score += domain.count('-') * 3
# Deeply nested subdomains (ie. www.paypal.com.security.accountupdate.gq)
if domain.count('.') >= 3:
score += domain.count('.') * 3
return score
def callback(self, message, context):
"""Callback handler for certstream events."""
if message['message_type'] == "heartbeat":
return
if message['message_type'] == "certificate_update":
all_domains = message['data']['leaf_cert']['all_domains']
for domain in all_domains:
pbar.update(1)
score = score_domain(self, domain.lower())
# If issued from a free CA = more suspicious
if "Let's Encrypt" in message['data']['chain'][0]['subject']['aggregated']:
score += 10
if score >= 100:
self.stdout.write(tqdm.tqdm.write(
"[!] Suspicious: "
"{} (score={})".format(colored(domain, 'red', attrs=['underline', 'bold']), score)))
elif score >= 90:
self.stdout.write(tqdm.tqdm.write(
"[!] Suspicious: "
"{} (score={})".format(colored(domain, 'red', attrs=['underline']), score)))
elif score >= 80:
self.stdout.write(tqdm.tqdm.write(
"[!] Likely : "
"{} (score={})".format(colored(domain, 'yellow', attrs=['underline']), score)))
elif score >= 65:
self.stdout.write(tqdm.tqdm.write(
"[+] Potential : "
"{} (score={})".format(colored(domain, attrs=['underline']), score)))
if score >= 75:
with open(log_suspicious, 'a') as f:
f.write("{}\n".format(domain))
def callback(self, message, context):
"""Callback handler for certstream events."""
if message['message_type'] == "heartbeat":
return
if message['message_type'] == "certificate_update":
all_domains = message['data']['leaf_cert']['all_domains']
for domain in all_domains:
pbar.update(1)
score = score_domain(domain.lower())
# If issued from a free CA = more suspicious
if "Let's Encrypt" in message['data']['chain'][0]['subject']['aggregated']:
score += 10
if score >= 100:
tqdm.tqdm.write(
"[!] Suspicious: "
"{} (score={})".format(colored(domain, 'red', attrs=['underline', 'bold']), score))
elif score >= 90:
tqdm.tqdm.write(
"[!] Suspicious: "
"{} (score={})".format(colored(domain, 'red', attrs=['underline']), score))
elif score >= 80:
tqdm.tqdm.write(
"[!] Likely : "
"{} (score={})".format(colored(domain, 'yellow', attrs=['underline']), score))
elif score >= 65:
tqdm.tqdm.write(
"[+] Potential : "
"{} (score={})".format(colored(domain, attrs=['underline']), score))
if score >= 75:
with open(log_suspicious, 'a') as f:
f.write("{}\n".format(domain))
def handle(self, *args, **options):
certstream_url = 'wss://certstream.calidog.io'
log_suspicious = os.path.dirname(os.path.realpath(__file__))+'/suspicious_domains_'+time.strftime("%Y-%m-%d")+'.log'
suspicious_yaml = os.path.dirname(os.path.realpath(__file__))+'/suspicious.yaml'
external_yaml = os.path.dirname(os.path.realpath(__file__))+'/external.yaml'
pbar = tqdm.tqdm(desc='certificate_update', unit='cert')
with open(suspicious_yaml, 'r') as f:
suspicious = yaml.safe_load(f)
with open(external_yaml, 'r') as f:
external = yaml.safe_load(f)
if external['override_suspicious.yaml'] is True:
suspicious = external
else:
if external['keywords'] is not None:
suspicious['keywords'].update(external['keywords'])
if external['tlds'] is not None:
suspicious['tlds'].update(external['tlds'])
certstream.listen_for_events(callback, url=certstream_url)

self.callback, socertstream.listen_for_events(callback, url=certstream_url). You however defined twocallbackmethods?