from decimal import Decimal from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json import mysql.connector from mysql.connector import Error import config import openpyxl import os import re import ast from datetime import datetime import pandas as pd from openpyxl.styles import Font from flask import Flask, render_template, request, redirect, url_for, flash, session from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user from ldap3 import Server, Connection, ALL import logging from logging.handlers import RotatingFileHandler from flask import Flask, request, render_template from ldap3 import Server, Connection, ALL, SUBTREE from ldap3.core.exceptions import LDAPBindError from flask import request, session from flask import request, render_template, current_app , send_file from openpyxl import load_workbook from openpyxl.styles import Font, Alignment from openpyxl.utils import get_column_letter from openpyxl import Workbook import config from flask import send_from_directory from decimal import Decimal from datetime import datetime from openpyxl.styles import Font, PatternFill def log_action(action, details=""): """Log user actions with timestamp, user, action, and details.""" log_file = os.path.join(current_app.root_path, 'activity.log') timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Prefer LDAP common name (cn), fallback to username, else Unknown if hasattr(current_user, "cn") and current_user.cn: user = current_user.cn elif hasattr(current_user, "username") and current_user.username: user = current_user.username elif hasattr(current_user, "sAMAccountName") and current_user.sAMAccountName: user = current_user.sAMAccountName else: user = "Unknown" with open(log_file, "a", encoding="utf-8") as f: f.write( f"Timestamp: {timestamp} | " f"User: {user} | " f"Action: {action} | " f"Details: {details}\n" ) # this is server app = Flask(__name__) login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' class User(UserMixin): def __init__(self, id): self.id = id @login_manager.user_loader def load_user(user_id): return User(user_id) app.secret_key = '9f2a1b8c4d6e7f0123456789abcdef01' str_pattern_reg = "^[A-Za-z ]+$" class ResponseHandler: @staticmethod def invalid_name(entity): return {'status': 'error', 'message': f'Invalid {entity} name. Only letters are allowed!'} @staticmethod def already_exists(entity): return {'status': 'exists', 'message': f'{entity.capitalize()} already exists!'} @staticmethod def add_success(entity): return {'status': 'success', 'message': f'{entity.capitalize()} added successfully!'} @staticmethod def add_failure(entity): return {'status': 'error', 'message': f'Failed to add {entity}.'} @staticmethod def is_available(entity): return {'status': 'available', 'message': f'{entity.capitalize()} name is available!'} @staticmethod def delete_success(entity): return {'status': 'success', 'message': f'{entity.capitalize()} deleted successfully!'} @staticmethod def delete_failure(entity): return {'status': 'error', 'message': f'Failed to delete {entity}.'} @staticmethod def update_success(entity): return {'status': 'success', 'message': f'{entity.capitalize()} updated successfully!'} @staticmethod def update_failure(entity): return {'status': 'error', 'message': f'Failed to update {entity}.'} @staticmethod def fetch_failure(entity): return f"Failed to fetch {entity}" # Helper: JSON Response Formatter def json_response(message_obj, status_code): return jsonify(message_obj), status_code # this is Index page OR Home page.. @app.route('/') @login_required def index(): return render_template('index.html') # ---------------- LOGIN ROUTE ---------------- @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'].strip() password = request.form['password'] # Static fallback user if username == 'admin' and password == 'admin123': session['username'] = username log_action('Login', f"User {username} logged in (static user)") login_user(User(username)) return redirect(url_for('index', login='success')) ldap_user_dn = f"uid={username},ou=users,dc=lcepl,dc=org" try: # LDAP authentication conn = Connection( Server('ldap://localhost:389', get_info=ALL), user=ldap_user_dn, password=password, auto_bind=True ) # If bind successful → set session and log session['username'] = username log_action('Login', f"User {username} logged in (LDAP)") login_user(User(username)) return redirect(url_for('index', login='success')) except LDAPBindError: flash('Invalid credentials.', 'danger') except Exception as e: flash(f'LDAP error: {str(e)}', 'danger') return render_template('login.html') @app.route('/logout') @login_required def logout(): log_action('Logout', f"User {current_user.id} logged out") # log the event logout_user() flash('You have been logged out.', 'info') return redirect(url_for('login')) from flask_login import login_required from dateutil import parser # pip install python-dateutil @app.route('/activity_log', methods=['GET', 'POST']) @login_required def activity_log(): logs = [] log_file = os.path.join(current_app.root_path, 'activity.log') if os.path.exists(log_file): with open(log_file, 'r') as f: for line in f: parts = line.strip().split(" | ") if len(parts) == 4: logs.append({ "timestamp": parts[0].replace("Timestamp:", "").strip(), "user": parts[1].replace("User:", "").strip(), "action": parts[2].replace("Action:", "").strip(), "details": parts[3].replace("Details:", "").strip() }) # Filters (GET or POST) start_date = request.values.get("start_date") end_date = request.values.get("end_date") username = request.values.get("username") filtered_logs = logs # Date filter if start_date or end_date: try: start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else datetime.min end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else datetime.max end_dt = end_dt.replace(hour=23, minute=59, second=59) filtered_logs = [ log for log in filtered_logs if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt ] except Exception as e: print("Date filter error:", e) # Username filter if username: filtered_logs = [log for log in filtered_logs if username.lower() in log["user"].lower()] return render_template( "activity_log.html", logs=filtered_logs, start_date=start_date, end_date=end_date, username=username ) # ------------------------- State controller ------------------------------------------ @app.route('/add_state', methods=['GET', 'POST']) @login_required def add_state(): connection = config.get_db_connection() statedata = [] if connection: cursor = connection.cursor() if request.method == 'POST': state_name = request.form['state_Name'].strip() log_action("Add State", f"User {current_user.id} added state '{state_name}'") if not re.match(str_pattern_reg, state_name): return json_response(ResponseHandler.invalid_name("state"), 400) try: # cursor.execute("SELECT * FROM states WHERE State_Name = %s", (state_name,)) # if cursor.fetchone(): # return json_response(ResponseHandler.already_exists("state"), 409) cursor.callproc("CheckStateExists", (state_name,)) for data in cursor.stored_results(): existing_state = data.fetchone() if existing_state: return json_response(ResponseHandler.already_exists("state"), 409) # cursor.execute("call SaveState (%s)", (state_name,)) cursor.callproc("SaveState", (state_name,)) connection.commit() return json_response(ResponseHandler.add_success("state"), 200) except mysql.connector.Error as e: print(f"Error inserting state: {e}") return json_response(ResponseHandler.add_failure("state"), 500) try: # cursor.execute("SELECT State_ID, State_Name FROM states") # statedata = cursor.fetchall() cursor.callproc("GetAllStates") for res in cursor.stored_results(): statedata = res.fetchall() except mysql.connector.Error as e: print(f"Error fetching states: {e}") return ResponseHandler.fetch_failure("states"), 500 finally: cursor.close() connection.close() return render_template('add_state.html', statedata=statedata) # AJAX route to check state existence @app.route('/check_state', methods=['POST']) @login_required def check_state(): connection = config.get_db_connection() if connection: cursor = connection.cursor() state_name = request.json.get('state_Name', '').strip() log_action("Check State", f"User {current_user.id} Checked state '{state_name}'") if not re.match(str_pattern_reg, state_name): return json_response(ResponseHandler.invalid_name("state"), 400) try: # cursor.execute("SELECT * FROM states WHERE State_Name = %s", (state_name,)) # existing_state = cursor.fetchone() cursor.callproc("CheckStateExists", (state_name,)) for data in cursor.stored_results(): existing_state = data.fetchone() if existing_state: return json_response(ResponseHandler.already_exists("state"), 409) else: return json_response(ResponseHandler.is_available("state"), 200) except mysql.connector.Error as e: print(f"Error checking state: {e}") return json_response(ResponseHandler.add_failure("state"), 500) finally: cursor.close() connection.close() # Delete State @app.route('/delete_state/', methods=['GET']) @login_required def deleteState(id): connection = config.get_db_connection() cursor = connection.cursor() log_action("Delete State", f"User {current_user.id} Deleted state '{id}'") try: # cursor.execute("DELETE FROM states WHERE State_ID = %s", (id,)) cursor.callproc('DeleteState', (id,)) connection.commit() # For API response # return json_response(ResponseHandler.delete_success("state"), 200) except mysql.connector.Error as e: print(f"Error deleting data: {e}") return json_response(ResponseHandler.delete_failure("state"), 500) finally: cursor.close() connection.close() return redirect(url_for('add_state')) # Edit State @app.route('/edit_state/', methods=['GET', 'POST']) @login_required def editState(id): connection = config.get_db_connection() cursor = connection.cursor() # str_pattern_reg = r"^[A-Za-z\s]+$" if request.method == 'POST': state_name = request.form['state_Name'].strip() log_action("Edit State", f"User {current_user.id} Edited state '{state_name}'") if not re.match(str_pattern_reg, state_name): return ResponseHandler.invalid_name("state"), 400 try: # cursor.execute("UPDATE states SET State_Name = %s WHERE State_ID = %s", (state_name, id)) cursor.callproc("UpdateStateById", (id, state_name)) connection.commit() return redirect(url_for('add_state')) except mysql.connector.Error as e: print(f"Error updating data: {e}") return ResponseHandler.add_failure("state"), 500 finally: cursor.close() connection.close() try: # cursor.execute("SELECT * FROM states WHERE State_ID = %s", (id,)) # state = cursor.fetchone() cursor.callproc("GetStateByID", (id,)) for result in cursor.stored_results(): state = result.fetchone() if state is None: return "State not found", 404 except mysql.connector.Error as e: print(f"Error retrieving data: {e}") return ResponseHandler.fetch_failure("state"), 500 finally: cursor.close() connection.close() return render_template('edit_state.html', state=state) # -------- end State controller ----------- # ------------------------- District controller ------------------------------------------ @app.route('/add_district', methods=['GET', 'POST']) @login_required def add_district(): connection = config.get_db_connection() districtdata = [] states = [] if connection: cursor = connection.cursor() try: # cursor.execute("SELECT State_ID, State_Name FROM states") # states = cursor.fetchall() cursor.callproc("GetAllStates") for res in cursor.stored_results(): states = res.fetchall() except mysql.connector.Error as e: print(f"Error fetching states: {e}") return ResponseHandler.fetch_failure("states"), 500 if request.method == 'POST': district_name = request.form['district_Name'].strip() state_id = request.form['state_Id'] log_action("Add District", f"User {current_user.id} Added District '{district_name}'") if not re.match(str_pattern_reg, district_name): return json_response(ResponseHandler.invalid_name("district"), 400) try: # cursor.execute("SELECT * FROM districts WHERE District_Name = %s AND State_Id = %s", # (district_name, state_id)) cursor.callproc("GetDistrictByNameAndState", (district_name, state_id)) for data in cursor.stored_results(): rs = data.fetchone() if rs: return json_response(ResponseHandler.already_exists("district"), 409) cursor.callproc('SaveDistrict', (district_name, state_id)) connection.commit() return json_response(ResponseHandler.add_success("district"), 200) except mysql.connector.Error as e: print(f"Error inserting district: {e}") return json_response(ResponseHandler.add_failure("district"), 500) try: # cursor.execute("SELECT d.District_id, d.District_Name, s.State_Name, s.State_Id FROM districts d JOIN states s ON d.State_Id = s.State_ID") # districtdata = cursor.fetchall() cursor.callproc("GetAllDistricts") for dis in cursor.stored_results(): districtdata = dis.fetchall() except mysql.connector.Error as e: print(f"Error fetching districts: {e}") return ResponseHandler.fetch_failure("districts"), 500 finally: cursor.close() connection.close() return render_template('add_district.html', districtdata=districtdata, states=states) # AJAX route to check district existence @app.route('/check_district', methods=['POST']) @login_required def check_district(): connection = config.get_db_connection() if connection: cursor = connection.cursor() district_name = request.json.get('district_Name', '').strip() state_id = request.json.get('state_Id', '') log_action("Check District", f"User {current_user.id} Checked District '{district_name}'") if not re.match(str_pattern_reg, district_name): return json_response(ResponseHandler.invalid_name("district"), 400) try: # cursor.execute("SELECT * FROM districts WHERE District_Name = %s AND State_Id = %s", # (district_name, state_id)) # existing_district = cursor.fetchone() cursor.callproc("GetDistrictByNameAndState", (district_name, state_id,)) for result in cursor.stored_results(): existing_district = result.fetchone() if existing_district: return json_response(ResponseHandler.already_exists("district"), 409) else: return json_response(ResponseHandler.is_available("district"), 200) except mysql.connector.Error as e: print(f"Error checking district: {e}") return json_response(ResponseHandler.add_failure("district"), 500) finally: cursor.close() connection.close() # this is delete District method by id.. @app.route('/delete_district/', methods=['GET', 'POST']) @login_required def delete_district(district_id): connection = config.get_db_connection() if connection: cursor = connection.cursor() try: # cursor.execute("DELETE FROM districts WHERE District_id = %s", (district_id,)) cursor.callproc("DeleteDistrict", (district_id,)) connection.commit() log_action("Delete District", f"User {current_user.id} Deleted District '{district_id}'") except mysql.connector.Error as e: print(f"Error deleting district: {e}") return json_response(ResponseHandler.delete_failure("district"), 500) finally: cursor.close() connection.close() return redirect('/add_district') # this is update District page by id .. @app.route('/edit_district/', methods=['GET', 'POST']) @login_required def edit_district(district_id): connection = config.get_db_connection() districtdata = [] states = [] if connection: cursor = connection.cursor() # Retrieve all states for dropdown try: # cursor.execute("SELECT State_ID, State_Name FROM states") # states = cursor.fetchall() cursor.callproc("GetAllStates") for res in cursor.stored_results(): states = res.fetchall() except mysql.connector.Error as e: print(f"Error fetching states: {e}") return ResponseHandler.fetch_failure("states"), 500 # Retrieve district info try: # cursor.execute("SELECT District_Name, State_Id FROM districts WHERE District_id = %s", (district_id,)) # districtdata = cursor.fetchone() cursor.callproc("GetDistrictDataByID", (district_id,)) for rs in cursor.stored_results(): districtdata = rs.fetchone() log_action("Edit District", f"User {current_user.id} Edited District '{district_id}'") except mysql.connector.Error as e: print(f"Error fetching district data: {e}") return ResponseHandler.fetch_failure("district"), 500 # Handle update if request.method == 'POST': district_name = request.form['district_Name'] state_id = request.form['state_Id'] try: # cursor.execute( "UPDATE districts SET District_Name = %s, State_Id = %s WHERE District_id = %s", # (district_name, state_id, district_id) ) cursor.callproc("UpdateDistrict", (district_id, state_id, district_name,)) connection.commit() except mysql.connector.Error as e: print(f"Error updating district: {e}") return ResponseHandler.update_failure("district"), 500 return redirect('/add_district') return render_template('edit_district.html', districtdata=districtdata, states=states) # --------- end District controller ------------- # ------------------------- Block controller ------------------------------------------ @app.route('/add_block', methods=['GET', 'POST']) @login_required def add_block(): connection = config.get_db_connection() block_data = [] states = [] if connection: cursor = connection.cursor() try: # cursor.execute("SELECT State_ID, State_Name FROM states") # states = cursor.fetchall() cursor.callproc("GetAllStates") for res in cursor.stored_results(): states = res.fetchall() except mysql.connector.Error as e: print(f"Error fetching states: {e}") return json_response(ResponseHandler.fetch_failure("states"), 500) if request.method == 'POST': block_name = request.form['block_Name'].strip() district_id = request.form['district_Id'] log_action("Add Block", f"User {current_user.id} Added block '{block_name}'") if not re.match(str_pattern_reg, block_name): return json_response(ResponseHandler.invalid_name("block"), 400) try: # cursor.execute("SELECT * FROM blocks WHERE Block_Name = %s AND District_id = %s", # (block_name, district_id)) # existing_block = cursor.fetchone() cursor.callproc("GetBlockByNameAndDistrict", (block_name, district_id,)) for rs in cursor.stored_results(): existing_block = rs.fetchone() if existing_block: return json_response(ResponseHandler.already_exists("block"), 409) cursor.callproc('SaveBlock', (block_name, district_id)) connection.commit() return json_response(ResponseHandler.add_success("block"), 200) except mysql.connector.Error as e: print(f"Error adding block: {e}") return json_response(ResponseHandler.add_failure("block"), 500) # Fetch all blocks to display try: # cursor.execute( # """SELECT b.Block_Id, b.Block_Name, d.District_Name # FROM blocks b # JOIN districts d ON b.District_id = d.District_id""" # ) # block_data = cursor.fetchall() cursor.callproc("GetAllBlocks") for blocks in cursor.stored_results(): block_data = blocks.fetchall() except mysql.connector.Error as e: print(f"Error fetching blocks: {e}") return json_response(ResponseHandler.fetch_failure("blocks"), 500) finally: cursor.close() connection.close() return render_template('add_block.html', block_data=block_data, states=states) # check block @app.route('/check_block', methods=['POST']) @login_required def check_block(): connection = config.get_db_connection() cursor = connection.cursor() block_name = request.json.get('block_Name', '').strip() district_id = request.json.get('district_Id', '') log_action("Check Block", f"User {current_user.id} Checked block '{block_name}'") if not re.match(str_pattern_reg, block_name): return json_response(ResponseHandler.invalid_name("block"), 400) # cursor.execute("SELECT * FROM blocks WHERE Block_Name = %s AND District_id = %s", (block_name, district_id)) # existing_block = cursor.fetchone() cursor.callproc("GetBlockByNameAndDistrict", (block_name, district_id)) for rs in cursor.stored_results(): existing_block = rs.fetchone() if existing_block: return json_response(ResponseHandler.already_exists("block"), 409) return json_response(ResponseHandler.is_available("block"), 200) @app.route('/edit_block/', methods=['GET', 'POST']) @login_required def edit_block(block_id): connection = config.get_db_connection() block_data = [] states = [] districts = [] if connection: cursor = connection.cursor() # Retrieve all states try: cursor.callproc("GetAllStates") for rs in cursor.stored_results(): states = rs.fetchall() except mysql.connector.Error as e: print(f"Error fetching states: {e}") return "Failed to fetch states", 500 # Retrieve block data try: cursor.callproc("GetBlockDataByID", (block_id,)) for rs in cursor.stored_results(): block_data = rs.fetchone() log_action("Edit Block", f"User {current_user.id} Edited block '{block_id}'") except mysql.connector.Error as e: print(f"Error fetching block data: {e}") return "Failed to fetch block data", 500 # Handle POST request if request.method == 'POST': block_name = request.form['block_Name'] district_id = request.form['district_Id'] try: # cursor.execute("UPDATE blocks SET Block_Name = %s, District_id = %s WHERE Block_Id = %s", # (block_name, district_id, block_id)) cursor.callproc("UpdateBlockById", (block_name, district_id, block_id,)) connection.commit() flash("Block updated successfully!", "success") return redirect(url_for('add_block', block_id=block_id)) except mysql.connector.Error as e: print(f"Error updating blocks: {e}") return "Failed to update blocks", 500 # Retrieve districts for the dropdown try: cursor.callproc("GetAllDistrictsData") for rs in cursor.stored_results(): districts = rs.fetchall() except mysql.connector.Error as e: print(f"Error fetching districts: {e}") return "Failed to fetch districts", 500 return render_template('edit_block.html', block_data=block_data, states=states, districts=districts) # delete block by id @app.route('/delete_block/', methods=['GET', 'POST']) @login_required def delete_block(block_id): connection = config.get_db_connection() if connection: cursor = connection.cursor() try: # cursor.execute("DELETE FROM blocks WHERE Block_Id = %s", (block_id,)) cursor.callproc("DeleteBlock", (block_id,)) log_action("Delete Block", f"User {current_user.id} Deleted block '{block_id}'") connection.commit() except mysql.connector.Error as e: print(f"Error deleting block: {e}") return json_response(ResponseHandler.add_failure("block"), 500) finally: cursor.close() connection.close() return redirect('/add_block') # this is get district all data by using state id .. @app.route('/get_districts/', methods=['GET']) @login_required def get_districts(state_id): connection = config.get_db_connection() districts = [] if connection: cursor = connection.cursor() try: cursor.callproc("GetDistrictsByStateId", (state_id,)) for dis in cursor.stored_results(): districts = dis.fetchall() log_action("Get District", f"User {current_user.id} Get District '{state_id}'") except mysql.connector.Error as e: print(f"Error fetching districts: {e}") return json_response(ResponseHandler.fetch_failure("districts"), 500) finally: cursor.close() connection.close() return jsonify({ "districts": [{"District_id": d[0], "District_Name": d[1]} for d in districts] }) # ----------- end Block controller ----------------- # ------------------------- Village controller ------------------------------------------ # Route to add a village @app.route('/add_village', methods=['GET', 'POST']) @login_required def add_village(): connection = config.get_db_connection() cursor = connection.cursor() states = [] villages = [] try: cursor.callproc("GetAllStates") for res in cursor.stored_results(): states = res.fetchall() cursor.callproc("GetAllVillages") for result in cursor.stored_results(): villages = result.fetchall() if request.method == 'POST': block_id = request.form.get('block_Id') village_name = request.form.get('Village_Name', '').strip() log_action("Add Villages", f"User {current_user.id} Added Villages '{village_name}'") if not block_id: return json_response(ResponseHandler.add_failure("block"), 400) if not re.match(str_pattern_reg, village_name): return json_response(ResponseHandler.invalid_name("village"), 400) cursor.callproc("GetVillageByNameAndBlock", (village_name, block_id,)) for rs in cursor.stored_results(): existing_village = rs.fetchone() if existing_village: return json_response(ResponseHandler.already_exists("village"), 409) # Insert new village cursor.callproc('SaveVillage', (village_name, block_id)) connection.commit() return json_response(ResponseHandler.add_success("village"), 200) except mysql.connector.Error as e: print(f"Database Error: {e}") return json_response(ResponseHandler.add_failure("village"), 500) finally: cursor.close() connection.close() return render_template('add_village.html', states=states, villages=villages) # get block by district id @app.route('/get_blocks/', methods=['GET']) @login_required def get_blocks(district_id): connection = config.get_db_connection() cursor = connection.cursor() blocks = [] try: cursor.callproc("GetBlocksByDistrict", (district_id,)) for rs in cursor.stored_results(): blocks = rs.fetchall() log_action("Get blocks", f"User {current_user.id} Get Blocks '{district_id}'") except mysql.connector.Error as e: print(f"Error fetching blocks: {e}") return json_response({"error": "Failed to fetch blocks"}, 500) finally: cursor.close() connection.close() return jsonify({"blocks": [{"Block_Id": block[0], "Block_Name": block[1]} for block in blocks]}) # check village @app.route('/check_village', methods=['POST']) @login_required def check_village(): connection = config.get_db_connection() cursor = connection.cursor() block_id = request.form.get('block_Id') village_name = request.form.get('Village_Name', '').strip() log_action("Check villages", f"User {current_user.id} Check villages '{village_name}'") # Validate village name if not re.match(str_pattern_reg, village_name): return json_response(ResponseHandler.invalid_name("village"), 400) if not block_id or not village_name: return json_response({'status': 'error', 'message': 'Block and Village Name are required!'}, 400) cursor.callproc("GetVillageByNameAndBlocks", (village_name, block_id)) for rs in cursor.stored_results(): existing_village = rs.fetchone() cursor.close() connection.close() if existing_village: return json_response(ResponseHandler.already_exists("village"), 409) else: return json_response(ResponseHandler.is_available("village"), 200) @app.route('/edit_village/', methods=['GET', 'POST']) @login_required def edit_village(village_id): connection = config.get_db_connection() village_data = None blocks = [] try: cursor = connection.cursor() cursor.callproc("GetVillageDetailsById", (village_id,)) for rs in cursor.stored_results(): village_data = rs.fetchone() cursor.callproc('GetAllBlocks') for result in cursor.stored_results(): blocks = result.fetchall() if request.method == 'POST': village_name = request.form['Village_Name'] block_id = request.form['block_Id'] log_action("Edit villages", f"User {current_user.id} Edit villages '{village_name}'") if not re.match("^[A-Za-z ]+$", village_name): flash("Invalid village name! Only letters and spaces allowed.", "error") return redirect(url_for('edit_village', village_id=village_id)) cursor.execute("UPDATE villages SET Village_Name = %s, Block_Id = %s WHERE Village_Id = %s", (village_name, block_id, village_id)) connection.commit() flash("Village updated successfully!", "success") return redirect(url_for('edit_village', village_id=village_id)) except mysql.connector.Error as e: print(f"Error: {e}") return "Failed to process request", 500 finally: if cursor: cursor.close() if connection: connection.close() return render_template('edit_village.html', village_data=village_data, blocks=blocks) # delete village @app.route('/delete_village/', methods=['GET', 'POST']) @login_required def delete_village(village_id): connection = config.get_db_connection() cursor = connection.cursor() try: # cursor.execute("DELETE FROM villages WHERE Village_Id = %s", (village_id,)) cursor.callproc("DeleteVillage", (village_id,)) log_action("Delete villages", f"User {current_user.id} Deletedvillages '{village_id}'") connection.commit() # return json_response(ResponseHandler.delete_success("village"), 200) except mysql.connector.Error as e: print(f"Error: {e}") return json_response(ResponseHandler.add_failure("village"), 500) finally: if cursor: cursor.close() if connection: connection.close() return redirect(url_for('add_village')) # ---- end Village controller --------------------- # -------------------------------- Invoice controller ------------------------------------------ @app.route('/add_invoice', methods=['GET', 'POST']) @login_required def add_invoice(): connection = config.get_db_connection() if not connection: return jsonify({"status": "error", "message": "Database connection failed"}), 500 if request.method == 'POST': try: cursor = connection.cursor(dictionary=True) # Get the village name from the form village_name = request.form.get('village') print("village name", village_name) cursor.callproc("GetVillageIdByName", (village_name,)) for rs in cursor.stored_results(): village_result = rs.fetchone() if not village_result: return jsonify({"status": "error", "message": f"Village '{village_name}' not found"}), 400 village_id = village_result['Village_Id'] # Fetch form data pmc_no = request.form.get('pmc_no') work_type = request.form.get('work_type') invoice_details = request.form.get('invoice_details') invoice_date = request.form.get('invoice_date') invoice_no = request.form.get('invoice_no') basic_amount = request.form.get('basic_amount') basic_amount=float(basic_amount) if basic_amount else 0.0 debit_amount = request.form.get('debit_amount') debit_amount=float(debit_amount) if debit_amount else 0.0 after_debit_amount = request.form.get('after_debit_amount') after_debit_amount=float(after_debit_amount) if after_debit_amount else 0.0 amount = request.form.get('amount') amount=float(amount) if amount else 0.0 gst_amount = request.form.get('gst_amount') gst_amount=float(gst_amount) if gst_amount else 0.0 tds_amount = request.form.get('tds_amount') tds_amount=float(tds_amount) if tds_amount else 0.0 sd_amount = request.form.get('sd_amount') sd_amount=float(sd_amount) if sd_amount else 0.0 on_commission = request.form.get('on_commission') on_commission=float(on_commission) if on_commission else 0.0 hydro_testing = request.form.get('hydro_testing') hydro_testing=float(hydro_testing) if hydro_testing else 0.0 gst_sd_amount = request.form.get('gst_sd_amount') gst_sd_amount=float(gst_sd_amount) if gst_sd_amount else 0.0 final_amount = request.form.get('final_amount') final_amount=float(final_amount) if final_amount else 0.0 cursor.callproc('InsertInvoice', [ pmc_no, village_id, work_type, invoice_details, invoice_date, invoice_no, basic_amount, debit_amount, after_debit_amount, amount, gst_amount, tds_amount, sd_amount, on_commission, hydro_testing, gst_sd_amount, final_amount]) log_action("Add invoice", f"User {current_user.id} Added invoice '{ pmc_no}'") for result in cursor.stored_results(): invoice_id = result.fetchone()['invoice_id'] connection.commit() print("This is the invocie id from the invoice table ", invoice_id) subcontractor_id = request.form.get('subcontractor_id') cursor.callproc('AssignSubcontractor', [pmc_no, subcontractor_id, village_id]) connection.commit() # Insert Hold Amounts into invoice_subcontractor_hold_join table hold_types = request.form.getlist('hold_type[]') hold_amounts = request.form.getlist('hold_amount[]') hold_count = 0 for hold_type, hold_amount in zip(hold_types, hold_amounts): cursor.callproc('GetHoldTypeIdByName', [hold_type]) for result in cursor.stored_results(): hold_type_result = result.fetchone() print("hold type from invoice ", hold_type_result) if not hold_type_result: return jsonify({"status": "error", "message": f"Invalid Hold Type: {hold_type}"}), 400 hold_type_id = hold_type_result['hold_type_id'] cursor.callproc('InsertInvoiceSubcontractorHold', [ subcontractor_id, invoice_id, hold_type_id, hold_amount ]) connection.commit() hold_count += 1 print("Hold count from the invoice", hold_count) connection.commit() return jsonify({"status": "success", "message": "Invoice added successfully"}), 201 except mysql.connector.Error as e: connection.rollback() return jsonify({"status": "error", "message": f"Failed to add invoice: {str(e)}"}), 500 finally: cursor.close() connection.close() # GET request: fetch and display all invoices (all fields) along with the form try: cursor = connection.cursor(dictionary=True) cursor.callproc('GetAllInvoiceDetails') for result in cursor.stored_results(): invoices = result.fetchall() villages = [] cursor.callproc("GetAllVillages") for result in cursor.stored_results(): villages = result.fetchall() except mysql.connector.Error as e: print(f"Error: {e}") invoices = [] finally: cursor.close() connection.close() return render_template('add_invoice.html', invoices=invoices, villages=villages) # search subcontraactor to assing invoice @app.route('/search_subcontractor', methods=['POST']) @login_required def search_subcontractor(): connection = config.get_db_connection() if not connection: return json_response(ResponseHandler.fetch_failure("database connection"), 500) sub_query = request.form.get("query") try: cursor = connection.cursor(dictionary=True) cursor.callproc('SearchContractorsByName', [sub_query]) for result in cursor.stored_results(): results = result.fetchall() print(results) if not results: return "
  • No subcontractor found
  • " output = "".join( f"
  • {row['Contractor_Name']}
  • " for row in results ) print("Ajax Call for subcontractor", output) return output except mysql.connector.Error as e: return json_response(ResponseHandler.fetch_failure(f"Search failed: {str(e)}"), 500) finally: cursor.close() connection.close() # get hold types @app.route('/get_hold_types', methods=['GET']) @login_required def get_hold_types(): connection = config.get_db_connection() try: cursor = connection.cursor(dictionary=True) cursor.callproc("GetAllHoldTypes") for hold in cursor.stored_results(): hold_types = hold.fetchall() log_action("Get hold type", f"User {current_user.id} Get hold type'{ hold_types}'") return jsonify(hold_types) except mysql.connector.Error as e: return ResponseHandler.fetch_failure({str(e)}), 500 # return jsonify({"status": "error", "message": f"Failed to fetch hold types: {str(e)}"}), 500 finally: cursor.close() connection.close() # update invoice by id @app.route('/edit_invoice/', methods=['GET', 'POST']) @login_required def edit_invoice(invoice_id): connection = config.get_db_connection() if not connection: return jsonify({"status": "error", "message": "Database connection failed"}), 500 cursor = connection.cursor(dictionary=True) if request.method == 'POST': try: # Fetch updated form data subcontractor_id = request.form.get('subcontractor_id', '').strip() subcontractor_id = int(subcontractor_id) if subcontractor_id else None village_name = request.form.get('village') # cursor.execute("SELECT Village_Id FROM villages WHERE Village_Name = %s", (village_name,)) # village_result = cursor.fetchone() cursor.callproc("GetVillageIdByName", (village_name,)) for rs in cursor.stored_results(): village_result = rs.fetchone() if not village_result: return jsonify({"status": "error", "message": "Invalid Village Name"}), 400 village_id = village_result['Village_Id'] pmc_no = request.form.get('pmc_no') work_type = request.form.get('work_type') invoice_details = request.form.get('invoice_details') invoice_date = request.form.get('invoice_date') invoice_no = request.form.get('invoice_no') log_action("Edit invoice", f"User {current_user.id} Edit invoice'{ invoice_id}'") # Convert numeric fields properly numeric_fields = { "basic_amount": request.form.get('basic_amount'), "debit_amount": request.form.get('debit_amount'), "after_debit_amount": request.form.get('after_debit_amount'), "amount": request.form.get('amount'), "gst_amount": request.form.get('gst_amount'), "tds_amount": request.form.get('tds_amount'), "sd_amount": request.form.get('sd_amount'), "on_commission": request.form.get('on_commission'), "hydro_testing": request.form.get('hydro_testing'), "gst_sd_amount": request.form.get('gst_sd_amount'), "final_amount": request.form.get('final_amount'), } numeric_fields = {k: float(v) if v else 0 for k, v in numeric_fields.items()} cursor.callproc('UpdateInvoice', [ pmc_no, village_id, work_type, invoice_details, invoice_date, invoice_no, *numeric_fields.values(), invoice_id ]) connection.commit() # Handle holds hold_types = request.form.getlist('hold_type[]') hold_amounts = request.form.getlist('hold_amount[]') for hold_type, hold_amount in zip(hold_types, hold_amounts): if not hold_type: continue # skip empty hold types cursor.callproc('GetHoldTypeIdByName', [hold_type]) for result in cursor.stored_results(): hold_type_result = result.fetchone() if not hold_type_result: # Call stored procedure to insert and return new ID cursor.callproc('InsertHoldType', [hold_type, 0]) for result in cursor.stored_results(): pass # advance past any results cursor.execute("SELECT @_InsertHoldType_1") hold_type_id = cursor.fetchone()[0] print("if not hold type result anish:", hold_type_id) else: hold_type_id = hold_type_result['hold_type_id'] print("if hold type result anish:", hold_type_id) hold_amount = float(hold_amount) if hold_amount else 0 cursor.callproc('GetHoldJoinId', [invoice_id, subcontractor_id, hold_type_id]) for result in cursor.stored_results(): join_result = result.fetchone() if join_result: cursor.callproc('UpdateHoldAmountByJoinId', [hold_amount, join_result['join_id']]) connection.commit() else: cursor.callproc('InsertInvoiceSubcontractorHold', [ subcontractor_id, invoice_id, hold_type_id, hold_amount ]) connection.commit() connection.commit() return jsonify({"status": "success", "message": "Invoice updated successfully"}), 200 except mysql.connector.Error as e: connection.rollback() return jsonify({"status": "error", "message": f"Failed to update invoice: {str(e)}"}), 500 finally: cursor.close() connection.close() # ------------------ GET Request ------------------ try: # Fetch invoice data cursor.callproc('GetInvoiceDetailsById', [invoice_id]) for result in cursor.stored_results(): invoice = result.fetchone() if not invoice: return jsonify({"status": "error", "message": "Invoice not found"}), 404 # Important! Clear unread result issue while cursor.nextset(): pass cursor.callproc('GetHoldAmountsByInvoiceId', [invoice_id]) for result in cursor.stored_results(): hold_amounts = result.fetchall() invoice["hold_amounts"] = hold_amounts except mysql.connector.Error as e: return jsonify({"status": "error", "message": f"Database error: {str(e)}"}), 500 finally: cursor.close() connection.close() return render_template('edit_invoice.html', invoice=invoice) # delete invoice by id @app.route('/delete_invoice/', methods=['GET']) @login_required def delete_invoice(invoice_id): connection = config.get_db_connection() if not connection: return json_response(ResponseHandler.fetch_failure("invoice"), 500) try: cursor = connection.cursor() # cursor.execute("DELETE FROM invoice WHERE Invoice_Id = %s", (invoice_id,)) cursor.callproc("DeleteInvoice", (invoice_id,)) log_action("Delete invoice", f"User {current_user.id} Delete invoice'{ invoice_id}'") connection.commit() # Check if the invoice was actually deleted if cursor.rowcount == 0: return json_response(ResponseHandler.fetch_failure("invoice"), 404) return redirect(url_for('add_invoice')) except mysql.connector.Error as e: print("Error deleting invoice:", e) return json_response(ResponseHandler.delete_failure("invoice"), 500) finally: cursor.close() connection.close() # ---------- end Invoice controller ------------------ @app.route('/add_payment', methods=['GET', 'POST']) @login_required def add_payment(): connection = config.get_db_connection() payments = [] if connection: cursor = connection.cursor() try: cursor.callproc('GetAllPayments') for result in cursor.stored_results(): payments = result.fetchall() except mysql.connector.Error as e: print(f"Error fetching payment history: {e}") return "Failed to fetch payment history", 500 finally: cursor.close() if request.method == 'POST': pmc_no = request.form['PMC_No'] invoice_no = request.form['invoice_No'] amount = request.form['Payment_Amount'] tds_amount = request.form['TDS_Payment_Amount'] total_amount = request.form['total_amount'] utr = request.form['utr'] log_action("Add Payment", f"User {current_user.id} Add Payment'{ pmc_no}'") try: cursor = connection.cursor() cursor.callproc('InsertPayments', [ pmc_no, invoice_no, amount, tds_amount, total_amount, utr ]) connection.commit() return redirect(url_for('add_payment')) except mysql.connector.Error as e: print(f"Error inserting payment: {e}") return "Failed to add payment", 500 finally: cursor.close() connection.close() return render_template('add_payment.html', payments=payments) @app.route('/get_pmc_nos_by_subcontractor/') @login_required def get_pmc_nos_by_subcontractor(subcontractorId): connection = config.get_db_connection() cur = connection.cursor() print(subcontractorId) cur.callproc('GetDistinctPMCNoByContractorId', [subcontractorId]) for result in cur.stored_results(): results = result.fetchall() print(results) pmc_nos = [row[0] for row in results] cur.close() return jsonify({'pmc_nos': pmc_nos}) # Edit Payment Route @app.route('/edit_payment/', methods=['GET', 'POST']) @login_required def edit_payment(payment_id): connection = config.get_db_connection() payment_data = {} # To hold the payment data for the given ID if not connection: return json_response(ResponseHandler.fetch_failure("payment"), 500) try: cursor = connection.cursor() cursor.callproc("GetPaymentById", (payment_id,)) for result in cursor.stored_results(): payment_data = result.fetchone() # Handle POST request to update the payment if request.method == 'POST': pmc_no = request.form['PMC_No'] invoice_no = request.form['invoice_No'] amount = request.form['Payment_Amount'] tds_amount = request.form['TDS_Payment_Amount'] total_amount = request.form['total_amount'] utr = request.form['utr'] log_action("Edit Payment", f"User {current_user.id} Edit Payment'{ pmc_no}'") try: cursor.callproc("UpdatePayment", (payment_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr,)) connection.commit() return redirect(url_for('add_payment')) # Redirect to add_payment page to view the updated list except mysql.connector.Error as e: print(f"Error updating payment: {e}") return json_response(ResponseHandler.update_failure("payment"), 500) except mysql.connector.Error as e: print(f"Error fetching payment data: {e}") return json_response(ResponseHandler.fetch_failure("payment"), 500) finally: cursor.close() connection.close() return render_template('edit_payment.html', payment_data=payment_data) # Delete Payment Route @app.route('/delete_payment/', methods=['GET', 'POST']) @login_required def delete_payment(payment_id): connection = config.get_db_connection() if not connection: return json_response(ResponseHandler.fetch_failure("payment"), 500) try: cursor = connection.cursor() # cursor.execute("DELETE FROM payment WHERE Payment_Id = %s", (payment_id,)) cursor.callproc("DeletePayment", (payment_id,)) log_action("Delete Payment", f"User {current_user.id} Delete Payment'{ payment_id}'") connection.commit() # Check if any rows were deleted if cursor.rowcount == 0: return json_response(ResponseHandler.fetch_failure("payment"), 404) return redirect(url_for('add_payment')) # Redirect back to the add_payment page except mysql.connector.Error as e: print(f"Error deleting payment: {e}") return json_response(ResponseHandler.delete_failure("payment"), 500) finally: cursor.close() connection.close() # --- end Payment controller ----------- # ------------------------- GST Release controller ------------------------------------------ @app.route('/add_gst_release', methods=['GET', 'POST']) @login_required def add_gst_release(): connection = config.get_db_connection() gst_releases = [] # List to hold GST Release history invoices = [] # List to hold invoices for the dropdown if not connection: return json_response(ResponseHandler.fetch_failure("GST Release"), 500) try: cursor = connection.cursor() # Retrieve GST Release history cursor.execute("SELECT GST_Release_Id, PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR FROM gst_release") gst_releases = cursor.fetchall() if request.method == 'POST': pmc_no = request.form['PMC_No'] invoice_no = request.form['invoice_No'] basic_amount = request.form['basic_amount'] final_amount = request.form['final_amount'] total_amount = request.form['total_amount'] utr = request.form['utr'] contractor_id = request.form['subcontractor_id'] log_action("Add gst_release", f"User {current_user.id} Add gst_release'{ pmc_no}'") # cursor.callproc('SaveGSTRelease', ( # pmc_no, invoice_no, basic_amount, final_amount,total_amount, utr # )) # connection.commit() cursor.execute(""" INSERT INTO gst_release (PMC_No, invoice_no, Basic_Amount, Final_Amount, Total_Amount, UTR, Contractor_Id) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id )) connection.commit() return redirect(url_for('add_gst_release')) # Redirect to add_gst_release page except mysql.connector.Error as e: print(f"Error: {e}") return json_response(ResponseHandler.add_failure("GST Release"), 500) finally: cursor.close() connection.close() return render_template('add_gst_release.html', invoices=invoices, gst_releases=gst_releases) # update gst Release by id @app.route('/edit_gst_release/', methods=['GET', 'POST']) @login_required def edit_gst_release(gst_release_id): connection = config.get_db_connection() gst_release_data = {} # To hold the GST release data for the given ID invoices = [] # List to hold invoices for the dropdown if not connection: return json_response(ResponseHandler.fetch_failure("GST Release"), 500) try: cursor = connection.cursor() # Fetch the existing GST release data for the given gst_release_id cursor.execute( "SELECT GST_Release_Id, PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR FROM gst_release WHERE GST_Release_Id = %s", (gst_release_id,) ) gst_release_data = cursor.fetchone() if request.method == 'POST': pmc_no = request.form['PMC_No'] invoice_no = request.form['invoice_No'] basic_amount = request.form['basic_amount'] final_amount = request.form['final_amount'] total_amount = request.form['total_amount'] utr = request.form['utr'] log_action("Edit gst_release", f"User {current_user.id} Edit gst_release'{ pmc_no}'") try: cursor.execute(""" UPDATE gst_release SET PMC_No = %s, invoice_no = %s, Basic_Amount = %s, Final_Amount = %s, Total_Amount = %s, UTR = %s WHERE GST_Release_Id = %s """, ( pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, gst_release_id )) return redirect(url_for('add_gst_release')) # Redirect to the page to view the updated list except mysql.connector.Error as e: print(f"Error updating GST Release: {e}") return json_response(ResponseHandler.update_failure("GST Release"), 500) except mysql.connector.Error as e: print(f"Error fetching GST Release data: {e}") return json_response(ResponseHandler.fetch_failure("GST Release"), 500) finally: cursor.close() connection.close() return render_template('edit_gst_release.html', gst_release_data=gst_release_data, invoices=invoices) # delete gst release by id @app.route('/delete_gst_release/', methods=['GET', 'POST']) @login_required def delete_gst_release(gst_release_id): connection = config.get_db_connection() if not connection: return json_response(ResponseHandler.fetch_failure("GST Release"), 500) try: cursor = connection.cursor() # cursor.execute("DELETE FROM gst_release WHERE GST_Release_Id = %s", (gst_release_id,)) cursor.callproc("DeleteGSTRelease", (gst_release_id,)) log_action("delete gst_release", f"User {current_user.id} delete gst_release'{ gst_release_id}'") connection.commit() # Check if any rows were deleted if cursor.rowcount == 0: return json_response(ResponseHandler.fetch_failure("GST Release"), 404) return redirect(url_for('add_gst_release')) # Redirect to the add_gst_release page except mysql.connector.Error as e: print(f"Error deleting GST Release: {e}") return json_response(ResponseHandler.delete_failure("GST Release"), 500) finally: cursor.close() connection.close() # --- end GST Release controller ----- # ------------------------- Subcontractor controller ------------------------------------------ @app.route('/subcontractor', methods=['GET', 'POST']) @login_required def subcontract(): connection = config.get_db_connection() subcontractor = [] if not connection: return json_response(ResponseHandler.fetch_failure("Subcontractor"), 500) try: cursor = connection.cursor() if request.method == 'GET': try: cursor.callproc('GetAllSubcontractors') for result in cursor.stored_results(): subcontractor = result.fetchall() except Error as e: print(f"Error fetching data: {e}") return json_response(ResponseHandler.fetch_failure("Subcontractor"), 500) if request.method == 'POST': contractor_data = { 'Contractor_Name': request.form['Contractor_Name'], 'Address': request.form['Address'], 'Mobile_No': request.form['Mobile_No'], 'PAN_No': request.form['PAN_No'], 'Email': request.form['Email'], 'Gender': request.form['Gender'], 'GST_Registration_Type': request.form['GST_Registration_Type'], 'GST_No': request.form['GST_No'], 'Contractor_password': request.form['Contractor_password'], } try: cursor.callproc('SaveContractor', ( contractor_data['Contractor_Name'], contractor_data['Address'], contractor_data['Mobile_No'], contractor_data['PAN_No'], contractor_data['Email'], contractor_data['Gender'], contractor_data['GST_Registration_Type'], contractor_data['GST_No'], contractor_data['Contractor_password'] )) connection.commit() cursor.callproc('GetAllSubcontractors') for result in cursor.stored_results(): subcontractor = result.fetchall() except Error as e: print(f"Error inserting data: {e}") return json_response(ResponseHandler.add_failure("Subcontractor"), 500) except Error as e: print(f"Error handling subcontractor data: {e}") return json_response(ResponseHandler.fetch_failure("Subcontractor"), 500) finally: cursor.close() connection.close() return render_template('add_subcontractor.html', subcontractor=subcontractor) # update subcontractor by id @app.route('/edit_subcontractor/', methods=['GET', 'POST']) @login_required def edit_subcontractor(id): connection = config.get_db_connection() if not connection: return json_response(ResponseHandler.fetch_failure("Subcontractor"), 500) try: cursor = connection.cursor() subcontractor = None cursor.callproc("GetSubcontractorById", (id,)) for contractors in cursor.stored_results(): subcontractor = contractors.fetchone() if not subcontractor: return json_response(ResponseHandler.fetch_failure("Subcontractor"), 404) if request.method == 'POST': updated_data = { 'Contractor_Name': request.form['Contractor_Name'], 'Address': request.form['Address'], 'Mobile_No': request.form['Mobile_No'], 'PAN_No': request.form['PAN_No'], 'Email': request.form['Email'], 'Gender': request.form['Gender'], 'GST_Registration_Type': request.form['GST_Registration_Type'], 'GST_No': request.form['GST_No'], 'Contractor_password': request.form['Contractor_password'], 'id': id } log_action("Edit Subcontractor", f"User {current_user.id}Edit Subcontractor'{ id}'") try: cursor.callproc("UpdateSubcontractor", ( id, updated_data['Contractor_Name'], updated_data['Address'], updated_data['Mobile_No'], updated_data['PAN_No'], updated_data['Email'], updated_data['Gender'], updated_data['GST_Registration_Type'], updated_data['GST_No'], updated_data['Contractor_password'] )) connection.commit() return redirect(url_for('subcontract')) except Error as e: print(f"Error updating subcontractor: {e}") return json_response(ResponseHandler.update_failure("Subcontractor"), 500) except Error as e: print(f"Error fetching subcontractor data: {e}") return json_response(ResponseHandler.fetch_failure("Subcontractor"), 500) finally: cursor.close() connection.close() return render_template('edit_subcontractor.html', subcontractor=subcontractor) # return redirect(url_for('subcontract')) @app.route('/deleteSubContractor/', methods=['GET', 'POST']) @login_required def deleteSubContractor(id): connection = config.get_db_connection() if not connection: return json_response(ResponseHandler.fetch_failure("Subcontractor"), 500) try: cursor = connection.cursor() # Optional: check if subcontractor exists before attempting delete cursor.execute("SELECT 1 FROM subcontractors WHERE Contractor_Id = %s", (id,)) if cursor.fetchone() is None: return json_response(ResponseHandler.fetch_failure("Subcontractor not found"), 404) # Call stored procedure to delete subcontractor and related records cursor.callproc("DeleteSubcontractor", (id,)) connection.commit() # Retrieve result from procedure (SELECT ROW_COUNT()) affected_rows = 0 for result in cursor.stored_results(): row = result.fetchone() affected_rows = row[0] if row else 0 log_action("Delete Subcontractor", f"User {current_user.id}Delete Subcontractor'{ id}'") if affected_rows == 0: return json_response(ResponseHandler.fetch_failure("Subcontractor not deleted"), 404) except Error as e: print(f"Error deleting subcontractor: {e}") return json_response(ResponseHandler.delete_failure("Subcontractor"), 500) finally: cursor.close() connection.close() return redirect(url_for('subcontract')) # redirect to subcontractor list page # ------------------------------- Show Report Subcontractor --------------------- UPLOAD_FOLDER = 'uploads' app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) # Upload Excel file html page @app.route('/upload_excel_file', methods=['GET', 'POST']) def upload(): if request.method == 'POST': file = request.files['file'] if file and file.filename.endswith('.xlsx'): filepath = os.path.join(app.config['UPLOAD_FOLDER'], file.filename) file.save(filepath) log_action("Upload Excel File", f"User {current_user.id}Upload Excel File'{file}'") return redirect(url_for('show_table', filename=file.filename)) return render_template('uploadExcelFile.html') @app.route('/show_table/') def show_table(filename): global data data = [] filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) wb = openpyxl.load_workbook(filepath, data_only=True) sheet = wb.active file_info = { "Subcontractor": sheet.cell(row=1, column=2).value, "State": sheet.cell(row=2, column=2).value, "District": sheet.cell(row=3, column=2).value, "Block": sheet.cell(row=4, column=2).value, } errors = [] subcontractor_data = None state_data = None district_data = None block_data = None connection = config.get_db_connection() if connection: try: cursor = connection.cursor(dictionary=True) print(f"Calling GetStateByName with: {file_info['State']}") cursor.callproc('GetStateByName', [file_info['State']]) for result in cursor.stored_results(): state_data = result.fetchone() if not state_data: errors.append(f"State '{file_info['State']}' is not valid. Please add it.") if state_data: print(f"Calling GetDistrictByNameAndStates with: {file_info['District']}, {state_data['State_ID']}") cursor.callproc('GetDistrictByNameAndStates', [file_info['District'], state_data['State_ID']]) for result in cursor.stored_results(): district_data = result.fetchone() if not district_data: errors.append(f"District '{file_info['District']}' is not valid under state '{file_info['State']}'.") if district_data: print(f"Calling GetBlockByNameAndDistricts with: {file_info['Block']}, {district_data['District_ID']}") cursor.callproc('GetBlockByNameAndDistricts', [file_info['Block'], district_data['District_ID']]) for result in cursor.stored_results(): block_data = result.fetchone() if not block_data: errors.append(f"Block '{file_info['Block']}' is not valid under district '{file_info['District']}'.") print(f"Calling GetSubcontractorByName with: {file_info['Subcontractor']}") cursor.callproc('GetSubcontractorByName', [file_info['Subcontractor']]) for result in cursor.stored_results(): subcontractor_data = result.fetchone() if not subcontractor_data: print(f"Inserting subcontractor: {file_info['Subcontractor']}") cursor.callproc('InsertSubcontractor', [file_info['Subcontractor']]) connection.commit() print(f"Calling GetSubcontractorByName again with: {file_info['Subcontractor']}") cursor.callproc('GetSubcontractorByName', [file_info['Subcontractor']]) for result in cursor.stored_results(): subcontractor_data = result.fetchone() print("Calling GetAllHoldTypes") cursor.callproc("GetAllHoldTypes") hold_types_data = [] for ht in cursor.stored_results(): hold_types_data = ht.fetchall() hold_types_lookup = {row['hold_type'].lower(): row['hold_type_id'] for row in hold_types_data if row['hold_type']} cursor.close() except mysql.connector.Error as e: print(f"Database error: {e}") return f"Database operation failed: {e}", 500 finally: connection.close() variables = {} hold_columns = [] hold_counter = 0 for j in range(1, sheet.max_column + 1): col_value = sheet.cell(row=5, column=j).value if col_value: variables[col_value] = j if 'hold' in str(col_value).lower(): hold_counter += 1 hold_type_key = str(col_value).lower().strip() hold_type_id = hold_types_lookup.get(hold_type_key, None) hold_columns.append({ 'column_name': col_value, 'column_number': j, 'hold_type_id': hold_type_id }) for i in range(6, sheet.max_row + 1): row_data = {} if sheet.cell(row=i, column=1).value: row_data["Row Number"] = i for var_name, col_num in variables.items(): row_data[var_name] = sheet.cell(row=i, column=col_num).value if sum(1 for value in row_data.values() if value) >= 4: data.append(row_data) for hold in hold_columns: if hold['hold_type_id']: print(f" if Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}") else: errors.append(f"Hold Type not added ! Column name '{hold['column_name']}'.") print(f" else Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}") return render_template( 'show_excel_file.html', file_info=file_info, variables=variables, data=data, subcontractor_data=subcontractor_data, state_data=state_data, district_data=district_data, block_data=block_data, errors=errors, hold_columns=hold_columns, hold_counter=hold_counter ) # save Excel data @app.route('/save_data', methods=['POST']) def save_data(): # Extract form data subcontractor_id = request.form.get("subcontractor_data") state_id = request.form.get("state_data") district_id = request.form.get("district_data") block_id = request.form.get("block_data") variables = request.form.getlist('variables[]') hold_columns = request.form.get("hold_columns") hold_counter = request.form.get("hold_counter") # print("Info: ", subcontractor_id, state_id, district_id, block_id) if not data: return jsonify({"error": "No data provided to save"}), 400 if data: # print("Total number of entries in data:", len(data)) connection = config.get_db_connection() cursor = connection.cursor() try: for entry in data: save_data = { "PMC_No": entry.get("PMC_No"), "Invoice_Details": entry.get("Invoice_Details", ''), "Work_Type": 'none', "Invoice_Date": entry.get("Invoice_Date").strftime('%Y-%m-%d') if entry.get( "Invoice_Date") else None, "Invoice_No": entry.get("Invoice_No", ''), "Basic_Amount": entry.get("Basic_Amount", 0.00), "Debit_Amount": entry.get("Debit_Amount", 0.00), "After_Debit_Amount": entry.get("After_Debit_Amount", 0.00), "Amount": entry.get("Amount", 0.00), "GST_Amount": entry.get("GST_Amount", 0.00), "TDS_Amount": entry.get("TDS_Amount", 0.00), "SD_Amount": entry.get("SD_Amount", 0.00), "On_Commission": entry.get("On_Commission", 0.00), "Hydro_Testing": entry.get("Hydro_Testing", 0.00), "Hold_Amount": 0, "GST_SD_Amount": entry.get("GST_SD_Amount", 0.00), "Final_Amount": entry.get("Final_Amount", 0.00), "Payment_Amount": entry.get("Payment_Amount", 0.00), "Total_Amount": entry.get("Total_Amount", 0.00), "TDS_Payment_Amount": entry.get("TDS_Payment_Amount", 0.00), "UTR": entry.get("UTR", ''), } village_name, work_type = None, None village_id = 0 log_action("Data saved", f"User {current_user.id} Data saved'{ village_name}'") PMC_No = save_data.get('PMC_No') Invoice_Details = save_data.get('Invoice_Details') Invoice_Date = save_data.get('Invoice_Date') Invoice_No = save_data.get('Invoice_No') Basic_Amount = save_data.get('Basic_Amount') Debit_Amount = save_data.get('Debit_Amount') After_Debit_Amount = save_data.get('After_Debit_Amount') Amount = save_data.get('Amount') GST_Amount = save_data.get('GST_Amount') TDS_Amount = save_data.get('TDS_Amount') SD_Amount = save_data.get('SD_Amount') On_Commission = save_data.get('On_Commission') Hydro_Testing = save_data.get('Hydro_Testing') GST_SD_Amount = save_data.get('GST_SD_Amount') Final_Amount = save_data.get('Final_Amount') Payment_Amount = save_data.get('Payment_Amount') Total_Amount = save_data.get('Total_Amount') TDS_Payment_Amount = save_data.get('TDS_Payment_Amount') UTR = save_data.get('UTR') if Invoice_Details: words = Invoice_Details.lower().split() if 'village' in words: village_pos = words.index('village') village_name = " ".join(words[:village_pos]) if 'work' in words: work_pos = words.index('work') if village_name: work_type = " ".join(words[village_pos + 1:work_pos + 1]) else: work_type = " ".join(words[:work_pos + 1]) if Invoice_Details and 'village' in Invoice_Details.lower() and 'work' in Invoice_Details.lower(): print("village_name ::", village_name, "|| work_type ::", work_type) if block_id and village_name: village_id = None # cursor.execute("SELECT Village_Id FROM villages WHERE Block_Id = %s AND Village_Name = %s",(block_id, village_name)) # result = cursor.fetchone() cursor.callproc("GetVillageId", (block_id, village_name)) for result in cursor.stored_results(): result = result.fetchone() village_id = result[0] if result else None if not village_id: # cursor.execute("INSERT INTO villages (Village_Name, Block_Id) VALUES (%s, %s)", (village_name, block_id)) cursor.callproc("SaveVillage", (village_name, block_id)) # cursor.execute("SELECT Village_Id FROM villages WHERE Block_Id = %s AND Village_Name = %s",(block_id, village_name)) # result = cursor.fetchone() cursor.callproc("GetVillageId", (block_id, village_name)) for result in cursor.stored_results(): result = result.fetchone() village_id = result[0] if result else None print("village_id :", village_id) print("block_id :", block_id) print("invoice :", PMC_No, village_id, work_type, Invoice_Details, Invoice_Date, Invoice_No, Basic_Amount, Debit_Amount, After_Debit_Amount, Amount, GST_Amount, TDS_Amount, SD_Amount, On_Commission, Hydro_Testing, GST_SD_Amount, Final_Amount) args = ( PMC_No, village_id, work_type, Invoice_Details, Invoice_Date, Invoice_No, Basic_Amount, Debit_Amount, After_Debit_Amount, Amount, GST_Amount, TDS_Amount, SD_Amount, On_Commission, Hydro_Testing, GST_SD_Amount, Final_Amount, subcontractor_id, 0 ) # for result in cursor.stored_results(): # invoice_id = result.fetchone()['invoice_id'] print("All invoice Details ",args) results = cursor.callproc('SaveInvoice', args) # cursor.callproc("SaveInvoice",args) # for re in cursor.stored_results(): invoice_id = results[-1] print("invoice id from the excel ", invoice_id) if isinstance(hold_columns, str): hold_columns = ast.literal_eval(hold_columns) # Check if hold_columns is actually a list of dictionaries if isinstance(hold_columns, list) and all(isinstance(hold, dict) for hold in hold_columns): for hold in hold_columns: print(f"Processing hold: {hold}") hold_column_name = hold.get('column_name') # Get column name hold_type_id = hold.get('hold_type_id') # Get hold_type_id if hold_column_name: hold_amount = entry.get( hold_column_name) # Get the value for that specific hold column if hold_amount is not None: print(f"Processing hold type: {hold_column_name}, Hold Amount: {hold_amount}") # Insert into the invoice_subcontractor_hold_join table hold_join_data = { "Contractor_Id": subcontractor_id, "Invoice_Id": invoice_id, "hold_type_id": hold_type_id, "hold_amount": hold_amount } # insert_hold_query = """INSERT INTO invoice_subcontractor_hold_join (Contractor_Id, Invoice_Id, hold_type_id, hold_amount) # VALUES (%(Contractor_Id)s, %(Invoice_Id)s, %(hold_type_id)s, %(hold_amount)s); # """ # cursor.execute(insert_hold_query, hold_join_data) # print(f"Inserted hold join data: {hold_join_data}") cursor.callproc('InsertHoldJoinData', [ hold_join_data['Contractor_Id'], hold_join_data['Invoice_Id'], hold_join_data['hold_type_id'], hold_join_data['hold_amount'] ]) connection.commit() print(f"Inserted hold join data: {hold_join_data}") else: print(f"Invalid hold entry: {hold}") else: print("Hold columns data is not a valid list of dictionaries.") #---------------------------------------------Credit Note--------------------------------------------------------------------------- elif any(keyword in Invoice_Details.lower() for keyword in ['credit note','logging report']): print("Credit note found:", PMC_No, Invoice_No, Basic_Amount, Debit_Amount, Final_Amount, After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR, Invoice_No) cursor.execute( """INSERT INTO credit_note (PMC_No, Invoice_Details, Basic_Amount, Debit_Amount, After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR, Contractor_Id, invoice_no) VALUES (%s,%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s)""", ( PMC_No, Invoice_Details, Basic_Amount, Debit_Amount, After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR, subcontractor_id, Invoice_No)) #-----------------------------------------------Hold Amount---------------------------------------------------------------------- import re # Step 1: Normalize Invoice_Details: lowercase, trim, remove extra spaces normalized_details = re.sub(r'\s+', ' ', Invoice_Details.strip()).lower() # Step 2: Define lowercase keywords keywords = [ 'excess hold', 'ht', 'hold release amount', 'dpr excess hold amount', 'excess hold amount', 'Multi to Single layer bill', 'hold amount', 'logging report' ] # Step 3: Matching condition if any(kw in normalized_details for kw in keywords): print("✅ Match found. Inserting hold release for:", Invoice_Details) cursor.execute(""" INSERT INTO hold_release (PMC_No, Invoice_No, Invoice_Details, Basic_Amount, Total_Amount, UTR, Contractor_Id) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( PMC_No, Invoice_No, Invoice_Details, Basic_Amount, Final_Amount, UTR, subcontractor_id )) connection.commit() # ✅ Ensure changes are saved to DB print("✅ Hold release inserted for:", PMC_No, Invoice_Details) #------------------------------------------------------------------------------------------------------------------ elif Invoice_Details and any( keyword in Invoice_Details.lower() for keyword in ['gst', 'release', 'note']): print("Gst rels :", PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR, subcontractor_id) #cursor.callproc("SaveGSTRelease", (PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR)) cursor.execute( """INSERT INTO gst_release (PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR, Contractor_Id) VALUES (%s,%s, %s, %s, %s, %s, %s)""", (PMC_No, Invoice_No, Basic_Amount, Final_Amount, Total_Amount, UTR, subcontractor_id)) # insert_payment = """INSERT INTO payment (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR) VALUES (%s, %s, %s, %s, %s, %s)""" # cursor.execute(insert_payment, # (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR)) if PMC_No and Total_Amount and UTR: print("Payment :", PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR ) # insert_payment = """INSERT INTO payment (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR) VALUES (%s, %s, %s, %s, %s, %s)""" # cursor.execute(insert_payment, # (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR)) cursor.callproc("SavePayment", (PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR )) connection.commit() return jsonify({"success": "Data saved successfully!"}), 200 # return render_template('uploadExcelFile.html') except Exception as e: connection.rollback() return jsonify({"error": f"An unexpected error occurred: {e}"}), 500 finally: cursor.close() connection.close() return render_template('index.html') # ---------------------- Report -------------------------------- # call report page @app.route('/report') def report_page(): return render_template('report.html') # Search list multiples input and search reports @app.route('/search_contractor', methods=['POST']) def search_contractor(): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) subcontractor_name = request.form.get('subcontractor_name') pmc_no = request.form.get('pmc_no') state = request.form.get('state') district = request.form.get('district') block = request.form.get('block') village = request.form.get('village') year_from = request.form.get('year_from') year_to = request.form.get('year_to') conditions = [] params = [] log_action("Search contractor", f"User {current_user.id} Search contractor'{ subcontractor_name}'") if subcontractor_name: conditions.append("LOWER(s.Contractor_Name) LIKE LOWER(%s)") params.append(f"%{subcontractor_name}%") if pmc_no: conditions.append("i.PMC_No = %s") params.append(pmc_no) if state: conditions.append("LOWER(st.State_Name) LIKE LOWER(%s)") params.append(f"%{state}%") if district: conditions.append("LOWER(d.District_Name) LIKE LOWER(%s)") params.append(f"%{district}%") if block: conditions.append("LOWER(b.Block_Name) LIKE LOWER(%s)") params.append(f"%{block}%") if village: conditions.append("LOWER(v.Village_Name) LIKE LOWER(%s)") params.append(f"%{village}%") if year_from and year_to: conditions.append("i.Invoice_Date BETWEEN %s AND %s") params.append(year_from) params.append(year_to) if not conditions: return jsonify({"error": "At least one field is required for search."}), 400 cursor.callproc("search_contractor_info", [ subcontractor_name or None, pmc_no or None, state or None, district or None, block or None, village or None, year_from or None, year_to or None ]) for result in cursor.stored_results(): data = result.fetchall() return jsonify(data) from flask import render_template from datetime import datetime import config @app.route('/contractor_report/') def contractor_report(contractor_id): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True, buffered=True) try: # Contractor details cursor.execute(""" SELECT DISTINCT s.Contractor_Name, st.State_Name, d.District_Name, b.Block_Name, s.Mobile_No, s.GST_Registration_Type, s.GST_No, s.PAN_No, s.Email, s.Address FROM subcontractors s LEFT JOIN assign_subcontractors asg ON s.Contractor_Id = asg.Contractor_Id LEFT JOIN villages v ON asg.Village_Id = v.Village_Id LEFT JOIN blocks b ON v.Block_Id = b.Block_Id LEFT JOIN districts d ON b.District_id = d.District_id LEFT JOIN states st ON d.State_Id = st.State_Id WHERE s.Contractor_Id = %s """, (contractor_id,)) contInfo = cursor.fetchone() # Hold types cursor.execute(""" SELECT DISTINCT ht.hold_type_id, ht.hold_type FROM invoice_subcontractor_hold_join h JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id JOIN invoice i ON h.Invoice_Id = i.Invoice_Id WHERE h.Contractor_Id = %s """, (contractor_id,)) hold_types = cursor.fetchall() # Invoices cursor.execute(""" SELECT DISTINCT i.PMC_No, v.Village_Name, i.Work_Type, i.Invoice_Details, i.Invoice_Date, i.Invoice_No, i.Basic_Amount, i.Debit_Amount, i.After_Debit_Amount, i.Amount, i.GST_Amount, i.TDS_Amount, i.SD_Amount, i.On_Commission, i.Hydro_Testing, i.GST_SD_Amount, i.Final_Amount, h.hold_amount, ht.hold_type FROM assign_subcontractors asg INNER JOIN villages v ON asg.Village_Id = v.Village_Id INNER JOIN invoice i ON i.Village_Id = v.Village_Id AND i.PMC_No = asg.PMC_No LEFT JOIN invoice_subcontractor_hold_join h ON i.Invoice_Id = h.Invoice_Id AND h.Contractor_Id = asg.Contractor_Id LEFT JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id WHERE asg.Contractor_Id = %s ORDER BY i.PMC_No ASC """, (contractor_id,)) invoices = cursor.fetchall() # GST Release cursor.execute(""" SELECT gr.pmc_no, gr.invoice_no, gr.basic_amount, gr.final_amount FROM gst_release gr INNER JOIN ( SELECT DISTINCT i.PMC_No, i.Invoice_No FROM invoice i JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No AND i.Village_Id = a.Village_Id WHERE a.Contractor_Id = %s ) x ON gr.pmc_no = x.PMC_No AND gr.invoice_no = x.Invoice_No ORDER BY gr.pmc_no ASC """, (contractor_id,)) gst_rel = cursor.fetchall() #Hold # Hold Release cursor.execute("SELECT * FROM hold_release WHERE Contractor_Id=%s", (contractor_id,)) hold_release = cursor.fetchall() print(hold_release) #Credit Note cursor.execute("select * from credit_note where Contractor_Id=%s",(contractor_id,)) credit_note=cursor.fetchall() print(credit_note) # Payments (include valid matches and payments with pmc_no but invoice_no is NULL) cursor.execute(""" SELECT p.pmc_no, p.invoice_no, p.Payment_Amount, p.TDS_Payment_Amount, p.Total_amount, p.utr FROM payment p WHERE EXISTS ( SELECT 1 FROM invoice i JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No AND i.Village_Id = a.Village_Id WHERE a.Contractor_Id = %s AND i.PMC_No = p.pmc_no AND i.Invoice_No = p.invoice_no ) OR ( p.invoice_no IS NULL AND EXISTS ( SELECT 1 FROM assign_subcontractors a WHERE a.Contractor_Id = %s AND a.PMC_No = p.pmc_no ) ) ORDER BY p.pmc_no ASC """, (contractor_id, contractor_id)) payments = cursor.fetchall() # Totals total = { "sum_invo_basic_amt": float(sum(row['Basic_Amount'] or 0 for row in invoices)), "sum_invo_debit_amt": float(sum(row['Debit_Amount'] or 0 for row in invoices)), "sum_invo_after_debit_amt": float(sum(row['After_Debit_Amount'] or 0 for row in invoices)), "sum_invo_amt": float(sum(row['Amount'] or 0 for row in invoices)), "sum_invo_gst_amt": float(sum(row['GST_Amount'] or 0 for row in invoices)), "sum_invo_tds_amt": float(sum(row['TDS_Amount'] or 0 for row in invoices)), "sum_invo_ds_amt": float(sum(row['SD_Amount'] or 0 for row in invoices)), "sum_invo_on_commission": float(sum(row['On_Commission'] or 0 for row in invoices)), "sum_invo_hydro_test": float(sum(row['Hydro_Testing'] or 0 for row in invoices)), "sum_invo_gst_sd_amt": float(sum(row['GST_SD_Amount'] or 0 for row in invoices)), "sum_invo_final_amt": float(sum(row['Final_Amount'] or 0 for row in invoices)), "sum_invo_hold_amt": float(sum(row['hold_amount'] or 0 for row in invoices)), "sum_gst_basic_amt": float(sum(row['basic_amount'] or 0 for row in gst_rel)), "sum_gst_final_amt": float(sum(row['final_amount'] or 0 for row in gst_rel)), "sum_pay_payment_amt": float(sum(row['Payment_Amount'] or 0 for row in payments)), "sum_pay_tds_payment_amt": float(sum(row['TDS_Payment_Amount'] or 0 for row in payments)), "sum_pay_total_amt": float(sum(row['Total_amount'] or 0 for row in payments)) } current_date = datetime.now().strftime('%Y-%m-%d') except Exception as e: print(f"Error fetching contractor report: {e}") return "An error occurred while fetching contractor report", 500 finally: cursor.close() connection.close() return render_template('subcontractor_report.html', contInfo=contInfo, contractor_id=contractor_id, invoices=invoices, hold_types=hold_types, gst_rel=gst_rel, payments=payments, credit_note=credit_note, hold_release=hold_release, total=total, current_date=current_date) @app.route('/download_report/') def download_report(contractor_id): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True, buffered=True) output_folder = "static/download" os.makedirs(output_folder, exist_ok=True) output_file = os.path.join(output_folder, f"Contractor_Report_{contractor_id}.xlsx") try: # ---------------- Contractor Info ---------------- cursor.execute(""" SELECT s.Contractor_Name, st.State_Name, d.District_Name, b.Block_Name, s.Mobile_No, s.GST_Registration_Type, s.GST_No, s.PAN_No, s.Email, s.Address FROM subcontractors s LEFT JOIN assign_subcontractors asg ON s.Contractor_Id = asg.Contractor_Id LEFT JOIN villages v ON asg.Village_Id = v.Village_Id LEFT JOIN blocks b ON v.Block_Id = b.Block_Id LEFT JOIN districts d ON b.District_id = d.District_id LEFT JOIN states st ON d.State_Id = st.State_Id WHERE s.Contractor_Id = %s """, (contractor_id,)) contInfo = cursor.fetchone() if not contInfo: return "No contractor found", 404 # ---------------- Hold Types ---------------- cursor.execute(""" SELECT DISTINCT ht.hold_type_id, ht.hold_type FROM invoice_subcontractor_hold_join h JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id WHERE h.Contractor_Id = %s """, (contractor_id,)) hold_types = cursor.fetchall() hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types} # ---------------- Invoices ---------------- cursor.execute(""" SELECT i.*, v.Village_Name FROM assign_subcontractors asg INNER JOIN invoice i ON i.PMC_No = asg.PMC_No AND i.Village_Id = asg.Village_Id LEFT JOIN villages v ON i.Village_Id = v.Village_Id WHERE asg.Contractor_Id = %s ORDER BY i.PMC_No, i.Invoice_No """, (contractor_id,)) invoices = cursor.fetchall() # Remove duplicate invoices invoice_ids_seen = set() unique_invoices = [] for inv in invoices: if inv["Invoice_Id"] not in invoice_ids_seen: invoice_ids_seen.add(inv["Invoice_Id"]) unique_invoices.append(inv) invoices = unique_invoices # ---------------- Hold Amounts ---------------- cursor.execute(""" SELECT h.Invoice_Id, h.hold_type_id, h.hold_amount FROM invoice_subcontractor_hold_join h WHERE h.Contractor_Id = %s """, (contractor_id,)) hold_amounts = cursor.fetchall() hold_data = {} for h in hold_amounts: hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount'] # ---------------- Payments ---------------- cursor.execute(""" SELECT DISTINCT p.pmc_no, p.invoice_no, p.Payment_Amount, p.TDS_Payment_Amount, p.Total_amount, p.utr FROM payment p INNER JOIN invoice i ON i.PMC_No = p.pmc_no AND i.invoice_no = p.invoice_no INNER JOIN assign_subcontractors asg ON i.PMC_No = asg.PMC_No AND i.Village_Id = asg.Village_Id WHERE asg.Contractor_Id = %s """, (contractor_id,)) payments = cursor.fetchall() payments_map = {} for pay in payments: key = (str(pay['pmc_no']), str(pay['invoice_no'])) payments_map.setdefault(key, []).append(pay) # ---------------- Extra Payments (no invoice_no) ---------------- cursor.execute(""" SELECT pmc_no, Payment_Amount, TDS_Payment_Amount, Total_amount, utr FROM payment WHERE (invoice_no IS NULL OR invoice_no = '') AND Total_amount != 0 AND pmc_no IS NOT NULL """) extra_payments_raw = cursor.fetchall() extra_payments_map = {} for pay in extra_payments_raw: extra_payments_map.setdefault(str(pay['pmc_no']), []).append({ 'Payment_Amount': pay['Payment_Amount'], 'TDS_Payment_Amount': pay['TDS_Payment_Amount'], 'Total_amount': pay['Total_amount'], 'utr': pay['utr'] }) # ---------------- Credit Notes ---------------- cursor.execute(""" SELECT PMC_No, Invoice_No, Invoice_Details, Basic_Amount, Debit_Amount, After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR FROM credit_note WHERE Contractor_Id = %s """, (contractor_id,)) credit_notes = cursor.fetchall() credit_note_map = {} for cn in credit_notes: key = (str(cn['PMC_No']), str(cn['Invoice_No'])) credit_note_map.setdefault(key, []).append(cn) # ---------------- GST Releases ---------------- cursor.execute(""" SELECT PMC_No, Invoice_No, Basic_Amount, Final_Amount, Total_Amount, UTR FROM gst_release WHERE Contractor_Id = %s ORDER BY PMC_No, Invoice_No """, (contractor_id,)) gst_releases = cursor.fetchall() gst_release_map = {} for gr in gst_releases: key = (str(gr['PMC_No']), str(gr['Invoice_No'])) gst_release_map.setdefault(key, []).append(gr) # ---------------- Excel Workbook ---------------- workbook = openpyxl.Workbook() sheet = workbook.active sheet.title = "Contractor Report" # Contractor Info for field, value in contInfo.items(): sheet.append([field.replace("_", " "), value]) sheet.append([]) # Headers base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No", "Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)", "SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"] hold_headers = [ht['hold_type'] for ht in hold_types] payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"] all_headers = base_headers + hold_headers + payment_headers sheet.append(all_headers) for cell in sheet[sheet.max_row]: cell.font = Font(bold=True) cell.fill = PatternFill(start_color="ADD8E6", end_color="ADD8E6", fill_type="solid") # ---------------- Data Rows ---------------- processed_gst_releases = set() appended_credit_keys = set() previous_pmc_no = None for inv in invoices: pmc_no = str(inv["PMC_No"]) invoice_no = str(inv["invoice_no"]) key = (pmc_no, invoice_no) # Yellow separator if PMC_No changes if previous_pmc_no and pmc_no != previous_pmc_no: sheet.append([""] * len(all_headers)) yellow_fill = PatternFill(start_color="FFFF99", end_color="FFFF99", fill_type="solid") for cell in sheet[sheet.max_row]: cell.fill = yellow_fill previous_pmc_no = pmc_no # Invoice row row = [ pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"], inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"], inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"], inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"] ] # Hold values invoice_holds = hold_data.get(inv["Invoice_Id"], {}) for ht_id in hold_type_map: row.append(invoice_holds.get(ht_id, "")) # Payment values payment = payments_map.get(key, [None])[0] row += [ inv["Final_Amount"], payment["Payment_Amount"] if payment else "", payment["TDS_Payment_Amount"] if payment else "", payment["Total_amount"] if payment else "", payment["utr"] if payment and payment.get("utr") else "" ] sheet.append(row) # ---------------- Extra Payments for this PMC ---------------- if pmc_no in extra_payments_map: for ep in extra_payments_map[pmc_no]: extra_row = [pmc_no] + [""] * (len(base_headers) - 1) extra_row += [""] * len(hold_headers) extra_row += [ "", ep["Payment_Amount"], ep["TDS_Payment_Amount"], ep["Total_amount"], ep.get("utr", "") ] sheet.append(extra_row) del extra_payments_map[pmc_no] # GST Releases if key in gst_release_map and key not in processed_gst_releases: for gr in gst_release_map[key]: gst_row = [ pmc_no, "", "", "GST Release Note", "", gr["Invoice_No"], gr["Basic_Amount"], "", "", "", "", "", "", "", "", "" ] gst_row += ["" for _ in hold_headers] gst_row += [gr["Final_Amount"], "", "", gr["Total_Amount"], gr.get("UTR", "")] sheet.append(gst_row) processed_gst_releases.add(key) # Credit Notes if key in credit_note_map and key not in appended_credit_keys: for cn in credit_note_map[key]: cn_row = [ pmc_no, "", "", cn.get("Invoice_Details", "Credit Note"), "", cn.get("Invoice_No", ""), cn.get("Basic_Amount", ""), cn.get("Debit_Amount", ""), cn.get("After_Debit_Amount", ""), cn.get("GST_Amount", ""), cn.get("Amount", ""), "", "", "", "", "" ] cn_row += ["" for _ in hold_headers] cn_row += [cn.get("Final_Amount", ""), "", "", cn.get("Total_Amount", ""), cn.get("UTR", "")] sheet.append(cn_row) appended_credit_keys.add(key) # ---------------- Totals ---------------- total_basic_amount = total_tds_amount = total_sd_amount = total_on_commission = 0 total_final_amount = total_total_amount = total_hold_amount = 0 start_row = 2 # skip headers for r in sheet.iter_rows(min_row=start_row, max_row=sheet.max_row, values_only=True): try: total_basic_amount += float(r[6] or 0) total_tds_amount += float(r[11] or 0) total_sd_amount += float(r[12] or 0) total_on_commission += float(r[13] or 0) total_final_amount += float(r[-5] or 0) total_total_amount += float(r[-2] or 0) total_hold_amount += sum(float(r[i] or 0) for i in range(len(base_headers), len(base_headers) + len(hold_headers))) except: continue totals_row = [ "Total", "", "", "", "", "", total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount, total_on_commission, "", "" ] totals_row += [total_hold_amount for _ in hold_headers] totals_row += [total_final_amount, "", "", total_total_amount, ""] sheet.append([]) sheet.append(totals_row) for cell in sheet[sheet.max_row]: cell.font = Font(bold=True) # ---------------- Column Width ---------------- for col in sheet.columns: max_length = 0 col_letter = openpyxl.utils.get_column_letter(col[0].column) for cell in col: if cell.value: max_length = max(max_length, len(str(cell.value))) sheet.column_dimensions[col_letter].width = max_length + 2 workbook.save(output_file) workbook.close() finally: cursor.close() connection.close() return send_from_directory(output_folder, f"Contractor_Report_{contractor_id}.xlsx", as_attachment=True) @app.route('/pmc_report/') def pmc_report(pmc_no): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True, buffered=True) try: # 1. Fetch PMC info using stored procedure # cursor.execute(""" # SELECT DISTINCT a.PMC_No, a.Village_Id, v.Village_Name, b.Block_Name, # d.District_Name, s.State_Name, sc.Contractor_Id, sc.Contractor_Name, # sc.Address, sc.Mobile_No, sc.PAN_No, sc.Email, sc.Gender, # sc.GST_Registration_Type, sc.GST_No # FROM assign_subcontractors a # INNER JOIN villages v ON a.Village_Id = v.Village_Id # INNER JOIN blocks b ON v.Block_Id = b.Block_Id # INNER JOIN districts d ON b.District_id = d.District_id # INNER JOIN states s ON d.State_Id = s.State_Id # INNER JOIN subcontractors sc ON a.Contractor_Id = sc.Contractor_Id # WHERE a.pmc_no = %s # """, (pmc_no,)) # pmc_info = cursor.fetchone() cursor.callproc("GetContractorInfoByPmcNo", (pmc_no,)) pmc_info = next(cursor.stored_results()).fetchone() if not pmc_info: return "No PMC found with this number", 404 # 2. Fetch hold types using stored procedure # cursor.execute(""" # SELECT DISTINCT ht.hold_type_id, ht.hold_type # FROM invoice_subcontractor_hold_join h # JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id # JOIN invoice i ON h.Invoice_Id = i.Invoice_Id # JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No # WHERE a.PMC_No = %s AND a.Contractor_Id = %s # """, (pmc_no, pmc_info["Contractor_Id"])) # hold_types = cursor.fetchall() cursor.callproc("Get_pmc_hold_types", (pmc_no, pmc_info["Contractor_Id"])) hold_types = next(cursor.stored_results()).fetchall() hold_type_ids = [ht['hold_type_id'] for ht in hold_types] # 3. Initialize invoice data invoices = [] hold_amount_total = 0 # 4. Build invoice query if hold_type_ids: placeholders = ','.join(['%s'] * len(hold_type_ids)) query = f""" SELECT DISTINCT i.PMC_No, v.Village_Name, i.Work_Type, i.Invoice_Details, i.Invoice_Date, i.Invoice_No, i.Basic_Amount, i.Debit_Amount, i.After_Debit_Amount, i.Amount, i.GST_Amount, i.TDS_Amount, i.SD_Amount, i.On_Commission, i.Hydro_Testing, i.GST_SD_Amount, i.Final_Amount, h.hold_amount, ht.hold_type FROM invoice i LEFT JOIN villages v ON i.Village_Id = v.Village_Id LEFT JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No LEFT JOIN invoice_subcontractor_hold_join h ON i.Invoice_Id = h.Invoice_Id LEFT JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id WHERE a.PMC_No = %s AND a.Contractor_Id = %s AND (ht.hold_type_id IS NULL OR ht.hold_type_id IN ({placeholders})) ORDER BY i.Invoice_Date, i.Invoice_No """ params = [pmc_no, pmc_info["Contractor_Id"]] + hold_type_ids else: query = """ SELECT DISTINCT i.PMC_No, v.Village_Name, i.Work_Type, i.Invoice_Details, i.Invoice_Date, i.Invoice_No, i.Basic_Amount, i.Debit_Amount, i.After_Debit_Amount, i.Amount, i.GST_Amount, i.TDS_Amount, i.SD_Amount, i.On_Commission, i.Hydro_Testing, i.GST_SD_Amount, i.Final_Amount FROM invoice i LEFT JOIN villages v ON i.Village_Id = v.Village_Id LEFT JOIN assign_subcontractors a ON i.PMC_No = a.PMC_No WHERE a.PMC_No = %s AND a.Contractor_Id = %s ORDER BY i.Invoice_Date, i.Invoice_No """ params = [pmc_no, pmc_info["Contractor_Id"]] cursor.execute(query, params) invoices = cursor.fetchall() if hold_type_ids: hold_amount_total = sum(row.get('hold_amount', 0) or 0 for row in invoices) # 5. Totals from invoices total_invo_final = sum(row.get('Final_Amount', 0) or 0 for row in invoices) # 6. GST release cursor.execute(""" SELECT pmc_no, invoice_no, basic_amount, final_amount FROM gst_release WHERE pmc_no = %s ORDER BY invoice_no ASC """, (pmc_no,)) gst_rel = cursor.fetchall() # gst_rel = cursor.fetchall() # cursor.callproc('GetGSTReleaseByPMC', [pmc_no]) # # # Fetch results # for result in cursor.stored_results(): # gst_rel = result.fetchall() total_gst_basic = sum(row.get('basic_amount', 0) or 0 for row in gst_rel) total_gst_final = sum(row.get('final_amount', 0) or 0 for row in gst_rel) # Hold Release Amount cursor.execute("""select * from hold_release where pmc_no=%s""", (pmc_no,)) hold_release = cursor.fetchall() print("All Hold Release ", hold_release) # Credit Note cursor.execute("select * from credit_note where pmc_no=%s", (pmc_no,)) credit_note = cursor.fetchall() print(credit_note) # 7. Payments cursor.execute(""" SELECT pmc_no, invoice_no, Payment_Amount, TDS_Payment_Amount, Total_amount, utr FROM payment WHERE pmc_no = %s ORDER BY invoice_no ASC """, (pmc_no,)) payments = cursor.fetchall() # cursor.callproc('GetPaymentByPMC', [pmc_no]) # # for result in cursor.stored_results(): # payments = result.fetchall() total_pay_amount = sum(row.get('Payment_Amount', 0) or 0 for row in payments) total_pay_total = sum(row.get('Total_amount', 0) or 0 for row in payments) # 8. Final totals dictionary totals = { "sum_invo_basic_amt": sum(row.get('Basic_Amount', 0) or 0 for row in invoices), "sum_invo_debit_amt": sum(row.get('Debit_Amount', 0) or 0 for row in invoices), "sum_invo_after_debit_amt": sum(row.get('After_Debit_Amount', 0) or 0 for row in invoices), "sum_invo_amt": sum(row.get('Amount', 0) or 0 for row in invoices), "sum_invo_gst_amt": sum(row.get('GST_Amount', 0) or 0 for row in invoices), "sum_invo_tds_amt": sum(row.get('TDS_Amount', 0) or 0 for row in invoices), "sum_invo_ds_amt": sum(row.get('SD_Amount', 0) or 0 for row in invoices), "sum_invo_on_commission": sum(row.get('On_Commission', 0) or 0 for row in invoices), "sum_invo_hydro_test": sum(row.get('Hydro_Testing', 0) or 0 for row in invoices), "sum_invo_gst_sd_amt": sum(row.get('GST_SD_Amount', 0) or 0 for row in invoices), "sum_invo_final_amt": total_invo_final, "sum_invo_hold_amt": hold_amount_total, "sum_gst_basic_amt": total_gst_basic, "sum_gst_final_amt": total_gst_final, "sum_pay_payment_amt": total_pay_amount, "sum_pay_tds_payment_amt": sum(row.get('TDS_Payment_Amount', 0) or 0 for row in payments), "sum_pay_total_amt": total_pay_total } except Exception as e: print(f"Error fetching PMC report: {e}") return "An error occurred while fetching PMC report", 500 finally: cursor.close() connection.close() return render_template( 'pmc_report.html', info=pmc_info, invoices=invoices, hold_types=hold_types, gst_rel=gst_rel, payments=payments, credit_note=credit_note, hold_release=hold_release, total=totals ) # # Download report by PMC No # @app.route('/download_pmc_report/') # def download_pmc_report(pmc_no): # connection = config.get_db_connection() # output_folder = "static/download" # output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx") # if not os.path.exists(output_folder): # os.makedirs(output_folder) # cursor = connection.cursor(dictionary=True) # try: # # # Fetch Contractor Details using PMC No # # cursor.execute(""" # # SELECT DISTINCT s.Contractor_Id, s.Contractor_Name, st.State_Name, d.District_Name, b.Block_Name, # # s.Mobile_No, s.GST_Registration_Type, s.GST_No, s.PAN_No, s.Email, s.Address # # FROM subcontractors s # # LEFT JOIN assign_subcontractors asg ON s.Contractor_Id = asg.Contractor_Id # # LEFT JOIN villages v ON asg.Village_Id = v.Village_Id # # LEFT JOIN blocks b ON v.Block_Id = b.Block_Id # # LEFT JOIN districts d ON b.District_id = d.District_id # # LEFT JOIN states st ON d.State_Id = st.State_Id # # WHERE asg.PMC_No = %s # # """, (pmc_no,)) # # contractor_info = cursor.fetchone() # cursor.callproc('GetContractorDetailsByPMC', [pmc_no]) # # Now fetch the result: # for result in cursor.stored_results(): # contractor_info = result.fetchone() # if not contractor_info: # return "No contractor found for this PMC No", 404 # # # Fetch distinct hold types present for the contractor # # cursor.execute(""" # # SELECT DISTINCT ht.hold_type_id, ht.hold_type # # FROM invoice_subcontractor_hold_join h # # JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id # # WHERE h.Contractor_Id = %s # # """, (contractor_info["Contractor_Id"],)) # # hold_types = cursor.fetchall() # cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]]) # for result in cursor.stored_results(): # hold_types = result.fetchall() # hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types} # # # # Fetch Invoices & GST Releases # # cursor.execute(""" # # SELECT DISTINCT i.Invoice_Id, i.PMC_No, v.Village_Name, i.Work_Type, i.Invoice_Details, # # i.Invoice_Date, i.Invoice_No, i.Basic_Amount, i.Debit_Amount, # # i.After_Debit_Amount, i.GST_Amount, i.Amount, i.TDS_Amount, i.SD_Amount, # # i.On_Commission, i.Hydro_Testing, i.GST_SD_Amount, i.Final_Amount, # # g.pmc_no AS gst_pmc_no, g.invoice_no AS gst_invoice_no, # # g.basic_amount AS gst_basic_amount, g.final_amount AS gst_final_amount # # FROM invoice i # # LEFT JOIN assign_subcontractors asg ON i.PMC_No = asg.PMC_No # # LEFT JOIN villages v ON i.Village_Id = v.Village_Id # # LEFT JOIN gst_release g ON i.PMC_No = g.pmc_no AND i.Invoice_No = g.invoice_no # # WHERE asg.PMC_No = %s # # ORDER BY i.Invoice_Date, i.Invoice_No # # """, (pmc_no,)) # # invoices = cursor.fetchall() # cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no]) # for result in cursor.stored_results(): # invoices = result.fetchall() # print("pmc_report invoice data:",invoices) # # cursor.callproc('GetInvoicesAndGSTReleasesByPMC', [pmc_no]) # # for result in cursor.stored_results(): # # invoices = result.fetchall() # # # Fetch Hold Amounts separately # # cursor.execute(""" # # SELECT h.Invoice_Id, ht.hold_type_id, h.hold_amount # # FROM invoice_subcontractor_hold_join h # # JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id # # WHERE h.Contractor_Id = %s # # """, (contractor_info["Contractor_Id"],)) # # hold_amounts = cursor.fetchall() # cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]]) # for result in cursor.stored_results(): # hold_amounts = result.fetchall() # # Create a mapping of invoice_id to hold amounts by type # hold_data = {} # for h in hold_amounts: # hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount'] # # # Fetch all Payments for the PMC number # # cursor.execute(""" # # SELECT pmc_no, invoice_no, Payment_Amount, TDS_Payment_Amount, Total_amount, UTR # # FROM payment # # WHERE pmc_no = %s # # ORDER BY invoice_no # # """, (pmc_no,)) # # all_payments = cursor.fetchall() # cursor.callproc('GetAllPaymentsByPMC', [pmc_no]) # for result in cursor.stored_results(): # all_payments = result.fetchall() # # Organize payments by Invoice No (both regular and GST release notes) # payments_map = {} # extra_payments = [] # for pay in all_payments: # if pay['invoice_no']: # key = pay['invoice_no'] # if key not in payments_map: # payments_map[key] = [] # payments_map[key].append(pay) # else: # extra_payments.append(pay) # # Create Excel workbook # workbook = openpyxl.Workbook() # sheet = workbook.active # sheet.title = "PMC Report" # # Write Contractor Details # sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD.", "", ""]) # sheet.append( # ["Contractor Name", contractor_info["Contractor_Name"], " ", "GST No", contractor_info["GST_No"], " ", # "GST Type", contractor_info["GST_Registration_Type"]]) # sheet.append(["State", contractor_info["State_Name"], " ", "PAN No", contractor_info["PAN_No"], " ", "Address", # contractor_info["Address"]]) # sheet.append(["District", contractor_info["District_Name"], " ", "Mobile No", contractor_info["Mobile_No"]]) # sheet.append(["Block", contractor_info["Block_Name"], " ", "Email", contractor_info["Email"]]) # sheet.append([]) # # Table Headers - include all hold types as separate columns # base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No", # "Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)", # "SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"] # hold_headers = [ht['hold_type'] for ht in hold_types] # payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"] # sheet.append(base_headers + hold_headers + payment_headers) # seen_invoices = set() # seen_gst_notes = set() # processed_payments = set() # # Process invoices # for inv in invoices: # invoice_no = inv["Invoice_No"] # payments = payments_map.get(invoice_no, []) # # Process invoice row with first payment (if exists) # if invoice_no not in seen_invoices: # seen_invoices.add(invoice_no) # first_payment = payments[0] if len(payments) > 0 else None # # Base invoice data # row = [ # pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"], # inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"], # inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"], # inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"] # ] # # Add hold amounts for each hold type # invoice_holds = hold_data.get(inv["Invoice_Id"], {}) # for ht_id in hold_type_map.keys(): # row.append(invoice_holds.get(ht_id, "")) # # Add payment information # row += [ # inv["Final_Amount"], # first_payment["Payment_Amount"] if first_payment else "", # first_payment["TDS_Payment_Amount"] if first_payment else "", # first_payment["Total_amount"] if first_payment else "", # first_payment["UTR"] if first_payment else "" # ] # sheet.append(row) # if first_payment: # payment_id = f"{invoice_no}-{first_payment['Payment_Amount']}-{first_payment.get('UTR', '')}" # processed_payments.add(payment_id) # # Process GST release if exists (only if we have a matching GST record) # if inv["gst_pmc_no"] and inv["gst_invoice_no"] and inv["gst_invoice_no"] not in seen_gst_notes: # seen_gst_notes.add(inv["gst_invoice_no"]) # # Find the payment that matches this GST release # gst_payment = None # for payment in payments[1:]: # Skip first payment (already used for invoice) # if payment['invoice_no'] == inv["gst_invoice_no"]: # gst_payment = payment # break # # If no payment found in the invoice's payments, check all payments # if not gst_payment: # gst_payments = payments_map.get(inv["gst_invoice_no"], []) # if gst_payments: # gst_payment = gst_payments[0] # # GST release row # row = [ # pmc_no, "", "", "GST Release Note", "", inv["gst_invoice_no"], # inv["gst_basic_amount"], "", "", "", "", "", "", "", "", "" # Empty GST SD Amount # ] # # Empty holds for GST release # row += ["" for _ in hold_headers] # # Add payment information # row += [ # inv["gst_final_amount"], # gst_payment["Payment_Amount"] if gst_payment else "", # gst_payment["TDS_Payment_Amount"] if gst_payment else "", # gst_payment["Total_amount"] if gst_payment else "", # gst_payment["UTR"] if gst_payment else "" # ] # sheet.append(row) # if gst_payment: # payment_id = f"{inv['gst_invoice_no']}-{gst_payment['Payment_Amount']}-{gst_payment.get('UTR', '')}" # processed_payments.add(payment_id) # # Process remaining payments as extra payments # for payment in payments[1:]: # payment_id = f"{payment['invoice_no']}-{payment['Payment_Amount']}-{payment.get('UTR', '')}" # if payment_id not in processed_payments: # row = [ # pmc_no, "", "", "", "", payment['invoice_no'], # "", "", "", "", "", "", "", "", "", "" # Empty GST SD Amount # ] # # Empty holds for extra payments # row += ["" for _ in hold_headers] # # Add payment information # row += [ # "", # payment["Payment_Amount"], # payment["TDS_Payment_Amount"], # payment["Total_amount"], # payment["UTR"] # ] # sheet.append(row) # processed_payments.add(payment_id) # # Process extra payments (null invoice_no) # for payment in extra_payments: # payment_id = f"null-{payment['Payment_Amount']}-{payment.get('UTR', '')}" # if payment_id not in processed_payments: # row = [ # pmc_no, "", "", "", "", "", # "", "", "", "", "", "", "", "", "", "" # Empty GST SD Amount # ] # # Empty holds for null invoice payments # row += ["" for _ in hold_headers] # # Add payment information # row += [ # "", # payment["Payment_Amount"], # payment["TDS_Payment_Amount"], # payment["Total_amount"], # payment["UTR"] # ] # sheet.append(row) # processed_payments.add(payment_id) # # Calculate totals # total_basic_amount = 0 # total_tds_amount = 0 # total_sd_amount = 0 # total_on_commission = 0 # total_hold_amount = 0 # total_final_amount = 0 # total_payment_amount = 0 # total_tds_payment_amount = 0 # total_total_paid = 0 # for row in sheet.iter_rows(min_row=2, max_row=sheet.max_row, values_only=True): # try: # total_basic_amount += float(row[6] or 0) # Basic_Amount # total_tds_amount += float(row[11] or 0) # TDS_Amount # total_sd_amount += float(row[12] or 0) # SD_Amount # total_on_commission += float(row[13] or 0) # On_Commission # total_final_amount += float(row[-5] or 0) # Final_Amount # total_payment_amount += float(row[-4] or 0) # Payment_Amount # total_tds_payment_amount += float(row[-3] or 0) # TDS_Payment # total_total_paid += float(row[-2] or 0) # Total_Paid # # Sum of hold amounts # hold_start_col = len(base_headers) # hold_end_col = hold_start_col + len(hold_headers) # total_hold_amount += sum(float(row[i] or 0) for i in range(hold_start_col, hold_end_col)) # except (ValueError, IndexError, TypeError): # continue # # Append totals row # totals_row = [ # "TOTAL", "", "", "", "", "", # total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount, # total_on_commission, "", "", # Empty GST SD Amount # ] # # Add hold totals # totals_row += [total_hold_amount] + [""] * (len(hold_headers) - 1) # # Add payment totals # totals_row += [ # total_final_amount, # total_payment_amount, # total_tds_payment_amount, # total_total_paid, # "" # UTR column remains empty # ] # sheet.append([]) # sheet.append(totals_row) # # Make totals row bold # for cell in sheet[sheet.max_row]: # cell.font = Font(bold=True) # # Save Excel file # workbook.save(output_file) # workbook.close() # finally: # cursor.close() # connection.close() # return send_from_directory(output_folder, f"PMC_Report_{pmc_no}.xlsx", as_attachment=True) # @app.route('/download_pmc_report/') # def download_pmc_report(pmc_no): # connection = config.get_db_connection() # output_folder = "static/download" # output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx") # # if not os.path.exists(output_folder): # os.makedirs(output_folder) # # cursor = connection.cursor(dictionary=True) # # try: # cursor.callproc('GetContractorDetailsByPMC', [pmc_no]) # # for result in cursor.stored_results(): # contractor_info = result.fetchone() # # if not contractor_info: # return "No contractor found for this PMC No", 404 # # cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]]) # # for result in cursor.stored_results(): # hold_types = result.fetchall() # # hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types} # # cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no]) # # for result in cursor.stored_results(): # invoices = result.fetchall() # total_tds=Decimal('0.00') # final_amount=Decimal('0.00') # # total_hold_amount=Decimal('0.00') # for data in invoices: # total_tds=total_tds+data.get('TDS_Amount',Decimal('0.00')) # final_amount=final_amount+data.get('Final_Amount',Decimal('0.00')) # # cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]]) # # for result in cursor.stored_results(): # hold_amounts = result.fetchall() # # hold_data = {} # for h in hold_amounts: # hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount'] # # cursor.callproc('GetAllPaymentsByPMC', [pmc_no]) # # for result in cursor.stored_results(): # all_payments = result.fetchall() # total_amount=Decimal('0.00') # for d in all_payments: # total_amount=total_amount+ d.get('Total_Amount',Decimal('0.00')) # total_amount_paid= final_amount- total_amount; # payments_map = {} # extra_payments = [] # for pay in all_payments: # if pay['invoice_no']: # key = pay['invoice_no'] # if key not in payments_map: # payments_map[key] = [] # payments_map[key].append(pay) # else: # extra_payments.append(pay) # # workbook = openpyxl.Workbook() # sheet = workbook.active # sheet.title = "PMC Report" # # # Write Contractor Details # sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD.", "", ""]) # sheet.append( # ["Contractor Name", contractor_info["Contractor_Name"], " ", "GST No", contractor_info["GST_No"], " ", # "GST Type", contractor_info["GST_Registration_Type"]]) # sheet.append(["State", contractor_info["State_Name"], " ", "PAN No", contractor_info["PAN_No"], " ", "Address", # contractor_info["Address"]]) # sheet.append(["District", contractor_info["District_Name"], " ", "Mobile No", contractor_info["Mobile_No"]]) # sheet.append(["Block", contractor_info["Block_Name"], " ", "Email", contractor_info["Email"]]) # sheet.append([]) # # # Table Headers - include all hold types as separate columns # base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No", # "Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)", # "SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"] # # hold_headers = [ht['hold_type'] for ht in hold_types] # # payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"] # # sheet.append(base_headers + hold_headers + payment_headers) # # seen_invoices = set() # seen_gst_notes = set() # processed_payments = set() # # # Process invoices # for inv in invoices: # invoice_no = inv["Invoice_No"] # payments = payments_map.get(invoice_no, []) # # # Process invoice row with first payment (if exists) # if invoice_no not in seen_invoices: # seen_invoices.add(invoice_no) # first_payment = payments[0] if len(payments) > 0 else None # # # Base invoice data # row = [ # pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"], # inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"], # inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"], # inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"] # ] # # # Add hold amounts for each hold type # invoice_holds = hold_data.get(inv["Invoice_Id"], {}) # for ht_id in hold_type_map.keys(): # row.append(invoice_holds.get(ht_id, "")) # # # Add payment information # row += [ # inv["Final_Amount"], # first_payment["Payment_Amount"] if first_payment else "", # first_payment["TDS_Payment_Amount"] if first_payment else "", # first_payment["Total_amount"] if first_payment else "", # first_payment["UTR"] if first_payment else "" # ] # # sheet.append(row) # # if first_payment: # payment_id = f"{invoice_no}-{first_payment['Payment_Amount']}-{first_payment.get('UTR', '')}" # processed_payments.add(payment_id) # # # Process GST release if exists (only if we have a matching GST record) # if inv["gst_pmc_no"] and inv["gst_invoice_no"] and inv["gst_invoice_no"] not in seen_gst_notes: # seen_gst_notes.add(inv["gst_invoice_no"]) # # # Find the payment that matches this GST release # gst_payment = None # for payment in payments[1:]: # Skip first payment (already used for invoice) # if payment['invoice_no'] == inv["gst_invoice_no"]: # gst_payment = payment # break # # # If no payment found in the invoice's payments, check all payments # if not gst_payment: # gst_payments = payments_map.get(inv["gst_invoice_no"], []) # if gst_payments: # gst_payment = gst_payments[0] # # # GST release row (this will be in the same row, after the invoice information) # gst_row = [ # pmc_no, "", "", "GST Release Note", "", inv["gst_invoice_no"], # inv["gst_basic_amount"], "", "", "", "", "", "", "", "", "" # Empty GST SD Amount # ] # # # Empty holds for GST release # gst_row += ["" for _ in hold_headers] # # # Add GST payment information (same columns as invoice payment information) # gst_row += [ # inv["gst_final_amount"], # gst_payment["Payment_Amount"] if gst_payment else "", # gst_payment["TDS_Payment_Amount"] if gst_payment else "", # gst_payment["Total_amount"] if gst_payment else "", # gst_payment["UTR"] if gst_payment else "" # ] # # sheet.append(gst_row) # # if gst_payment: # payment_id = f"{inv['gst_invoice_no']}-{gst_payment['Payment_Amount']}-{gst_payment.get('UTR', '')}" # processed_payments.add(payment_id) # # # Process remaining payments as extra payments (if any) # for payment in payments[1:]: # payment_id = f"{payment['invoice_no']}-{payment['Payment_Amount']}-{payment.get('UTR', '')}" # if payment_id not in processed_payments: # row = [ # pmc_no, "", "", "", "", payment['invoice_no'], # "", "", "", "", "", "", "", "", "", "" # Empty GST SD Amount # ] # # # Empty holds for extra payments # row += ["" for _ in hold_headers] # # # Add payment information # row += [ # "", # payment["Payment_Amount"], # payment["TDS_Payment_Amount"], # payment["Total_amount"], # payment["UTR"] # ] # # sheet.append(row) # processed_payments.add(payment_id) # # # Process extra payments (null invoice_no) # for payment in extra_payments: # payment_id = f"null-{payment['Payment_Amount']}-{payment.get('UTR', '')}" # if payment_id not in processed_payments: # row = [ # pmc_no, "", "", "", "", "", # "", "", "", "", "", "", "", "", "", "" # Empty GST SD Amount # ] # # # Empty holds for null invoice payments # row += ["" for _ in hold_headers] # # # Add payment information # row += [ # "", # payment["Payment_Amount"], # payment["TDS_Payment_Amount"], # payment["Total_amount"], # payment["UTR"] # ] # # sheet.append(row) # processed_payments.add(payment_id) # # # Calculate totals # total_basic_amount = 0 # total_tds_amount = 0 # total_sd_amount = 0 # total_on_commission = 0 # total_hold_amount = 0 # total_final_amount = 0 # total_payment_amount = 0 # total_tds_payment_amount = 0 # total_total_paid = 0 # # for row in sheet.iter_rows(min_row=2, max_row=sheet.max_row, values_only=True): # try: # total_basic_amount += float(row[6] or 0) # Basic_Amount # total_tds_amount += float(row[11] or 0) # TDS_Amount # total_sd_amount += float(row[12] or 0) # SD_Amount # total_on_commission += float(row[13] or 0) # On_Commission # total_final_amount += float(row[-5] or 0) # Final_Amount # total_payment_amount += float(row[-4] or 0) # Payment_Amount # total_tds_payment_amount += float(row[-3] or 0) # TDS_Payment # total_total_paid += float(row[-2] or 0) # Total_Paid # # # Sum of hold amounts # hold_start_col = len(base_headers) # hold_end_col = hold_start_col + len(hold_headers) # total_hold_amount += sum(float(row[i] or 0) for i in range(hold_start_col, hold_end_col)) # except (ValueError, IndexError, TypeError): # continue # # # Append totals row # totals_row = [ # "TOTAL", "", "", "", "", "", # total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount, # total_on_commission, "", "", # Empty GST SD Amount # ] # if hold_headers: # totals_row += [total_hold_amount] + [""] * (len(hold_headers) - 1) # # # Add payment totals # totals_row += [ # total_final_amount, # total_payment_amount, # total_tds_payment_amount, # total_total_paid, # "" # UTR column remains empty # ] # # sheet.append([]) # sheet.append(totals_row) # #new code added for small chart---summary # total_hold_amount=Decimal('0.00') # for d in invoices: # total_hold_amount = total_hold_amount + d.get('SD_Amount', Decimal('0.00')) + d.get('On_Commission', # Decimal( # '0.00')) + d.get( # 'Hydro_Testing', Decimal('0.00')) # for data in hold_amounts: # total_hold_amount = total_hold_amount + data.get('hold_amount', Decimal('0.00')) # print("Total Hold Amount after adding the hold amount ", total_hold_amount) # # # Add payment information # # Get today's date # today_date = datetime.today().strftime('%A,%Y-%m-%d') # # Add headers (optional) # sheet.append(["Contractor Name", contractor_info["Contractor_Name"]]) # sheet.append(["Date", today_date]) # sheet.append(["Description", "Amount"]) # # Add your values # sheet.append(["Advance/Surplus", str(total_final_amount-total_payment_amount)]) # sheet.append(["Total Hold Amount", str(total_hold_amount)]) # sheet.append(["Amount With TDS", str(total_tds_payment_amount)]) # # new coded ended here for summary chart # # Make totals row bold # for cell in sheet[sheet.max_row]: # cell.font = Font(bold=True) # # # Save Excel file # workbook.save(output_file) # workbook.close() # # finally: # cursor.close() # connection.close() # # return send_from_directory(output_folder, f"PMC_Report_{pmc_no}.xlsx", as_attachment=True) # @app.route('/download_pmc_report/') # def download_pmc_report(pmc_no): # connection = config.get_db_connection() # output_folder = "static/download" # output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx") # # if not os.path.exists(output_folder): # os.makedirs(output_folder) # # cursor = connection.cursor(dictionary=True) # # try: # cursor.callproc('GetContractorDetailsByPMC', [pmc_no]) # contractor_info = next(cursor.stored_results()).fetchone() # # if not contractor_info: # return "No contractor found for this PMC No", 404 # # cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]]) # hold_types = next(cursor.stored_results()).fetchall() # hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types} # # cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no]) # invoices = next(cursor.stored_results()).fetchall() # # cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]]) # hold_amounts = next(cursor.stored_results()).fetchall() # hold_data = {} # for h in hold_amounts: # hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount'] # # cursor.callproc('GetAllPaymentsByPMC', [pmc_no]) # all_payments = next(cursor.stored_results()).fetchall() # # payments_map = {} # extra_payments = [] # for pay in all_payments: # if pay['invoice_no']: # payments_map.setdefault(pay['invoice_no'], []).append(pay) # else: # extra_payments.append(pay) # # workbook = openpyxl.Workbook() # sheet = workbook.active # sheet.title = "PMC Report" # # # Write contractor header # sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD."]) # sheet.append(["Contractor Name", contractor_info["Contractor_Name"], "", "GST No", contractor_info["GST_No"], "", "GST Type", contractor_info["GST_Registration_Type"]]) # sheet.append(["State", contractor_info["State_Name"], "", "PAN No", contractor_info["PAN_No"], "", "Address", contractor_info["Address"]]) # sheet.append(["District", contractor_info["District_Name"], "", "Mobile No", contractor_info["Mobile_No"]]) # sheet.append(["Block", contractor_info["Block_Name"], "", "Email", contractor_info["Email"]]) # sheet.append([]) # # base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No", # "Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)", # "SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"] # # hold_headers = [ht['hold_type'] for ht in hold_types] # payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"] # sheet.append(base_headers + hold_headers + payment_headers) # # Style the headers # header_fill=PatternFill(start_color="ADD8E6",end_color="ADD8E6",fill_type="solid") # header_font=Font(bold=True) # for cell in sheet[sheet.max_row]: # cell.font=header_font # cell.fill=header_fill # # seen_invoices = set() # seen_gst_notes = set() # processed_payments = set() # # for inv in invoices: # invoice_no = inv["Invoice_No"] # payments = payments_map.get(invoice_no, []) # # if invoice_no not in seen_invoices: # seen_invoices.add(invoice_no) # first_payment = payments[0] if payments else None # # row = [ # pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"], # inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"], # inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"], # inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"] # ] # # invoice_holds = hold_data.get(inv["Invoice_Id"], {}) # for ht_id in hold_type_map.keys(): # row.append(invoice_holds.get(ht_id, "")) # # row += [ # inv["Final_Amount"], # first_payment["Payment_Amount"] if first_payment else "", # first_payment["TDS_Payment_Amount"] if first_payment else "", # first_payment["Total_amount"] if first_payment else "", # first_payment["UTR"] if first_payment else "" # ] # # sheet.append(row) # # if first_payment: # processed_payments.add(f"{invoice_no}-{first_payment['Payment_Amount']}-{first_payment.get('UTR', '')}") # # if inv["gst_pmc_no"] and inv["gst_invoice_no"] and inv["gst_invoice_no"] not in seen_gst_notes: # seen_gst_notes.add(inv["gst_invoice_no"]) # gst_payment = None # for payment in payments[1:]: # if payment['invoice_no'] == inv["gst_invoice_no"]: # gst_payment = payment # break # if not gst_payment: # gst_payment = payments_map.get(inv["gst_invoice_no"], [None])[0] # # gst_row = [ # pmc_no, "", "", "GST Release Note", "", inv["gst_invoice_no"], # inv["gst_basic_amount"], "", "", "", "", "", "", "", "", "" # ] # gst_row += ["" for _ in hold_headers] # gst_row += [ # inv["gst_final_amount"], # gst_payment["Payment_Amount"] if gst_payment else "", # gst_payment["TDS_Payment_Amount"] if gst_payment else "", # gst_payment["Total_amount"] if gst_payment else "", # gst_payment["UTR"] if gst_payment else "" # ] # sheet.append(gst_row) # if gst_payment: # processed_payments.add(f"{inv['gst_invoice_no']}-{gst_payment['Payment_Amount']}-{gst_payment.get('UTR', '')}") # # for payment in payments[1:]: # payment_id = f"{payment['invoice_no']}-{payment['Payment_Amount']}-{payment.get('UTR', '')}" # if payment_id not in processed_payments: # row = [pmc_no, "", "", "", "", payment['invoice_no']] + [""] * 10 # row += ["" for _ in hold_headers] # row += [ # "", payment["Payment_Amount"], payment["TDS_Payment_Amount"], # payment["Total_amount"], payment["UTR"] # ] # sheet.append(row) # processed_payments.add(payment_id) # # for payment in extra_payments: # row = [pmc_no, "", "", "", "", ""] + [""] * 10 # row += ["" for _ in hold_headers] # row += [ # "", payment["Payment_Amount"], payment["TDS_Payment_Amount"], # payment["Total_amount"], payment["UTR"] # ] # sheet.append(row) # # # Totals # total_basic_amount = Decimal('0.00') # total_tds_amount = Decimal('0.00') # total_sd_amount = Decimal('0.00') # total_on_commission = Decimal('0.00') # total_final_amount = Decimal('0.00') # total_payment_amount = Decimal('0.00') # total_tds_payment_amount = Decimal('0.00') # total_total_paid = Decimal('0.00') # total_hold_amount_dynamic = Decimal('0.00') # # for row in sheet.iter_rows(min_row=8, max_row=sheet.max_row, values_only=True): # try: # total_basic_amount += Decimal(str(row[6] or 0)) # total_tds_amount += Decimal(str(row[11] or 0)) # total_sd_amount += Decimal(str(row[12] or 0)) # total_on_commission += Decimal(str(row[13] or 0)) # total_final_amount += Decimal(str(row[-5] or 0)) # total_payment_amount += Decimal(str(row[-4] or 0)) # total_tds_payment_amount += Decimal(str(row[-3] or 0)) # total_total_paid += Decimal(str(row[-2] or 0)) # # for i in range(len(base_headers), len(base_headers) + len(hold_headers)): # total_hold_amount_dynamic += Decimal(str(row[i] or 0)) # except: # continue # # totals_row = [ # "TOTAL", "", "", "", "", "", # total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount, # total_on_commission, "", "" # ] # totals_row += [total_hold_amount_dynamic] + [""] * (len(hold_headers) - 1) # totals_row += [ # total_final_amount, # total_payment_amount, # total_tds_payment_amount, # total_total_paid, # "" # ] # # sheet.append([]) # sheet.append(totals_row) # # # Summary # summary_hold = Decimal('0.00') # for d in invoices: # summary_hold += Decimal(str(d.get('SD_Amount', 0.00))) + Decimal(str(d.get('On_Commission', 0.00))) + Decimal(str(d.get('Hydro_Testing', 0.00))) # for h in hold_amounts: # summary_hold += Decimal(str(h.get('hold_amount', 0.00))) # # sheet.append([]) # today = datetime.today().strftime('%A, %Y-%m-%d') # sheet.append(["Contractor Name", contractor_info["Contractor_Name"]]) # sheet.append(["Date", today]) # sheet.append(["Description", "Amount"]) # sheet.append(["Advance/Surplus", str(total_final_amount - total_payment_amount)]) # sheet.append(["Total Hold Amount", str(summary_hold)]) # sheet.append(["Amount With TDS", str(total_payment_amount + total_tds_payment_amount)]) # # for cell in sheet[sheet.max_row]: # cell.font = Font(bold=True) # # workbook.save(output_file) # workbook.close() # # finally: # cursor.close() # connection.close() # # return send_from_directory(output_folder, f"PMC_Report_{pmc_no}.xlsx", as_attachment=True) @app.route('/download_pmc_report/') def download_pmc_report(pmc_no): connection = config.get_db_connection() output_folder = "static/download" output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx") if not os.path.exists(output_folder): os.makedirs(output_folder) cursor = connection.cursor(dictionary=True) try: cursor.callproc('GetContractorDetailsByPMC', [pmc_no]) contractor_info = next(cursor.stored_results()).fetchone() if not contractor_info: return "No contractor found for this PMC No", 404 cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]]) hold_types = next(cursor.stored_results()).fetchall() hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types} cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no]) invoices = next(cursor.stored_results()).fetchall() # Credit Note # Credit Note Fetch cursor.execute(""" SELECT PMC_No, Invoice_Details, Basic_Amount, Debit_Amount, After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR, Invoice_No FROM credit_note WHERE Contractor_Id = %s """, (pmc_no,)) credit_notes = cursor.fetchall() # Build map by (PMC_No, Invoice_No) credit_note_map = {} for cn in credit_notes: key = (cn["PMC_No"], cn["Invoice_No"]) # Use correct casing! credit_note_map.setdefault(key, []).append(cn) # Track already appended credit notes appended_credit_keys = set() cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]]) hold_amounts = next(cursor.stored_results()).fetchall() hold_data = {} for h in hold_amounts: hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount'] cursor.callproc('GetAllPaymentsByPMC', [pmc_no]) all_payments = next(cursor.stored_results()).fetchall() payments_map = {} extra_payments = [] for pay in all_payments: if pay['invoice_no']: payments_map.setdefault(pay['invoice_no'], []).append(pay) else: extra_payments.append(pay) # Fetch GST Release data cursor.execute(""" SELECT PMC_No, Invoice_No, Basic_Amount, Final_Amount, Total_Amount, UTR FROM gst_release WHERE PMC_No = %s """, (pmc_no,)) gst_releases = cursor.fetchall() gst_release_map = {} for gr in gst_releases: invoice_nos = [] if gr['Invoice_No']: cleaned = gr['Invoice_No'].replace(' ', '') if '&' in cleaned: invoice_nos = cleaned.split('&') elif ',' in cleaned: invoice_nos = cleaned.split(',') else: invoice_nos = [cleaned] for inv_no in invoice_nos: gst_release_map.setdefault(inv_no, []).append(gr) log_action("Download PMC Report", f"User {current_user.id} Download PMC Report'{ pmc_no}'") workbook = openpyxl.Workbook() sheet = workbook.active sheet.title = "PMC Report" # Write contractor header sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD."]) sheet.append(["Contractor Name", contractor_info["Contractor_Name"], "", "GST No", contractor_info["GST_No"], "", "GST Type", contractor_info["GST_Registration_Type"]]) sheet.append(["State", contractor_info["State_Name"], "", "PAN No", contractor_info["PAN_No"], "", "Address", contractor_info["Address"]]) sheet.append(["District", contractor_info["District_Name"], "", "Mobile No", contractor_info["Mobile_No"]]) sheet.append(["Block", contractor_info["Block_Name"], "", "Email", contractor_info["Email"]]) sheet.append([]) base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No", "Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)", "SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"] hold_headers = [ht['hold_type'] for ht in hold_types] payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"] sheet.append(base_headers + hold_headers + payment_headers) # Style the headers header_fill=PatternFill(start_color="ADD8E6",end_color="ADD8E6",fill_type="solid") header_font=Font(bold=True) for cell in sheet[sheet.max_row]: cell.font=header_font cell.fill=header_fill seen_invoices = set() seen_gst_notes = set() processed_payments = set() for inv in invoices: invoice_no = inv["Invoice_No"] payments = payments_map.get(invoice_no, []) if invoice_no not in seen_invoices: seen_invoices.add(invoice_no) first_payment = payments[0] if payments else None row = [ pmc_no, inv["Village_Name"], inv["Work_Type"], inv["Invoice_Details"], inv["Invoice_Date"], invoice_no, inv["Basic_Amount"], inv["Debit_Amount"], inv["After_Debit_Amount"], inv["GST_Amount"], inv["Amount"], inv["TDS_Amount"], inv["SD_Amount"], inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"] ] invoice_holds = hold_data.get(inv["Invoice_Id"], {}) for ht_id in hold_type_map.keys(): row.append(invoice_holds.get(ht_id, "")) row += [ inv["Final_Amount"], first_payment["Payment_Amount"] if first_payment else "", first_payment["TDS_Payment_Amount"] if first_payment else "", first_payment["Total_amount"] if first_payment else "", first_payment["UTR"] if first_payment else "" ] sheet.append(row) if first_payment: processed_payments.add(f"{invoice_no}-{first_payment['Payment_Amount']}-{first_payment.get('UTR', '')}") # if inv["gst_pmc_no"] and inv["gst_invoice_no"] and inv["gst_invoice_no"] not in seen_gst_notes: # seen_gst_notes.add(inv["gst_invoice_no"]) # gst_payment = None # for payment in payments[1:]: # if payment['invoice_no'] == inv["gst_invoice_no"]: # gst_payment = payment # break # if not gst_payment: # gst_payment = payments_map.get(inv["gst_invoice_no"], [None])[0] # # gst_row = [ # pmc_no, "", "", "GST Release Note", "", inv["gst_invoice_no"], # inv["gst_basic_amount"], "", "", "", "", "", "", "", "", "" # ] # gst_row += ["" for _ in hold_headers] # gst_row += [ # inv["gst_final_amount"], # gst_payment["Payment_Amount"] if gst_payment else "", # gst_payment["TDS_Payment_Amount"] if gst_payment else "", # gst_payment["Total_amount"] if gst_payment else "", # gst_payment["UTR"] if gst_payment else "" # ] # sheet.append(gst_row) # Add GST Release Note(s) for this invoice if any if invoice_no in gst_release_map: for gr in gst_release_map[invoice_no]: gst_row = [ pmc_no, "", "", "GST Release Note", "", gr["Invoice_No"], gr["Basic_Amount"], "", "", "", "", "", "", "", "", "" ] gst_row += ["" for _ in hold_headers] gst_row += [ gr["Final_Amount"], "", "", gr["Total_Amount"], gr["UTR"] if gr["UTR"] else "" ] # ✅ Ensure proper alignment while len(gst_row) < len(base_headers) + len(hold_headers) + len(payment_headers): gst_row.append("") sheet.append(gst_row) # if gst_payment: # processed_payments.add(f"{inv['gst_invoice_no']}-{gst_payment['Payment_Amount']}-{gst_payment.get('UTR', '')}") for payment in payments[1:]: payment_id = f"{payment['invoice_no']}-{payment['Payment_Amount']}-{payment.get('UTR', '')}" if payment_id not in processed_payments: row = [pmc_no, "", "", "", "", payment['invoice_no']] + [""] * 10 row += ["" for _ in hold_headers] row += [ "", payment["Payment_Amount"], payment["TDS_Payment_Amount"], payment["Total_amount"], payment["UTR"] ] sheet.append(row) processed_payments.add(payment_id) for payment in extra_payments: row = [pmc_no, "", "", "", "", ""] + [""] * 10 row += ["" for _ in hold_headers] row += [ "", payment["Payment_Amount"], payment["TDS_Payment_Amount"], payment["Total_amount"], payment["UTR"] ] sheet.append(row) # Credit Note row(s) # Track already appended credit notes appended_credit_keys = set() # While writing invoices key = (pmc_no, invoice_no) if key in credit_note_map and key not in appended_credit_keys: for cn in credit_note_map[key]: credit_row = [ pmc_no, "", "", cn.get("Invoice_Details", "Credit Note"), "", cn.get("Invoice_No", ""), cn.get("Basic_Amount", ""), cn.get("Debit_Amount", ""), cn.get("After_Debit_Amount", ""), cn.get("GST_Amount", ""), cn.get("Amount", ""), "", "", "", "", "" ] credit_row += ["" for _ in hold_headers] credit_row += [ cn.get("Final_Amount", ""), cn.get("Total_Amount", ""), cn.get("UTR", "") ] sheet.append(credit_row) appended_credit_keys.add(key) # Totals total_basic_amount = Decimal('0.00') total_tds_amount = Decimal('0.00') total_sd_amount = Decimal('0.00') total_on_commission = Decimal('0.00') total_final_amount = Decimal('0.00') total_payment_amount = Decimal('0.00') total_tds_payment_amount = Decimal('0.00') total_total_paid = Decimal('0.00') total_hold_amount_dynamic = Decimal('0.00') for row in sheet.iter_rows(min_row=8, max_row=sheet.max_row, values_only=True): try: total_basic_amount += Decimal(str(row[6] or 0)) total_tds_amount += Decimal(str(row[11] or 0)) total_sd_amount += Decimal(str(row[12] or 0)) total_on_commission += Decimal(str(row[13] or 0)) total_final_amount += Decimal(str(row[-5] or 0)) total_payment_amount += Decimal(str(row[-4] or 0)) total_tds_payment_amount += Decimal(str(row[-3] or 0)) total_total_paid += Decimal(str(row[-2] or 0)) for i in range(len(base_headers), len(base_headers) + len(hold_headers)): total_hold_amount_dynamic += Decimal(str(row[i] or 0)) except: continue # totals_row = [ # "TOTAL", "", "", "", "", "", # total_basic_amount, "", "", "", "", total_tds_amount, total_sd_amount, # total_on_commission, "", "" # ] # if total_hold_amount_dynamic: # totals_row += [total_hold_amount_dynamic] + [""] * (len(hold_headers) - 1) # totals_row += [ # total_final_amount, # total_payment_amount, # total_tds_payment_amount, # total_total_paid, # "" # ] # Prepare empty totals_row with length of base_headers totals_row = [""] * len(base_headers) # Fill in specific columns totals_row[0] = "TOTAL" # Column 0: Label totals_row[6] = total_basic_amount # Column 6: Basic Amount totals_row[11] = total_tds_amount # Column 11: TDS totals_row[12] = total_sd_amount # Column 12: SD totals_row[13] = total_on_commission # Column 13: On Commission # Add hold header totals hold_values = ["" for _ in hold_headers] if total_hold_amount_dynamic: hold_values[0] = total_hold_amount_dynamic # Only in first column totals_row += hold_values # Add payment section totals_row += [ total_final_amount, total_payment_amount, total_tds_payment_amount, total_total_paid, "" # UTR ] sheet.append([]) sheet.append(totals_row) # Summary # summary_hold = Decimal('0.00') # for d in invoices: # summary_hold += Decimal(str(d.get('SD_Amount', 0))) + Decimal(str(d.get('On_Commission', 0))) + Decimal(str(d.get('Hydro_Testing', 0))) # for h in hold_amounts: # summary_hold += Decimal(str(h.get('hold_amount', 0))) sheet.append([]) today = datetime.today().strftime('%A, %Y-%m-%d') sheet.append(["Contractor Name", contractor_info["Contractor_Name"]]) sheet.append(["Date", today]) sheet.append(["Description", "Amount"]) sheet.append(["Advance/Surplus", str(total_final_amount - total_total_paid)]) sheet.append(["Total Hold Amount", str(total_hold_amount_dynamic)]) sheet.append(["Amount With TDS", str(total_tds_amount)]) for cell in sheet[sheet.max_row]: cell.font = Font(bold=True) workbook.save(output_file) workbook.close() finally: cursor.close() connection.close() return send_from_directory(output_folder, f"PMC_Report_{pmc_no}.xlsx", as_attachment=True) # --------- Hold Types Controller -------------------------------------------- # Route to Add a New Hold Type @app.route('/add_hold_type', methods=['POST', 'GET']) def add_hold_type(): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) try: # Fetch all hold types using the stored procedure cursor.callproc("GetAllHoldTypes") hold_types = [] for hold in cursor.stored_results(): hold_types = hold.fetchall() if request.method == 'POST': hold_type = request.form.get('hold_type', '').strip() # Validation: Must start with a letter if not hold_type or not hold_type[0].isalpha(): return jsonify({"status": "error", "message": "Hold Type must start with a letter."}), 400 # Validation: Check if it already exists (case-insensitive) # cursor.execute("SELECT COUNT(*) AS count FROM hold_types WHERE LOWER(hold_type) = LOWER(%s)", (hold_type,)) # if cursor.fetchone()['count'] > 0: # return jsonify({"status": "error", "message": "This Hold Type already exists."}), 400 # Call the procedure to check if the hold_type exists cursor.callproc('CheckHoldTypeExists', [hold_type]) try: # Insert new hold type into the database # cursor.execute("INSERT INTO hold_types (hold_type) VALUES (%s)", (hold_type,)) # connection.commit() cursor.callproc('SaveHoldType', [hold_type]) connection.commit() return jsonify({"status": "success", "message": "Hold Type added successfully!"}), 201 except mysql.connector.Error as e: connection.rollback() return jsonify({"status": "error", "message": f"Database error: {str(e)}"}), 500 except mysql.connector.Error as e: return jsonify({"status": "error", "message": f"Database error: {str(e)}"}), 500 finally: cursor.close() connection.close() return render_template('add_hold_type.html', Hold_Types_data=hold_types) # Route to Update Hold Type # @app.route('/update_hold_type/', methods=['POST', 'GET']) # def update_hold_type(id): # # GET request: Show the form with the current hold type # if request.method == 'GET': # connection = config.get_db_connection() # cursor = connection.cursor() # # cursor.execute("SELECT * FROM hold_types WHERE hold_type_id = %s", (id,)) # # hold_type = cursor.fetchone() # # cursor.callproc("GetHoldTypesById", (id,)) # for hold in cursor.stored_results(): # hold_type = hold.fetchone() # # cursor.close() # connection.close() # # if not hold_type: # return jsonify({'status': 'error', 'message': 'Hold Type not found.'}), 404 # # return render_template('edit_hold_type.html', hold_type=hold_type) # # # POST request: Update the hold type # if request.method == 'POST': # new_hold_type = request.form.get('hold_type').strip() # # # Validation: Must start with a letter # if not new_hold_type or not new_hold_type[0].isalpha(): # return jsonify(ResponseHandler.invalid_name('Hold Type')), 400 # # connection = config.get_db_connection() # cursor = connection.cursor() # # try: # # Check if the hold type exists before updating # # cursor.execute("SELECT * FROM hold_types WHERE hold_type_id = %s", (id,)) # # hold_type = cursor.fetchone() # cursor.callproc("GetHoldTypesById", (id,)) # for hold in cursor.stored_results(): # hold_type = hold.fetchone() # # if not hold_type: # return jsonify({'status': 'error', 'message': 'Hold Type not found.'}), 404 # # # Update the hold type # # cursor.execute("UPDATE hold_types SET hold_type = %s WHERE hold_type_id = %s", (new_hold_type, id)) # cursor.callproc("UpdateHoldTypeById", (id,new_hold_type)) # connection.commit() # return jsonify(ResponseHandler.update_success('Hold Type')) # # except mysql.connector.Error as e: # connection.rollback() # return jsonify(ResponseHandler.update_failure('Hold Type')), 500 # finally: # cursor.close() # connection.close() @app.route('/update_hold_type/', methods=['GET', 'POST']) def update_hold_type(id): connection = config.get_db_connection() cursor = connection.cursor() if request.method == 'GET': cursor.callproc("GetHoldTypesById", (id,)) for hold in cursor.stored_results(): hold_type = hold.fetchone() cursor.close() connection.close() if not hold_type: flash('Hold Type not found.', 'error') return redirect(url_for('add_hold_type')) return render_template('edit_hold_type.html', hold_type=hold_type) elif request.method == 'POST': new_hold_type = request.form.get('hold_type', '').strip() if not new_hold_type or not new_hold_type[0].isalpha(): flash('Invalid hold type name. Must start with a letter.', 'error') return redirect(url_for('add_hold_type')) try: cursor.callproc("GetHoldTypesById", (id,)) for h in cursor.stored_results(): hold_type = h.fetchone() if not hold_type: flash('Hold Type not found.', 'error') return redirect(url_for('add_hold_type')) cursor.callproc("UpdateHoldTypeById", (id, new_hold_type)) connection.commit() flash('Hold Type updated successfully!', 'success') except mysql.connector.Error as e: connection.rollback() flash('Failed to update Hold Type.', 'error') finally: cursor.close() connection.close() return redirect(url_for('add_hold_type')) # Route to Delete Hold Type @app.route('/delete_hold_type/', methods=['POST']) def delete_hold_type(id): connection = config.get_db_connection() cursor = connection.cursor() try: # cursor.execute("SELECT * FROM hold_types WHERE hold_type_id = %s", (id,)) # hold_type = cursor.fetchone() cursor.callproc("GetHoldTypesById", (id,)) for hold in cursor.stored_results(): hold_type = hold.fetchone() log_action("Delete hold type", f"User {current_user.id} Delete hold type'{ hold_type}'") if not hold_type: return jsonify({'status': 'error', 'message': 'Hold Type not found.'}), 404 # Proceed with deletion # cursor.execute("DELETE FROM hold_types WHERE hold_type_id = %s", (id,)) cursor.callproc("DeleteHoldType", (id,)) connection.commit() return jsonify(ResponseHandler.delete_success('Hold Type')) except mysql.connector.Error as e: return jsonify(ResponseHandler.delete_failure('Hold Type')), 500 finally: cursor.close() connection.close() @app.route('/unreleased_gst') def unreleased_gst(): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) try: # Step 1: Fetch invoices cursor.execute(""" SELECT i.PMC_No, i.Invoice_No, i.GST_SD_Amount, i.Invoice_Details, i.Basic_Amount, i.Final_Amount FROM `invoice` i """) invoices = cursor.fetchall() # Step 2: Fetch GST releases cursor.execute(""" SELECT Invoice_No, Basic_Amount ,Final_Amount FROM gst_release """) gst_releases = cursor.fetchall() # Step 3: Lookup sets gst_invoice_nos = {g['Invoice_No'] for g in gst_releases if g['Invoice_No']} gst_basic_amounts = {float(g['Basic_Amount']) for g in gst_releases if g['Basic_Amount'] is not None} # Step 4: Filter unreleased = [] for inv in invoices: match_by_invoice = inv['Invoice_No'] in gst_invoice_nos match_by_gst_amount = float(inv.get('GST_SD_Amount') or 0.0) in gst_basic_amounts if not (match_by_invoice or match_by_gst_amount): unreleased.append(inv) return render_template("unreleased_gst.html", data=unreleased) finally: cursor.close() connection.close() # -- end hold types controlller -------------------- # -- end hold types controlller -------------------- # Route to display the HTML form @app.route('/add_work_order', methods=['GET']) def add_work_order(): # Add database connection connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute("SELECT Contractor_id, Contractor_Name FROM subcontractors") # Adjust table/column names as needed subcontractor = cursor.fetchall() cursor.close() connection.close() return render_template('add_work_order.html', subcontractor=subcontractor) # This is your HTML form page # Route to handle form submission (from action="/submit_work_order") @app.route('/submit_work_order', methods=['POST', 'GET']) def submit_work_order(): vendor_name = request.form['vendor_name'] work_order_type = request.form['work_order_type'] work_order_amount = request.form['work_order_amount'] boq_amount = request.form['boq_amount'] work_done_percentage = request.form['work_done_percentage'] work_order_number = request.form['work_order_number'] gst_amount = request.form['gst_amount'] tds_amount = request.form['tds_amount'] security_deposite = request.form['security_deposite'] sd_against_gst = request.form['sd_against_gst'] final_total = request.form['final_total'] tds_of_gst = request.form['tds_of_gst'] log_action("Submit Work Order", f"User {current_user.id} Submit Work Order'{ work_order_type}'") # print("Good Morning How are U") connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) # print("Good morning and how are you") insert_query = """ INSERT INTO work_order (vendor_name, work_order_type, work_order_amount, boq_amount, work_done_percentage,work_order_number,gst_amount,tds_amount ,security_deposit,sd_against_gst,final_total,tds_of_gst) VALUES (%s, %s, %s, %s, %s,%s,%s,%s,%s,%s,%s,%s) """ cursor.execute(insert_query, (vendor_name, work_order_type, work_order_amount, boq_amount, work_done_percentage, work_order_number , gst_amount, tds_amount, security_deposite, sd_against_gst, final_total, tds_of_gst)) connection.commit() # ✅ Fetch all data after insert select_query = "SELECT * FROM work_order" cursor.execute(select_query) wo = cursor.fetchall() # print("The Work order data is ",wo) print("The data from work order ", wo) # should now print the data properly cursor.execute("SELECT Contractor_id, Contractor_Name FROM subcontractors") # Adjust table/column names as needed subcontractor = cursor.fetchall() cursor.close() connection.close() return render_template('add_work_order.html', work_order_type=work_order_type, wo=wo, subcontractor=subcontractor) @app.route('/delete_work_order/', methods=['POST']) def delete_work_order(id): connection = config.get_db_connection() cursor = connection.cursor() cursor.execute("DELETE FROM work_order WHERE work_order_id = %s", (id,)) connection.commit() cursor.close() connection.close() log_action("delete Work Order", f"User {current_user.id} delete Work Order'{ id}'") return jsonify({'success': True}) @app.route('/update_work_order/', methods=['GET', 'POST']) def update_work_order(id): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) if request.method == 'POST': work_order_type = request.form['work_order_type'] work_order_amount = request.form['work_order_amount'] boq_amount = request.form['boq_amount'] work_done_percentage = request.form['work_done_percentage'] work_order_number = request.form['work_order_number'] gst_amount = request.form['gst_amount'] tds_amount = request.form['tds_amount'] security_deposite = request.form['security_deposite'] sd_against_gst = request.form['sd_against_gst'] final_amount = request.form['final_amount'] tds_of_gst = request.form['tds_of_gst'] update_query = """ UPDATE work_order SET work_order_type = %s, work_order_amount = %s, boq_amount = %s, work_done_percentage = %s, work_order_number= %s, gst_amount = %s, tds_amount= %s, security_deposite= %s, sd_against_gst=%s, final_amount= %s, tds_of_gst=%s WHERE work_order_id = %s """ cursor.execute(update_query, ( work_order_type, work_order_amount, boq_amount, work_done_percentage, work_order_number, gst_amount, tds_amount, security_deposite, sd_against_gst, final_amount, tds_of_gst, id)) connection.commit() cursor.close() connection.close() # If GET request: fetch the existing record cursor.execute("SELECT * FROM work_order WHERE work_order_id = %s", (id,)) work_order = cursor.fetchone() cursor.close() connection.close() return render_template('update_work_order.html', work_order=work_order) # Optional: Route to show a success message @app.route('/success') def success(): return "Work Order Submitted Successfully!" import logging logging.basicConfig(level=logging.DEBUG) @app.route('/add_purchase_order', methods=['GET', 'POST']) def add_purchase_order(): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) if request.method == 'POST': # Fetch form fields purchase_date = request.form.get('purchase_date') supplier_name = request.form.get('supplier_name') purchase_order_no = request.form.get('purchase_order_no') item_name = request.form.get('item_name') quantity = request.form.get('quantity') unit = request.form.get('unit') rate = request.form.get('rate') amount = request.form.get('amount') GST_Amount = request.form.get('GST_Amount') TDS = request.form.get('TDS') final_amount = request.form.get('final_amount') log_action("Add purchase order", f"User {current_user.id} Added puirchase Order'{ purchase_order_no}'") # Insert into database insert_query = """ INSERT INTO purchase_order ( purchase_date, supplier_name, purchase_order_no, item_name, quantity, unit, rate, amount, GST_Amount, TDS, final_amount ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ cursor.execute(insert_query, ( purchase_date, supplier_name, purchase_order_no, item_name, quantity, unit, rate, amount, GST_Amount, TDS, final_amount )) connection.commit() # ✅ Always fetch updated data cursor.execute("SELECT * FROM purchase_order") purchases = cursor.fetchall() cursor.close() connection.close() return render_template('add_purchase_order.html', purchases=purchases) # Show all purchases @app.route('/purchase_orders') def show_purchase_orders(): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute("SELECT * FROM purchase_order") purchases = cursor.fetchall() cursor.close() connection.close() return render_template('add_purchase_order.html', purchases=purchases) # Delete purchase order @app.route('/delete_purchase/', methods=['POST']) def delete_purchase(id): connection = config.get_db_connection() cursor = connection.cursor() cursor.execute("DELETE FROM purchase_order WHERE purchase_id = %s", (id,)) connection.commit() cursor.close() connection.close() log_action("Delete purchase order", f"User {current_user.id} Deleted puirchase Order'{ id}'") return render_template(('add_purchase_order.html')) # Edit purchase order (form + update logic) @app.route('/update_purchase/', methods=['GET', 'POST']) def update_purchase(id): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) if request.method == 'POST': # ✅ Form submitted - update all fields data = request.form cursor.execute(""" UPDATE purchase_order SET purchase_date = %s, supplier_name = %s, purchase_order_no = %s, item_name = %s, quantity = %s, unit = %s, rate = %s, amount = %s, GST_Amount = %s, TDS = %s, final_amount = %s WHERE purchase_id = %s """, ( data['purchase_date'], data['supplier_name'], data['purchase_order_no'], data['item_name'], data['quantity'], data['unit'], data['rate'], data['amount'], data['GST_Amount'], data['TDS'], data['final_amount'], id )) connection.commit() cursor.close() connection.close() log_action("Delete purchase order", f"User {current_user.id} Deleted puirchase Order'{ id}'") return redirect(url_for('show_purchase_orders')) # Show edit form cursor.execute("SELECT * FROM purchase_order WHERE purchase_id = %s", (id,)) purchase = cursor.fetchone() cursor.close() connection.close() return render_template('edit_purchase.html', purchase=purchase) # SHOW all GRNs + ADD form @app.route('/grn', methods=['GET']) def grn_page(): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) # Fetch purchase orders for dropdown cursor.execute("SELECT purchase_id, supplier_name FROM purchase_order") purchase_orders = cursor.fetchall() # Fetch all GRNs to display cursor.execute("SELECT * FROM goods_receive_note") grns = cursor.fetchall() print(grns) cursor.close() connection.close() # Render the template with both datasets return render_template('grn_form.html', purchase_orders=purchase_orders, grns=grns) # ADD new GRN @app.route('/add_grn', methods=['POST', 'GET']) def add_grn(): data = request.form connection = config.get_db_connection() cursor = connection.cursor() query = """ INSERT INTO goods_receive_note (grn_date, purchase_id, supplier_name, item_description, received_quantity, unit, rate, amount, remarks) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) \ """ cursor.execute(query, ( data.get('grn_date'), data.get('purchase_id'), data.get('supplier_name'), data.get('item_description'), data.get('received_quantity'), data.get('unit'), data.get('rate'), data.get('amount'), data.get('remarks') )) connection.commit() cursor.execute("SELECT * FROM goods_receive_note") grns = cursor.fetchall() print(grns) query = "select * from purchase_order" cursor.execute(query) purchase_orders = cursor.fetchall() cursor.close() connection.close() return render_template('grn_form.html', purchase_orders=purchase_orders, grns=grns) # UPDATE GRN @app.route('/update_grn/', methods=['GET', 'POST']) def update_grn(grn_id): connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) if request.method == 'POST': data = request.form query = """ UPDATE goods_receive_note SET grn_date=%s, purchase_id=%s, supplier_name=%s, item_description=%s, received_quantity=%s, unit=%s, rate=%s, amount=%s, remarks=%s WHERE grn_id=%s """ cursor.execute(query, ( data['grn_date'], data['purchase_id'], data['supplier_name'], data['item_description'], data['received_quantity'], data['unit'], data['rate'], data['amount'], data['remarks'], grn_id )) connection.commit() cursor.close() connection.close() return redirect(url_for('grns')) cursor.execute("SELECT * FROM goods_receive_note WHERE grn_id = %s", (grn_id,)) grn = cursor.fetchone() cursor.close() connection.close() return render_template("edit_grn.html", grn=grn) # DELETE GRN @app.route('/delete_grn/', methods=['POST']) def delete_grn(grn_id): connection = config.get_db_connection() cursor = connection.cursor() cursor.execute("DELETE FROM goods_receive_note WHERE grn_id = %s", (grn_id,)) connection.commit() cursor.close() connection.close() return render_template("grn_form.html") @app.route('/work_order_report', methods=['GET']) def work_order_report(): return render_template('work_order_report.html') from flask import request, jsonify, send_file import pandas as pd import os import config # make sure your DB connection is imported correctly # ✅ Vendor Name Search (for Select2) @app.route('/get_vendor_names') def get_vendor_names(): query = request.args.get('q', '') connection = config.get_db_connection() cursor = connection.cursor() cursor.execute("SELECT DISTINCT vendor_name FROM work_order WHERE vendor_name LIKE %s", (f"%{query}%",)) vendors = [row[0] for row in cursor.fetchall()] cursor.close() connection.close() return jsonify(vendors) # ✅ Work Order Number Search (with or without vendor) @app.route('/get_work_order_numbers') def get_work_order_numbers(): vendor = request.args.get('vendor_name', '') query = request.args.get('q', '') connection = config.get_db_connection() cursor = connection.cursor() if vendor: cursor.execute( "SELECT DISTINCT work_order_number FROM work_order WHERE vendor_name = %s AND work_order_number LIKE %s", (vendor, f"%{query}%")) else: cursor.execute("SELECT DISTINCT work_order_number FROM work_order WHERE work_order_number LIKE %s", (f"%{query}%",)) orders = [row[0] for row in cursor.fetchall()] cursor.close() connection.close() return jsonify(orders) # ✅ Get Work Order Data (Filtered) @app.route('/get_work_order_data') def get_work_order_data(): vendor = request.args.get('vendor_name') order_number = request.args.get('work_order_number') query = "SELECT * FROM work_order WHERE 1=1" params = [] if vendor: query += " AND vendor_name = %s" params.append(vendor) if order_number: query += " AND work_order_number = %s" params.append(order_number) connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute(query, tuple(params)) data = cursor.fetchall() cursor.close() connection.close() return jsonify(data) @app.route('/download_work_order_report') def download_work_order_report(): vendor_name = request.args.get('vendor_name') work_order_number = request.args.get('work_order_number') if work_order_number == "null": work_order_number = None query = "SELECT * FROM work_order WHERE 1=1" params = [] if vendor_name: query += " AND vendor_name = %s" params.append(vendor_name) if work_order_number: query += " AND work_order_number = %s" params.append(work_order_number) conn = config.get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(query, tuple(params)) data = cursor.fetchall() cursor.close() conn.close() if not data: return "No data found for the selected filters", 404 # Convert to DataFrame df = pd.DataFrame(data) output_path = 'static/downloads/work_order_report.xlsx' os.makedirs(os.path.dirname(output_path), exist_ok=True) df.to_excel(output_path, index=False, startrow=2) # Leave space for heading # Load workbook for styling wb = load_workbook(output_path) ws = wb.active # Add a merged title title = "Work Order Report" ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=len(df.columns)) title_cell = ws.cell(row=1, column=1) title_cell.value = title title_cell.font = Font(size=14, bold=True, color="1F4E78") title_cell.alignment = Alignment(horizontal="center", vertical="center") # Style header row (row 3 because data starts from row 3) header_font = Font(bold=True) for col_num, column_name in enumerate(df.columns, 1): cell = ws.cell(row=3, column=col_num) cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") # Optional: adjust column width max_length = max(len(str(column_name)), 12) ws.column_dimensions[get_column_letter(col_num)].width = max_length + 2 wb.save(output_path) return send_file(output_path, as_attachment=True) @app.route('/purchase_order_report', methods=['GET']) def purchase_order_report(): return render_template('purchase_order_report.html') @app.route('/get_supplier_names') def get_supplier_names(): query = request.args.get('q', '') # Get the search term from Select2 connection = config.get_db_connection() cursor = connection.cursor() # Fetch distinct supplier names that match the search query cursor.execute( "SELECT supplier_name FROM purchase_order WHERE supplier_name LIKE %s", (f"%{query}%",) ) suppliers = [row[0] for row in cursor.fetchall()] cursor.close() connection.close() return jsonify(suppliers) @app.route('/get_purchase_order_numbers') def get_purchase_order_numbers(): supplier = request.args.get('supplier_name', '') query = request.args.get('q', '') connection = config.get_db_connection() cursor = connection.cursor() if supplier: cursor.execute(""" SELECT purchase_order_no FROM purchase_order WHERE supplier_name = %s AND purchase_order_no LIKE %s """, (supplier, f"%{query}%")) else: cursor.execute(""" SELECT purchase_order_no FROM purchase_order WHERE purchase_order_no LIKE %s """, (f"%{query}%",)) orders = [row[0] for row in cursor.fetchall()] cursor.close() connection.close() return jsonify(orders) @app.route('/get_purchase_order_data') def get_purchase_order_data(): supplier = request.args.get('supplier_name') order_no = request.args.get('purchase_order_no') query = "SELECT * FROM purchase_order WHERE 1=1" params = [] if supplier: query += " AND supplier_name = %s" params.append(supplier) if order_no: query += " AND purchase_order_no = %s" params.append(order_no) connection = config.get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute(query, tuple(params)) data = cursor.fetchall() cursor.close() connection.close() return jsonify(data) @app.route('/download_purchase_order_report') def download_purchase_order_report(): supplier_name = request.args.get('supplier_name') purchase_order_no = request.args.get('purchase_order_no') if purchase_order_no == "null": purchase_order_no = None log_action("Download purchase order", f"User {current_user.id} Download puirchase Order'{ purchase_order_no}'") query = "SELECT * FROM purchase_order WHERE 1=1" params = [] if supplier_name: query += " AND supplier_name = %s" params.append(supplier_name) if purchase_order_no: query += " AND purchase_order_no = %s" params.append(purchase_order_no) conn = config.get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(query, tuple(params)) data = cursor.fetchall() cursor.close() conn.close() if not data: return "No data found for the selected filters", 404 # Convert to DataFrame df = pd.DataFrame(data) output_path = 'static/downloads/purchase_order_report.xlsx' os.makedirs(os.path.dirname(output_path), exist_ok=True) df.to_excel(output_path, index=False, startrow=2) # Reserve space for heading # Load workbook for styling wb = load_workbook(output_path) ws = wb.active # Add a merged title title = "Purchase Order Report" ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=len(df.columns)) title_cell = ws.cell(row=1, column=1) title_cell.value = title title_cell.font = Font(size=14, bold=True, color="1F4E78") title_cell.alignment = Alignment(horizontal="center", vertical="center") # Style header row (row 3 because data starts from row 3) header_font = Font(bold=True) for col_num, column_name in enumerate(df.columns, 1): cell = ws.cell(row=3, column=col_num) cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") max_length = max(len(str(column_name)), 12) ws.column_dimensions[get_column_letter(col_num)].width = max_length + 2 wb.save(output_path) return send_file(output_path, as_attachment=True) if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=True)