1
0
forked from Mirrors/RGSX

Add CLI support and headless mode for RGSX

- Introduced a command-line interface (CLI) for RGSX, allowing users to list platforms, games, and download ROMs without a graphical interface.
- Added a new README_CLI.md file detailing CLI usage, commands, and examples.
This commit is contained in:
skymike03
2025-09-10 17:17:34 +02:00
parent 626359095e
commit 44bf0eda24
9 changed files with 970 additions and 31 deletions

View File

@@ -9,6 +9,14 @@ The application supports multiple sources like myrient and 1fichier. These sourc
---
## 🧰 Command-line usage (CLI)
RGSX also offers a headless command-line interface to list platforms/games and download ROMs:
- English guide: see `https://github.com/RetroGameSets/RGSX/blob/main/README_CLI.md`
---
## ✨ Features
- **Game downloads** : Support for ZIP files and handling of unsupported extensions based on EmulationStation's `es_systems.cfg` (and custom `es_systems_*.cfg` on Batocera). RGSX reads allowed extensions per system from these configs and will automatically extract archives when a system doesn't support them.

159
README_CLI.md Normal file
View File

@@ -0,0 +1,159 @@
# RGSX CLI — Guide dutilisation
Ce guide couvre toutes les commandes disponibles du CLI et fournit des exemples prêts à copier (Windows PowerShell).
## Prérequis
- Python installé et accessible (le projet utilise un mode headless; aucune fenêtre ne souvrira).
- Exécuter depuis le dossier contenant `rgsx_cli.py`.
## Syntaxe générale
Les options globales peuvent être placées avant ou après la sous-commande.
- Forme 1:
```powershell
python rgsx_cli.py [--verbose] [--force-update|-force-update] <commande> [options]
```
- Forme 2:
```powershell
python rgsx_cli.py <commande> [options] [--verbose] [--force-update|-force-update]
```
- `--verbose` active les logs détaillés (DEBUG) sur la sortie standard derreur.
- `--force-update` (ou `-force-update`) purge les données locales et force le re-téléchargement du pack de données (systems_list, games/*.json, images).
Lorsque les données sources sont manquantes, le CLI télécharge et extrait automatiquement le pack (avec une barre de progression).
## Commandes
### 1) platforms — lister les plateformes
- Options:
- `--json`: sortie JSON (objets `{ name, folder }`).
Exemples:
```powershell
python rgsx_cli.py platforms
python rgsx_cli.py platforms --json
python rgsx_cli.py --verbose platforms
python rgsx_cli.py platforms --verbose
```
Sortie texte: une ligne par plateforme, au format `Nom<TAB>Dossier`.
### 2) games — lister les jeux dune plateforme
- Options:
- `--platform <nom_ou_dossier>` (ex: `n64` ou "Nintendo 64").
- `--search <texte>`: filtre par sous-chaîne dans le nom du jeu.
Exemples:
```powershell
python rgsx_cli.py games --platform n64
python rgsx_cli.py games --platform "Nintendo 64" --search zelda
python rgsx_cli.py games --platform n64 --verbose
```
Remarques:
- La plateforme est résolue par nom affiché (platform_name) ou par dossier (folder), sans tenir compte de la casse.
### 3) download — télécharger un jeu
- Options:
- `--platform <nom_ou_dossier>`
- `--game "<titre exact ou partiel>"`
- `--force`: ignorer lavertissement si lextension du fichier nest pas répertoriée comme supportée pour la plateforme.
Exemples:
```powershell
# Titre exact
python rgsx_cli.py download --platform n64 --game "Legend of Zelda, The - Ocarina of Time (USA) (Beta).zip"
# Correspondance partielle
# Si aucun titre exact nest trouvé, le CLI nautosélectionne plus. Il affiche des correspondances possibles.
python rgsx_cli.py download --platform n64 --game "Ocarina of Time (Beta)"
# ➜ Le CLI proposera une liste de titres potentiels (à relancer ensuite avec le titre exact).
Mode interactif par défaut:
- Si aucun titre exact nest trouvé et que vous êtes dans un terminal interactif (TTY), une liste numérotée saffiche automatiquement pour choisir un match et lancer le téléchargement.
# Forcer si lextension semble non supportée (ex: .rar)
python rgsx_cli.py download --platform snes --game "pack_roms.rar" --force
# Verbose positionné après la sous-commande
python rgsx_cli.py download --platform n64 --game "Legend of Zelda, The - Ocarina of Time (USA) (Beta).zip" --verbose
```
Pendant le téléchargement, une progression en pourcentage, taille (MB) et vitesse (MB/s) saffiche. Le résultat final est également écrit dans lhistorique.
Notes:
- Les ROMs sont enregistrées dans le dossier de la plateforme correspondante (ex: `R:\roms\n64`).
- Si le fichier est une archive (zip/rar) et que la plateforme ne supporte pas lextension, un avertissement est affiché (vous pouvez utiliser `--force`).
### 4) history — afficher lhistorique
- Options:
- `--tail <N>`: n dernières entrées (défaut: 50)
- `--json`: sortie JSON
Exemples:
```powershell
python rgsx_cli.py history
python rgsx_cli.py history --tail 20
python rgsx_cli.py history --json
```
### 5) clear-history — vider lhistorique
Exemple:
```powershell
python rgsx_cli.py clear-history
```
### Option globale: --force-update — purge + re-téléchargement des données
- Supprime `systems_list.json`, le dossier `games/` et `images/`, puis télécharge/extrait à nouveau le pack de données.
Exemples:
```powershell
# Sans sous-commande: purge + re-téléchargement puis sortie
python rgsx_cli.py --force-update
# Placé après une sous-commande (accepté aussi)
python rgsx_cli.py platforms --force-update
```
## Comportements et conseils
- Résolution de plateforme: par nom affiché ou dossier, insensible à la casse. Pour la commande `games` et `download`, une recherche par sous-chaîne est utilisée si la correspondance exacte nest pas trouvée.
- Logs `--verbose`: principalement utiles lors des téléchargements/extractions; émis en DEBUG.
- Téléchargement de données manquantes: automatique avec progression harmonisée (téléchargement puis extraction).
- Codes de sortie (indicatif):
- `0`: succès
- `1`: échec du téléchargement/erreur générique
- `2`: plateforme introuvable
- `3`: jeu introuvable
- `4`: extension non supportée (sans `--force`)
## Exemples rapides (copier-coller)
```powershell
# Lister plateformes (texte)
python rgsx_cli.py platforms
# Lister plateformes (JSON)
python rgsx_cli.py platforms --json
# Lister jeux N64 avec filtre
python rgsx_cli.py games --platform n64 --search zelda
# Télécharger un jeu N64 (titre exact)
python rgsx_cli.py download --platform n64 --game "Legend of Zelda, The - Ocarina of Time (USA) (Beta).zip"
# Télécharger un jeu N64 (titre aproximatif)
python rgsx_cli.py download --platform n64 --game "Ocarina of Time"
Resultat (exemple) :
No exact result found for this game: Ocarina of Time
Select a match to download:
1. Legend of Zelda, The - Ocarina of Time (Europe) (Beta) (2003-02-13) (GameCube).zip
2. Legend of Zelda, The - Ocarina of Time (Europe) (Beta) (2003-02-21) (GameCube) (Debug).zip
...
15. F-Zero X (USA) (Beta) (The Legend of Zelda - Ocarina of Time leftover data).zip
# Voir lhistorique (20 dernières entrées)
python rgsx_cli.py history --tail 20
# Purger et recharger les données de listes des systèmes et des jeux
python rgsx_cli.py --force-update
```

152
README_CLI_EN.md Normal file
View File

@@ -0,0 +1,152 @@
# RGSX CLI — Usage Guide
This guide covers all available CLI commands with copy-ready Windows PowerShell examples.
## Prerequisites
- Python installed and on PATH (the app runs in headless mode; no window will open).
- Run commands from the folder that contains `rgsx_cli.py`.
## General syntax
Global options can be placed before or after the subcommand.
- Form 1:
```powershell
python rgsx_cli.py [--verbose] [--force-update|-force-update] <command> [options]
```
- Form 2:
```powershell
python rgsx_cli.py <command> [options] [--verbose] [--force-update|-force-update]
```
- `--verbose` enables detailed logs (DEBUG) on stderr.
- `--force-update` (or `-force-update`) purges local data and re-downloads the data pack (systems_list, games/*.json, images).
When source data is missing, the CLI will automatically download and extract the data pack (with progress).
## Commands
### 1) platforms — list platforms
- Options:
- `--json`: JSON output (objects `{ name, folder }`).
Examples:
```powershell
python rgsx_cli.py platforms
python rgsx_cli.py platforms --json
python rgsx_cli.py --verbose platforms
python rgsx_cli.py platforms --verbose
```
Text output: one line per platform, formatted as `Name<TAB>Folder`.
### 2) games — list games for a platform
- Options:
- `--platform <name_or_folder>` (e.g., `n64` or "Nintendo 64").
- `--search <text>`: filter by substring in game title.
Examples:
```powershell
python rgsx_cli.py games --platform n64
python rgsx_cli.py games --platform "Nintendo 64" --search zelda
python rgsx_cli.py games --platform n64 --verbose
```
Notes:
- The platform is resolved by display name (platform_name) or folder, case-insensitively.
### 3) download — download a game
- Options:
- `--platform <name_or_folder>`
- `--game "<exact or partial title>"`
- `--force`: ignore unsupported-extension warning for the platform.
Examples:
```powershell
# Exact title
python rgsx_cli.py download --platform n64 --game "Legend of Zelda, The - Ocarina of Time (USA) (Beta).zip"
# Partial match
# If no exact title is found, the CLI no longer auto-selects; it displays suggestions.
python rgsx_cli.py download --platform n64 --game "Ocarina of Time (Beta)"
# ➜ The CLI shows a list of candidates (then run again with the exact title).
Interactive mode by default:
- If no exact title is found and you are in an interactive terminal (TTY), a numbered list is shown automatically so you can pick and start the download.
# Force if the file extension seems unsupported (e.g., .rar)
python rgsx_cli.py download --platform snes --game "pack_roms.rar" --force
# Verbose placed after the subcommand
python rgsx_cli.py download --platform n64 --game "Legend of Zelda, The - Ocarina of Time (USA) (Beta).zip" --verbose
```
During download, progress %, size (MB) and speed (MB/s) are shown. The final result is also written to history.
Notes:
- ROMs are saved into the corresponding platform directory (e.g., `R:\roms\n64`).
- If the file is an archive (zip/rar) and the platform doesnt support that extension, a warning is shown (you can use `--force`).
### 4) history — show history
- Options:
- `--tail <N>`: last N entries (default: 50)
- `--json`: JSON output
Examples:
```powershell
python rgsx_cli.py history
python rgsx_cli.py history --tail 20
python rgsx_cli.py history --json
```
### 5) clear-history — clear history
Example:
```powershell
python rgsx_cli.py clear-history
```
### Global option: --force-update — purge + re-download data
- Removes `systems_list.json`, the `games/` and `images/` folders, then downloads/extracts the data pack again.
Examples:
```powershell
# Without subcommand: purge + re-download then exit
python rgsx_cli.py --force-update
# Placed after a subcommand (also accepted)
python rgsx_cli.py platforms --force-update
```
## Behavior and tips
- Platform resolution: by display name or folder, case-insensitive. For `games` and `download`, if no exact match is found a search-like suggestion list is shown.
- `--verbose` logs: most useful during downloads/extraction; printed at DEBUG level.
- Missing data download: automatic, with consistent progress (download then extraction).
- Exit codes (indicative):
- `0`: success
- `1`: download failure/generic error
- `2`: platform not found
- `3`: game not found
- `4`: unsupported extension (without `--force`)
## Quick examples (copy/paste)
```powershell
# List platforms (text)
python rgsx_cli.py platforms
# List platforms (JSON)
python rgsx_cli.py platforms --json
# List N64 games with filter
python rgsx_cli.py games --platform n64 --search zelda
# Download an N64 game (exact title)
python rgsx_cli.py download --platform n64 --game "Legend of Zelda, The - Ocarina of Time (USA) (Beta).zip"
# Download with approximate title (suggestions + interactive pick)
python rgsx_cli.py download --platform n64 --game "Ocarina of Time"
# View last 20 history entries
python rgsx_cli.py history --tail 20
# Purge and refresh data pack
python rgsx_cli.py --force-update
```

View File

@@ -8,6 +8,14 @@ L'application prend en charge plusieurs sources comme myrient, 1fichier. Ces sou
---
## 🧰 Utilisation en ligne de commande (CLI)
RGSX propose aussi une interface en ligne de commande (sans interface graphique) pour lister les plateformes/jeux et télécharger des ROMs :
- Guide FR: voir `https://github.com/RetroGameSets/RGSX/blob/main/README_CLI.md`
---
## ✨ Fonctionnalités
- **Téléchargement de jeux** : Prise en charge des fichiers ZIP et gestion des extensions non supportées à partir du fichier `es_systems.cfg` d'EmulationStation (et des `es_systems_*.cfg` personnalisés sur Batocera). RGSX lit les extensions autorisées par système depuis ces configurations et extrait automatiquement les archives si le système ne les supporte pas.

View File

@@ -1,16 +1,26 @@
import pygame # type: ignore
import os
import logging
import platform
# Headless mode for CLI: set env RGSX_HEADLESS=1 to avoid pygame and noisy prints
HEADLESS = os.environ.get("RGSX_HEADLESS") == "1"
try:
if not HEADLESS:
import pygame # type: ignore
else:
pygame = None # type: ignore
except Exception:
pygame = None # type: ignore
# Version actuelle de l'application
app_version = "2.2.0.3"
def get_operating_system():
"""Renvoie le nom du système d'exploitation."""
return platform.system()
#log dans la console le système d'exploitation
print(f"Système d'exploitation : {get_operating_system()}")
#log dans la console le système d'exploitation (désactivé en headless)
if not HEADLESS:
print(f"Système d'exploitation : {get_operating_system()}")
def get_application_root():
@@ -20,7 +30,8 @@ def get_application_root():
current_file = os.path.abspath(__file__)
# Remonter au dossier parent de config.py (par exemple, dossier de l'application)
app_root = os.path.dirname(os.path.dirname(current_file))
print(f"Dossier de l'application : {app_root}")
if not HEADLESS:
print(f"Dossier de l'application : {app_root}")
return app_root
except NameError:
# Si __file__ n'est pas défini (par exemple, exécution dans un REPL)
@@ -35,7 +46,8 @@ def get_system_root():
current_path = os.path.abspath(__file__)
drive, _ = os.path.splitdrive(current_path)
system_root = drive + os.sep
print(f"Dossier racine du système : {system_root}")
if not HEADLESS:
print(f"Dossier racine du système : {system_root}")
return system_root
elif OPERATING_SYSTEM == "Linux":
# tester si c'est batocera :
@@ -48,7 +60,8 @@ def get_system_root():
parent_dir = os.path.dirname(current_dir)
if os.path.basename(parent_dir) == "userdata": # Vérifier si le parent est userdata
system_root = parent_dir
print(f"Dossier racine du système : {system_root}")
if not HEADLESS:
print(f"Dossier racine du système : {system_root}")
return system_root
current_dir = parent_dir
# Si userdata n'est pas trouvé, retourner /
@@ -109,25 +122,16 @@ xdvdfs_download_exe = os.path.join(OTA_SERVER_URL, "xdvdfs.exe")
xdvdfs_download_linux = os.path.join(OTA_SERVER_URL, "xdvdfs")
# Print des chemins pour debug
print(f"RETROBAT_DATA_FOLDER: {RETROBAT_DATA_FOLDER}")
print(f"ROMS_FOLDER: {ROMS_FOLDER}")
print(f"SAVE_FOLDER: {SAVE_FOLDER}")
print(f"RGSX APP_FOLDER: {APP_FOLDER}")
print(f"RGSX LOGS_FOLDER: {log_dir}")
print(f"RGSX SETTINGS PATH: {RGSX_SETTINGS_PATH}")
print(f"GAMELISTXML: {GAMELISTXML}")
print(f"GAMELISTXML_WINDOWS: {GAMELISTXML_WINDOWS}")
print(f"UPDATE_FOLDER: {UPDATE_FOLDER}")
print(f"LANGUAGES_FOLDER: {LANGUAGES_FOLDER}")
print(f"JSON_EXTENSIONS: {JSON_EXTENSIONS}")
print(f"MUSIC_FOLDER: {MUSIC_FOLDER}")
print(f"IMAGES_FOLDER: {IMAGES_FOLDER}")
print(f"GAMES_FOLDER: {GAMES_FOLDER}")
print(f"SOURCES_FILE: {SOURCES_FILE}")
print(f"CONTROLS_CONFIG_PATH: {CONTROLS_CONFIG_PATH}")
print(f"HISTORY_PATH: {HISTORY_PATH}")
if not HEADLESS:
# Print des chemins pour debug
print(f"ROMS_FOLDER: {ROMS_FOLDER}")
print(f"SAVE_FOLDER: {SAVE_FOLDER}")
print(f"RGSX LOGS_FOLDER: {log_dir}")
print(f"RGSX SETTINGS PATH: {RGSX_SETTINGS_PATH}")
print(f"JSON_EXTENSIONS: {JSON_EXTENSIONS}")
print(f"IMAGES_FOLDER: {IMAGES_FOLDER}")
print(f"GAMES_FOLDER: {GAMES_FOLDER}")
print(f"SOURCES_FILE: {SOURCES_FILE}")
# Constantes pour la répétition automatique dans pause_menu
@@ -199,7 +203,7 @@ selected_key = (0, 0) # Position du curseur dans le clavier virtuel
redownload_confirm_selection = 0 # Sélection pour la confirmation de redownload
popup_message = "" # Message à afficher dans les popups
popup_timer = 0 # Temps restant pour le popup en millisecondes (0 = inactif)
last_frame_time = pygame.time.get_ticks()
last_frame_time = pygame.time.get_ticks() if pygame is not None else 0
current_music_name = None
music_popup_start_time = 0
selected_games = set() # Indices des jeux sélectionnés pour un téléchargement multiple (menu game)
@@ -282,6 +286,8 @@ update_checked = False
def validate_resolution():
"""Valide la résolution de l'écran par rapport aux capacités de l'écran."""
if pygame is None:
return SCREEN_WIDTH, SCREEN_HEIGHT
display_info = pygame.display.Info()
if SCREEN_WIDTH > display_info.current_w or SCREEN_HEIGHT > display_info.current_h:
logger.warning(f"Résolution {SCREEN_WIDTH}x{SCREEN_HEIGHT} dépasse les limites de l'écran")

View File

@@ -1,8 +1,15 @@
import os
import json
import pygame #type: ignore
import logging
import config
from config import HEADLESS
try:
if not HEADLESS:
import pygame # type: ignore
else:
pygame = None # type: ignore
except Exception:
pygame = None # type: ignore
import subprocess
from rgsx_settings import load_rgsx_settings, save_rgsx_settings

View File

@@ -3,10 +3,17 @@ import subprocess
import os
import sys
import threading
import pygame # type: ignore
import zipfile
import asyncio
import config
from config import HEADLESS
try:
if not HEADLESS:
import pygame # type: ignore
else:
pygame = None # type: ignore
except Exception:
pygame = None # type: ignore
from config import OTA_VERSION_ENDPOINT,APP_FOLDER, UPDATE_FOLDER, OTA_UPDATE_ZIP
from utils import sanitize_filename, extract_zip, extract_rar, load_api_key_1fichier, normalize_platform_name
from history import save_history
@@ -189,7 +196,7 @@ async def check_for_updates():
config.popup_message = config.update_result_message
config.popup_timer = 2000
config.update_result_error = False
config.update_result_start_time = pygame.time.get_ticks()
config.update_result_start_time = pygame.time.get_ticks() if pygame is not None else 0
config.needs_redraw = True
logger.debug(f"Affichage de la popup de mise à jour réussie, redémarrage imminent")
@@ -211,7 +218,7 @@ async def check_for_updates():
config.popup_message = config.update_result_message
config.popup_timer = 5000
config.update_result_error = True
config.update_result_start_time = pygame.time.get_ticks()
config.update_result_start_time = pygame.time.get_ticks() if pygame is not None else 0
config.needs_redraw = True
return False, _("network_check_update_error").format(str(e))
@@ -534,6 +541,24 @@ async def download_rom(url, platform, game_name, is_zip_non_supported=False, tas
logger.error(f"Erreur mise à jour progression: {str(e)}")
thread.join()
# Drain any remaining final message to ensure history is saved
try:
task_queue = progress_queues.get(task_id)
if task_queue:
while not task_queue.empty():
data = task_queue.get()
if isinstance(data[1], bool):
success, message = data[1], data[2]
if isinstance(config.history, list):
for entry in config.history:
if "url" in entry and entry["url"] == url and entry["status"] in ["downloading", "Téléchargement", "Extracting"]:
entry["status"] = "Download_OK" if success else "Erreur"
entry["progress"] = 100 if success else 0
entry["message"] = message
save_history(config.history)
break
except Exception:
pass
# Nettoyer la queue
if task_id in progress_queues:
del progress_queues[task_id]
@@ -801,6 +826,24 @@ async def download_from_1fichier(url, platform, game_name, is_zip_non_supported=
logger.debug(f"Fin boucle de progression, attente fin thread pour task_id={task_id}")
thread.join()
logger.debug(f"Thread terminé, nettoyage queue pour task_id={task_id}")
# Drain any remaining final message to ensure history is saved
try:
task_queue = progress_queues.get(task_id)
if task_queue:
while not task_queue.empty():
data = task_queue.get()
if isinstance(data[1], bool):
success, message = data[1], data[2]
if isinstance(config.history, list):
for entry in config.history:
if "url" in entry and entry["url"] == url and entry["status"] in ["downloading", "Téléchargement", "Extracting"]:
entry["status"] = "Download_OK" if success else "Erreur"
entry["progress"] = 100 if success else 0
entry["message"] = message
save_history(config.history)
break
except Exception:
pass
# Nettoyer la queue
if task_id in progress_queues:
del progress_queues[task_id]

549
ports/RGSX/rgsx_cli.py Normal file
View File

@@ -0,0 +1,549 @@
#!/usr/bin/env python3
import os
# Force headless mode before any project imports
os.environ.setdefault("RGSX_HEADLESS", "1")
import sys
import argparse
import asyncio
import json
import logging
import requests
import time
import zipfile
import shutil
import re
# IMPORTANT: Avoid importing display/pygame modules for headless mode
import config # paths, settings, SAVE_FOLDER, etc.
import network as network_mod # for progress_queues access
from utils import load_sources, load_games, is_extension_supported, load_extensions_json, sanitize_filename, extract_zip_data
from history import load_history, save_history, add_to_history
from network import download_rom, download_from_1fichier, is_1fichier_url
from rgsx_settings import get_sources_zip_url
logger = logging.getLogger("rgsx.cli")
def setup_logging(verbose: bool):
level = logging.DEBUG if verbose else logging.WARNING
logging.basicConfig(level=level, format='%(levelname)s: %(message)s')
def ensure_data_present(verbose: bool = False):
"""Ensure systems list and games data exist; if missing, download OTA data ZIP and extract it."""
# If systems_list exists and games folder has some json files, nothing to do
has_sources = os.path.exists(config.SOURCES_FILE)
has_games = os.path.isdir(config.GAMES_FOLDER) and any(
f.lower().endswith('.json') for f in os.listdir(config.GAMES_FOLDER)
)
if has_sources and has_games:
return True
url = get_sources_zip_url(config.OTA_data_ZIP)
if not url:
print("No sources URL configured; cannot auto-download data.", file=sys.stderr)
return False
zip_path = os.path.join(config.SAVE_FOLDER, "data_download.zip")
os.makedirs(config.SAVE_FOLDER, exist_ok=True)
headers = {"User-Agent": "Mozilla/5.0"}
# Always show progress when we're in the 'missing data' path
show = True or verbose
print("Source data not found, downloading...")
print(f"Downloading data from {url}...")
try:
with requests.get(url, stream=True, headers=headers, timeout=60) as r:
r.raise_for_status()
total = int(r.headers.get('content-length', 0))
downloaded = 0
last_t = time.time()
last_d = 0
last_line = ""
with open(zip_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
if not chunk:
continue
f.write(chunk)
downloaded += len(chunk)
if show and total:
now = time.time()
dt = max(1e-6, now - last_t)
delta = downloaded - last_d
speed = delta / dt / (1024*1024)
pct = int(downloaded/total*100)
mb = downloaded/(1024*1024)
tot = total/(1024*1024)
line = f"Downloading data: {pct:3d}% ({mb:.1f}/{tot:.1f} MB) @ {speed:.1f} MB/s"
if line != last_line:
print("\r" + line, end="", flush=True)
last_line = line
last_t = now
last_d = downloaded
if show:
print()
except Exception as e:
print(f"Failed to download data: {e}", file=sys.stderr)
return False
# Extract
if show:
print("Extracting data...")
try:
# Custom extraction with progress
total_size = 0
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
total_size = sum(info.file_size for info in zip_ref.infolist() if not info.is_dir())
extracted = 0
chunk = 2048
last_line = ""
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
for info in zip_ref.infolist():
if info.is_dir():
continue
file_path = os.path.join(config.SAVE_FOLDER, info.filename)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with zip_ref.open(info) as src, open(file_path, 'wb') as dst:
remaining = info.file_size
while remaining > 0:
buf = src.read(min(chunk, remaining))
if not buf:
break
dst.write(buf)
remaining -= len(buf)
extracted += len(buf)
if show and total_size:
pct = int(extracted/total_size*100)
mb = extracted/(1024*1024)
tot = total_size/(1024*1024)
line = f"Extracting data: {pct:3d}% ({mb:.1f}/{tot:.1f} MB)"
if line != last_line:
print("\r" + line, end="", flush=True)
last_line = line
try:
os.chmod(file_path, 0o644)
except Exception:
pass
if show and last_line:
print()
ok, msg = True, "OK"
except Exception as ee:
ok, msg = False, str(ee)
try:
if os.path.exists(zip_path):
os.remove(zip_path)
except Exception:
pass
if not ok:
print(f"Failed to extract data: {msg}", file=sys.stderr)
return False
if show:
print("Data downloaded and extracted.")
return True
def cmd_platforms(args):
ensure_data_present(getattr(args, 'verbose', False))
sources = load_sources()
items = []
for s in sources:
name = s.get("platform_name") or s.get("name") or s.get("platform") or ""
folder = s.get("folder") or s.get("dossier") or ""
if name:
items.append({"name": name, "folder": folder})
if getattr(args, 'json', False):
print(json.dumps(items, ensure_ascii=False, indent=2))
else:
for it in items:
# name TAB folder (folder may be empty for BIOS/virtual)
print(f"{it['name']}\t{it['folder']}")
def _resolve_platform(sources, platform_name: str):
# match by display name or key, case-insensitive
pn = platform_name.strip().lower()
for s in sources:
display = (s.get("platform_name") or s.get("name") or "").lower()
key = (s.get("platform") or s.get("folder") or "").lower()
if pn == display or pn == key:
return s
# fallback: substring
for s in sources:
display = (s.get("platform_name") or s.get("name") or "").lower()
if pn in display:
return s
return None
def cmd_games(args):
ensure_data_present(getattr(args, 'verbose', False))
sources = load_sources()
platform = _resolve_platform(sources, args.platform)
if not platform:
print(f"Platform not found: {args.platform}", file=sys.stderr)
sys.exit(2)
platform_id = (
platform.get('platform_name')
or platform.get('platform')
or platform.get('folder')
or args.platform
)
games = load_games(platform_id)
if args.search:
q = args.search.lower()
games = [g for g in games if q in (g[0] or '').lower()]
for g in games:
# games items can be (name, url) or (name, url, size)
title = g[0] if isinstance(g, (list, tuple)) and g else str(g)
print(title)
def cmd_history(args):
hist = load_history()
if args.json:
print(json.dumps(hist, ensure_ascii=False, indent=2))
else:
for e in hist[-args.tail:]:
print(f"[{e.get('status')}] {e.get('platform')} - {e.get('game_name')} ({e.get('progress','?')}%) {e.get('message','')}")
def cmd_clear_history(args):
save_history([])
print("History cleared")
async def _run_download_with_progress(url: str, platform_name: str, game_name: str, force_extract_zip: bool = False):
"""Run download and display live progress in the terminal."""
task_id = f"cli-{os.getpid()}"
# Start download coroutine
coro = download_from_1fichier(url, platform_name, game_name, force_extract_zip, task_id) if is_1fichier_url(url) else download_rom(url, platform_name, game_name, force_extract_zip, task_id)
task = asyncio.create_task(coro)
last_line = ""
def print_progress(pct: int, speed_mb_s: float | None, downloaded: int, total: int):
nonlocal last_line
# Build a concise one-line status
total_mb = total / (1024*1024) if total else 0
dl_mb = downloaded / (1024*1024)
spd = f" @ {speed_mb_s:.1f} MB/s" if speed_mb_s is not None and speed_mb_s > 0 else ""
line = f"Downloading: {pct:3d}% ({dl_mb:.1f}/{total_mb:.1f} MB){spd}"
# Avoid overly chatty output
if line != last_line:
print("\r" + line, end="", flush=True)
last_line = line
# Poll shared in-memory history for progress (non-intrusive)
while not task.done():
try:
if isinstance(config.history, list):
for e in config.history:
if e.get('url') == url and e.get('status') in ("downloading", "Téléchargement", "Extracting"):
downloaded = int(e.get('downloaded_size') or 0)
total = int(e.get('total_size') or 0)
speed = e.get('speed')
if total > 0:
pct = int(downloaded/total*100)
else:
pct = 0
# speed might be None or 0 when unknown
print_progress(pct, float(speed) if isinstance(speed, (int, float)) else None, downloaded, total)
break
except Exception:
pass
await asyncio.sleep(0.2)
success, message = await task
if last_line:
# End the progress line
print()
if success:
print(message or "Download completed")
return 0
else:
print(message or "Download failed", file=sys.stderr)
return 1
def cmd_download(args):
ensure_data_present(getattr(args, 'verbose', False))
sources = load_sources()
platform = _resolve_platform(sources, args.platform)
if not platform:
print(f"Platform not found: {args.platform}", file=sys.stderr)
sys.exit(2)
platform_id = (
platform.get('platform_name')
or platform.get('platform')
or platform.get('folder')
or args.platform
)
games = load_games(platform_id)
query_raw = args.game.strip()
def _strip_ext(name: str) -> str:
try:
base, _ = os.path.splitext(name)
return base
except Exception:
return name
def _tokens(s: str) -> list[str]:
return re.findall(r"[a-z0-9]+", s.lower())
def _game_title(g) -> str | None:
return g[0] if isinstance(g, (list, tuple)) and g else None
def _game_url(g) -> str | None:
return g[1] if isinstance(g, (list, tuple)) and len(g) > 1 else None
# 1) Exact match (case-insensitive), with and without extension
match = None
q_lower = query_raw.lower()
q_no_ext = _strip_ext(query_raw).lower()
for g in games:
title = _game_title(g)
if not title:
continue
t_lower = title.strip().lower()
if t_lower == q_lower or _strip_ext(t_lower) == q_no_ext:
match = (title, _game_url(g))
break
# Si pas d'exact, ne pas auto-sélectionner; proposer des correspondances possibles
suggestions = [] # (priority, score, title, url)
if not match:
# 2) Sous-chaîne sur le titre (ou sans extension)
for g in games:
title = _game_title(g)
if not title:
continue
t_lower = title.lower()
t_no_ext = _strip_ext(t_lower)
pos_full = t_lower.find(q_lower)
pos_noext = t_no_ext.find(q_no_ext)
if pos_full != -1 or (q_no_ext and pos_noext != -1):
# priorité 0 = sous-chaîne; score = position trouvée (plus petit est mieux)
pos = pos_full if pos_full != -1 else pos_noext
suggestions.append((0, max(0, pos), title, _game_url(g)))
# 3) Tokens en ordre non-contigu, avec score de proximité
def ordered_gap_score(qt: list[str], tt: list[str]):
pos = []
start = 0
for tok in qt:
try:
i = next(i for i in range(start, len(tt)) if tt[i] == tok)
except StopIteration:
return None
pos.append(i)
start = i + 1
gap = (pos[-1] - pos[0]) - (len(qt) - 1)
return max(0, gap)
q_tokens = _tokens(query_raw)
if q_tokens:
for g in games:
title = _game_title(g)
if not title:
continue
tt = _tokens(title)
score = ordered_gap_score(q_tokens, tt)
if score is not None:
suggestions.append((1, score, title, _game_url(g)))
# 4) Tokens présents (ordre libre)
if q_tokens:
for g in games:
title = _game_title(g)
if not title:
continue
t_tokens = set(_tokens(title))
if all(tok in t_tokens for tok in q_tokens):
suggestions.append((2, len(t_tokens), title, _game_url(g)))
# Dédupliquer en gardant la meilleure (priorité/score) pour chaque titre
best_by_title = {}
for prio, score, title, url in suggestions:
key = title.lower()
cur = best_by_title.get(key)
if cur is None or (prio, score) < (cur[0], cur[1]):
best_by_title[key] = (prio, score, title, url)
suggestions = sorted(best_by_title.values(), key=lambda x: (x[0], x[1], x[2].lower()))
if not match:
# Afficher les correspondances possibles, et en mode interactif proposer un choix
print(f"No exact result found for this game: {args.game}")
if suggestions:
limit = 20
shown = suggestions[:limit]
# Mode interactif par défaut si TTY détecté, ou si --interactive explicite
interactive = False
try:
interactive = bool(getattr(args, 'interactive', False) or sys.stdin.isatty())
except Exception:
interactive = bool(getattr(args, 'interactive', False))
if interactive:
print("Select a match to download:")
for i, s in enumerate(shown, start=1):
print(f" {i}. {s[2]}")
if len(suggestions) > limit:
print(f" ... and {len(suggestions) - limit} more not shown")
try:
choice = input("Enter number (or press Enter to cancel): ").strip()
except EOFError:
choice = ""
if choice:
try:
idx = int(choice)
if 1 <= idx <= len(shown):
sel = shown[idx-1]
match = (sel[2], sel[3])
except Exception:
pass
if not match:
print("Here are potential matches (use the exact title with --game):")
for i, s in enumerate(shown, start=1):
print(f" {i}. {s[2]}")
if len(suggestions) > limit:
print(f" ... and {len(suggestions) - limit} more")
print("Tip: list games with: python rgsx_cli.py games --platform \"%s\" --search \"%s\"" % (args.platform, query_raw))
sys.exit(3)
else:
print("No similar titles found.")
print("Tip: list games with: python rgsx_cli.py games --platform \"%s\" --search \"%s\"" % (args.platform, query_raw))
sys.exit(3)
title, url = match
# Determine if we should force ZIP extraction (only when we can safely check extensions)
is_zip_non_supported = False
exts = None
try:
if os.path.exists(config.JSON_EXTENSIONS) and os.path.getsize(config.JSON_EXTENSIONS) > 2:
exts = load_extensions_json()
except Exception:
exts = None
if exts is not None:
# If extension unsupported for this platform, either block or allow with --force
if not is_extension_supported(sanitize_filename(title), platform.get('platform') or '', exts):
import os as _os
ext = _os.path.splitext(title)[1].lower()
is_zip_non_supported = ext in ('.zip', '.rar')
if not args.force and not is_zip_non_supported:
print("Unsupported extension for this platform. Use --force to override.", file=sys.stderr)
sys.exit(4)
# Add entry to history and run
hist = load_history()
hist.append({
"platform": platform.get('platform_name') or platform.get('platform') or args.platform,
"game_name": title,
"status": "downloading",
"url": url,
"progress": 0,
"message": "Téléchargement en cours",
"timestamp": None,
})
save_history(hist)
# Important: share the same list object with network module so it can update history in place
try:
config.history = hist
except Exception:
pass
# Run download with live progress
exit_code = asyncio.run(_run_download_with_progress(url, platform_id, title, is_zip_non_supported))
if exit_code != 0:
sys.exit(exit_code)
def build_parser():
p = argparse.ArgumentParser(prog="rgsx-cli", description="RGSX headless CLI")
p.add_argument("--verbose", action="store_true", help="Verbose logging")
p.add_argument("--force-update", "-force-update", action="store_true", help="Purge data (games/images/systems_list) and redownload")
sub = p.add_subparsers(dest="cmd")
sp = sub.add_parser("platforms", help="List available platforms")
sp.add_argument("--json", action="store_true", help="Output JSON with name and folder")
# Also accept global flags after the subcommand
sp.add_argument("--verbose", action="store_true", help="Verbose logging")
sp.add_argument("--force-update", "-force-update", action="store_true", help="Purge data (games/images/systems_list) and redownload")
sp.set_defaults(func=cmd_platforms)
sg = sub.add_parser("games", help="List games for a platform")
sg.add_argument("--platform", required=True, help="Platform name or key")
sg.add_argument("--search", help="Filter by name contains")
# Also accept global flags after the subcommand
sg.add_argument("--verbose", action="store_true", help="Verbose logging")
sg.add_argument("--force-update", "-force-update", action="store_true", help="Purge data (games/images/systems_list) and redownload")
sg.set_defaults(func=cmd_games)
sd = sub.add_parser("download", help="Download a game by title")
sd.add_argument("--platform", required=True)
sd.add_argument("--game", required=True)
sd.add_argument("--force", action="store_true", help="Override unsupported extension warning")
sd.add_argument("--interactive", "-i", action="store_true", help="Prompt to choose from matches when no exact title is found")
# Also accept global flags after the subcommand
sd.add_argument("--verbose", action="store_true", help="Verbose logging")
sd.add_argument("--force-update", "-force-update", action="store_true", help="Purge data (games/images/systems_list) and redownload")
sd.set_defaults(func=cmd_download)
sh = sub.add_parser("history", help="Show recent history")
sh.add_argument("--tail", type=int, default=50, help="Last N entries")
sh.add_argument("--json", action="store_true")
# Also accept global flags after the subcommand
sh.add_argument("--verbose", action="store_true", help="Verbose logging")
sh.add_argument("--force-update", "-force-update", action="store_true", help="Purge data (games/images/systems_list) and redownload")
sh.set_defaults(func=cmd_history)
sc = sub.add_parser("clear-history", help="Clear history")
# Also accept global flags after the subcommand
sc.add_argument("--verbose", action="store_true", help="Verbose logging")
sc.add_argument("--force-update", "-force-update", action="store_true", help="Purge data (games/images/systems_list) and redownload")
sc.set_defaults(func=cmd_clear_history)
return p
def main(argv=None):
argv = argv or sys.argv[1:]
# Force headless mode for CLI
os.environ.setdefault("RGSX_HEADLESS", "1")
parser = build_parser()
args = parser.parse_args(argv)
setup_logging(args.verbose)
# Ensure SAVE_FOLDER exists (for history/download outputs, etc.)
try:
os.makedirs(config.SAVE_FOLDER, exist_ok=True)
except Exception:
pass
# Handle global force-update (can run without a subcommand)
if getattr(args, 'force_update', False):
# Purge
try:
if os.path.exists(config.SOURCES_FILE):
os.remove(config.SOURCES_FILE)
except Exception:
pass
try:
shutil.rmtree(config.GAMES_FOLDER, ignore_errors=True)
except Exception:
pass
try:
shutil.rmtree(config.IMAGES_FOLDER, ignore_errors=True)
except Exception:
pass
# Redownload
ok = ensure_data_present(verbose=True)
if not ok:
sys.exit(1)
# If no subcommand, exit now
if not getattr(args, 'cmd', None):
return
# If a subcommand is provided, run it
if getattr(args, 'cmd', None):
args.func(args)
if __name__ == "__main__":
main()

View File

@@ -1,5 +1,4 @@
import shutil
import pygame # type: ignore
import re
import json
import os
@@ -7,6 +6,14 @@ import logging
import platform
import subprocess
import config
from config import HEADLESS
try:
if not HEADLESS:
import pygame # type: ignore
else:
pygame = None # type: ignore
except Exception:
pygame = None # type: ignore
import glob
import threading
from rgsx_settings import load_rgsx_settings, save_rgsx_settings