Skip to content
Open
80 changes: 74 additions & 6 deletions fileglancer/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,14 +865,20 @@ async def get_profile(username: str = Depends(get_current_user)):

# SSH Key Management endpoints
@app.get("/api/ssh-keys", response_model=sshkeys.SSHKeyListResponse,
description="List all SSH keys in the user's ~/.ssh directory")
description="List Fileglancer-managed SSH keys")
async def list_ssh_keys(username: str = Depends(get_current_user)):
"""List SSH keys for the authenticated user"""
"""List SSH keys with 'fileglancer' in the comment"""
with _get_user_context(username):
try:
ssh_dir = sshkeys.get_ssh_directory()
keys = sshkeys.list_ssh_keys(ssh_dir)
return sshkeys.SSHKeyListResponse(keys=keys)
exists, unmanaged, missing_pubkey = sshkeys.check_id_ed25519_status(ssh_dir)
return sshkeys.SSHKeyListResponse(
keys=keys,
unmanaged_id_ed25519_exists=unmanaged,
id_ed25519_exists=exists,
id_ed25519_missing_pubkey=missing_pubkey
)
except Exception as e:
logger.error(f"Error listing SSH keys for {username}: {e}")
raise HTTPException(status_code=500, detail=str(e))
Expand Down Expand Up @@ -951,27 +957,89 @@ async def authorize_ssh_key(username: str = Depends(get_current_user)):
logger.error(f"Error authorizing SSH key for {username}: {e}")
raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/ssh-keys/content", response_model=sshkeys.SSHKeyContent,
@app.post("/api/ssh-keys/regenerate-public",
response_model=sshkeys.SSHKeyInfo,
description="Regenerate public key from private key")
async def regenerate_public_key(
request: sshkeys.GenerateKeyRequest = Body(default=sshkeys.GenerateKeyRequest()),
username: str = Depends(get_current_user)
):
"""Regenerate the public key from the private key.

If the private key is encrypted, provide the passphrase in the request body.
"""
with _get_user_context(username):
try:
ssh_dir = sshkeys.get_ssh_directory()
key_info = sshkeys.regenerate_public_key(
ssh_dir,
passphrase=request.passphrase
)
return key_info

except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except RuntimeError as e:
# Check for passphrase errors
if "passphrase" in str(e).lower():
raise HTTPException(status_code=401, detail=str(e))
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
logger.error(f"Error regenerating public key for {username}: {e}")
raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/ssh-keys/content",
description="Get the content of the default SSH key (id_ed25519)")
async def get_ssh_key_content(
key_type: str = Query(..., description="Type of key to fetch: 'public' or 'private'"),
username: str = Depends(get_current_user)
):
"""Get the public or private key content for copying"""
"""Get the public or private key content for copying.

Returns plain text response with secure bytearray handling that wipes
the key content from memory after sending.
"""
if key_type not in ("public", "private"):
raise HTTPException(status_code=400, detail="key_type must be 'public' or 'private'")

with _get_user_context(username):
try:
ssh_dir = sshkeys.get_ssh_directory()
return sshkeys.get_key_content(ssh_dir, "id_ed25519", key_type)
key_buffer = sshkeys.get_key_content(ssh_dir, "id_ed25519", key_type)
return sshkeys.SSHKeyContentResponse(key_buffer)

except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Error getting SSH key content for {username}: {e}")
raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/ssh-keys/generate-temp",
description="Generate a temporary SSH key and return private key for one-time copy")
async def generate_temp_ssh_key(
request: sshkeys.GenerateKeyRequest = Body(default=sshkeys.GenerateKeyRequest()),
username: str = Depends(get_current_user)
):
"""Generate a temporary SSH key, add to authorized_keys, return private key.

The private key is streamed securely and the temporary files are deleted
after the response is sent. Key info is included in response headers:
- X-SSH-Key-Filename
- X-SSH-Key-Type
- X-SSH-Key-Fingerprint
- X-SSH-Key-Comment
"""
with _get_user_context(username):
try:
ssh_dir = sshkeys.get_ssh_directory()
return sshkeys.generate_temp_key_and_authorize(ssh_dir, request.passphrase)

except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
logger.error(f"Error generating temp SSH key for {username}: {e}")
raise HTTPException(status_code=500, detail=str(e))

# File content endpoint
@app.head("/api/content/{path_name:path}")
async def head_file_content(path_name: str,
Expand Down
Loading