diff --git a/MAIN2.PY b/MAIN2.PY deleted file mode 100644 index 394fb24..0000000 --- a/MAIN2.PY +++ /dev/null @@ -1,26 +0,0 @@ -from flask import Blueprint, request, jsonify, send_file, current_app, render_template -from datetime import datetime -import ast -import os - -from db import DB # Your existing DB wrapper - - -# ============================================================= -# Base Service (common DB operations) -# ============================================================= -class BaseService: - def __init__(self): - self.db = DB() - - def fetch_all(self, proc, params=None): - return self.db.fetch_all_proc(proc, params) - - def fetch_one(self, proc, params=None): - return self.db.fetch_one_proc(proc, params) - - def execute_proc(self, proc, params=None): - return self.db.exec_proc(proc, params) - - def get_conn(self): - return self.db.get_connection() diff --git a/main1.py b/main1.py deleted file mode 100644 index d8a00ff..0000000 --- a/main1.py +++ /dev/null @@ -1,933 +0,0 @@ -# PART 1 of main.py -# Imports, DB helper, app init, auth/login, logging & small helpers - -from decimal import Decimal -from datetime import datetime -import os -import re -import logging -from logging.handlers import RotatingFileHandler - -from flask import ( - Flask, render_template, request, redirect, url_for, send_from_directory, - flash, jsonify, json, session, current_app -) -from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user -import mysql.connector -from mysql.connector import Error -import config -import openpyxl -import ast -import pandas as pd -from openpyxl.styles import Font -from ldap3 import Server, Connection, ALL, SUBTREE -from ldap3.core.exceptions import LDAPBindError - -# --------------------------- -# App and Login manager init -# --------------------------- -app = Flask(__name__) -app.secret_key = os.environ.get('FLASK_SECRET_KEY', '9f2a1b8c4d6e7f0123456789abcdef01') - -login_manager = LoginManager() -login_manager.init_app(app) -login_manager.login_view = 'login' - -# --------------------------- -# DB helper (dictionary cursor) -# --------------------------- -class DB: - """ - Lightweight DB helper to call stored procedures with dictionary=True cursors. - Usage: - db = DB() - rows = db.fetch_all_proc('GetAllStates') - single = db.fetch_one_proc('GetStateByID', [id]) - db.exec_proc('SaveState', [state_name]) - db.close() - """ - def __init__(self): - self.conn = None - self.cursor = None - try: - self.conn = config.get_db_connection() - except Exception as e: - # Keep None and let callers handle connection absence - self.conn = None - app.logger.exception("DB connection failed: %s", e) - - def _ensure_cursor(self, dict_mode=True): - if not self.conn: - raise mysql.connector.Error("No DB connection") - self.cursor = self.conn.cursor(dictionary=dict_mode) - - def fetch_all_proc(self, proc_name, params=None): - try: - self._ensure_cursor(dict_mode=True) - self.cursor.callproc(proc_name, params or []) - results = [] - for res in self.cursor.stored_results(): - results = res.fetchall() - return results - finally: - self.close_cursor() - - def fetch_one_proc(self, proc_name, params=None): - rows = self.fetch_all_proc(proc_name, params) - return rows[0] if rows else None - - def exec_proc(self, proc_name, params=None): - try: - self._ensure_cursor(dict_mode=True) - self.cursor.callproc(proc_name, params or []) - # advance through any results to avoid unread result issues - for _ in self.cursor.stored_results(): - pass - if self.conn: - self.conn.commit() - finally: - self.close_cursor() - - def close_cursor(self): - try: - if self.cursor: - self.cursor.close() - except Exception: - pass - self.cursor = None - - def close(self): - self.close_cursor() - try: - if self.conn: - self.conn.close() - except Exception: - pass - self.conn = None - -# --------------------------- -# ResponseHandler (centralized messages) -# --------------------------- -class ResponseHandler: - @staticmethod - def invalid_name(entity): - return {'status': 'error', 'message': f'Invalid {entity} name. Only letters and spaces are allowed!'} - - @staticmethod - def already_exists(entity): - return {'status': 'exists', 'message': f'{entity.capitalize()} already exists!'} - - @staticmethod - def add_success(entity): - return {'status': 'success', 'message': f'{entity.capitalize()} added successfully!'} - - @staticmethod - def add_failure(entity): - return {'status': 'error', 'message': f'Failed to add {entity}.'} - - @staticmethod - def is_available(entity): - return {'status': 'available', 'message': f'{entity.capitalize()} name is available!'} - - @staticmethod - def delete_success(entity): - return {'status': 'success', 'message': f'{entity.capitalize()} deleted successfully!'} - - @staticmethod - def delete_failure(entity): - return {'status': 'error', 'message': f'Failed to delete {entity}.'} - - @staticmethod - def update_success(entity): - return {'status': 'success', 'message': f'{entity.capitalize()} updated successfully!'} - - @staticmethod - def update_failure(entity): - return {'status': 'error', 'message': f'Failed to update {entity}.'} - - @staticmethod - def fetch_failure(entity): - return {'status': 'error', 'message': f'Failed to fetch {entity}.'} - -# JSON response helper -def json_response(message_obj, status_code=200): - return jsonify(message_obj), status_code - -# --------------------------- -# Logging helper -# --------------------------- -if not app.debug: - log_file = os.path.join(app.root_path, 'app.log') - handler = RotatingFileHandler(log_file, maxBytes=5*1024*1024, backupCount=2) - handler.setLevel(logging.INFO) - formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]') - handler.setFormatter(formatter) - app.logger.addHandler(handler) - -def log_action(action, details=""): - """Write a user action to activity.log with timestamp and user info.""" - try: - log_file = os.path.join(current_app.root_path, 'activity.log') - except RuntimeError: - # current_app not available (like unit tests). Use app.root_path fallback. - log_file = os.path.join(app.root_path, 'activity.log') - - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - user = "Unknown" - try: - if current_user and hasattr(current_user, "id"): - user = getattr(current_user, "id", "Unknown") - elif session.get('username'): - user = session.get('username') - except Exception: - pass - - try: - with open(log_file, "a", encoding="utf-8") as f: - f.write(f"Timestamp: {timestamp} | User: {user} | Action: {action} | Details: {details}\n") - except Exception: - app.logger.exception("Failed to write activity log.") - -# --------------------------- -# Simple User class for flask-login -# --------------------------- -class User(UserMixin): - def __init__(self, id, cn=None, username=None, sAMAccountName=None): - self.id = id - self.cn = cn - self.username = username - self.sAMAccountName = sAMAccountName - -@login_manager.user_loader -def load_user(user_id): - # Minimal loader: return User object with id. - return User(user_id) - -# --------------------------- -# Constants & simple validators -# --------------------------- -STR_NAME_PATTERN = r"^[A-Za-z\s]+$" - -def is_valid_name(value): - return bool(re.match(STR_NAME_PATTERN, value.strip())) if value else False - -# --------------------------- -# Basic Routes: index, login, logout -# --------------------------- -@app.route('/') -@login_required -def index(): - return render_template('index.html') - -@app.route('/login', methods=['GET', 'POST']) -def login(): - """ - Login supports: - - static admin/admin123 fallback - - LDAP bind if not static - """ - if request.method == 'POST': - username = request.form.get('username', '').strip() - password = request.form.get('password', '') - - # static fallback - if username == 'admin' and password == 'admin123': - session['username'] = username - login_user(User(username)) - log_action('Login', f"Static admin logged in: {username}") - return redirect(url_for('index')) - - ldap_user_dn = f"uid={username},ou=users,dc=lcepl,dc=org" - try: - conn = Connection(Server('ldap://localhost:389', get_info=ALL), user=ldap_user_dn, password=password, auto_bind=True) - session['username'] = username - login_user(User(username)) - log_action('Login', f"LDAP login: {username}") - conn.unbind() - return redirect(url_for('index')) - except LDAPBindError: - flash('Invalid credentials.', 'danger') - except Exception as e: - app.logger.exception("LDAP error during login") - flash(f'LDAP error: {str(e)}', 'danger') - - return render_template('login.html') - -@app.route('/logout') -@login_required -def logout(): - try: - user_id = getattr(current_user, 'id', 'Unknown') - except Exception: - user_id = session.get('username', 'Unknown') - log_action('Logout', f"User {user_id} logged out") - logout_user() - flash('You have been logged out.', 'info') - return redirect(url_for('login')) - -# --------------------------- -# Activity log viewer -# --------------------------- -@app.route('/activity_log', methods=['GET', 'POST']) -@login_required -def activity_log(): - logs = [] - log_file = os.path.join(current_app.root_path, 'activity.log') - if os.path.exists(log_file): - with open(log_file, 'r', encoding='utf-8') as f: - for line in f: - parts = line.strip().split(" | ") - if len(parts) == 4: - logs.append({ - "timestamp": parts[0].replace("Timestamp:", "").strip(), - "user": parts[1].replace("User:", "").strip(), - "action": parts[2].replace("Action:", "").strip(), - "details": parts[3].replace("Details:", "").strip() - }) - - # Filters - start_date = request.values.get("start_date") - end_date = request.values.get("end_date") - username = request.values.get("username") - - filtered_logs = logs - # date filter - if start_date or end_date: - try: - start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else datetime.min - end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else datetime.max - end_dt = end_dt.replace(hour=23, minute=59, second=59) - filtered_logs = [ - log for log in filtered_logs - if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt - ] - except Exception: - app.logger.exception("activity_log date filter parse error") - - if username: - filtered_logs = [log for log in filtered_logs if username.lower() in log["user"].lower()] - - return render_template('activity_log.html', logs=filtered_logs, start_date=start_date, end_date=end_date, username=username) - -# PART 2 of main.py -# -------------------------------------------- -# STATE, DISTRICT, BLOCK, VILLAGE MANAGEMENT -# -------------------------------------------- - -# ---------------- STATE ---------------- -@app.route('/add_state', methods=['GET', 'POST']) -@login_required -def add_state(): - db = DB() - - if request.method == 'POST': - state_name = request.form['state_Name'].strip() - - if not is_valid_name(state_name): - db.close() - return json_response(ResponseHandler.invalid_name("state"), 400) - - existing = db.fetch_one_proc('CheckStateExists', [state_name]) - if existing: - db.close() - return json_response(ResponseHandler.already_exists("state"), 409) - - db.exec_proc('SaveState', [state_name]) - log_action('Add State', f"State added: {state_name}") - db.close() - return json_response(ResponseHandler.add_success("state"), 200) - - statedata = db.fetch_all_proc('GetAllStates') - db.close() - return render_template('add_state.html', statedata=statedata) - - -@app.route('/delete_state/', methods=['POST']) -@login_required -def delete_state(id): - db = DB() - try: - db.exec_proc('DeleteState', [id]) - log_action('Delete State', f"Deleted state id={id}") - return json_response(ResponseHandler.delete_success("state")) - except Exception as e: - app.logger.exception("DeleteState failed: %s", e) - return json_response(ResponseHandler.delete_failure("state"), 500) - finally: - db.close() - - -@app.route('/update_state/', methods=['POST']) -@login_required -def update_state(id): - db = DB() - state_name = request.form.get('state_Name', '').strip() - - if not is_valid_name(state_name): - db.close() - return json_response(ResponseHandler.invalid_name("state"), 400) - - try: - db.exec_proc('UpdateStateById', [id, state_name]) - log_action('Update State', f"Updated state id={id}, name={state_name}") - return json_response(ResponseHandler.update_success("state")) - except Exception as e: - app.logger.exception("UpdateState failed: %s", e) - return json_response(ResponseHandler.update_failure("state"), 500) - finally: - db.close() - - -# ---------------- DISTRICT ---------------- -@app.route('/add_district', methods=['GET', 'POST']) -@login_required -def add_district(): - db = DB() - - if request.method == 'POST': - state_id = request.form.get('state_id') - district_name = request.form.get('district_Name', '').strip() - - if not is_valid_name(district_name): - db.close() - return json_response(ResponseHandler.invalid_name("district"), 400) - - existing = db.fetch_one_proc('CheckDistrictExists', [state_id, district_name]) - if existing: - db.close() - return json_response(ResponseHandler.already_exists("district"), 409) - - db.exec_proc('SaveDistrict', [state_id, district_name]) - log_action('Add District', f"District added: {district_name} (state_id={state_id})") - db.close() - return json_response(ResponseHandler.add_success("district"), 200) - - statedata = db.fetch_all_proc('GetAllStates') - districtdata = db.fetch_all_proc('GetAllDistricts') - db.close() - return render_template('add_district.html', statedata=statedata, districtdata=districtdata) - - -@app.route('/delete_district/', methods=['POST']) -@login_required -def delete_district(id): - db = DB() - try: - db.exec_proc('DeleteDistrict', [id]) - log_action('Delete District', f"Deleted district id={id}") - return json_response(ResponseHandler.delete_success("district")) - except Exception as e: - app.logger.exception("DeleteDistrict failed: %s", e) - return json_response(ResponseHandler.delete_failure("district"), 500) - finally: - db.close() - - -@app.route('/update_district/', methods=['POST']) -@login_required -def update_district(id): - db = DB() - state_id = request.form.get('state_id') - district_name = request.form.get('district_Name', '').strip() - - if not is_valid_name(district_name): - db.close() - return json_response(ResponseHandler.invalid_name("district"), 400) - - try: - db.exec_proc('UpdateDistrictById', [id, state_id, district_name]) - log_action('Update District', f"Updated district id={id}, name={district_name}") - return json_response(ResponseHandler.update_success("district")) - except Exception as e: - app.logger.exception("UpdateDistrict failed: %s", e) - return json_response(ResponseHandler.update_failure("district"), 500) - finally: - db.close() - - -# ---------------- BLOCK ---------------- -@app.route('/add_block', methods=['GET', 'POST']) -@login_required -def add_block(): - db = DB() - - if request.method == 'POST': - district_id = request.form.get('district_id') - block_name = request.form.get('block_Name', '').strip() - - if not is_valid_name(block_name): - db.close() - return json_response(ResponseHandler.invalid_name("block"), 400) - - existing = db.fetch_one_proc('CheckBlockExists', [district_id, block_name]) - if existing: - db.close() - return json_response(ResponseHandler.already_exists("block"), 409) - - db.exec_proc('SaveBlock', [district_id, block_name]) - log_action('Add Block', f"Block added: {block_name} (district_id={district_id})") - db.close() - return json_response(ResponseHandler.add_success("block"), 200) - - districtdata = db.fetch_all_proc('GetAllDistricts') - blockdata = db.fetch_all_proc('GetAllBlocks') - db.close() - return render_template('add_block.html', districtdata=districtdata, blockdata=blockdata) - - -@app.route('/delete_block/', methods=['POST']) -@login_required -def delete_block(id): - db = DB() - try: - db.exec_proc('DeleteBlock', [id]) - log_action('Delete Block', f"Deleted block id={id}") - return json_response(ResponseHandler.delete_success("block")) - except Exception as e: - app.logger.exception("DeleteBlock failed: %s", e) - return json_response(ResponseHandler.delete_failure("block"), 500) - finally: - db.close() - - -@app.route('/update_block/', methods=['POST']) -@login_required -def update_block(id): - db = DB() - district_id = request.form.get('district_id') - block_name = request.form.get('block_Name', '').strip() - - if not is_valid_name(block_name): - db.close() - return json_response(ResponseHandler.invalid_name("block"), 400) - - try: - db.exec_proc('UpdateBlockById', [id, district_id, block_name]) - log_action('Update Block', f"Updated block id={id}, name={block_name}") - return json_response(ResponseHandler.update_success("block")) - except Exception as e: - app.logger.exception("UpdateBlock failed: %s", e) - return json_response(ResponseHandler.update_failure("block"), 500) - finally: - db.close() - - -# ---------------- VILLAGE ---------------- -@app.route('/add_village', methods=['GET', 'POST']) -@login_required -def add_village(): - db = DB() - - if request.method == 'POST': - block_id = request.form.get('block_id') - village_name = request.form.get('village_Name', '').strip() - - if not is_valid_name(village_name): - db.close() - return json_response(ResponseHandler.invalid_name("village"), 400) - - existing = db.fetch_one_proc('CheckVillageExists', [block_id, village_name]) - if existing: - db.close() - return json_response(ResponseHandler.already_exists("village"), 409) - - db.exec_proc('SaveVillage', [block_id, village_name]) - log_action('Add Village', f"Village added: {village_name} (block_id={block_id})") - db.close() - return json_response(ResponseHandler.add_success("village"), 200) - - blockdata = db.fetch_all_proc('GetAllBlocks') - villagedata = db.fetch_all_proc('GetAllVillages') - db.close() - return render_template('add_village.html', blockdata=blockdata, villagedata=villagedata) - - -@app.route('/delete_village/', methods=['POST']) -@login_required -def delete_village(id): - db = DB() - try: - db.exec_proc('DeleteVillage', [id]) - log_action('Delete Village', f"Deleted village id={id}") - return json_response(ResponseHandler.delete_success("village")) - except Exception as e: - app.logger.exception("DeleteVillage failed: %s", e) - return json_response(ResponseHandler.delete_failure("village"), 500) - finally: - db.close() - - -@app.route('/update_village/', methods=['POST']) -@login_required -def update_village(id): - db = DB() - block_id = request.form.get('block_id') - village_name = request.form.get('village_Name', '').strip() - - if not is_valid_name(village_name): - db.close() - return json_response(ResponseHandler.invalid_name("village"), 400) - - try: - db.exec_proc('UpdateVillageById', [id, block_id, village_name]) - log_action('Update Village', f"Updated village id={id}, name={village_name}") - return json_response(ResponseHandler.update_success("village")) - except Exception as e: - app.logger.exception("UpdateVillage failed: %s", e) - return json_response(ResponseHandler.update_failure("village"), 500) - finally: - db.close() - -# ----------------------------------- -# INVOICE ENTRY -# ----------------------------------- - -@app.route('/invoice_entry', methods=['GET', 'POST']) -@login_required -def invoice_entry(): - db = DB() - - if request.method == 'POST': - inv_no = request.form['Invoice_No'].strip() - inv_date = request.form['Invoice_Date'] - state_id = request.form['state'] - district_id = request.form['district'] - block_id = request.form['block'] - village_id = request.form['village'] - subcontractor_id = request.form['subcontractor'] - work_type_id = request.form['work_type'] - final_amount = request.form['Final_Amount'] - - # Check duplicate invoice number - exists = db.fetch_one_proc("CheckInvoiceExists", [inv_no]) - if exists: - db.close() - return json_response(ResponseHandler.already_exists("invoice"), 409) - - db.exec_proc("InsertInvoice", [ - inv_no, inv_date, - state_id, district_id, block_id, village_id, - subcontractor_id, work_type_id, final_amount - ]) - db.close() - - log_action("Add Invoice", f"Invoice: {inv_no}") - return json_response(ResponseHandler.add_success("invoice"), 200) - - # GET REQUEST → Load dropdown data - statedata = db.fetch_all_proc("GetAllStates") - districtdata = db.fetch_all_proc("GetDistrictDetails") - blockdata = db.fetch_all_proc("GetBlockDetails") - villagedata = db.fetch_all_proc("GetVillageDetails") - subcontractordata = db.fetch_all_proc("GetSubcontractorDetails") - worktypedata = db.fetch_all_proc("GetWorkTypeDetails") - invoicedata = db.fetch_all_proc("GetInvoiceDetails") - db.close() - - return render_template('invoice_entry.html', - statedata=statedata, districtdata=districtdata, - blockdata=blockdata, villagedata=villagedata, - subcontractordata=subcontractordata, - worktypedata=worktypedata, - invoicedata=invoicedata) - - -@app.route('/delete_invoice/', methods=['POST']) -@login_required -def delete_invoice(id): - db = DB() - db.exec_proc("DeleteInvoice", [id]) - db.close() - - log_action("Delete Invoice", f"Invoice ID: {id}") - return json_response(ResponseHandler.delete_success("invoice"), 200) - - -@app.route('/update_invoice', methods=['POST']) -@login_required -def update_invoice(): - invoice_id = request.form['invoice_id'] - inv_no = request.form['Invoice_No'].strip() - inv_date = request.form['Invoice_Date'] - state_id = request.form['state'] - district_id = request.form['district'] - block_id = request.form['block'] - village_id = request.form['village'] - subcontractor_id = request.form['subcontractor'] - work_type_id = request.form['work_type'] - final_amount = request.form['Final_Amount'] - - db = DB() - db.exec_proc("UpdateInvoice", [ - invoice_id, inv_no, inv_date, - state_id, district_id, block_id, village_id, - subcontractor_id, work_type_id, final_amount - ]) - db.close() - - log_action("Update Invoice", f"Invoice ID {invoice_id}") - return json_response(ResponseHandler.update_success("invoice"), 200) - - -# ----------------------------------- -# HOLD ENTRY -# ----------------------------------- - -@app.route('/hold_entry', methods=['GET', 'POST']) -@login_required -def hold_entry(): - db = DB() - - if request.method == 'POST': - invoice_id = request.form['invoice'] - hold_type_id = request.form['hold_type'] - remarks = request.form['Remarks'].strip() - - db.exec_proc("InsertHold", [invoice_id, hold_type_id, remarks]) - db.close() - log_action("Add Hold", f"Invoice ID: {invoice_id}") - return json_response(ResponseHandler.add_success("hold"), 200) - - invoicedata = db.fetch_all_proc("GetInvoiceDetails") - holdtypedata = db.fetch_all_proc("GetHoldTypeDetails") - holddata = db.fetch_all_proc("GetHoldDetails") - db.close() - - return render_template('hold_entry.html', - invoicedata=invoicedata, - holdtypedata=holdtypedata, - holddata=holddata) - - -@app.route('/delete_hold/', methods=['POST']) -@login_required -def delete_hold(id): - db = DB() - db.exec_proc("DeleteHold", [id]) - db.close() - - log_action("Delete Hold", f"Hold ID: {id}") - return json_response(ResponseHandler.delete_success("hold"), 200) - - -# ----------------------------------- -# PAYMENT ENTRY -# ----------------------------------- - -@app.route('/payment_entry', methods=['GET', 'POST']) -@login_required -def payment_entry(): - db = DB() - - if request.method == 'POST': - invoice_id = request.form['invoice'] - payment_date = request.form['Payment_Date'] - payment_amt = request.form['Payment_Amount'] - tds_amt = request.form['TDS_Payment_Amount'] - total = request.form['Total_amount'] - utr = request.form['utr'].strip() - - db.exec_proc("InsertPayment", [ - invoice_id, payment_date, payment_amt, tds_amt, total, utr - ]) - db.close() - - log_action("Add Payment", f"Invoice ID: {invoice_id}") - return json_response(ResponseHandler.add_success("payment"), 200) - - invoicedata = db.fetch_all_proc("GetInvoiceDetails") - paymentdata = db.fetch_all_proc("GetPaymentDetails") - db.close() - - return render_template('payment_entry.html', - invoicedata=invoicedata, - paymentdata=paymentdata) - - -@app.route('/delete_payment/', methods=['POST']) -@login_required -def delete_payment(id): - db = DB() - db.exec_proc("DeletePayment", [id]) - db.close() - - log_action("Delete Payment", f"Payment ID: {id}") - return json_response(ResponseHandler.delete_success("payment"), 200) - -# ----------------------------------- -# SUBCONTRACTOR MANAGEMENT -# ----------------------------------- - -@app.route('/add_subcontractor', methods=['GET', 'POST']) -@login_required -def add_subcontractor(): - db = DB() - - if request.method == 'POST': - subcontractor_name = request.form['subcontractor_Name'].strip() - mobile_no = request.form['Mobile_No'].strip() - - if not subcontractor_name: - return json_response(ResponseHandler.invalid_name("subcontractor"), 400) - - exists = db.fetch_one_proc("CheckSubcontractorExists", [subcontractor_name]) - if exists: - db.close() - return json_response(ResponseHandler.already_exists("subcontractor"), 409) - - db.exec_proc("SaveSubcontractor", [subcontractor_name, mobile_no]) - db.close() - log_action("Add Subcontractor", f"Name: {subcontractor_name}") - return json_response(ResponseHandler.add_success("subcontractor"), 200) - - subcontractordata = db.fetch_all_proc("GetSubcontractorDetails") - db.close() - return render_template('add_subcontractor.html', subcontractordata=subcontractordata) - - -@app.route('/delete_subcontractor/', methods=['POST']) -@login_required -def delete_subcontractor(id): - db = DB() - db.exec_proc("DeleteSubcontractor", [id]) - db.close() - log_action("Delete Subcontractor", f"ID: {id}") - return json_response(ResponseHandler.delete_success("subcontractor"), 200) - - -@app.route('/update_subcontractor', methods=['POST']) -@login_required -def update_subcontractor(): - subcontractor_id = request.form['subcontractor_id'] - subcontractor_name = request.form['subcontractor_Name'].strip() - mobile_no = request.form['Mobile_No'].strip() - - db = DB() - db.exec_proc("UpdateSubcontractor", [subcontractor_id, subcontractor_name, mobile_no]) - db.close() - log_action("Update Subcontractor", f"ID {subcontractor_id}") - return json_response(ResponseHandler.update_success("subcontractor"), 200) - - -# ----------------------------------- -# WORK TYPE MANAGEMENT -# ----------------------------------- - -@app.route('/add_work_type', methods=['GET', 'POST']) -@login_required -def add_work_type(): - db = DB() - - if request.method == 'POST': - work_type_name = request.form['worktype_Name'].strip() - - if not is_valid_name(work_type_name): - return json_response(ResponseHandler.invalid_name("work type"), 400) - - exists = db.fetch_one_proc("CheckWorkTypeExists", [work_type_name]) - if exists: - db.close() - return json_response(ResponseHandler.already_exists("work type"), 409) - - db.exec_proc("SaveWorkType", [work_type_name]) - db.close() - - log_action("Add Work Type", f"Name: {work_type_name}") - return json_response(ResponseHandler.add_success("work type"), 200) - - worktypedata = db.fetch_all_proc("GetWorkTypeDetails") - db.close() - return render_template('add_work_type.html', worktypedata=worktypedata) - - -@app.route('/delete_work_type/', methods=['POST']) -@login_required -def delete_work_type(id): - db = DB() - db.exec_proc("DeleteWorkType", [id]) - db.close() - log_action("Delete Work Type", f"ID: {id}") - return json_response(ResponseHandler.delete_success("work type"), 200) - - -@app.route('/update_work_type', methods=['POST']) -@login_required -def update_work_type(): - work_type_id = request.form['worktype_id'] - work_type_name = request.form['worktype_Name'].strip() - - if not is_valid_name(work_type_name): - return json_response(ResponseHandler.invalid_name("work type"), 400) - - db = DB() - db.exec_proc("UpdateWorkTypeById", [work_type_id, work_type_name]) - db.close() - log_action("Update Work Type", f"ID {work_type_id}") - return json_response(ResponseHandler.update_success("work type"), 200) - - -# ----------------------------------- -# EXCEL BULK UPLOAD (INVOICE) -# ----------------------------------- - -@app.route('/upload_excel', methods=['GET', 'POST']) -@login_required -def upload_excel(): - if request.method == 'POST': - file = request.files.get("file") - if not file: - flash("No file selected.", "danger") - return redirect(url_for("upload_excel")) - - filepath = os.path.join(app.root_path, "uploads", file.filename) - os.makedirs(os.path.dirname(filepath), exist_ok=True) - file.save(filepath) - - workbook = openpyxl.load_workbook(filepath) - sheet = workbook.active - - db = DB() - for row in sheet.iter_rows(min_row=2, values_only=True): - inv_no, inv_date, state, district, block, village, subcontractor, work_type, amount = row - db.exec_proc("InsertInvoiceBulk", [ - inv_no, inv_date, state, district, block, village, subcontractor, work_type, amount - ]) - db.close() - - log_action("Bulk Invoice Upload", f"File: {file.filename}") - flash("Excel uploaded successfully!", "success") - return redirect(url_for("invoice_entry")) - - return render_template('uploadExcelFile.html') - - -# ----------------------------------- -# REPORTS -# ----------------------------------- - -@app.route('/report', methods=['GET', 'POST']) -@login_required -def report(): - db = DB() - statedata = db.fetch_all_proc("GetAllStates") - subcontractordata = db.fetch_all_proc( "GetSubcontractorDetails", - ["1=1", "{}"] ) - db.close() - return render_template('report.html', statedata=statedata, subcontractordata=subcontractordata) - - -@app.route('/view_report', methods=['POST']) -@login_required -def view_report(): - state_id = request.form.get("state", "") - subcontractor_id = request.form.get("subcontractor", "") - - db = DB() - reportdata = db.fetch_all_proc("GetReportData", [state_id, subcontractor_id]) - db.close() - - log_action("Generate Report", f"State {state_id}, Subcontractor {subcontractor_id}") - return render_template("report_table.html", reportdata=reportdata) - -if __name__ == "__main__": - app.run(host='0.0.0.0', port=5000, debug=True)