import headscale, helper, pytz, os, yaml from flask import Markup, render_template, Flask from datetime import datetime from dateutil import parser from concurrent.futures import wait, ALL_COMPLETED from flask_executor import Executor app = Flask(__name__) executor = Executor(app) def render_overview(): url = headscale.get_url() api_key = headscale.get_api_key() timezone = pytz.timezone(os.environ["TZ"] if os.environ["TZ"] else "UTC") local_time = timezone.localize(datetime.now()) # Overview page will just read static information from the config file and display it # Open the config.yaml and parse it. config_file = "" try: config_file = open("/etc/headscale/config.yml", "r") except: config_file = open("/etc/headscale/config.yaml", "r") config_yaml = yaml.safe_load(config_file) # Get and display the following information: # Overview of the server's machines, users, preauth keys, API key expiration, server version # Get all machines: machines_count = 0 machines = headscale.get_machines(url, api_key) for machine in machines["machines"]: machines_count += 1 # Get all routes: routes = headscale.get_routes(url,api_key) total_routes = len(routes["routes"]) enabled_routes = 0 for route in routes["routes"]: if route["enabled"] and route['advertised']: enabled_routes += 1 # Get a count of all enabled exit routes exits_count = 0 exits_enabled_count = 0 for route in routes["routes"]: if route['advertised']: if route["prefix"] == "0.0.0.0/0" or route["prefix"] == "::/0": exits_count +=1 if route["enabled"]: exits_enabled_count += 1 # Get User and PreAuth Key counts user_count = 0 usable_keys_count = 0 users = headscale.get_users(url, api_key) for user in users["users"]: user_count +=1 preauth_keys = headscale.get_preauth_keys(url, api_key, user["name"]) for key in preauth_keys["preAuthKeys"]: expiration_parse = parser.parse(key["expiration"]) key_expired = True if expiration_parse < local_time else False if key["reusable"] and not key_expired: usable_keys_count += 1 if not key["reusable"] and not key["used"] and not key_expired: usable_keys_count += 1 overview_content = """
Stats

Machines """+ str(machines_count) +"""
Users """+ str(user_count) +"""
Usable PreAuth Keys """+ str(usable_keys_count) +"""
Enabled/Total Routes """+ str(enabled_routes) +"""/"""+str(total_routes) +"""
Enabled/Total Exits """+ str(exits_enabled_count) +"""/"""+str(exits_count) +"""

""" # Overview of general configs from the YAML general_content = """
General

IP Prefixes """; if "ip_prefixes" in config_yaml: general_content += str(config_yaml["ip_prefixes"]) else: general_content += "N/A" general_content +="""
Server URL """; if "server_url" in config_yaml: general_content += str(config_yaml["server_url"]) else: general_content += "N/A" general_content +="""
Updates Disabled? """; if "disable_check_updates" in config_yaml: general_content += str(config_yaml["disable_check_updates"]) else: general_content += "N/A" general_content +="""
Ephemeral Node Timeout """; if "ephemeral_node_inactivity_timeout" in config_yaml: general_content += str(config_yaml["ephemeral_node_inactivity_timeout"]); else: general_content += "N/A" general_content +="""
Node Update Check Interval """; if "node_update_check_interval" in config_yaml: general_content += str(config_yaml["node_update_check_interval"]) else: general_content += "N/A" general_content +="""

""" # Whether OIDC is configured oidc_content = "" if "oidc" in config_yaml: oidc_content = """
OIDC

Issuer """ if "issuer" in config_yaml["oidc"] : oidc_content += str(config_yaml["oidc"]["issuer"]) else: oidc_content += "N/A" oidc_content += """
Client ID """ if "client_id" in config_yaml["oidc"] : oidc_content += str(config_yaml["oidc"]["client_id"]) else: oidc_content += "N/A" oidc_content += """
Scope """ if "scope" in config_yaml["oidc"] : oidc_content += str(config_yaml["oidc"]["scope"]) else: oidc_content += "N/A" oidc_content += """
Token Expiry """ if "use_expiry_from_token" in config_yaml["oidc"] : oidc_content += str(config_yaml["oidc"]["use_expiry_from_token"]) else: oidc_content += "N/A" oidc_content += """
Expiry """ if "expiry" in config_yaml["oidc"] : oidc_content += str(config_yaml["oidc"]["expiry"]) else: oidc_content += "N/A" oidc_content += """

""" derp_content = "" if "derp" in config_yaml: if "server" in config_yaml["derp"]: derp_content = """
Built-in DERP

Enabled """ if "enabled" in config_yaml["derp"]["server"] : derp_content+= str(config_yaml["derp"]["server"]["enabled"]) else: derp_content+= "N/A" derp_content+= """
Region ID """ if "region_id" in config_yaml["derp"]["server"] : derp_content+= str(config_yaml["derp"]["server"]["region_id"]) else: derp_content+= "N/A" derp_content+= """
Region Code """ if "region_code" in config_yaml["derp"]["server"] : derp_content+= str(config_yaml["derp"]["server"]["region_code"]) else: derp_content+= "N/A" derp_content+= """
Region Name """ if "region_name" in config_yaml["derp"]["server"] : derp_content+= str(config_yaml["derp"]["server"]["region_name"]) else: derp_content+= "N/A" derp_content+= """
STUN Address """ if "stun_listen_addr" in config_yaml["derp"]["server"]: derp_content+= str(config_yaml["derp"]["server"]["stun_listen_addr"]) else: derp_content+= "N/A" derp_content+= """

