Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions app/auth/auth_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime, timezone
from typing import Optional

from bson import ObjectId
from flask_login import current_user
from werkzeug.security import generate_password_hash

Expand Down Expand Up @@ -231,6 +232,67 @@ async def delete_user(self, user_id: str) -> tuple[bool, str]:
logger.error(f"Error deleting user: {str(e)}")
return False, "An internal error has occurred."

@with_mongodb_retry(retries=3, delay=2)
async def change_password(self, user_id: str, current_password: str, new_password: str) -> tuple[bool, str]:
"""Change user password after verifying current password"""

try:
# Get user data
user_data = self.db.users.find_one({"_id": ObjectId(user_id)})
if not user_data:
return False, "User not found"

# Check for account lockout due to too many failed attempts
failed_attempts = user_data.get('failed_password_change_attempts', 0)
last_failed = user_data.get('last_failed_password_change')

if failed_attempts >= 5 and last_failed:
# Lock for 15 minutes after 5 failed attempts
time_since = (datetime.now(timezone.utc) - last_failed).total_seconds()
if time_since < 900: # 15 minutes
mins_left = int((900 - time_since) / 60) + 1
return False, f"Too many failed attempts. Try again in {mins_left} minutes"

# Create user object to verify current password
user = User.create_from_db(user_data)
if not user.check_password(current_password):
# Track failed attempt
self.db.users.update_one(
{"_id": ObjectId(user_id)},
{
"$set": {"last_failed_password_change": datetime.now(timezone.utc)},
"$inc": {"failed_password_change_attempts": 1}
}
)
return False, "Current password is incorrect"

# Check new password strength
password_valid, message = await check_password_strength(new_password)
if not password_valid:
return False, message

# Update password and reset failed attempts
result = self.db.users.update_one(
{"_id": ObjectId(user_id)},
{
"$set": {
"password_hash": generate_password_hash(new_password),
"password_changed_at": datetime.now(timezone.utc),
"failed_password_change_attempts": 0
},
"$unset": {"last_failed_password_change": ""}
}
)

if result.modified_count > 0:
logger.info(f"Password changed for user: {user.username}")
return True, "Password changed successfully"
return False, "Failed to update password"

except Exception as e:
logger.error(f"Error changing password: {str(e)}")
return False, "An internal error has occurred."

@with_mongodb_retry(retries=3, delay=2)
async def update_user_settings(self, user_id: str, form_data: dict, profile_picture=None) -> tuple[bool, Optional[str]]:
"""Update user settings including profile picture"""
Expand Down
48 changes: 48 additions & 0 deletions app/auth/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,54 @@ async def check_username():
}), 500


@auth_bp.route("/change_password", methods=["POST"])
@limiter.limit("5 per minute")
@login_required
@async_route
async def change_password():
"""Change user password"""
try:
if request.headers.get('X-Requested-With') != 'XMLHttpRequest':
return jsonify({"success": False, "message": "Invalid request"}), 403

data = request.get_json()
current_password = data.get('current_password', '').strip()
new_password = data.get('new_password', '').strip()
confirm_password = data.get('confirm_password', '').strip()

# Validate input
if not all([current_password, new_password, confirm_password]):
return jsonify({"success": False, "message": "All fields are required"}), 400

if new_password != confirm_password:
return jsonify({"success": False, "message": "New passwords do not match"}), 400

# Additional security: Prevent reusing current password
if current_password == new_password:
return jsonify({"success": False, "message": "New password must be different from current password"}), 400

# Change password
success, message = await user_manager.change_password(
current_user.get_id(),
current_password,
new_password
)

if success:
current_app.logger.info(f"Password changed for user {current_user.username}")
# Log out user to invalidate session
logout_user()
return jsonify({"success": True, "message": "Password changed successfully. Please log in again.", "redirect": url_for("auth.login")})
else:
# Don't reveal whether username exists or password was wrong (timing attack mitigation)
current_app.logger.warning(f"Failed password change attempt for user {current_user.username}")
return jsonify({"success": False, "message": message}), 400

except Exception as e:
current_app.logger.error(f"Error changing password for user {current_user.username}: {str(e)}", exc_info=True)
return jsonify({"success": False, "message": "An internal error has occurred."}), 500


@auth_bp.route("/delete_account", methods=["POST"])
@login_required
@async_route
Expand Down
Loading
Loading