import csv import logging import os import sqlite3 from datetime import datetime, date, timedelta from flask import Flask, render_template, request, jsonify, session, redirect, url_for, flash from werkzeug.security import generate_password_hash, check_password_hash from flask_jwt_extended import jwt_required, get_jwt_identity, JWTManager, get_jwt, create_access_token from flask_cors import CORS app = Flask(__name__) CORS(app, supports_credentials=True, resources={r"*": {"origins": "*"}}) app.secret_key = 'Password' app.config['SESSION_COOKIE_HTTPONLY'] = False app.config['SESSION_COOKIE_SECURE'] = False # Enforce cookies over HTTPS app.config['PERMANENT_SESSION_LIFETIME'] = 1800 # 30 Minutes, adjust as needed app.config['JWT_SECRET_KEY'] = 'AX29X822$2323&223XAPIDN012XDJJHDOO$' # Change this to a random string! jwt = JWTManager(app) # Set up logging logging.basicConfig(level=logging.INFO) @jwt.expired_token_loader def my_expired_token_callback(jwt_header, jwt_payload): return jsonify({'error': 'Your token has expired, please log in again.'}), 401 # database Functions def get_db_connection(): conn = sqlite3.connect('Compliance_platform.db') conn.row_factory = sqlite3.Row return conn def check_login(): # Use JWT to secure this endpoint and retrieve the identity @jwt_required(optional=False) def inner(): username = get_jwt_identity() try: with get_db_connection() as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() # Retrieve the user from the database based on the username cursor.execute('SELECT role FROM users WHERE username = ?', (username,)) user = cursor.fetchone() if user: return user['role'] # Return the user's role if everything checks out except sqlite3.Error as e: logging.error('An error occurred during login check: %s', str(e)) return None return inner() # Call the inner function to execute with jwt_required decorator # Routes @app.route('/create-user', methods=['GET', 'POST']) def create_user(): if not (check_login() and session.get('role') == 'Admin'): # User is not authenticated or not an admin, redirect or send an unauthorized response return jsonify({'error': 'Unauthorized'}), 401 if request.method == 'POST': username = request.form['username'] password = request.form['password'] full_name = request.form['FullName'] name_parts = full_name.split() first_name = name_parts[0] last_name = name_parts[-1] if len(name_parts) > 1 else '' role = 'User' status = "Active" hashed_password = generate_password_hash(password) try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( 'INSERT INTO users (username, password, role, Status, first_name, last_name) VALUES (?, ?, ?, ?, ?, ?)', (username, hashed_password, role, status, first_name, last_name)) conn.commit() except sqlite3.IntegrityError: conn.close() return jsonify({'error': 'Username already exists'}), 409 # Conflict error finally: if conn: conn.close() return jsonify({'success': 'User created successfully', 'username': username}) else: # For GET requests, you might still want to serve the form, or you could handle it differently return jsonify({'error': 'Method not allowed'}), 405 @app.route('/update-account', methods=['POST']) def update_account(): fullname = request.form.get('fullname') password = request.form.get('password') # Need to Rebuild # Update the user's full name and password in the database connection = get_db_connection() cursor = connection.cursor() cursor.execute("UPDATE users SET fullname = ?, password = ? WHERE id = ?", (fullname, password)) connection.commit() cursor.close() connection.close() # Redirect the user to the account page return redirect('/account') @app.route('/search-clients') def search_clients(): return render_template('search_clients.html') @app.route('/api/clients', methods=['GET']) def get_clients(): conn = get_db_connection() # Assuming this is your function to connect to the database cursor = conn.cursor() try: # Querying iso_clients table cursor.execute("SELECT * FROM iso_clients") iso_clients = cursor.fetchall() # Querying soc2_clients table cursor.execute("SELECT * FROM soc2_clients") soc2_clients = cursor.fetchall() # Formatting results into a list of dictionaries iso_client_list = [dict(row) for row in iso_clients] soc2_client_list = [dict(row) for row in soc2_clients] # Combining both lists combined_clients = iso_client_list + soc2_client_list return jsonify(combined_clients) except Exception as e: return jsonify({"error": str(e)}), 500 finally: conn.close() @app.route('/api/get-chapters-data', methods=['GET']) def get_chapters_data(): try: with get_db_connection() as conn: cursor = conn.cursor() # Fetch distinct chapter names from the ISO27001 table cursor.execute("SELECT DISTINCT chapter_name FROM ISO27001 ORDER BY id") chapters = cursor.fetchall() chapters_list = [{'chapter_name': row['chapter_name']} for row in chapters] return jsonify(chapters_list) except Exception as e: return jsonify({'error': 'Internal server error'}), 500 @app.route('/api/compliance-data', methods=['GET']) def get_compliance_data(): try: with get_db_connection() as conn: cursor = conn.cursor() # Fetching required data from ISO27001 table cursor.execute("SELECT chapter_name, control_id, control_text, requirements, explanation FROM ISO27001") compliance_data = cursor.fetchall() compliance_data_list = [dict(row) for row in compliance_data] return jsonify(compliance_data_list) except Exception as e: return jsonify({'error': 'Internal server error'}), 500 @app.route('/api/save-compliance-data', methods=['POST']) def save_compliance_data(): updated_data = request.json try: conn = get_db_connection() cursor = conn.cursor() for item in updated_data: # Check if a record exists for the given projectId and controlId cursor.execute(''' SELECT status_id FROM customer_compliance_status WHERE projectId = ? AND controlId = ? ''', (item['projectId'], item['controlId'])) status_id = cursor.fetchone() if status_id: # Update existing record cursor.execute(''' UPDATE customer_compliance_status SET control_status = ?, notes = ? WHERE status_id = ? ''', (item['controlStatus'], item['notes'], status_id[0])) else: # Insert new record cursor.execute(''' INSERT INTO customer_compliance_status (project_id, control_id, control_status, notes) VALUES (?, ?, ?, ?) ''', (item['projectId'], item['controlId'], item['controlStatus'], item['notes'])) conn.commit() conn.close() return jsonify({'message': 'Compliance data updated successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/edit-user/', methods=['GET', 'POST']) def edit_user(user_id): if request.method == 'POST': # Get the form data fullname = request.form['fullname'] username = request.form['username'] password = request.form['password'] role = request.form['role'] department = request.form['department'] # Connect to the SQLite database conn = sqlite3.connect('workflows.db') cursor = conn.cursor() # Update the user's information in the database cursor.execute("UPDATE users SET fullname=?, username=?, password=?, role=?, department=? WHERE id=?", (fullname, username, password, role, department, int(user_id))) conn.commit() # Close the database connection conn.close() # Redirect to the user management page return redirect('/user-management') else: # Connect to the SQLite database conn = get_db_connection() cursor = conn.cursor() # Fetch the user data from the database based on the user_id cursor.execute("SELECT * FROM users WHERE id=?", (int(user_id),)) user = cursor.fetchone() # Close the database connection conn.close() if user: # Convert the user tuple to a dictionary user_dict = { 'id': user[0], 'fullname': user[1], 'username': user[2], 'password': user[3], 'role': user[4], } # Render the edit user template with the user data return render_template('edit_user.html', user=user_dict) else: # Handle the case when the user is not found in the database # You can redirect or show an error message return redirect('/user-management') @app.route('/logout') def logout(): session.clear() return redirect('/login') @app.route('/user-management') def user_management(): if check_login() and session['role'] == 'Admin': return render_template('user_management.html') else: return redirect('/login') @app.route('/api/users') def api_users(): if check_login() and session['role'] == 'Admin': # Retrieve user data from the database conn = get_db_connection() cursor = conn.cursor() cursor.execute('SELECT * FROM users') users = cursor.fetchall() conn.close() # Convert fetched data into a JSON-serializable format user_list = [] for user in users: user_list.append({ 'username': user['username'], 'role': user['role'], 'Name': user['first_name'], 'last_login': user['last_login'], 'status': user['status'] }) return jsonify(user_list) else: return jsonify({'error': 'Unauthorized'}), 401 @app.route('/delete-user/', methods=['POST']) def delete_user(user_id): if check_login() and session['role'] == 'Admin': conn = get_db_connection() cursor = conn.cursor() cursor.execute('DELETE FROM users WHERE id = ?', (user_id,)) conn.commit() conn.close() return redirect('/user-management') else: return redirect('/login') @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': try: username = request.form['username'] password = request.form['password'] hashed_password = generate_password_hash(password) full_name = request.form['FullName'] name_parts = full_name.split() first_name = name_parts[0] last_name = name_parts[-1] if len(name_parts) > 1 else '' role = 'User' status = "Disabled" conn = get_db_connection() cursor = conn.cursor() cursor.execute( 'INSERT INTO users (username, password, role, Status, first_name, last_name) VALUES (?, ?, ?, ?, ?, ?)', (username, hashed_password, role, status, first_name, last_name)) conn.commit() conn.close() # Returning JSON response for AJAX request return jsonify({"success": True, "message": "User Successfully Registered - Ask Admin to Be Enabled"}) except Exception as e: # Handle errors and send error message as JSON return jsonify({"success": False, "message": str(e)}), 500 else: # Serve the registration form template for GET requests return render_template('register.html') @app.route('/user-dashboard') def consultant_dashboard(): if check_login() and session['role'] == 'Consultant': return render_template('core_consultant.html') else: return redirect('/login') @app.route('/admin-dashboard') def admin_dashboard(): if check_login() and session['role'] == 'Admin': return render_template('core_admin.html') else: return redirect('/login') @app.route('/customer-dashboard') def customer_dashboard(): if check_login() and session['role'] == 'Customer': return render_template('core_customer.html') else: return redirect('/login') @app.route('/user-data/') def user_data(user_id): if check_login() and session['role'] == 'Admin': conn = get_db_connection() cursor = conn.cursor() cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,)) user = cursor.fetchone() conn.close() return render_template('user_data.html', user=user) else: return redirect('/login') @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] conn = get_db_connection() cursor = conn.cursor() try: cursor.execute('SELECT * FROM users WHERE username = ?', (username,)) user = cursor.fetchone() if user and check_password_hash(user['password'], password): # Explicitly check if password change is needed if user['password_change'] is True: session['user_id'] = user['id'] return jsonify({'passwordChangeRequired': True, 'message': 'Password change required.'}) # Update last login time cursor.execute('UPDATE users SET last_login = ? WHERE id = ?', (datetime.now(), user['id'])) conn.commit() if request.form['authType'] == 'jwt': # Generate the JWT access token access_token = create_access_token(identity=username, additional_claims={"role": user['role'], "user_id": user['id']}, expires_delta=timedelta(hours=1)) # return jsonify(access_token=access_token), 200 return jsonify({'passwordChangeRequired': False, 'access_token': access_token, 'user': {'username': user['username'], 'role': user['role'], 'user_id': user['id']}, 'redirect_to': url_for('dashboard')}), 200 session['username'] = user['username'] session['role'] = user['role'] session['user_id'] = user['id'] return jsonify({'passwordChangeRequired': False, 'redirect': url_for('dashboard')}) return jsonify({'error': 'Invalid username or password'}), 401 finally: conn.close() else: return render_template('login.html') @app.route('/update-password', methods=['POST']) def update_password(): user_id = session.get('user_id') if not user_id: return jsonify({'error': 'User not logged in'}), 401 new_password = request.form['newPassword'] hashed_password = generate_password_hash(new_password) try: conn = get_db_connection() cursor = conn.cursor() cursor.execute('UPDATE users SET password = ?, password_change = FALSE WHERE id = ?', (hashed_password, user_id)) conn.commit() except Exception as e: conn.close() return jsonify({'error': str(e)}), 500 conn.close() return jsonify({'success': 'Password updated successfully'}) @app.route('/update-profile', methods=['POST']) def update_profile(): if 'user_id' not in session: return redirect('/login') # Redirect to login if not authenticated user_id = session['user_id'] new_password = request.form.get('password') first_name = request.form.get('first_name') last_name = request.form.get('last_name') conn = get_db_connection() cursor = conn.cursor() update_data = {} if new_password: update_data['password'] = generate_password_hash(new_password) if first_name: update_data['first_name'] = first_name if last_name: update_data['last_name'] = last_name # Constructing and executing the update query if update_data: update_query = 'UPDATE users SET ' update_query += ', '.join([f"{key} = ?" for key in update_data.keys()]) update_query += ' WHERE id = ?' cursor.execute(update_query, tuple(update_data.values()) + (user_id,)) conn.commit() conn.close() flash('Profile updated successfully!', 'success') return jsonify({'message': 'Profile updated successfully!'}) @app.route('/update-profile-form') def update_profile_form(): # Ensure the user is logged in if 'user_id' in session: return render_template('update_profile.html') else: return redirect('/login') @app.route('/view-past-tests/', methods=['GET']) def view_past_tests(user_id): # Check if the user is logged in and has the correct user_id if check_login() and session['user_id'] == user_id: # Connect to the database conn = get_db_connection() cursor = conn.cursor() # Execute the query to fetch past test records for the user cursor.execute('SELECT * FROM exams_record WHERE user_id = ? ORDER BY exam_date DESC', (user_id,)) past_tests = cursor.fetchall() # Close the database connection conn.close() # Render the past_tests.html template and pass the past_tests data return render_template('past_tests.html', past_tests=past_tests) else: # If not logged in or user_id doesn't match, redirect to the login page return redirect('/login') def calculate_and_update_user_average_score(user_id): conn = get_db_connection() cursor = conn.cursor() # Calculate the average score cursor.execute('SELECT score FROM exams_record WHERE user_id = ?', (user_id,)) scores = cursor.fetchall() if scores: total_score = sum(score['score'] for score in scores) average_score = total_score / len(scores) else: average_score = None # or 0, depending on how you want to handle users with no scores # Update the average score in the users table if average_score is not None: cursor.execute('UPDATE users SET average_score = ? WHERE id = ?', (average_score, user_id)) conn.commit() conn.close() return average_score def update_user_tests_count(user_id): conn = get_db_connection() cursor = conn.cursor() # Count the number of tests taken by the user cursor.execute('SELECT COUNT(*) FROM exams_record WHERE user_id = ?', (user_id,)) count_result = cursor.fetchone() total_tests = count_result[0] if count_result else 0 # Update the total tests taken in the users table cursor.execute('UPDATE users SET total_tests_taken = ? WHERE id = ?', (total_tests, user_id)) conn.commit() conn.close() return total_tests @app.route('/api/user-data/', methods=['GET']) def get_user_data(user_id): if check_login(): try: # Connect to the SQLite database conn = get_db_connection() cursor = conn.cursor() # Query the database to retrieve user data cursor.execute( 'SELECT username, first_name, last_name, last_login, customer_id FROM users WHERE id = ?', (user_id,)) user_data = cursor.fetchone() conn.close() if user_data: # Create a JSON response with the required user data response_data = { "username": user_data['username'], "first_name": user_data['first_name'], "last_name": user_data['last_name'], "last_login": user_data['last_login'], "customer_id": user_data['customer_id'], } return jsonify(response_data), 200 else: return jsonify({"error": "User not found"}), 404 except Exception as e: return jsonify({"error": str(e)}), 500 else: return jsonify({"error": "Unauthorized access"}), 401 @app.route('/') def main_page(): return render_template('main.html') @app.route('/compliance-dashboard') def compliance_dashboard(): if not check_login(): return redirect(url_for('login')) customer_id = request.args.get('customerId', None) # Fetch additional data related to customer_id if necessary # Pass customer_id to the template return render_template('compliance_dashboard.html', customer_id=customer_id) @app.route('/start-compliance-project', methods=['POST']) def start_compliance_project(): if 'user_id' in session: # Assuming this check is still relevant for session management data = request.json client_id = data.get('customerId') # Get client_id from the request friendly_name = data.get('friendly_name') backend_name = data.get('backend_name') compliance_type = data.get('compliance_type') if not client_id or not friendly_name or not backend_name or not compliance_type: return jsonify({'error': 'Missing parameters'}), 400 try: conn = get_db_connection() cursor = conn.cursor() # Insert data into the compliance_projects table cursor.execute(''' INSERT INTO compliance_projects (project_name, client_id, start_date, status, backend_name, compliance_type) VALUES (?, ?, ?, ?, ?, ?) ''', (friendly_name, client_id, date.today(), 'Active', backend_name, compliance_type)) project_id = cursor.lastrowid conn.commit() conn.close() return jsonify({'message': 'Compliance project started successfully', 'project_id': project_id}), 201 except Exception as e: app.logger.error(f"Error starting compliance project: {str(e)}") return jsonify({'error': str(e)}), 500 else: return jsonify({'error': 'Unauthorized'}), 401 @app.route('/view-compliance-projects') def view_compliance_projects(): if 'user_id' in session: # You can add additional logic here if needed return render_template('view_compliance_projects.html') # Replace with your actual template filename else: return redirect('/login') # Redirect to login if the user is not logged in @app.route('/fetch-customer-projects', methods=['GET']) def fetch_customer_projects(): try: # Connect to your SQLite database conn = get_db_connection() cursor = conn.cursor() # Query the database to fetch project_name and project_id based on customer_id cursor.execute( "SELECT project_name, project_id, customer_name, compliance_type FROM compliance_projects" ) results = cursor.fetchall() conn.close() projects_list = [{'project_name': result[0], 'project_id': result[1], 'customer_name': result[2], 'compliance_type': result[3]} for result in results] # Return the data as JSON return jsonify({'projects': projects_list}), 200 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/compliance-projects/', methods=['GET']) def get_compliance_projects(): try: conn = get_db_connection() # Ensure you are connecting to the correct database cursor = conn.cursor() # Query to fetch compliance projects for a specific customer cursor.execute(''' SELECT project_id, project_name, customer_name , customer_id, start_date, end_date, status, compliance_type FROM compliance_projects ''', ()) projects = cursor.fetchall() # Convert the result to a list of dictionaries projects_data = [] for project in projects: project_data = { 'projectId': project['project_id'], 'projectName': project['project_name'], 'customerName': project['customer_name'], 'customerId': project['customer_id'], 'complianceType': project['compliance_type'], 'startDate': project['start_date'], 'endDate': project['end_date'] if project['end_date'] else None, 'status': project['status'] } projects_data.append(project_data) return jsonify(projects_data) except Exception as e: return jsonify({'error': str(e)}), 500 finally: if conn: conn.close() @app.route('/show-gap-analysis-by-type', methods=['GET']) def show_gap_analysis_by_type(): compliance_type = request.args.get('compliance_type') if not compliance_type: return jsonify({'error': 'Compliance type is required'}), 400 try: conn = get_db_connection() cursor = conn.cursor() # Construct query safely to avoid SQL injection query = 'SELECT id, chapter_name, control_id, control_text, requirements, explanation FROM {}'.format( compliance_type) cursor.execute(query) rows = cursor.fetchall() # Format the rows into a list of dicts data = [dict(row) for row in rows] return jsonify(data) except Exception as e: return jsonify({'error': str(e)}), 500 finally: conn.close() @app.route('/gap-analysis-doc', methods=['POST']) def gap_analysis_doc(): # Retrieve data from form and store in session session['projectId'] = request.form.get('projectId') session['complianceType'] = request.form.get('complianceType') session['last_visited'] = 'gap_analysis_doc' # Track last visited for potential use session.modified = True # Redirect to a GET handler that shows the analysis return redirect(url_for('show_gap_analysis')) @app.route('/show-gap-analysis') def show_gap_analysis(): project_id = session.get('projectId') compliance_type = session.get('complianceType') if not project_id or not compliance_type: # Redirect to main page if session data is missing or invalid return redirect(url_for('main_page')) return render_template('Gap_Analysis_Doc.html', project_id=project_id, compliance_type=compliance_type) @app.route('/get-gap-analysis-info') def get_gap_analysis_info(): # Retrieve data from session project_id = session.get('projectId') compliance_type = session.get('complianceType') if not project_id or not compliance_type: return jsonify({'error': 'Session data not set'}), 400 return jsonify({ 'projectId': project_id, 'complianceType': compliance_type }) @app.route('/update-compliance-project/', methods=['POST']) def update_compliance_project(project_id): if 'user_id' not in session: return jsonify({'error': 'Unauthorized'}), 401 try: conn = get_db_connection() data = request.get_json() app.logger.info(f'Received Data: {data}') # Log the entire received JSON data # Check if project exists cursor = conn.cursor() cursor.execute("SELECT 1 FROM compliance_projects WHERE project_id = ?", (project_id,)) result = cursor.fetchone() if not result: return jsonify({'error': 'Project not found'}), 404 # Insert or update data into customer_compliance_status table controlData = data.get('controlData', []) for control in controlData: # Extract control details control_id = control.get('control_id') control_status = control.get('control_status') notes = control.get('notes') links = control.get('links') non_conformity = control.get('nonConformity') opportunity = control.get('opportunity') remediation_plan = control.get('remediation_plan') # Extract remediation_plan from the control app.logger.info(f'Control Data: {control}') # Log control data # Update or insert new control data if control_id and control_status: cursor.execute(""" SELECT * FROM customer_compliance_status WHERE project_id = ? AND control_id = ?""", (project_id, control_id)) existing_record = cursor.fetchone() if existing_record: cursor.execute(""" UPDATE customer_compliance_status SET control_status = ?, notes = ?, links = ?, "Non-Conformity" = ?, "Opportunity_for_Improvement" = ?, remediation_plan = ? WHERE project_id = ? AND control_id = ?""", (control_status, notes, links, non_conformity, opportunity, remediation_plan, project_id, control_id)) else: cursor.execute(""" INSERT INTO customer_compliance_status (project_id, control_id, control_status, notes, links, "Non-Conformity", "Opportunity_for_Improvement", remediation_plan) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", (project_id, control_id, control_status, notes, links, non_conformity, opportunity, remediation_plan)) conn.commit() return jsonify({'message': 'Data saved successfully'}), 200 except Exception as e: app.logger.error(str(e)) return jsonify({'error': str(e)}), 500 @app.route('/get-compliance-info', methods=['GET']) def get_compliance_info(): try: # Retrieve project_id and control_id from query parameters project_id = request.args.get('projectId') control_id = request.args.get('control_id') if not project_id or not control_id: return jsonify({'error': 'Both project_id and control_id are required'}), 400 conn = get_db_connection() cursor = conn.cursor() # Query to retrieve compliance details directly using project_id cursor.execute( "SELECT control_status, notes, links, \"Non-Conformity\", \"Opportunity_for_Improvement\" FROM customer_compliance_status WHERE project_id = ? AND control_id = ?", (project_id, control_id)) data = cursor.fetchone() conn.close() if data: control_status, notes, links, non_conformity, opportunity_for_improvement = data return jsonify({ 'control_id': control_id, 'control_status': control_status, 'notes': notes, 'links': links, 'non_conformity': non_conformity, 'opportunity_for_improvement': opportunity_for_improvement }) else: # Return a 'no data' message with a 200 OK status to avoid 404 console log return jsonify({'message': 'No compliance data available for the specified control.'}), 248 except Exception as e: conn.close() # Ensure connection is closed in case of exceptions return jsonify({'error': str(e)}), 500 @app.route('/SOA') def soa(): # Retrieve data from session instead of query parameters compliance_type = session.get('complianceType') project_id = session.get('projectId') if not compliance_type or not project_id: return redirect(url_for('login')) return render_template('SOA.html', compliance_type=compliance_type, projectId=project_id) @app.route('/set-SOA-session', methods=['POST']) def set_soa_session(): compliance_type = request.form.get('complianceType') project_id = request.form.get('projectId') # Set these in session session['complianceType'] = compliance_type session['projectId'] = project_id session.modified = True # Redirect to the SOA page return redirect(url_for('soa')) @app.route('/get-soa-session-data') def get_session_data(): compliance_type = session.get('complianceType') project_id = session.get('projectId') if compliance_type and project_id: return jsonify({ 'complianceType': compliance_type, 'projectId': project_id }) else: return jsonify({'error': 'Session data not found'}), 404 @app.route('/submit-soa', methods=['POST']) def submit_soa(): data = request.get_json() sql = ''' INSERT INTO soa_customers (project_id, compliance_type, control_id, control_text, applicability, remarks, current_control, accountability) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (project_id, control_id) DO UPDATE SET compliance_type=excluded.compliance_type, control_text=excluded.control_text, applicability=excluded.applicability, remarks=excluded.remarks, current_control=excluded.current_control, accountability=excluded.accountability ''' values = ( data.get('project_id'), data.get('compliance_type'), data.get('control_id'), data.get('control_text'), data.get('applicability'), data.get('remarks'), data.get('current_control'), data.get('accountability') ) try: with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(sql, values) conn.commit() except sqlite3.OperationalError as e: app.logger.error(f'Database Operational Error: {e}') return jsonify({'error': 'Database error', 'message': str(e)}), 500 except Exception as e: app.logger.error(f'Unexpected Error: {e}') return jsonify({'error': 'Unexpected error', 'message': str(e)}), 500 return jsonify({'message': 'Data submitted successfully'}), 200 @app.route('/check-soa-data//', methods=['GET']) def check_soa_data(complianceType, projectId): conn = get_db_connection() cursor = conn.cursor() try: # Prepare SQL query to fetch and sort data based on complianceType and projectId cursor.execute(""" SELECT * FROM soa_customers WHERE compliance_type = ? AND project_id = ? ORDER BY id ASC """, (complianceType, projectId)) results = cursor.fetchall() # Check if data is found if results: soa_data = [{ 'id': row[0], # Adding id for clarity and order verification 'control_id': row[3], # control_id is the 4th column 'control_text': row[4], # control_text is the 5th column 'applicability': row[5], # applicability is the 6th column 'remarks': row[6], # remarks is the 7th column 'current_control': row[7], # current_control is the 8th column 'accountability': row[8] # accountability is the 9th column } for row in results] return jsonify(soa_data), 200 else: return jsonify({'message': 'No data found'}), 404 except Exception as e: return jsonify({'error': str(e)}), 500 finally: cursor.close() conn.close() @app.route('/soa-data-default/', methods=['GET']) def get_soa_data(): conn = get_db_connection() # Function to get database connection cursor = conn.cursor() try: # Execute a SELECT query to fetch all data from the soa_default table cursor.execute('SELECT * FROM soa_default') results = cursor.fetchall() # Convert the results into a list of dictionaries soa_data = [] for row in results: soa_data.append({ 'control_id': row[1], 'control_text': row[2], 'current_control': row[3], 'remarks_implementation': row[4], 'accountability': row[5] }) return jsonify(soa_data), 200 except Exception as e: return jsonify({'error': str(e)}), 500 finally: cursor.close() conn.close() @app.route('/project-dashboard-view/', methods=['GET']) def project_dashboard_view(projectID): try: conn = get_db_connection() cursor = conn.cursor() # Query the customer_compliance_status table based on the projectID cursor.execute("SELECT control_id, control_status FROM customer_compliance_status WHERE project_id = ?", (projectID,)) data = cursor.fetchall() conn.close() # Check if data is empty if not data: return jsonify({'error': 'No data found for the projectID'}), 404 # Convert data to a list of dictionaries result = [{'control_id': row[0], 'control_status': row[1]} for row in data] return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/finish-compliance-project/', methods=['POST']) def finish_compliance_project(project_id): # Ensure user is authenticated if 'user_id' not in session: return jsonify({'error': 'Unauthorized'}), 401 # Update the project status to 'Finished' conn = get_db_connection() cursor = conn.cursor() cursor.execute("UPDATE compliance_projects SET status = 'Finished' WHERE project_id = ?", (project_id,)) conn.commit() conn.close() return jsonify({'message': 'Project finished successfully'}), 200 @app.route('/manage-compliance-project/') def manage_compliance_project_page(project_id): # Ensure user is authenticated if 'user_id' not in session: return redirect('/login') # Ensure the user has permission to manage this project # (Add any necessary authorization logic here) try: conn = get_db_connection() cursor = conn.cursor() # Fetch project data from the database cursor.execute("SELECT * FROM compliance_projects WHERE project_id = ?", (project_id,)) project = cursor.fetchone() # Fetch related compliance status or other details if needed # ... conn.close() if project: # Render the template with project data return render_template('manage_compliance_project.html', project=project) else: return f"No project found with ID {project_id}", 404 except Exception as e: # Handle any exceptions that occur return f"An error occurred: {e}", 500 @app.route('/get-compliance-details/', methods=['GET']) def get_compliance_details(compliance_type): if 'user_id' not in session: return jsonify({'error': 'Unauthorized'}), 401 conn = get_db_connection() cursor = conn.cursor() try: cursor.execute(f"SELECT * FROM {compliance_type} ORDER BY id") controls = cursor.fetchall() chapters = {} for control in controls: control_dict = {key: control[key] for key in control.keys()} # Convert Row object to dictionary chapter_name = control_dict['chapter_name'] if chapter_name not in chapters: chapters[chapter_name] = [] chapters[chapter_name].append(control_dict) return jsonify(chapters) except Exception as e: return jsonify({'error': str(e)}), 500 finally: conn.close() @app.route('/get-compliance-status/', methods=['GET']) def get_compliance_status(project_id): conn = get_db_connection() cursor = conn.cursor() try: # SQL query to select control_id and control_status where project_id matches cursor.execute("SELECT control_id, control_status FROM customer_compliance_status WHERE project_id = ?", (project_id,)) results = cursor.fetchall() # Convert results to a list of dictionaries compliance_status = [{'control_id': row[0], 'control_status': row[1]} for row in results] return jsonify(compliance_status) except Exception as e: return jsonify({'error': str(e)}), 500 finally: conn.close() @app.route('/project-dashboard') def project_dashboard(): projectName = request.args.get('projectName') complianceType = request.args.get('complianceType') # Additional logic to handle projectName and complianceType return render_template('project_dashboard.html', projectName=projectName, complianceType=complianceType) @app.route('/dashboard') @jwt_required() def dashboard(): role = check_login() # Set up logging for debugging logging.basicConfig(level=logging.INFO) if role: if role == 'Admin': logging.info('Serving admin dashboard') # Render an admin-specific template return render_template('core_admin.html') elif role == 'Consultant': logging.info('Serving consultant dashboard') # Render a consultant-specific template return render_template('core_consultant.html') elif role == 'Customer': logging.info('Serving customer dashboard') # Render a customer-specific template return render_template('core_customer.html') else: logging.info('No valid role found - Redirecting to logout') return redirect('/logout') else: logging.info('No login found - Redirecting to login') return redirect('/login') # import Functions @app.route('/import-iso27001-csv') def import_iso27001_csv(): if check_login(): try: import_iso27001_from_csv("C:\\Users\\Yigal\\Desktop\\iso_controls.csv") return "ISO 27001 CSV data imported successfully." except Exception as e: return f"An error occurred during ISO 27001 CSV import: {e}", 500 else: return redirect('/login') def import_iso27001_from_csv(csv_file_path): try: conn = get_db_connection() cursor = conn.cursor() with open(csv_file_path, 'r') as csv_file: csv_reader = csv.DictReader(csv_file) for row in csv_reader: sql = ''' INSERT INTO ISO27001 (id, part, chapter_name, control_id, control_text, requirements, Explanation) VALUES (:id, :part, :chapter_name, :control_id, :control_text, :requirements, :Explanation); ''' cursor.execute(sql, row) conn.commit() except Exception as e: print(f"An error occurred: {e}") if conn: conn.rollback() finally: if conn: conn.close() print("ISO 27001 CSV data import process has been completed.") @app.route('/import-nc-ofi-data-csv') def import_nc_ofi_data_csv(): try: import_nc_ofi_data_from_csv("C:\\a\\csv.csv") return "NC OFI data CSV imported successfully." except Exception as e: return f"An error occurred during NC OFI data CSV import: {e}", 500 def import_nc_ofi_data_from_csv(csv_file_path): conn = None try: conn = get_db_connection() cursor = conn.cursor() with open(csv_file_path, 'r', encoding='utf-8-sig') as csv_file: # Note the encoding change here csv_reader = csv.DictReader(csv_file, delimiter=',') # Adjusting the delimiter to comma print("CSV Headers:", csv_reader.fieldnames) # Debugging line for row in csv_reader: # Ensure control_id is treated as text row['control_id'] = str(row['control_id']) sql = ''' INSERT INTO nc_ofi_data (control_id, NC_Option1, NC_Option2, NC_Option3, NC_Option4, OFI_Option1, OFI_Option2, OFI_Option3, OFI_Option4) VALUES (:control_id, :NC_Option1, :NC_Option2, :NC_Option3, :NC_Option4, :OFI_Option1, :OFI_Option2, :OFI_Option3, :OFI_Option4) ON CONFLICT(control_id) DO UPDATE SET NC_Option1=excluded.NC_Option1, NC_Option2=excluded.NC_Option2, NC_Option3=excluded.NC_Option3, NC_Option4=excluded.NC_Option4, OFI_Option1=excluded.OFI_Option1, OFI_Option2=excluded.OFI_Option2, OFI_Option3=excluded.OFI_Option3, OFI_Option4=excluded.OFI_Option4; ''' cursor.execute(sql, row) conn.commit() except Exception as e: print(f"An error occurred: {e}") if conn: conn.rollback() finally: if conn: conn.close() print("NC OFI data CSV import process has been completed.") @app.route('/import-soa-csv') def import_soa_csv(): try: import_soa_default_from_csv("C:\\a\\soa.csv") return "CSV data imported successfully." except Exception as e: return f"An error occurred during CSV import: {e}", 500 def import_soa_default_from_csv(csv_file_path): try: conn = get_db_connection() cursor = conn.cursor() with open(csv_file_path, 'r', encoding='utf-8') as csv_file: csv_reader = csv.DictReader(csv_file) for row in csv_reader: # Ensure each key exists in row dictionary with a fallback to None if missing data = { 'control_id': row.get('control_id'), 'control_text': row.get('control_text'), 'remarks': row.get('remarks'), 'current_control': row.get('current_control'), 'remarks_implementation': row.get('remarks_implementation'), 'accountability': row.get('accountability') } sql = ''' INSERT INTO soa_default (control_id, control_text, remarks, current_control, remarks_implementation, accountability) VALUES (:control_id, :control_text, :remarks, :current_control, :remarks_implementation, :accountability); ''' cursor.execute(sql, data) conn.commit() except Exception as e: print(f"An error occurred: {e}") if conn: conn.rollback() finally: if conn: conn.close() print("SOA Default CSV data import process has been completed.") @app.route('/import-goals-csv') def import_goals_csv(): try: import_goals_from_csv("C:\\a\\goals.csv") return "Goals CSV data imported successfully." except Exception as e: return f"An error occurred during Goals CSV import: {e}", 500 def import_goals_from_csv(csv_file_path): conn = None try: conn = get_db_connection() cursor = conn.cursor() with open(csv_file_path, 'r', encoding='utf-8') as csv_file: csv_reader = csv.DictReader(csv_file) for row in csv_reader: data = { 'security_goal': row.get('security_goal'), 'baseline': row.get('baseline'), 'target': row.get('target'), 'description': row.get('description'), 'due_date': row.get('due_date'), 'internal_time': row.get('internal_time'), 'owner': row.get('owner'), 'status': row.get('status') } sql = ''' INSERT INTO goals (security_goal, baseline, target, description, due_date, internal_time, owner, status) VALUES (:security_goal, :baseline, :target, :description, :due_date, :internal_time, :owner, :status); ''' cursor.execute(sql, data) conn.commit() except Exception as e: print(f"An error occurred: {e}") if conn: conn.rollback() finally: if conn: conn.close() print("Goals CSV data import process has been completed.") @app.route('/import-kpis-csv') def import_kpis_csv(): try: import_kpis_from_csv("C:\\a\\kpis.csv") return "KPIs CSV data imported successfully." except Exception as e: return f"An error occurred during KPIs CSV import: {e}", 500 def import_kpis_from_csv(csv_file_path): conn = None try: conn = get_db_connection() cursor = conn.cursor() with open(csv_file_path, 'r', encoding='utf-8') as csv_file: csv_reader = csv.DictReader(csv_file) for row in csv_reader: data = { 'objective': row.get('objective'), 'audit_mechanisms': row.get('audit_mechanisms'), 'periodical_review': row.get('periodical_review'), 'target': row.get('target'), 'future_target': row.get('future_target'), 'owner': row.get('owner'), 'due_date': row.get('due_date'), 'results': row.get('results') } sql = ''' INSERT INTO kpis (objective, audit_mechanisms, periodical_review, target, future_target, owner, due_date, results) VALUES (:objective, :audit_mechanisms, :periodical_review, :target, :future_target, :owner, :due_date, :results); ''' cursor.execute(sql, data) conn.commit() except Exception as e: print(f"An error occurred: {e}") if conn: conn.rollback() finally: if conn: conn.close() print("KPIs CSV data import process has been completed.") @app.route('/get-user-id', methods=['GET']) def get_user_id(): # Check if 'user_id' is available in the session user_id = session.get('user_id') if user_id is not None: return jsonify({'user_id': user_id}), 200 else: return 'User is not authenticated or session data is missing.', 401 @app.route('/internal-audit') def internal_audit(): # Ensure user is authenticated if 'user_id' not in session: return redirect(url_for('login')) return render_template('internal_audit.html') @app.route('/get-audit-data') def get_audit_data(): # Ensure user is authenticated if 'user_id' not in session: return jsonify({'error': 'Unauthorized'}), 401 project_id = request.args.get('project_id') # Get project_id from query parameter if not project_id: return jsonify({'error': 'Project ID is required'}), 400 try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT * FROM customer_compliance_status WHERE project_id = ?""", (project_id,)) data = cursor.fetchall() # Convert data to a JSON-friendly format data_list = [] for row in data: data_list.append({ 'control_id': row['control_id'], 'non_conformity': row['Non-Conformity'], 'opportunity': row['Opportunity_for_Improvement'], # Add other fields as needed }) return jsonify(data_list) except Exception as e: app.logger.error(str(e)) return jsonify({'error': 'Internal Server Error'}), 500 finally: conn.close() @app.route('/get-ia-records') def fetch_ia_records(): project_id = request.args.get('project_id') project_name = request.args.get('project_name') if not project_id and not project_name: return jsonify({'error': 'Either project_id or project_name is required'}), 400 query = "SELECT * FROM internal_audit_records" conditions = [] params = [] if project_id: conditions.append("project_id = ?") params.append(project_id) if project_name: conditions.append("project_name = ?") params.append(project_name) if conditions: query += " WHERE " + " AND ".join(conditions) conn = get_db_connection() cursor = conn.cursor() cursor.execute(query, tuple(params)) records = cursor.fetchall() conn.close() if records: records_list = [dict(record) for record in records] return jsonify(records_list) else: return jsonify({'recordFound': False}) @app.route('/add-ia-record') def add_ia_record(): project_id = request.args.get('project_id') project_name = request.args.get('project_name') if not project_id or not project_name: # Handle the case where project ID or name is not provided return "Project ID or name is missing", 400 return render_template('ia_form.html', project_id=project_id, project_name=project_name) @app.route('/submit-ia-form', methods=['POST']) def submit_ia_form(): # Extracting data directly from request.form print(request.form) project_id = request.form.get('project_id') project_name = request.form.get('project_name') customer_name = request.form.get('customer_name') created_by = request.form.get('creator_name') month_started = request.form.get('starting_month') month_finished = request.form.get('finished_month') current_date = datetime.now().strftime("%d/%m/%Y") date_of_submission = current_date # Validate required fields if not all([project_id, project_name, customer_name, created_by, month_started, month_finished]): return jsonify({"error": "Required field is missing"}), 400 # Database insertion logic try: conn = get_db_connection() # Replace with your database connection function cursor = conn.cursor() cursor.execute(""" INSERT INTO internal_audit_records (project_id, project_name, customer_name, date_of_submission, created_by, month_started, month_finished) VALUES (?, ?, ?, ?, ?, ?, ?)""", (project_id, project_name, customer_name, date_of_submission, created_by, month_started, month_finished)) conn.commit() except Exception as e: conn.rollback() return jsonify({"error": str(e)}), 500 finally: conn.close() return jsonify({"message": "Internal Audit Record added successfully"}), 200 @app.route('/fetch-ia-data', methods=['GET']) def fetch_ia_data(): project_id = request.args.get('projectId') # Ensure both project_id and project_name are provided if not project_id: return jsonify({'error': 'Project ID Is required'}), 400 try: conn = get_db_connection() # Replace with your database connection function cursor = conn.cursor() cursor.execute(""" SELECT * FROM internal_audit_records WHERE project_id = ? """, project_id) record = cursor.fetchone() if record: # Convert the record to a dict or a similar structure that can be JSON serialized ia_data = { 'id': record[0], 'project_id': record[1], 'project_name': record[2], 'customer_name': record[3], 'date_of_submission': record[4], 'created_by': record[5], 'month_started': record[6], 'month_finished': record[7] } return jsonify(ia_data), 200 else: return jsonify({'message': 'No record found'}), 404 except Exception as e: return jsonify({'error': str(e)}), 500 finally: conn.close() @app.route('/get-audit-findings', methods=['GET']) def get_audit_findings(): project_id = request.args.get('project_id') conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT control_id, notes, "Non-Conformity", "Opportunity_for_Improvement" FROM customer_compliance_status WHERE project_id = ? """, (project_id,)) # Fetch all rows from the query result rows = cursor.fetchall() # Initialize an empty list to store the findings findings_list = [] # Iterate through the rows and convert them to dictionaries for row in rows: finding = { 'control_id': row['control_id'], 'notes': row['notes'], 'non_conformity': row['Non-Conformity'], 'opportunity_for_improvement': row['Opportunity_for_Improvement'] } findings_list.append(finding) return jsonify(findings_list) @app.route('/get-control-text/', methods=['GET']) def get_control_text(control_id): conn = get_db_connection() cursor = conn.cursor() try: # SQL query to select control_text where control_id matches cursor.execute("SELECT control_text FROM ISO27001 WHERE control_id = ?", (control_id,)) result = cursor.fetchone() if result: return jsonify({'control_text': result[0]}) else: return jsonify({'error': 'Control ID not found'}), 404 except Exception as e: return jsonify({'error': str(e)}), 500 finally: conn.close() @app.route('/get-gap-analysis-options/', methods=['GET']) def get_gap_analysis_options(controlId): try: conn = get_db_connection() # Assuming you have a function to get DB connection cursor = conn.cursor() cursor.execute( "SELECT NC_Option1, NC_Option2, NC_Option3, NC_Option4, OFI_Option1, OFI_Option2, OFI_Option3, OFI_Option4 FROM nc_ofi_data WHERE control_id = ?", (controlId,)) row = cursor.fetchone() if row: nonConformityOptions = [opt for opt in row[:4] if opt] # Filter out None or empty values opportunityOptions = [opt for opt in row[4:] if opt] # Filter out None or empty values else: nonConformityOptions = [] opportunityOptions = [] return jsonify({ 'nonConformityOptions': nonConformityOptions, 'opportunityOptions': opportunityOptions }), 200 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/non-conformities/', methods=['GET']) def get_non_conformities(project_id): conn = get_db_connection() cursor = conn.cursor() try: query = """ SELECT control_id, notes, "Non-Conformity" FROM customer_compliance_status WHERE project_id = ? AND "Non-Conformity" IS NOT NULL """ cursor.execute(query, (project_id,)) non_conformities = cursor.fetchall() # Convert to list of dicts for JSON response non_conformities_list = [dict(row) for row in non_conformities] return jsonify(non_conformities_list) except Exception as e: return jsonify({"error": str(e)}), 500 finally: conn.close() @app.route('/risk-assessment-treatment/') def risk_assessment_treatment(project_id): # Directly pass the project_id to the template without additional data fetching return render_template('Riskassessment&Treatment.html', project_id=project_id) @app.route('/goals-kpis/') def goals_kpis(project_id): # Directly pass the project_id to the template without additional data fetching return render_template('Goals&kpis.html', project_id=project_id) @app.route('/submit-ra-rtp', methods=['POST']) def submit_ra_rtp_form(): data = request.json project_id = data.get('project_id') project_name = data.get('project_name') customer_name = data.get('customer_name') created_by = data.get('created_by') # Use created_by as per JSON sent from client version_number = data.get('version_number') current_date = datetime.now().strftime("%d/%m/%Y") date_of_submission = current_date # Validate required fields if not all([project_id, project_name, customer_name, created_by, version_number]): return jsonify({"error": "Required field is missing"}), 400 # Database insertion logic try: conn = get_db_connection() # Use your actual database connection function cursor = conn.cursor() cursor.execute(""" INSERT INTO RA_RTP_records (project_id, project_name, customer_name, date_of_submission, created_by, Version_Number) VALUES (?, ?, ?, ?, ?, ?)""", (project_id, project_name, customer_name, date_of_submission, created_by, version_number)) conn.commit() except Exception as e: conn.rollback() return jsonify({"error": str(e)}), 500 finally: conn.close() return jsonify({"message": "RA-RTP Record added successfully"}), 200 @app.route('/fetch-ra-rtp-data', methods=['GET']) def fetch_ra_rtp_data(): project_id = request.args.get('projectId') # Ensure project_id is provided if not project_id: return jsonify({'error': 'Project ID is required'}), 400 try: conn = get_db_connection() # Use your actual database connection function cursor = conn.cursor() cursor.execute(""" SELECT * FROM RA_RTP_records WHERE project_id = ? """, (project_id,)) record = cursor.fetchone() if record: # Convert the record to a dict or a similar structure that can be JSON serialized ra_rtp_data = { 'id': record[0], 'project_id': record[1], 'project_name': record[2], 'date_of_submission': record[3], 'created_by': record[4], 'version_number': record[5], 'customer_name': record[6] } return jsonify(ra_rtp_data), 200 else: return jsonify({'message': 'No RA-RTP record found'}), 404 except Exception as e: return jsonify({'error': str(e)}), 500 finally: conn.close() @app.route('/api/save-risk-assessment', methods=['POST']) def save_risk_assessment(): if not request.is_json: app.logger.error("Request content type is not JSON") return jsonify({'error': 'Content type must be application/json'}), 400 data = request.get_json() app.logger.info(f"Data received for saving risk assessment: {data}") if 'risks' not in data or not isinstance(data['risks'], list): app.logger.error(f"Invalid data structure: {data}") return jsonify({'error': 'Expected data structure: {"risks": [list of risks]}'}), 400 risks = data['risks'] try: conn = get_db_connection() cursor = conn.cursor() for record in risks: # Attempt to update an existing record cursor.execute(''' UPDATE risk_assessment_table SET asset_name = ?, risk_owner = ?, threat = ?, vulnerability = ?, impact = ?, probability = ?, risk = ?, treatment_options = ?, security_controls = ?, implementation_technique = ?, due_date = ?, impact_after_treatment = ?, probability_after_treatment = ?, target_residual_risk = ?, hours = ? WHERE project_id = ? AND record_no = ? ''', (record.get('assetName'), record.get('riskOwner'), record.get('threat'), record.get('vulnerability'), record.get('impact'), record.get('probability'), record.get('risk'), record.get('treatmentOptions'), record.get('securityControls'), record.get('implementationTechnique'), record.get('dueDate'), record.get('impactAfterTreatment'), record.get('probabilityAfterTreatment'), record.get('targetResidualRisk'), record.get('hours'), data.get('projectId'), record.get('no'))) if cursor.rowcount == 0: # If no rows were updated, insert a new record cursor.execute(''' INSERT INTO risk_assessment_table (project_id, record_no, asset_name, risk_owner, threat, vulnerability, impact, probability, risk, treatment_options, security_controls, implementation_technique, due_date, impact_after_treatment, probability_after_treatment, target_residual_risk, hours) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (data.get('projectId'), record.get('no'), record.get('assetName'), record.get('riskOwner'), record.get('threat'), record.get('vulnerability'), record.get('impact'), record.get('probability'), record.get('risk'), record.get('treatmentOptions'), record.get('securityControls'), record.get('implementationTechnique'), record.get('dueDate'), record.get('impactAfterTreatment'), record.get('probabilityAfterTreatment'), record.get('targetResidualRisk'), record.get('hours'))) conn.commit() except Exception as e: conn.rollback() app.logger.error(f"Error saving risk assessment data: {e}", exc_info=True) return jsonify({'error': str(e)}), 500 finally: if conn: conn.close() return jsonify({'message': 'Risk assessment data saved successfully'}), 200 @app.route('/api/fetch-risk-assessment-data', methods=['GET']) def fetch_risk_assessment_data(): # Retrieve a specific project ID if provided as a query parameter project_id = request.args.get('projectId', default=None, type=str) # Check if project_id is provided if project_id is None: return jsonify({'error': 'Project ID is required'}), 400 try: conn = get_db_connection() # Use your function to connect to the SQLite database cursor = conn.cursor() # Fetch records for the specified project ID cursor.execute(''' SELECT * FROM risk_assessment_table WHERE project_id = ? ''', (project_id,)) records = cursor.fetchall() # Convert records to a list of dictionaries risk_assessments = [ { 'id': row['id'], 'record_no': row['record_no'], 'project_id': row['project_id'], 'asset_name': row['asset_name'], 'risk_owner': row['risk_owner'], 'threat': row['threat'], 'vulnerability': row['vulnerability'], 'impact': row['impact'], 'probability': row['probability'], 'risk': row['risk'], 'treatment_options': row['treatment_options'], 'security_controls': row['security_controls'], 'implementation_technique': row['implementation_technique'], 'due_date': row['due_date'], 'impact_after_treatment': row['impact_after_treatment'], 'probability_after_treatment': row['probability_after_treatment'], 'target_residual_risk': row['target_residual_risk'], 'hours': row['hours'] } for row in records ] return jsonify(risk_assessments), 200 except Exception as e: app.logger.error(f"Error fetching risk assessment data: {e}", exc_info=True) return jsonify({'error': str(e)}), 500 finally: if conn: conn.close() @app.route('/management-review/') def management_review(project_id): # Check if 'user_id' is stored in the session if 'user_id' not in session: # Handle the case where 'user_id' is not in the session, indicating the user might not be logged in return redirect(url_for('login')) # Redirect to login page or any other appropriate action user_id = session['user_id'] # Retrieve 'user_id' from the session # Fetch project details from the database based on project_id conn = get_db_connection() cursor = conn.cursor() cursor.execute('SELECT * FROM compliance_projects WHERE project_id = ?', (project_id,)) project = cursor.fetchone() conn.close() # Check if the project exists if project: # Convert project row to a dictionary (if necessary) and pass it to the template project_details = dict(project) # Assuming the project row can be directly converted to a dictionary # Pass 'project_details' and 'user_id' to the template return render_template('management_review.html', project=project_details, user_id=user_id) else: # Handle the case where the project does not exist return 'Project not found', 404 def main(): logging.info("Starting the application...") app.run(host='0.0.0.0', port=5050, debug=True) if __name__ == '__main__': main()