""" # TODO: # Whether there are custom DERP servers # If there are custom DERP servers, get the file location from the config file. Assume mapping is the same. # Whether the built-in DERP server is enabled # The IP prefixes # The DNS config if "dns_config" in config_yaml: dns_content = """
DNS

Nameservers """ if "nameservers" in config_yaml["dns_config"]: dns_content += str(config_yaml["dns_config"]["nameservers"]) else: dns_content += "N/A" dns_content += """
MagicDNS """ if "magic_dns" in config_yaml["dns_config"]: dns_content += str(config_yaml["dns_config"]["magic_dns"]) else: dns_content += "N/A" dns_content += """
Domains """ if "domains" in config_yaml["dns_config"]: dns_content += str(config_yaml["dns_config"]["domains"]) else: dns_content += "N/A" dns_content += """
Base Domain """ if "base_domain" in config_yaml["dns_config"]: dns_content += str(config_yaml["dns_config"]["base_domain"]) else: dns_content += "N/A" dns_content += """

""" if config_yaml["derp"]["paths"]: pass # # open the path: # derp_file = # config_file = open("/etc/headscale/config.yaml", "r") # config_yaml = yaml.safe_load(config_file) # The ACME config, if not empty # Whether updates are running # Whether metrics are enabled (and their listen addr) # The log level # What kind of Database is being used to drive headscale content = "
" + overview_content + general_content + derp_content + oidc_content + dns_content + "
" return Markup(content) def thread_machine_content(machine, machine_content, idx): # machine = passed in machine information # content = place to write the content url = headscale.get_url() api_key = headscale.get_api_key() # Set the current timezone and local time timezone = pytz.timezone(os.environ["TZ"] if os.environ["TZ"] else "UTC") local_time = timezone.localize(datetime.now()) # Get the machines routes pulled_routes = headscale.get_machine_routes(url, api_key, machine["id"]) routes = "" # Test if the machine is an exit node: exit_node = False # If the LENGTH of "routes" is NULL/0, there are no routes, enabled or disabled: if len(pulled_routes["routes"]) > 0: advertised_and_enabled = False advertised_route = False # First, check if there are any routes that are both enabled and advertised for route in pulled_routes["routes"]: if route ["advertised"] and route["enabled"]: advertised_and_enabled = True if route["advertised"]: advertised_route = True if advertised_and_enabled or advertised_route: routes = """
  • directions Routes

    """ for route in pulled_routes["routes"]: # app.logger.warning("Route: ["+str(route['machine']['name'])+"] id: "+str(route['id'])+" / prefix: "+str(route['prefix'])+" enabled?: "+str(route['enabled'])) # Check if the route is enabled: route_enabled = "red" route_tooltip = 'enable' if route["enabled"]: route_enabled = "green" route_tooltip = 'disable' if route["prefix"] == "0.0.0.0/0" or route["prefix"] == "::/0" and str(route["enabled"]) == "True": exit_node = True routes = routes+"""

    """+route['prefix']+"""

    """ routes = routes+"

  • " # Get machine tags tag_array = "" for tag in machine["forcedTags"]: tag_array = tag_array+"{tag: '"+tag[4:]+"'}, " tags = """
  • label Tags

  • """ # Get the machine IP's machine_ips = "" # Format the dates for easy readability last_seen_parse = parser.parse(machine["lastSeen"]) last_seen_local = last_seen_parse.astimezone(timezone) last_seen_delta = local_time - last_seen_local last_seen_print = helper.pretty_print_duration(last_seen_delta) last_seen_time = str(last_seen_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(last_seen_print)+")" last_update_parse = local_time if machine["lastSuccessfulUpdate"] is None else parser.parse(machine["lastSuccessfulUpdate"]) last_update_local = last_update_parse.astimezone(timezone) last_update_delta = local_time - last_update_local last_update_print = helper.pretty_print_duration(last_update_delta) last_update_time = str(last_update_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(last_update_print)+")" created_parse = parser.parse(machine["createdAt"]) created_local = created_parse.astimezone(timezone) created_delta = local_time - created_local created_print = helper.pretty_print_duration(created_delta) created_time = str(created_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(created_print)+")" # Get the first 10 characters of the PreAuth Key: if machine["preAuthKey"]: preauth_key = str(machine["preAuthKey"]["key"])[0:10] else: preauth_key = "None" # Set the status badge color: text_color = helper.text_color_duration(last_seen_delta) # Set the user badge color: user_color = helper.get_color(int(machine["user"]["id"])) # Generate the various badges: status_badge = "fiber_manual_record" user_badge = ""+machine["user"]["name"]+"" exit_node_badge = "" if not exit_node else "Exit Node" machine_content[idx] = (str(render_template( 'machines_card.html', given_name = machine["givenName"], machine_id = machine["id"], hostname = machine["name"], ns_name = machine["user"]["name"], ns_id = machine["user"]["id"], ns_created = machine["user"]["createdAt"], last_seen = str(last_seen_print), last_update = str(last_update_print), machine_ips = Markup(machine_ips), advertised_routes = Markup(routes), exit_node_badge = Markup(exit_node_badge), status_badge = Markup(status_badge), user_badge = Markup(user_badge), last_update_time = str(last_update_time), last_seen_time = str(last_seen_time), created_time = str(created_time), preauth_key = str(preauth_key), machine_tags = Markup(tags), ))) app.logger.warning("Finished thread for machine "+machine["givenName"]+" index "+str(idx)) # Render the cards for the machines page: def render_machines_cards(): url = headscale.get_url() api_key = headscale.get_api_key() machines_list = headscale.get_machines(url, api_key) ######################################### # Thread this entire thing. numThreads = len(machines_list["machines"]) iterable = [] machine_content = {} for i in range (0, numThreads): app.logger.debug("Appending iterable: "+str(i)) iterable.append(i) # Flask-Executor Method: app.logger.warning("Starting futures") futures = [executor.submit(thread_machine_content, machines_list["machines"][idx], machine_content, idx) for idx in iterable] # Wait for the executor to finish all jobs: wait(futures, return_when=ALL_COMPLETED) app.logger.warning("Finished futures") # DEBUG: Do in a forloop: # for idx in iterable: thread_machine_content(machines_list["machines"][idx], machine_content, idx) # Sort the content by machine_id: sorted_machines = {key: val for key, val in sorted(machine_content.items(), key = lambda ele: ele[0])} content = "
    " # Print the content for index in range(0, numThreads): content = content+str(sorted_machines[index]) # content = content+str(sorted_machines[index]) content = content+"
    " return Markup(content) # Render the cards for the Users page: def render_users_cards(): url = headscale.get_url() api_key = headscale.get_api_key() user_list = headscale.get_users(url, api_key) content = "
    " for user in user_list["users"]: # Get all preAuth Keys in the user, only display if one exists: preauth_keys_collection = build_preauth_key_table(user["name"]) # Set the user badge color: user_color = helper.get_color(int(user["id"]), "text") # Generate the various badges: status_badge = "fiber_manual_record" content = content + render_template( 'users_card.html', status_badge = Markup(status_badge), user_name = user["name"], user_id = user["id"], preauth_keys_collection = Markup(preauth_keys_collection) ) content = content+"
    " return Markup(content) # Builds the preauth key table for the User page def build_preauth_key_table(user_name): url = headscale.get_url() api_key = headscale.get_api_key() preauth_keys = headscale.get_preauth_keys(url, api_key, user_name) preauth_keys_collection = """
  • Toggle Expired Add PreAuth Key vpn_key PreAuth Keys """ if len(preauth_keys["preAuthKeys"]) == 0: preauth_keys_collection += "

    No keys defined for this user

    " if len(preauth_keys["preAuthKeys"]) > 0: preauth_keys_collection += """ """ for key in preauth_keys["preAuthKeys"]: # Get the key expiration date and compare it to now to check if it's expired: # Set the current timezone and local time timezone = pytz.timezone(os.environ["TZ"] if os.environ["TZ"] else "UTC") local_time = timezone.localize(datetime.now()) expiration_parse = parser.parse(key["expiration"]) key_expired = True if expiration_parse < local_time else False expiration_time = str(expiration_parse.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone) key_usable = False if key["reusable"] and not key_expired: key_usable = True if not key["reusable"] and not key["used"] and not key_expired: key_usable = True # Class for the javascript function to look for to toggle the hide function hide_expired = "expired-row" if not key_usable else "" btn_reusable = "fiber_manual_record" if key["reusable"] else "" btn_ephemeral = "fiber_manual_record" if key["ephemeral"] else "" btn_used = "fiber_manual_record" if key["used"] else "" btn_usable = "fiber_manual_record" if key_usable else "" # Other buttons: btn_delete = "Expire" if key_usable else "" tooltip_data = "Expiration: "+expiration_time # TR ID will look like "1-albert-tr" preauth_keys_collection = preauth_keys_collection+""" """ preauth_keys_collection = preauth_keys_collection+"""
    ID Key Prefix
    Reusable
    Used
    Ephemeral
    Usable
    Actions
    """+str(key["id"])+""" """+str(key["key"])[0:10]+"""
    """+btn_reusable+"""
    """+btn_used+"""
    """+btn_ephemeral+"""
    """+btn_usable+"""
    """+btn_delete+"""
  • """ return preauth_keys_collection def render_oidc_nav_dropdown(): htmlPayload = """
  • account_circle
  • """ return Markup(htmlPayload)