fix(security): hash gateway pairing codes instead of storing plaintext

Pairing codes were stored as plaintext keys in JSON files. Now uses
sha256 + random salt hashing with constant-time comparison.

Fixes #8036

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Tom Qiao
2026-04-12 09:19:41 +08:00
committed by Teknium
parent 3ac2125140
commit 2e509422ef
2 changed files with 149 additions and 19 deletions

View File

@@ -18,6 +18,7 @@ Security features (based on OWASP + NIST SP 800-63-4 guidance):
Storage: ~/.hermes/pairing/
"""
import hashlib
import json
import os
import secrets
@@ -148,6 +149,11 @@ class PairingStore:
# ----- Pending codes -----
@staticmethod
def _hash_code(code: str, salt: bytes) -> str:
"""Hash a pairing code with the given salt using SHA-256."""
return hashlib.sha256(salt + code.encode("utf-8")).hexdigest()
def generate_code(
self, platform: str, user_id: str, user_name: str = ""
) -> Optional[str]:
@@ -158,6 +164,9 @@ class PairingStore:
- User is rate-limited (too recent request)
- Max pending codes reached for this platform
- User/platform is in lockout due to failed attempts
The code is NOT stored in plaintext. Only a salted SHA-256 hash is
persisted so that reading the pending file does not reveal codes.
"""
with self._lock:
self._cleanup_expired(platform)
@@ -178,8 +187,17 @@ class PairingStore:
# Generate cryptographically random code
code = "".join(secrets.choice(ALPHABET) for _ in range(CODE_LENGTH))
# Store pending request
pending[code] = {
# Hash the code with a random salt before storing
salt = os.urandom(16)
code_hash = self._hash_code(code, salt)
# Use a unique entry id as the key (not the code itself)
entry_id = secrets.token_hex(8)
# Store pending request with hashed code
pending[entry_id] = {
"hash": code_hash,
"salt": salt.hex(),
"user_id": user_id,
"user_name": user_name,
"created_at": time.time(),
@@ -195,10 +213,14 @@ class PairingStore:
"""
Approve a pairing code. Adds the user to the approved list.
Returns {user_id, user_name} on success, None if code is
Returns ``{user_id, user_name}`` on success, ``None`` if the code is
invalid/expired OR the platform is currently locked out after
``MAX_FAILED_ATTEMPTS`` failed approvals (#10195). Callers can
disambiguate with ``_is_locked_out(platform)``.
Verification: the user-provided code is hashed with each stored
entry's salt and compared to the stored hash using constant-time
comparison.
"""
with self._lock:
self._cleanup_expired(platform)
@@ -213,33 +235,51 @@ class PairingStore:
return None
pending = self._load_json(self._pending_path(platform))
if code not in pending:
# Find the entry whose hash matches the provided code
matched_key = None
matched_entry = None
for entry_id, entry in pending.items():
salt = bytes.fromhex(entry["salt"])
candidate_hash = self._hash_code(code, salt)
if secrets.compare_digest(candidate_hash, entry["hash"]):
matched_key = entry_id
matched_entry = entry
break
if matched_key is None:
self._record_failed_attempt(platform)
return None
entry = pending.pop(code)
del pending[matched_key]
self._save_json(self._pending_path(platform), pending)
# Add to approved list
self._approve_user(platform, entry["user_id"], entry.get("user_name", ""))
self._approve_user(platform, matched_entry["user_id"],
matched_entry.get("user_name", ""))
return {
"user_id": entry["user_id"],
"user_name": entry.get("user_name", ""),
"user_id": matched_entry["user_id"],
"user_name": matched_entry.get("user_name", ""),
}
def list_pending(self, platform: str = None) -> list:
"""List pending pairing requests, optionally filtered by platform."""
"""List pending pairing requests, optionally filtered by platform.
Codes are stored hashed — the ``code`` field is replaced with the
first 8 hex characters of the hash so admins can distinguish entries
without revealing the original code.
"""
results = []
platforms = [platform] if platform else self._all_platforms("pending")
for p in platforms:
self._cleanup_expired(p)
pending = self._load_json(self._pending_path(p))
for code, info in pending.items():
for entry_id, info in pending.items():
age_min = int((time.time() - info["created_at"]) / 60)
results.append({
"platform": p,
"code": code,
"code": info["hash"][:8],
"user_id": info["user_id"],
"user_name": info.get("user_name", ""),
"age_minutes": age_min,
@@ -302,12 +342,12 @@ class PairingStore:
pending = self._load_json(path)
now = time.time()
expired = [
code for code, info in pending.items()
entry_id for entry_id, info in pending.items()
if (now - info["created_at"]) > CODE_TTL_SECONDS
]
if expired:
for code in expired:
del pending[code]
for entry_id in expired:
del pending[entry_id]
self._save_json(path, pending)
def _all_platforms(self, suffix: str) -> list: