from quart import Blueprint, render_template, request, redirect, url_for, session, current_app import os from functools import wraps from datetime import datetime, timedelta import db_operations as db import json from utils.email_service import send_verification_email, send_password_reset_email # Create a Blueprint for auth routes auth_bp = Blueprint('auth', __name__) # Helper function to require login for protected routes def login_required(f): @wraps(f) async def decorated_function(*args, **kwargs): if 'session_token' not in session: return redirect(url_for('auth.login', next=request.url)) # Validate the session result = db.validate_session(session['session_token']) if not result['success']: # Clear the invalid session session.pop('session_token', None) return redirect(url_for('auth.login', next=request.url)) # Add user data to session session['user_id'] = result['user_id'] session['email'] = result['email'] session['first_name'] = result['first_name'] session['last_name'] = result['last_name'] return await f(*args, **kwargs) return decorated_function @auth_bp.route('/register', methods=['GET', 'POST']) async def register(): error_message = None success_message = None if request.method == 'POST': form_data = await request.form email = form_data.get('email', '').strip().lower() password = form_data.get('password', '') password_confirm = form_data.get('password_confirm', '') first_name = form_data.get('first_name', '').strip() last_name = form_data.get('last_name', '').strip() # Basic validation if not email or not password: error_message = "Email and password are required." elif password != password_confirm: error_message = "Passwords do not match." elif len(password) < 8: error_message = "Password must be at least 8 characters." else: # Create the user result = db.create_user(email, password, first_name, last_name) if 'error' in result: error_message = result['error'] else: # Send verification email verification_url = url_for( 'auth.verify_email', token=result['verification_token'], _external=True ) email_sent = await send_verification_email(email, verification_url) if email_sent: success_message = "Registration successful! Please check your email to verify your account." else: error_message = "Account created but there was a problem sending the verification email. Please contact support." return await render_template( 'auth/register.html', error_message=error_message, success_message=success_message ) @auth_bp.route('/verify-email/') async def verify_email(token): result = db.verify_user(token) if result['success']: success_message = "Your email has been verified successfully! You can now log in." return await render_template('auth/login.html', success_message=success_message) else: error_message = "Invalid or expired verification link." return await render_template('auth/login.html', error_message=error_message) @auth_bp.route('/login', methods=['GET', 'POST']) async def login(): error_message = None success_message = request.args.get('success_message') next_url = request.args.get('next', url_for('main.index')) if request.method == 'POST': form_data = await request.form email = form_data.get('email', '').strip().lower() password = form_data.get('password', '') remember_me = form_data.get('remember_me') == 'on' # Authenticate the user auth_result = db.authenticate_user(email, password) if auth_result['success']: # Create a session user_agent = request.headers.get('User-Agent', '') ip_address = request.remote_addr # Set session duration based on remember_me session_days = 30 if remember_me else 1 session_result = db.create_session( auth_result['user_id'], ip_address=ip_address, user_agent=user_agent, session_duration_days=session_days ) if session_result['success']: # Save session token in the user's browser session['session_token'] = session_result['session_token'] session.permanent = True current_app.permanent_session_lifetime = timedelta(days=session_days) # Redirect to the next URL or dashboard return redirect(next_url) else: error_message = "Error creating session. Please try again." else: error_message = auth_result['error'] return await render_template( 'auth/login.html', error_message=error_message, success_message=success_message ) @auth_bp.route('/logout') async def logout(): if 'session_token' in session: # End the session in the database db.end_session(session['session_token']) # Clear session data session.clear() return redirect(url_for('auth.login', success_message="You have been logged out successfully.")) @auth_bp.route('/forgot-password', methods=['GET', 'POST']) async def forgot_password(): error_message = None success_message = None if request.method == 'POST': form_data = await request.form email = form_data.get('email', '').strip().lower() # Generate reset token result = db.request_password_reset(email) if result['success'] and 'reset_token' in result: # Create password reset URL reset_url = url_for( 'auth.reset_password', token=result['reset_token'], _external=True ) # Send password reset email email_sent = await send_password_reset_email(email, reset_url) if email_sent: success_message = "If your email address exists in our database, you will receive a password recovery link at your email address shortly." else: error_message = "There was a problem sending the password reset email. Please try again later." else: # Don't reveal if email exists or not success_message = "If your email address exists in our database, you will receive a password recovery link at your email address shortly." return await render_template( 'auth/forgot_password.html', error_message=error_message, success_message=success_message ) @auth_bp.route('/reset-password/', methods=['GET', 'POST']) async def reset_password(token): error_message = None success_message = None if request.method == 'POST': form_data = await request.form password = form_data.get('password', '') password_confirm = form_data.get('password_confirm', '') # Basic validation if not password: error_message = "Password is required." elif password != password_confirm: error_message = "Passwords do not match." elif len(password) < 8: error_message = "Password must be at least 8 characters." else: # Reset the password result = db.reset_password(token, password) if result['success']: success_message = "Your password has been reset successfully! You can now log in." return await render_template('auth/login.html', success_message=success_message) else: error_message = result['error'] return await render_template( 'auth/reset_password.html', error_message=error_message, success_message=success_message, token=token ) @auth_bp.route('/profile', methods=['GET', 'POST']) @login_required async def profile(): error_message = None success_message = None user_id = session.get('user_id') # Get user profile data user_data = db.get_user_profile(user_id) if not user_data['success']: return redirect(url_for('auth.logout')) if request.method == 'POST': form_data = await request.form first_name = form_data.get('first_name', '').strip() last_name = form_data.get('last_name', '').strip() # Update profile result = db.update_user_profile(user_id, first_name, last_name) if result['success']: # Update session data session['first_name'] = first_name session['last_name'] = last_name success_message = "Profile updated successfully!" else: error_message = result['error'] # Get history data job_descriptions = db.get_user_job_descriptions(user_id) resumes = db.get_user_resumes(user_id) reports = db.get_user_reports(user_id) return await render_template( 'auth/profile.html', user=user_data, job_descriptions=job_descriptions.get('job_descriptions', []) if job_descriptions['success'] else [], resumes=resumes.get('resumes', []) if resumes['success'] else [], reports=reports.get('reports', []) if reports['success'] else [], error_message=error_message, success_message=success_message ) @auth_bp.route('/change-password', methods=['POST']) @login_required async def change_password(): error_message = None success_message = None user_id = session.get('user_id') form_data = await request.form current_password = form_data.get('current_password', '') new_password = form_data.get('new_password', '') confirm_password = form_data.get('confirm_password', '') # Basic validation if not current_password or not new_password: error_message = "All password fields are required." elif new_password != confirm_password: error_message = "New passwords do not match." elif len(new_password) < 8: error_message = "New password must be at least 8 characters." else: # Verify current password auth_result = db.authenticate_user(session.get('email'), current_password) if auth_result['success']: # Update password user_email = session.get('email') # Request password reset token reset_result = db.request_password_reset(user_email) if reset_result['success'] and 'reset_token' in reset_result: # Use the token to reset the password result = db.reset_password(reset_result['reset_token'], new_password) if result['success']: # Log the user out (their session was already invalidated) session.clear() return redirect(url_for('auth.login', success_message="Password changed successfully. Please log in with your new password.")) else: error_message = result['error'] else: error_message = "Error updating password. Please try again." else: error_message = "Current password is incorrect." # If there was an error, redirect back to profile with error message return redirect(url_for('auth.profile', error_message=error_message))