2023-02-15 19:20:44 +09:00
import requests , json , os , sys
2023-02-06 04:58:09 +00:00
from os . path import exists
from cryptography . fernet import Fernet
from datetime import datetime , timedelta , date
from dateutil import parser
2023-02-15 19:20:44 +09:00
from flask import Flask
2023-02-06 04:58:09 +00:00
2023-02-15 19:20:44 +09:00
app = Flask ( __name__ )
2023-02-06 04:58:09 +00:00
##################################################################
# Functions related to HEADSCALE and API KEYS
##################################################################
def get_url ( ) : return os . environ [ ' HS_SERVER ' ]
def set_api_key ( api_key ) :
encryption_key = os . environ [ ' KEY ' ] # User-set encryption key
key_file = open ( " /data/key.txt " , " wb+ " ) # Key file on the filesystem for persistent storage
fernet = Fernet ( encryption_key ) # Preparing the Fernet class with the key
encrypted_key = fernet . encrypt ( api_key . encode ( ) ) # Encrypting the key
return True if key_file . write ( encrypted_key ) else False # Return true if the file wrote correctly
def get_api_key ( ) :
if not exists ( " /data/key.txt " ) : return False
encryption_key = os . environ [ ' KEY ' ] # User-set encryption key
key_file = open ( " /data/key.txt " , " rb+ " ) # Key file on the filesystem for persistent storage
enc_api_key = key_file . read ( ) # The encrypted key read from the file
if enc_api_key == b ' ' : return " NULL "
fernet = Fernet ( encryption_key ) # Preparing the Fernet class with the key
decrypted_key = fernet . decrypt ( enc_api_key ) . decode ( ) # Decrypting the key
return decrypted_key
def test_api_key ( url , api_key ) :
response = requests . get (
str ( url ) + " /api/v1/apikey " ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
return response . status_code
# Expires an API key
def expire_key ( url , api_key ) :
payload = { ' prefix ' : str ( api_key [ 0 : 10 ] ) }
json_payload = json . dumps ( payload )
2023-02-15 19:20:44 +09:00
# app.logge.warning("Sending the payload '"+str(json_payload)+"' to the headscale server")
2023-02-06 04:58:09 +00:00
response = requests . post (
str ( url ) + " /api/v1/apikey/expire " ,
data = json_payload ,
headers = {
' Accept ' : ' application/json ' ,
' Content-Type ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
return 0
# Checks if the key needs to be renewed
# If it does, renews the key, then expires the old key
def renew_api_key ( url , api_key ) :
# 0 = Key has been updated or key is not in need of an update
# 1 = Key has failed validity check or has failed to write the API key
# Check when the key expires and compare it to todays date:
key_info = get_api_key_info ( url , api_key )
expiration_time = key_info [ " expiration " ]
today_date = date . today ( )
expire = parser . parse ( expiration_time )
expire_fmt = str ( expire . year ) + " - " + str ( expire . month ) . zfill ( 2 ) + " - " + str ( expire . day ) . zfill ( 2 ) # 2023-01-04
expire_date = date . fromisoformat ( expire_fmt )
delta = expire_date - today_date
tmp = today_date + timedelta ( days = 90 )
new_expiration_date = str ( tmp ) + " T00:00:00.000000Z "
# If the delta is less than 5 days, renew the key:
if delta < timedelta ( days = 5 ) :
2023-02-15 19:20:44 +09:00
# app.logge.warning("Key is about to expire. Delta is "+str(delta))
2023-02-06 04:58:09 +00:00
payload = { ' expiration ' : str ( new_expiration_date ) }
json_payload = json . dumps ( payload )
2023-02-15 19:20:44 +09:00
# app.logge.warning("Sending the payload '"+str(json_payload)+"' to the headscale server")
2023-02-06 04:58:09 +00:00
response = requests . post (
str ( url ) + " /api/v1/apikey " ,
data = json_payload ,
headers = {
' Accept ' : ' application/json ' ,
' Content-Type ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
new_key = response . json ( )
2023-02-15 19:20:44 +09:00
# app.logge.warning("JSON: "+json.dumps(new_key))
# app.logge.warning("New Key is: "+new_key["apiKey"])
2023-02-06 04:58:09 +00:00
api_key_test = test_api_key ( url , new_key [ " apiKey " ] )
2023-02-15 19:20:44 +09:00
# app.logge.warning("Testing the key: "+str(api_key_test))
2023-02-06 04:58:09 +00:00
# Test if the new key works:
if api_key_test == 200 :
2023-02-15 19:20:44 +09:00
# app.logge.warning("The new key is valid and we are writing it to the file")
2023-02-06 04:58:09 +00:00
if not set_api_key ( new_key [ " apiKey " ] ) :
2023-02-15 19:20:44 +09:00
# app.logge.warning("We failed writing the new key!")
2023-02-06 04:58:09 +00:00
return False # Key write failed
2023-02-15 19:20:44 +09:00
# app.logge.warning("Key validated and written. Moving to expire the key.")
2023-02-06 04:58:09 +00:00
expire_key ( url , api_key )
return True # Key updated and validated
else : return False # The API Key test failed
else : return True #No work is required
# Gets information about the current API key
def get_api_key_info ( url , api_key ) :
response = requests . get (
str ( url ) + " /api/v1/apikey " ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
json_response = response . json ( )
# Find the current key in the array:
key_prefix = str ( api_key [ 0 : 10 ] )
for key in json_response [ " apiKeys " ] :
if key_prefix == key [ " prefix " ] :
return key
return " Key not found "
##################################################################
# Functions related to MACHINES
##################################################################
# register a new machine
def register_machine ( url , api_key , machine_key , user ) :
response = requests . post (
str ( url ) + " /api/v1/machine/register?user= " + str ( user ) + " &key= " + str ( machine_key ) ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
return response . json ( )
# Sets the machines tags
def set_machine_tags ( url , api_key , machine_id , tags_list ) :
json_payload = json . dumps ( tags_list )
response = requests . post (
str ( url ) + " /api/v1/machine/ " + str ( machine_id ) + " /tags " ,
data = tags_list ,
headers = {
' Accept ' : ' application/json ' ,
' Content-Type ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
return response . json ( )
# Moves machine_id to user "new_user"
def move_user ( url , api_key , machine_id , new_user ) :
response = requests . post (
str ( url ) + " /api/v1/machine/ " + str ( machine_id ) + " /user?user= " + str ( new_user ) ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
return response . json ( )
# updates routes for the given machine_id. enable / disable
# The Headscale API expects a list of routes to enable. if we want to toggle 1 out of 3 routes
# we need to pass all currently enabled routes and mask it
# For example, if we have routes:
# 0.0.0.0/0
# ::/0
# 192.168.1.0/24
# available, but only routes
# 0.0.0.0/24
# 192.168.1.0/24
# ENABLED, and we want to disable route 192.168.1.0/24, we need to pass ONLY the routes to KEEP enabled.
# In this case, 0.0.0/24
def update_route ( url , api_key , route_id , current_state ) :
action = " "
if current_state == " True " : action = " disable "
if current_state == " False " : action = " enable "
# Debug
2023-02-15 19:20:44 +09:00
#app.logger.info("URL: "+str(url))
#app.logger.info("Route ID: "+str(route_id))
#app.logger.info("Current State: "+str(current_state))
#app.logger.info("Action to take: "+str(action))
2023-02-06 04:58:09 +00:00
response = requests . post (
str ( url ) + " /api/v1/routes/ " + str ( route_id ) + " / " + str ( action ) ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
return response . json ( )
# First, get all route info:
# routes = get_machine_routes(url, api_key, machine_id)
# to_enable = []
# is_enabled = False
# "route" is what we are toggling. On or off.
# Get a list of all currently enabled routes.
# If route IS currently in enabled routes, remove it
# # TODO: REDO THIS FOR THE NEW API
# for enabled_route in routes["routes"]["enabledRoutes"]:
# if enabled_route == route: is_enabled=True
# else: to_enable.append(enabled_route)
# if not is_enabled: to_enable.append(route)
#
# query = ""
# count = 0
# for route in to_enable:
# count = count+1
# if count == 1: query = query+"routes="+route
# else: query = query+"&routes="+route
#
# response = requests.post(
# str(url)+"/api/v1/machine/"+str(machine_id)+"/routes?"+query,
# headers={
# 'Accept': 'application/json',
# 'Authorization': 'Bearer '+str(api_key)
# }
# )
# return response.json()
# Get all machines on the Headscale network
def get_machines ( url , api_key ) :
response = requests . get (
str ( url ) + " /api/v1/machine " ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
return response . json ( )
# Get machine with "machine_id" on the Headscale network
def get_machine_info ( url , api_key , machine_id ) :
response = requests . get (
str ( url ) + " /api/v1/machine/ " + str ( machine_id ) ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
return response . json ( )
# Delete a machine from Headscale
def delete_machine ( url , api_key , machine_id ) :
response = requests . delete (
str ( url ) + " /api/v1/machine/ " + str ( machine_id ) ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
status = " True " if response . status_code == 200 else " False "
return { " status " : status , " body " : response . json ( ) }
# Rename "machine_id" with name "new_name"
def rename_machine ( url , api_key , machine_id , new_name ) :
response = requests . post (
str ( url ) + " /api/v1/machine/ " + str ( machine_id ) + " /rename/ " + str ( new_name ) ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
status = " True " if response . status_code == 200 else " False "
return { " status " : status , " body " : response . json ( ) }
# Gets routes for the passed machine_id
def get_machine_routes ( url , api_key , machine_id ) :
response = requests . get (
str ( url ) + " /api/v1/machine/ " + str ( machine_id ) + " /routes " ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
return response . json ( )
# Gets routes for the entire tailnet
def get_routes ( url , api_key ) :
response = requests . get (
str ( url ) + " /api/v1/routes " ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
return response . json ( )
##################################################################
# Functions related to NAMESPACES
##################################################################
# Get all users in use
def get_users ( url , api_key ) :
response = requests . get (
str ( url ) + " /api/v1/user " ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
return response . json ( )
# Rename "old_name" with name "new_name"
def rename_user ( url , api_key , old_name , new_name ) :
response = requests . post (
str ( url ) + " /api/v1/user/ " + str ( old_name ) + " /rename/ " + str ( new_name ) ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
status = " True " if response . status_code == 200 else " False "
return { " status " : status , " body " : response . json ( ) }
# Delete a user from Headscale
def delete_user ( url , api_key , user_name ) :
response = requests . delete (
str ( url ) + " /api/v1/user/ " + str ( user_name ) ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
status = " True " if response . status_code == 200 else " False "
return { " status " : status , " body " : response . json ( ) }
# Add a user from Headscale
def add_user ( url , api_key , data ) :
response = requests . post (
str ( url ) + " /api/v1/user " ,
data = data ,
headers = {
' Accept ' : ' application/json ' ,
' Content-Type ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
status = " True " if response . status_code == 200 else " False "
return { " status " : status , " body " : response . json ( ) }
##################################################################
# Functions related to PREAUTH KEYS in NAMESPACES
##################################################################
# Get all PreAuth keys associated with a user "user_name"
def get_preauth_keys ( url , api_key , user_name ) :
response = requests . get (
str ( url ) + " /api/v1/preauthkey?user= " + str ( user_name ) ,
headers = {
' Accept ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
return response . json ( )
# Add a preauth key to the user "user_name" given the booleans "ephemeral" and "reusable" with the expiration date "date" contained in the JSON payload "data"
def add_preauth_key ( url , api_key , data ) :
response = requests . post (
str ( url ) + " /api/v1/preauthkey " ,
data = data ,
headers = {
' Accept ' : ' application/json ' ,
' Content-Type ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
status = " True " if response . status_code == 200 else " False "
return { " status " : status , " body " : response . json ( ) }
# Expire a pre-auth key. data is {"user": "string", "key": "string"}
def expire_preauth_key ( url , api_key , data ) :
response = requests . post (
str ( url ) + " /api/v1/preauthkey/expire " ,
data = data ,
headers = {
' Accept ' : ' application/json ' ,
' Content-Type ' : ' application/json ' ,
' Authorization ' : ' Bearer ' + str ( api_key )
}
)
status = " True " if response . status_code == 200 else " False "
2023-02-15 19:20:44 +09:00
# app.logger.info("expire_preauth_key - Return: "+str(response.json()))
# app.logger.info("expire_preauth_key - Status: "+str(status))
2023-02-06 04:58:09 +00:00
return { " status " : status , " body " : response . json ( ) }