# pylint: disable=line-too-long, wrong-import-order
import headscale, helper, pytz, os, yaml, logging
from flask import Flask, Markup, render_template
from datetime import datetime
from dateutil import parser
from concurrent.futures import ALL_COMPLETED, wait
from flask_executor import Executor
LOG_LEVEL = os.environ["LOG_LEVEL"].replace('"', '').upper()
# Initiate the Flask application and logging:
app = Flask(__name__, static_url_path="/static")
match LOG_LEVEL:
case "DEBUG" : app.logger.setLevel(logging.DEBUG)
case "INFO" : app.logger.setLevel(logging.INFO)
case "WARNING" : app.logger.setLevel(logging.WARNING)
case "ERROR" : app.logger.setLevel(logging.ERROR)
case "CRITICAL": app.logger.setLevel(logging.CRITICAL)
executor = Executor(app)
def render_overview():
app.logger.info("Rendering the Overview page")
url = headscale.get_url()
api_key = headscale.get_api_key()
timezone = pytz.timezone(os.environ["TZ"] if os.environ["TZ"] else "UTC")
local_time = timezone.localize(datetime.now())
# Overview page will just read static information from the config file and display it
# Open the config.yaml and parse it.
config_file = ""
try:
config_file = open("/etc/headscale/config.yml", "r")
app.logger.info("Opening /etc/headscale/config.yml")
except:
config_file = open("/etc/headscale/config.yaml", "r")
app.logger.info("Opening /etc/headscale/config.yaml")
config_yaml = yaml.safe_load(config_file)
# Get and display the following information:
# Overview of the server's machines, users, preauth keys, API key expiration, server version
# Get all machines:
machines = headscale.get_machines(url, api_key)
machines_count = len(machines["machines"])
# Need to check if routes are attached to an active machine:
# ISSUE: https://github.com/iFargle/headscale-webui/issues/36
# ISSUE: https://github.com/juanfont/headscale/issues/1228
# Get all routes:
routes = headscale.get_routes(url,api_key)
total_routes = 0
for route in routes["routes"]:
if int(route['machine']['id']) != 0:
total_routes += 1
enabled_routes = 0
for route in routes["routes"]:
if route["enabled"] and route['advertised'] and int(route['machine']['id']) != 0:
enabled_routes += 1
# Get a count of all enabled exit routes
exits_count = 0
exits_enabled_count = 0
for route in routes["routes"]:
if route['advertised'] and int(route['machine']['id']) != 0:
if route["prefix"] == "0.0.0.0/0" or route["prefix"] == "::/0":
exits_count +=1
if route["enabled"]:
exits_enabled_count += 1
# Get User and PreAuth Key counts
user_count = 0
usable_keys_count = 0
users = headscale.get_users(url, api_key)
for user in users["users"]:
user_count +=1
preauth_keys = headscale.get_preauth_keys(url, api_key, user["name"])
for key in preauth_keys["preAuthKeys"]:
expiration_parse = parser.parse(key["expiration"])
key_expired = True if expiration_parse < local_time else False
if key["reusable"] and not key_expired: usable_keys_count += 1
if not key["reusable"] and not key["used"] and not key_expired: usable_keys_count += 1
# General Content variables:
ip_prefixes, server_url, disable_check_updates, ephemeral_node_inactivity_timeout, node_update_check_interval = "N/A", "N/A", "N/A", "N/A", "N/A"
if "ip_prefixes" in config_yaml: ip_prefixes = str(config_yaml["ip_prefixes"])
if "server_url" in config_yaml: server_url = str(config_yaml["server_url"])
if "disable_check_updates" in config_yaml: disable_check_updates = str(config_yaml["disable_check_updates"])
if "ephemeral_node_inactivity_timeout" in config_yaml: ephemeral_node_inactivity_timeout = str(config_yaml["ephemeral_node_inactivity_timeout"])
if "node_update_check_interval" in config_yaml: node_update_check_interval = str(config_yaml["node_update_check_interval"])
# OIDC Content variables:
issuer, client_id, scope, use_expiry_from_token, expiry = "N/A", "N/A", "N/A", "N/A", "N/A"
if "oidc" in config_yaml:
if "issuer" in config_yaml["oidc"] : issuer = str(config_yaml["oidc"]["issuer"])
if "client_id" in config_yaml["oidc"] : client_id = str(config_yaml["oidc"]["client_id"])
if "scope" in config_yaml["oidc"] : scope = str(config_yaml["oidc"]["scope"])
if "use_expiry_from_token" in config_yaml["oidc"] : use_expiry_from_token = str(config_yaml["oidc"]["use_expiry_from_token"])
if "expiry" in config_yaml["oidc"] : expiry = str(config_yaml["oidc"]["expiry"])
# Embedded DERP server information.
enabled, region_id, region_code, region_name, stun_listen_addr = "N/A", "N/A", "N/A", "N/A", "N/A"
if "derp" in config_yaml:
if "server" in config_yaml["derp"] and config_yaml["derp"]["server"]["enabled"]:
if "enabled" in config_yaml["derp"]["server"]: enabled = str(config_yaml["derp"]["server"]["enabled"])
if "region_id" in config_yaml["derp"]["server"]: region_id = str(config_yaml["derp"]["server"]["region_id"])
if "region_code" in config_yaml["derp"]["server"]: region_code = str(config_yaml["derp"]["server"]["region_code"])
if "region_name" in config_yaml["derp"]["server"]: region_name = str(config_yaml["derp"]["server"]["region_name"])
if "stun_listen_addr" in config_yaml["derp"]["server"]: stun_listen_addr = str(config_yaml["derp"]["server"]["stun_listen_addr"])
nameservers, magic_dns, domains, base_domain = "N/A", "N/A", "N/A", "N/A"
if "dns_config" in config_yaml:
if "nameservers" in config_yaml["dns_config"]: nameservers = str(config_yaml["dns_config"]["nameservers"])
if "magic_dns" in config_yaml["dns_config"]: magic_dns = str(config_yaml["dns_config"]["magic_dns"])
if "domains" in config_yaml["dns_config"]: domains = str(config_yaml["dns_config"]["domains"])
if "base_domain" in config_yaml["dns_config"]: base_domain = str(config_yaml["dns_config"]["base_domain"])
# Start putting the content together
overview_content = """
"""
# Remove content that isn't needed:
# Remove OIDC if it isn't available:
if "oidc" not in config_yaml: oidc_content = ""
# Remove DERP if it isn't available or isn't enabled
if "derp" not in config_yaml:
derp_content = ""
if "derp" in config_yaml:
if "server" in config_yaml["derp"]:
if str(config_yaml["derp"]["server"]["enabled"]) == "False":
derp_content = ""
# TODO:
# Whether there are custom DERP servers
# If there are custom DERP servers, get the file location from the config file. Assume mapping is the same.
# Whether the built-in DERP server is enabled
# The IP prefixes
# The DNS config
if config_yaml["derp"]["paths"]: pass
# # open the path:
# derp_file =
# config_file = open("/etc/headscale/config.yaml", "r")
# config_yaml = yaml.safe_load(config_file)
# The ACME config, if not empty
# Whether updates are running
# Whether metrics are enabled (and their listen addr)
# The log level
# What kind of Database is being used to drive headscale
content = " " + overview_content + general_content + derp_content + oidc_content + dns_content + ""
return Markup(content)
def thread_machine_content(machine, machine_content, idx):
# machine = passed in machine information
# content = place to write the content
app.logger.debug("Machine Information")
app.logger.debug(str(machine))
url = headscale.get_url()
api_key = headscale.get_api_key()
# Set the current timezone and local time
timezone = pytz.timezone(os.environ["TZ"] if os.environ["TZ"] else "UTC")
local_time = timezone.localize(datetime.now())
# Get the machines routes
pulled_routes = headscale.get_machine_routes(url, api_key, machine["id"])
routes = ""
# Test if the machine is an exit node:
exit_node = False
# If the LENGTH of "routes" is NULL/0, there are no routes, enabled or disabled:
if len(pulled_routes["routes"]) > 0:
advertised_and_enabled = False
advertised_route = False
# First, check if there are any routes that are both enabled and advertised
for route in pulled_routes["routes"]:
if route ["advertised"] and route["enabled"]:
advertised_and_enabled = True
if route["advertised"]:
advertised_route = True
if advertised_and_enabled or advertised_route:
routes = """
directionsRoutes
"""
for route in pulled_routes["routes"]:
app.logger.debug("Route: ["+str(route['machine']['name'])+"] id: "+str(route['id'])+" / prefix: "+str(route['prefix'])+" enabled?: "+str(route['enabled']))
# Check if the route is enabled:
route_enabled = "red"
route_tooltip = 'enable'
if route["enabled"]:
route_enabled = "green"
route_tooltip = 'disable'
if route["prefix"] == "0.0.0.0/0" or route["prefix"] == "::/0" and str(route["enabled"]) == "True":
exit_node = True
routes = routes+"""
"""+route['prefix']+"""
"""
routes = routes+"
"
# Get machine tags
tag_array = ""
for tag in machine["forcedTags"]: tag_array = tag_array+"{tag: '"+tag[4:]+"'}, "
tags = """
labelTags
"""
# Get the machine IP's
machine_ips = "
"
for ip_address in machine["ipAddresses"]:
machine_ips = machine_ips+"
"+ip_address+"
"
machine_ips = machine_ips+"
"
# Format the dates for easy readability
last_seen_parse = parser.parse(machine["lastSeen"])
last_seen_local = last_seen_parse.astimezone(timezone)
last_seen_delta = local_time - last_seen_local
last_seen_print = helper.pretty_print_duration(last_seen_delta)
last_seen_time = str(last_seen_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(last_seen_print)+")"
last_update_parse = local_time if machine["lastSuccessfulUpdate"] is None else parser.parse(machine["lastSuccessfulUpdate"])
last_update_local = last_update_parse.astimezone(timezone)
last_update_delta = local_time - last_update_local
last_update_print = helper.pretty_print_duration(last_update_delta)
last_update_time = str(last_update_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(last_update_print)+")"
created_parse = parser.parse(machine["createdAt"])
created_local = created_parse.astimezone(timezone)
created_delta = local_time - created_local
created_print = helper.pretty_print_duration(created_delta)
created_time = str(created_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(created_print)+")"
# If there is no expiration date, we don't need to do any calculations:
if machine["expiry"] != "0001-01-01T00:00:00Z":
expiry_parse = parser.parse(machine["expiry"])
expiry_local = expiry_parse.astimezone(timezone)
expiry_delta = expiry_local - local_time
expiry_print = helper.pretty_print_duration(expiry_delta, "expiry")
if str(expiry_local.strftime('%Y')) in ("0001", "9999", "0000"):
expiry_time = "No expiration date."
elif int(expiry_local.strftime('%Y')) > int(expiry_local.strftime('%Y'))+2:
expiry_time = str(expiry_local.strftime('%m/%Y'))+" "+str(timezone)+" ("+str(expiry_print)+")"
else:
expiry_time = str(expiry_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(expiry_print)+")"
expiring_soon = True if int(expiry_delta.days) < 14 and int(expiry_delta.days) > 0 else False
app.logger.debug("Machine: "+machine["name"]+" expires: "+str(expiry_local.strftime('%Y'))+" / "+str(expiry_delta.days))
else:
expiry_time = "No expiration date."
expiring_soon = False
app.logger.debug("Machine: "+machine["name"]+" has no expiration date")
# Get the first 10 characters of the PreAuth Key:
if machine["preAuthKey"]:
preauth_key = str(machine["preAuthKey"]["key"])[0:10]
else: preauth_key = "None"
# Set the status badge color:
text_color = helper.text_color_duration(last_seen_delta)
# Set the user badge color:
user_color = helper.get_color(int(machine["user"]["id"]))
# Generate the various badges:
status_badge = "fiber_manual_record"
user_badge = ""+machine["user"]["name"]+""
exit_node_badge = "" if not exit_node else "Exit Node"
expiration_badge = "" if not expiring_soon else "Expiring!"
machine_content[idx] = (str(render_template(
'machines_card.html',
given_name = machine["givenName"],
machine_id = machine["id"],
hostname = machine["name"],
ns_name = machine["user"]["name"],
ns_id = machine["user"]["id"],
ns_created = machine["user"]["createdAt"],
last_seen = str(last_seen_print),
last_update = str(last_update_print),
machine_ips = Markup(machine_ips),
advertised_routes = Markup(routes),
exit_node_badge = Markup(exit_node_badge),
status_badge = Markup(status_badge),
user_badge = Markup(user_badge),
last_update_time = str(last_update_time),
last_seen_time = str(last_seen_time),
created_time = str(created_time),
expiry_time = str(expiry_time),
preauth_key = str(preauth_key),
expiration_badge = Markup(expiration_badge),
machine_tags = Markup(tags),
)))
app.logger.info("Finished thread for machine "+machine["givenName"]+" index "+str(idx))
# Render the cards for the machines page:
def render_machines_cards():
app.logger.info("Rendering machine cards")
url = headscale.get_url()
api_key = headscale.get_api_key()
machines_list = headscale.get_machines(url, api_key)
#########################################
# Thread this entire thing.
num_threads = len(machines_list["machines"])
iterable = []
machine_content = {}
for i in range (0, num_threads):
app.logger.debug("Appending iterable: "+str(i))
iterable.append(i)
# Flask-Executor Method:
if LOG_LEVEL == "DEBUG":
# DEBUG: Do in a forloop:
for idx in iterable: thread_machine_content(machines_list["machines"][idx], machine_content, idx)
else:
app.logger.info("Starting futures")
futures = [executor.submit(thread_machine_content, machines_list["machines"][idx], machine_content, idx) for idx in iterable]
# Wait for the executor to finish all jobs:
wait(futures, return_when=ALL_COMPLETED)
app.logger.info("Finished futures")
# Sort the content by machine_id:
sorted_machines = {key: val for key, val in sorted(machine_content.items(), key = lambda ele: ele[0])}
content = "
"
# Print the content
for index in range(0, num_threads):
content = content+str(sorted_machines[index])
content = content+"
"
return Markup(content)
# Render the cards for the Users page:
def render_users_cards():
app.logger.info("Rendering Users cards")
url = headscale.get_url()
api_key = headscale.get_api_key()
user_list = headscale.get_users(url, api_key)
content = "
"
for user in user_list["users"]:
# Get all preAuth Keys in the user, only display if one exists:
preauth_keys_collection = build_preauth_key_table(user["name"])
# Set the user badge color:
user_color = helper.get_color(int(user["id"]), "text")
# Generate the various badges:
status_badge = "fiber_manual_record"
content = content + render_template(
'users_card.html',
status_badge = Markup(status_badge),
user_name = user["name"],
user_id = user["id"],
preauth_keys_collection = Markup(preauth_keys_collection)
)
content = content+"
"
return Markup(content)
# Builds the preauth key table for the User page
def build_preauth_key_table(user_name):
app.logger.info("Building the PreAuth key table for User: %s", str(user_name))
url = headscale.get_url()
api_key = headscale.get_api_key()
preauth_keys = headscale.get_preauth_keys(url, api_key, user_name)
preauth_keys_collection = """
"
if len(preauth_keys["preAuthKeys"]) > 0:
preauth_keys_collection += """
ID
Key Prefix
Reusable
Used
Ephemeral
Usable
Actions
"""
for key in preauth_keys["preAuthKeys"]:
# Get the key expiration date and compare it to now to check if it's expired:
# Set the current timezone and local time
timezone = pytz.timezone(os.environ["TZ"] if os.environ["TZ"] else "UTC")
local_time = timezone.localize(datetime.now())
expiration_parse = parser.parse(key["expiration"])
key_expired = True if expiration_parse < local_time else False
expiration_time = str(expiration_parse.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)
key_usable = False
if key["reusable"] and not key_expired: key_usable = True
if not key["reusable"] and not key["used"] and not key_expired: key_usable = True
# Class for the javascript function to look for to toggle the hide function
hide_expired = "expired-row" if not key_usable else ""
btn_reusable = "fiber_manual_record" if key["reusable"] else ""
btn_ephemeral = "fiber_manual_record" if key["ephemeral"] else ""
btn_used = "fiber_manual_record" if key["used"] else ""
btn_usable = "fiber_manual_record" if key_usable else ""
# Other buttons:
btn_delete = "Expire" if key_usable else ""
tooltip_data = "Expiration: "+expiration_time
# TR ID will look like "1-albert-tr"
preauth_keys_collection = preauth_keys_collection+"""
"""
return preauth_keys_collection
def oidc_nav_dropdown(user_name, email_address, name):
app.logger.info("OIDC is enabled. Building the OIDC nav dropdown")
html_payload = """