# pylint: disable=line-too-long, wrong-import-order import headscale, helper, pytz, os, yaml from flask import Markup, render_template, Flask, logging from datetime import datetime from dateutil import parser from concurrent.futures import ALL_COMPLETED, wait from flask_executor import Executor app = Flask(__name__) LOG = logging.create_logger(app) executor = Executor(app) def render_overview(): url = headscale.get_url() api_key = headscale.get_api_key() timezone = pytz.timezone(os.environ["TZ"] if os.environ["TZ"] else "UTC") local_time = timezone.localize(datetime.now()) # Overview page will just read static information from the config file and display it # Open the config.yaml and parse it. config_file = "" try: config_file = open("/etc/headscale/config.yml", "r") except: config_file = open("/etc/headscale/config.yaml", "r") config_yaml = yaml.safe_load(config_file) # Get and display the following information: # Overview of the server's machines, users, preauth keys, API key expiration, server version # Get all machines: machines = headscale.get_machines(url, api_key) machines_count = len(machines["machines"]) # Get all routes: routes = headscale.get_routes(url,api_key) total_routes = len(routes["routes"]) enabled_routes = 0 for route in routes["routes"]: if route["enabled"] and route['advertised']: enabled_routes += 1 # Get a count of all enabled exit routes exits_count = 0 exits_enabled_count = 0 for route in routes["routes"]: if route['advertised']: if route["prefix"] == "0.0.0.0/0" or route["prefix"] == "::/0": exits_count +=1 if route["enabled"]: exits_enabled_count += 1 # Get User and PreAuth Key counts user_count = 0 usable_keys_count = 0 users = headscale.get_users(url, api_key) for user in users["users"]: user_count +=1 preauth_keys = headscale.get_preauth_keys(url, api_key, user["name"]) for key in preauth_keys["preAuthKeys"]: expiration_parse = parser.parse(key["expiration"]) key_expired = True if expiration_parse < local_time else False if key["reusable"] and not key_expired: usable_keys_count += 1 if not key["reusable"] and not key["used"] and not key_expired: usable_keys_count += 1 # General Content variables: ip_prefixes, server_url, disable_check_updates, ephemeral_node_inactivity_timeout, node_update_check_interval = "N/A" if "ip_prefixes" in config_yaml: ip_prefixes = str(config_yaml["ip_prefixes"]) if "server_url" in config_yaml: server_url = str(config_yaml["server_url"]) if "disable_check_updates" in config_yaml: disable_check_updates = str(config_yaml["disable_check_updates"]) if "ephemeral_node_inactivity_timeout" in config_yaml: ephemeral_node_inactivity_timeout = str(config_yaml["ephemeral_node_inactivity_timeout"]) if "node_update_check_interval" in config_yaml: node_update_check_interval = str(config_yaml["node_update_check_interval"]) # OIDC Content variables: issuer, client_id, scope, use_expiry_from_token, expiry = "N/A" if "oidc" in config_yaml: if "issuer" in config_yaml["oidc"] : issuer = str(config_yaml["oidc"]["issuer"]) if "client_id" in config_yaml["oidc"] : client_id = str(config_yaml["oidc"]["client_id"]) if "scope" in config_yaml["oidc"] : scope = str(config_yaml["oidc"]["scope"]) if "use_expiry_from_token" in config_yaml["oidc"] : use_expiry_from_token = str(config_yaml["oidc"]["use_expiry_from_token"]) if "expiry" in config_yaml["oidc"] : expiry = str(config_yaml["oidc"]["expiry"]) # Embedded DERP server information. if "derp" in config_yaml: if "server" in config_yaml["derp"] and config_yaml["derp"]["server"]["enabled"] == "True": if "enabled" in config_yaml["derp"]["server"]: enabled = str(config_yaml["derp"]["server"]["enabled"]) if "region_id" in config_yaml["derp"]["server"]: region_id = str(config_yaml["derp"]["server"]["region_id"]) if "region_code" in config_yaml["derp"]["server"]: region_code = str(config_yaml["derp"]["server"]["region_code"]) if "region_name" in config_yaml["derp"]["server"]: region_name = str(config_yaml["derp"]["server"]["region_name"]) if "stun_listen_addr" in config_yaml["derp"]["server"]: stun_listen_addr = str(config_yaml["derp"]["server"]["stun_listen_addr"]) if "dns_config" in config_yaml: if "nameservers" in config_yaml["dns_config"]: nameservers = str(config_yaml["dns_config"]["nameservers"]) if "magic_dns" in config_yaml["dns_config"]: magic_dns = str(config_yaml["dns_config"]["magic_dns"]) if "domains" in config_yaml["dns_config"]: domains = str(config_yaml["dns_config"]["domains"]) if "base_domain" in config_yaml["dns_config"]: base_domain = str(config_yaml["dns_config"]["base_domain"]) # Start putting the content together overview_content = """ """ general_content = """ """ oidc_content = """ """ derp_content = """ """ oidc_content = """ """ dns_content = """ """ # Remove content that isn't needed: # Remove OIDC if it isn't available: if "oidc" not in config_yaml: oidc_content = "" # Remove DERP if it isn't available or isn't enabled if "derp" not in config_yaml: oidc_content = "" if "derp" in config_yaml: if config_file["derp"]["enabled"] == "False": oidc_content = "" # TODO: # Whether there are custom DERP servers # If there are custom DERP servers, get the file location from the config file. Assume mapping is the same. # Whether the built-in DERP server is enabled # The IP prefixes # The DNS config if config_yaml["derp"]["paths"]: pass # # open the path: # derp_file = # config_file = open("/etc/headscale/config.yaml", "r") # config_yaml = yaml.safe_load(config_file) # The ACME config, if not empty # Whether updates are running # Whether metrics are enabled (and their listen addr) # The log level # What kind of Database is being used to drive headscale content = "
" + overview_content + general_content + derp_content + oidc_content + dns_content + "
" return Markup(content) def thread_machine_content(machine, machine_content, idx): # machine = passed in machine information # content = place to write the content url = headscale.get_url() api_key = headscale.get_api_key() # Set the current timezone and local time timezone = pytz.timezone(os.environ["TZ"] if os.environ["TZ"] else "UTC") local_time = timezone.localize(datetime.now()) # Get the machines routes pulled_routes = headscale.get_machine_routes(url, api_key, machine["id"]) routes = "" # Test if the machine is an exit node: exit_node = False # If the LENGTH of "routes" is NULL/0, there are no routes, enabled or disabled: if len(pulled_routes["routes"]) > 0: advertised_and_enabled = False advertised_route = False # First, check if there are any routes that are both enabled and advertised for route in pulled_routes["routes"]: if route ["advertised"] and route["enabled"]: advertised_and_enabled = True if route["advertised"]: advertised_route = True if advertised_and_enabled or advertised_route: routes = """
  • directions Routes

    """ for route in pulled_routes["routes"]: # LOG.warning("Route: ["+str(route['machine']['name'])+"] id: "+str(route['id'])+" / prefix: "+str(route['prefix'])+" enabled?: "+str(route['enabled'])) # Check if the route is enabled: route_enabled = "red" route_tooltip = 'enable' if route["enabled"]: route_enabled = "green" route_tooltip = 'disable' if route["prefix"] == "0.0.0.0/0" or route["prefix"] == "::/0" and str(route["enabled"]) == "True": exit_node = True routes = routes+"""

    """+route['prefix']+"""

    """ routes = routes+"

  • " # Get machine tags tag_array = "" for tag in machine["forcedTags"]: tag_array = tag_array+"{tag: '"+tag[4:]+"'}, " tags = """
  • label Tags

  • """ # Get the machine IP's machine_ips = "" # Format the dates for easy readability last_seen_parse = parser.parse(machine["lastSeen"]) last_seen_local = last_seen_parse.astimezone(timezone) last_seen_delta = local_time - last_seen_local last_seen_print = helper.pretty_print_duration(last_seen_delta) last_seen_time = str(last_seen_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(last_seen_print)+")" last_update_parse = local_time if machine["lastSuccessfulUpdate"] is None else parser.parse(machine["lastSuccessfulUpdate"]) last_update_local = last_update_parse.astimezone(timezone) last_update_delta = local_time - last_update_local last_update_print = helper.pretty_print_duration(last_update_delta) last_update_time = str(last_update_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(last_update_print)+")" created_parse = parser.parse(machine["createdAt"]) created_local = created_parse.astimezone(timezone) created_delta = local_time - created_local created_print = helper.pretty_print_duration(created_delta) created_time = str(created_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(created_print)+")" expiry_parse = parser.parse(machine["expiry"]) expiry_local = expiry_parse.astimezone(timezone) expiry_delta = expiry_local - local_time expiry_print = helper.pretty_print_duration(expiry_delta, "expiry") if str(expiry_local.strftime('%Y')) in ("0001", "9999", "0000"): expiry_time = "No expiration date." elif int(expiry_local.strftime('%Y')) > int(expiry_local.strftime('%Y'))+2: expiry_time = str(expiry_local.strftime('%m/%Y'))+" "+str(timezone)+" ("+str(expiry_print)+")" else: expiry_time = str(expiry_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(expiry_print)+")" LOG.error("Machine: "+machine["name"]+" expires: "+str(expiry_local.strftime('%Y'))+" / "+str(expiry_delta.days)) expiring_soon = True if int(expiry_delta.days) < 14 and int(expiry_delta.days) > 0 else False # Get the first 10 characters of the PreAuth Key: if machine["preAuthKey"]: preauth_key = str(machine["preAuthKey"]["key"])[0:10] else: preauth_key = "None" # Set the status badge color: text_color = helper.text_color_duration(last_seen_delta) # Set the user badge color: user_color = helper.get_color(int(machine["user"]["id"])) # Generate the various badges: status_badge = "fiber_manual_record" user_badge = ""+machine["user"]["name"]+"" exit_node_badge = "" if not exit_node else "Exit Node" expiration_badge = "" if not expiring_soon else "Expiring!" machine_content[idx] = (str(render_template( 'machines_card.html', given_name = machine["givenName"], machine_id = machine["id"], hostname = machine["name"], ns_name = machine["user"]["name"], ns_id = machine["user"]["id"], ns_created = machine["user"]["createdAt"], last_seen = str(last_seen_print), last_update = str(last_update_print), machine_ips = Markup(machine_ips), advertised_routes = Markup(routes), exit_node_badge = Markup(exit_node_badge), status_badge = Markup(status_badge), user_badge = Markup(user_badge), last_update_time = str(last_update_time), last_seen_time = str(last_seen_time), created_time = str(created_time), expiry_time = str(expiry_time), preauth_key = str(preauth_key), expiration_badge = Markup(expiration_badge), machine_tags = Markup(tags), ))) LOG.warning("Finished thread for machine "+machine["givenName"]+" index "+str(idx)) # Render the cards for the machines page: def render_machines_cards(): url = headscale.get_url() api_key = headscale.get_api_key() machines_list = headscale.get_machines(url, api_key) ######################################### # Thread this entire thing. num_threads = len(machines_list["machines"]) iterable = [] machine_content = {} for i in range (0, num_threads): LOG.error("Appending iterable: "+str(i)) iterable.append(i) # Flask-Executor Method: LOG.warning("Starting futures") futures = [executor.submit(thread_machine_content, machines_list["machines"][idx], machine_content, idx) for idx in iterable] # Wait for the executor to finish all jobs: wait(futures, return_when=ALL_COMPLETED) LOG.warning("Finished futures") # DEBUG: Do in a forloop: # for idx in iterable: thread_machine_content(machines_list["machines"][idx], machine_content, idx) # Sort the content by machine_id: sorted_machines = {key: val for key, val in sorted(machine_content.items(), key = lambda ele: ele[0])} content = "
    " # Print the content for index in range(0, num_threads): content = content+str(sorted_machines[index]) # content = content+str(sorted_machines[index]) content = content+"
    " return Markup(content) # Render the cards for the Users page: def render_users_cards(): url = headscale.get_url() api_key = headscale.get_api_key() user_list = headscale.get_users(url, api_key) content = "
    " for user in user_list["users"]: # Get all preAuth Keys in the user, only display if one exists: preauth_keys_collection = build_preauth_key_table(user["name"]) # Set the user badge color: user_color = helper.get_color(int(user["id"]), "text") # Generate the various badges: status_badge = "fiber_manual_record" content = content + render_template( 'users_card.html', status_badge = Markup(status_badge), user_name = user["name"], user_id = user["id"], preauth_keys_collection = Markup(preauth_keys_collection) ) content = content+"
    " return Markup(content) # Builds the preauth key table for the User page def build_preauth_key_table(user_name): url = headscale.get_url() api_key = headscale.get_api_key() preauth_keys = headscale.get_preauth_keys(url, api_key, user_name) preauth_keys_collection = """
  • Toggle Expired Add PreAuth Key vpn_key PreAuth Keys """ if len(preauth_keys["preAuthKeys"]) == 0: preauth_keys_collection += "

    No keys defined for this user

    " if len(preauth_keys["preAuthKeys"]) > 0: preauth_keys_collection += """ """ for key in preauth_keys["preAuthKeys"]: # Get the key expiration date and compare it to now to check if it's expired: # Set the current timezone and local time timezone = pytz.timezone(os.environ["TZ"] if os.environ["TZ"] else "UTC") local_time = timezone.localize(datetime.now()) expiration_parse = parser.parse(key["expiration"]) key_expired = True if expiration_parse < local_time else False expiration_time = str(expiration_parse.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone) key_usable = False if key["reusable"] and not key_expired: key_usable = True if not key["reusable"] and not key["used"] and not key_expired: key_usable = True # Class for the javascript function to look for to toggle the hide function hide_expired = "expired-row" if not key_usable else "" btn_reusable = "fiber_manual_record" if key["reusable"] else "" btn_ephemeral = "fiber_manual_record" if key["ephemeral"] else "" btn_used = "fiber_manual_record" if key["used"] else "" btn_usable = "fiber_manual_record" if key_usable else "" # Other buttons: btn_delete = "Expire" if key_usable else "" tooltip_data = "Expiration: "+expiration_time # TR ID will look like "1-albert-tr" preauth_keys_collection = preauth_keys_collection+""" """ preauth_keys_collection = preauth_keys_collection+"""
    ID Key Prefix
    Reusable
    Used
    Ephemeral
    Usable
    Actions
    """+str(key["id"])+""" """+str(key["key"])[0:10]+"""
    """+btn_reusable+"""
    """+btn_used+"""
    """+btn_ephemeral+"""
    """+btn_usable+"""
    """+btn_delete+"""
  • """ return preauth_keys_collection def oidc_nav_dropdown(user_name, email_address, name): html_payload = """
  • """+name+""" account_circle
  • """ return Markup(html_payload) def oidc_nav_mobile(user_name, email_address, name): # https://materializecss.github.io/materialize/sidenav.html html_payload = """

  • exit_to_appLogout
  • """ return Markup(html_payload)