added full project files

This commit is contained in:
2025-11-25 13:33:49 +05:30
parent 211a0a970e
commit 9b71ab6716
538 changed files with 17470 additions and 0 deletions

933
main1.py Normal file
View File

@@ -0,0 +1,933 @@
# PART 1 of main.py
# Imports, DB helper, app init, auth/login, logging & small helpers
from decimal import Decimal
from datetime import datetime
import os
import re
import logging
from logging.handlers import RotatingFileHandler
from flask import (
Flask, render_template, request, redirect, url_for, send_from_directory,
flash, jsonify, json, session, current_app
)
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import mysql.connector
from mysql.connector import Error
import config
import openpyxl
import ast
import pandas as pd
from openpyxl.styles import Font
from ldap3 import Server, Connection, ALL, SUBTREE
from ldap3.core.exceptions import LDAPBindError
# ---------------------------
# App and Login manager init
# ---------------------------
app = Flask(__name__)
app.secret_key = os.environ.get('FLASK_SECRET_KEY', '9f2a1b8c4d6e7f0123456789abcdef01')
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
# ---------------------------
# DB helper (dictionary cursor)
# ---------------------------
class DB:
"""
Lightweight DB helper to call stored procedures with dictionary=True cursors.
Usage:
db = DB()
rows = db.fetch_all_proc('GetAllStates')
single = db.fetch_one_proc('GetStateByID', [id])
db.exec_proc('SaveState', [state_name])
db.close()
"""
def __init__(self):
self.conn = None
self.cursor = None
try:
self.conn = config.get_db_connection()
except Exception as e:
# Keep None and let callers handle connection absence
self.conn = None
app.logger.exception("DB connection failed: %s", e)
def _ensure_cursor(self, dict_mode=True):
if not self.conn:
raise mysql.connector.Error("No DB connection")
self.cursor = self.conn.cursor(dictionary=dict_mode)
def fetch_all_proc(self, proc_name, params=None):
try:
self._ensure_cursor(dict_mode=True)
self.cursor.callproc(proc_name, params or [])
results = []
for res in self.cursor.stored_results():
results = res.fetchall()
return results
finally:
self.close_cursor()
def fetch_one_proc(self, proc_name, params=None):
rows = self.fetch_all_proc(proc_name, params)
return rows[0] if rows else None
def exec_proc(self, proc_name, params=None):
try:
self._ensure_cursor(dict_mode=True)
self.cursor.callproc(proc_name, params or [])
# advance through any results to avoid unread result issues
for _ in self.cursor.stored_results():
pass
if self.conn:
self.conn.commit()
finally:
self.close_cursor()
def close_cursor(self):
try:
if self.cursor:
self.cursor.close()
except Exception:
pass
self.cursor = None
def close(self):
self.close_cursor()
try:
if self.conn:
self.conn.close()
except Exception:
pass
self.conn = None
# ---------------------------
# ResponseHandler (centralized messages)
# ---------------------------
class ResponseHandler:
@staticmethod
def invalid_name(entity):
return {'status': 'error', 'message': f'Invalid {entity} name. Only letters and spaces are allowed!'}
@staticmethod
def already_exists(entity):
return {'status': 'exists', 'message': f'{entity.capitalize()} already exists!'}
@staticmethod
def add_success(entity):
return {'status': 'success', 'message': f'{entity.capitalize()} added successfully!'}
@staticmethod
def add_failure(entity):
return {'status': 'error', 'message': f'Failed to add {entity}.'}
@staticmethod
def is_available(entity):
return {'status': 'available', 'message': f'{entity.capitalize()} name is available!'}
@staticmethod
def delete_success(entity):
return {'status': 'success', 'message': f'{entity.capitalize()} deleted successfully!'}
@staticmethod
def delete_failure(entity):
return {'status': 'error', 'message': f'Failed to delete {entity}.'}
@staticmethod
def update_success(entity):
return {'status': 'success', 'message': f'{entity.capitalize()} updated successfully!'}
@staticmethod
def update_failure(entity):
return {'status': 'error', 'message': f'Failed to update {entity}.'}
@staticmethod
def fetch_failure(entity):
return {'status': 'error', 'message': f'Failed to fetch {entity}.'}
# JSON response helper
def json_response(message_obj, status_code=200):
return jsonify(message_obj), status_code
# ---------------------------
# Logging helper
# ---------------------------
if not app.debug:
log_file = os.path.join(app.root_path, 'app.log')
handler = RotatingFileHandler(log_file, maxBytes=5*1024*1024, backupCount=2)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]')
handler.setFormatter(formatter)
app.logger.addHandler(handler)
def log_action(action, details=""):
"""Write a user action to activity.log with timestamp and user info."""
try:
log_file = os.path.join(current_app.root_path, 'activity.log')
except RuntimeError:
# current_app not available (like unit tests). Use app.root_path fallback.
log_file = os.path.join(app.root_path, 'activity.log')
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
user = "Unknown"
try:
if current_user and hasattr(current_user, "id"):
user = getattr(current_user, "id", "Unknown")
elif session.get('username'):
user = session.get('username')
except Exception:
pass
try:
with open(log_file, "a", encoding="utf-8") as f:
f.write(f"Timestamp: {timestamp} | User: {user} | Action: {action} | Details: {details}\n")
except Exception:
app.logger.exception("Failed to write activity log.")
# ---------------------------
# Simple User class for flask-login
# ---------------------------
class User(UserMixin):
def __init__(self, id, cn=None, username=None, sAMAccountName=None):
self.id = id
self.cn = cn
self.username = username
self.sAMAccountName = sAMAccountName
@login_manager.user_loader
def load_user(user_id):
# Minimal loader: return User object with id.
return User(user_id)
# ---------------------------
# Constants & simple validators
# ---------------------------
STR_NAME_PATTERN = r"^[A-Za-z\s]+$"
def is_valid_name(value):
return bool(re.match(STR_NAME_PATTERN, value.strip())) if value else False
# ---------------------------
# Basic Routes: index, login, logout
# ---------------------------
@app.route('/')
@login_required
def index():
return render_template('index.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
"""
Login supports:
- static admin/admin123 fallback
- LDAP bind if not static
"""
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
# static fallback
if username == 'admin' and password == 'admin123':
session['username'] = username
login_user(User(username))
log_action('Login', f"Static admin logged in: {username}")
return redirect(url_for('index'))
ldap_user_dn = f"uid={username},ou=users,dc=lcepl,dc=org"
try:
conn = Connection(Server('ldap://localhost:389', get_info=ALL), user=ldap_user_dn, password=password, auto_bind=True)
session['username'] = username
login_user(User(username))
log_action('Login', f"LDAP login: {username}")
conn.unbind()
return redirect(url_for('index'))
except LDAPBindError:
flash('Invalid credentials.', 'danger')
except Exception as e:
app.logger.exception("LDAP error during login")
flash(f'LDAP error: {str(e)}', 'danger')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
try:
user_id = getattr(current_user, 'id', 'Unknown')
except Exception:
user_id = session.get('username', 'Unknown')
log_action('Logout', f"User {user_id} logged out")
logout_user()
flash('You have been logged out.', 'info')
return redirect(url_for('login'))
# ---------------------------
# Activity log viewer
# ---------------------------
@app.route('/activity_log', methods=['GET', 'POST'])
@login_required
def activity_log():
logs = []
log_file = os.path.join(current_app.root_path, 'activity.log')
if os.path.exists(log_file):
with open(log_file, 'r', encoding='utf-8') as f:
for line in f:
parts = line.strip().split(" | ")
if len(parts) == 4:
logs.append({
"timestamp": parts[0].replace("Timestamp:", "").strip(),
"user": parts[1].replace("User:", "").strip(),
"action": parts[2].replace("Action:", "").strip(),
"details": parts[3].replace("Details:", "").strip()
})
# Filters
start_date = request.values.get("start_date")
end_date = request.values.get("end_date")
username = request.values.get("username")
filtered_logs = logs
# date filter
if start_date or end_date:
try:
start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else datetime.min
end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else datetime.max
end_dt = end_dt.replace(hour=23, minute=59, second=59)
filtered_logs = [
log for log in filtered_logs
if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt
]
except Exception:
app.logger.exception("activity_log date filter parse error")
if username:
filtered_logs = [log for log in filtered_logs if username.lower() in log["user"].lower()]
return render_template('activity_log.html', logs=filtered_logs, start_date=start_date, end_date=end_date, username=username)
# PART 2 of main.py
# --------------------------------------------
# STATE, DISTRICT, BLOCK, VILLAGE MANAGEMENT
# --------------------------------------------
# ---------------- STATE ----------------
@app.route('/add_state', methods=['GET', 'POST'])
@login_required
def add_state():
db = DB()
if request.method == 'POST':
state_name = request.form['state_Name'].strip()
if not is_valid_name(state_name):
db.close()
return json_response(ResponseHandler.invalid_name("state"), 400)
existing = db.fetch_one_proc('CheckStateExists', [state_name])
if existing:
db.close()
return json_response(ResponseHandler.already_exists("state"), 409)
db.exec_proc('SaveState', [state_name])
log_action('Add State', f"State added: {state_name}")
db.close()
return json_response(ResponseHandler.add_success("state"), 200)
statedata = db.fetch_all_proc('GetAllStates')
db.close()
return render_template('add_state.html', statedata=statedata)
@app.route('/delete_state/<int:id>', methods=['POST'])
@login_required
def delete_state(id):
db = DB()
try:
db.exec_proc('DeleteState', [id])
log_action('Delete State', f"Deleted state id={id}")
return json_response(ResponseHandler.delete_success("state"))
except Exception as e:
app.logger.exception("DeleteState failed: %s", e)
return json_response(ResponseHandler.delete_failure("state"), 500)
finally:
db.close()
@app.route('/update_state/<int:id>', methods=['POST'])
@login_required
def update_state(id):
db = DB()
state_name = request.form.get('state_Name', '').strip()
if not is_valid_name(state_name):
db.close()
return json_response(ResponseHandler.invalid_name("state"), 400)
try:
db.exec_proc('UpdateStateById', [id, state_name])
log_action('Update State', f"Updated state id={id}, name={state_name}")
return json_response(ResponseHandler.update_success("state"))
except Exception as e:
app.logger.exception("UpdateState failed: %s", e)
return json_response(ResponseHandler.update_failure("state"), 500)
finally:
db.close()
# ---------------- DISTRICT ----------------
@app.route('/add_district', methods=['GET', 'POST'])
@login_required
def add_district():
db = DB()
if request.method == 'POST':
state_id = request.form.get('state_id')
district_name = request.form.get('district_Name', '').strip()
if not is_valid_name(district_name):
db.close()
return json_response(ResponseHandler.invalid_name("district"), 400)
existing = db.fetch_one_proc('CheckDistrictExists', [state_id, district_name])
if existing:
db.close()
return json_response(ResponseHandler.already_exists("district"), 409)
db.exec_proc('SaveDistrict', [state_id, district_name])
log_action('Add District', f"District added: {district_name} (state_id={state_id})")
db.close()
return json_response(ResponseHandler.add_success("district"), 200)
statedata = db.fetch_all_proc('GetAllStates')
districtdata = db.fetch_all_proc('GetAllDistricts')
db.close()
return render_template('add_district.html', statedata=statedata, districtdata=districtdata)
@app.route('/delete_district/<int:id>', methods=['POST'])
@login_required
def delete_district(id):
db = DB()
try:
db.exec_proc('DeleteDistrict', [id])
log_action('Delete District', f"Deleted district id={id}")
return json_response(ResponseHandler.delete_success("district"))
except Exception as e:
app.logger.exception("DeleteDistrict failed: %s", e)
return json_response(ResponseHandler.delete_failure("district"), 500)
finally:
db.close()
@app.route('/update_district/<int:id>', methods=['POST'])
@login_required
def update_district(id):
db = DB()
state_id = request.form.get('state_id')
district_name = request.form.get('district_Name', '').strip()
if not is_valid_name(district_name):
db.close()
return json_response(ResponseHandler.invalid_name("district"), 400)
try:
db.exec_proc('UpdateDistrictById', [id, state_id, district_name])
log_action('Update District', f"Updated district id={id}, name={district_name}")
return json_response(ResponseHandler.update_success("district"))
except Exception as e:
app.logger.exception("UpdateDistrict failed: %s", e)
return json_response(ResponseHandler.update_failure("district"), 500)
finally:
db.close()
# ---------------- BLOCK ----------------
@app.route('/add_block', methods=['GET', 'POST'])
@login_required
def add_block():
db = DB()
if request.method == 'POST':
district_id = request.form.get('district_id')
block_name = request.form.get('block_Name', '').strip()
if not is_valid_name(block_name):
db.close()
return json_response(ResponseHandler.invalid_name("block"), 400)
existing = db.fetch_one_proc('CheckBlockExists', [district_id, block_name])
if existing:
db.close()
return json_response(ResponseHandler.already_exists("block"), 409)
db.exec_proc('SaveBlock', [district_id, block_name])
log_action('Add Block', f"Block added: {block_name} (district_id={district_id})")
db.close()
return json_response(ResponseHandler.add_success("block"), 200)
districtdata = db.fetch_all_proc('GetAllDistricts')
blockdata = db.fetch_all_proc('GetAllBlocks')
db.close()
return render_template('add_block.html', districtdata=districtdata, blockdata=blockdata)
@app.route('/delete_block/<int:id>', methods=['POST'])
@login_required
def delete_block(id):
db = DB()
try:
db.exec_proc('DeleteBlock', [id])
log_action('Delete Block', f"Deleted block id={id}")
return json_response(ResponseHandler.delete_success("block"))
except Exception as e:
app.logger.exception("DeleteBlock failed: %s", e)
return json_response(ResponseHandler.delete_failure("block"), 500)
finally:
db.close()
@app.route('/update_block/<int:id>', methods=['POST'])
@login_required
def update_block(id):
db = DB()
district_id = request.form.get('district_id')
block_name = request.form.get('block_Name', '').strip()
if not is_valid_name(block_name):
db.close()
return json_response(ResponseHandler.invalid_name("block"), 400)
try:
db.exec_proc('UpdateBlockById', [id, district_id, block_name])
log_action('Update Block', f"Updated block id={id}, name={block_name}")
return json_response(ResponseHandler.update_success("block"))
except Exception as e:
app.logger.exception("UpdateBlock failed: %s", e)
return json_response(ResponseHandler.update_failure("block"), 500)
finally:
db.close()
# ---------------- VILLAGE ----------------
@app.route('/add_village', methods=['GET', 'POST'])
@login_required
def add_village():
db = DB()
if request.method == 'POST':
block_id = request.form.get('block_id')
village_name = request.form.get('village_Name', '').strip()
if not is_valid_name(village_name):
db.close()
return json_response(ResponseHandler.invalid_name("village"), 400)
existing = db.fetch_one_proc('CheckVillageExists', [block_id, village_name])
if existing:
db.close()
return json_response(ResponseHandler.already_exists("village"), 409)
db.exec_proc('SaveVillage', [block_id, village_name])
log_action('Add Village', f"Village added: {village_name} (block_id={block_id})")
db.close()
return json_response(ResponseHandler.add_success("village"), 200)
blockdata = db.fetch_all_proc('GetAllBlocks')
villagedata = db.fetch_all_proc('GetAllVillages')
db.close()
return render_template('add_village.html', blockdata=blockdata, villagedata=villagedata)
@app.route('/delete_village/<int:id>', methods=['POST'])
@login_required
def delete_village(id):
db = DB()
try:
db.exec_proc('DeleteVillage', [id])
log_action('Delete Village', f"Deleted village id={id}")
return json_response(ResponseHandler.delete_success("village"))
except Exception as e:
app.logger.exception("DeleteVillage failed: %s", e)
return json_response(ResponseHandler.delete_failure("village"), 500)
finally:
db.close()
@app.route('/update_village/<int:id>', methods=['POST'])
@login_required
def update_village(id):
db = DB()
block_id = request.form.get('block_id')
village_name = request.form.get('village_Name', '').strip()
if not is_valid_name(village_name):
db.close()
return json_response(ResponseHandler.invalid_name("village"), 400)
try:
db.exec_proc('UpdateVillageById', [id, block_id, village_name])
log_action('Update Village', f"Updated village id={id}, name={village_name}")
return json_response(ResponseHandler.update_success("village"))
except Exception as e:
app.logger.exception("UpdateVillage failed: %s", e)
return json_response(ResponseHandler.update_failure("village"), 500)
finally:
db.close()
# -----------------------------------
# INVOICE ENTRY
# -----------------------------------
@app.route('/invoice_entry', methods=['GET', 'POST'])
@login_required
def invoice_entry():
db = DB()
if request.method == 'POST':
inv_no = request.form['Invoice_No'].strip()
inv_date = request.form['Invoice_Date']
state_id = request.form['state']
district_id = request.form['district']
block_id = request.form['block']
village_id = request.form['village']
subcontractor_id = request.form['subcontractor']
work_type_id = request.form['work_type']
final_amount = request.form['Final_Amount']
# Check duplicate invoice number
exists = db.fetch_one_proc("CheckInvoiceExists", [inv_no])
if exists:
db.close()
return json_response(ResponseHandler.already_exists("invoice"), 409)
db.exec_proc("InsertInvoice", [
inv_no, inv_date,
state_id, district_id, block_id, village_id,
subcontractor_id, work_type_id, final_amount
])
db.close()
log_action("Add Invoice", f"Invoice: {inv_no}")
return json_response(ResponseHandler.add_success("invoice"), 200)
# GET REQUEST → Load dropdown data
statedata = db.fetch_all_proc("GetAllStates")
districtdata = db.fetch_all_proc("GetDistrictDetails")
blockdata = db.fetch_all_proc("GetBlockDetails")
villagedata = db.fetch_all_proc("GetVillageDetails")
subcontractordata = db.fetch_all_proc("GetSubcontractorDetails")
worktypedata = db.fetch_all_proc("GetWorkTypeDetails")
invoicedata = db.fetch_all_proc("GetInvoiceDetails")
db.close()
return render_template('invoice_entry.html',
statedata=statedata, districtdata=districtdata,
blockdata=blockdata, villagedata=villagedata,
subcontractordata=subcontractordata,
worktypedata=worktypedata,
invoicedata=invoicedata)
@app.route('/delete_invoice/<int:id>', methods=['POST'])
@login_required
def delete_invoice(id):
db = DB()
db.exec_proc("DeleteInvoice", [id])
db.close()
log_action("Delete Invoice", f"Invoice ID: {id}")
return json_response(ResponseHandler.delete_success("invoice"), 200)
@app.route('/update_invoice', methods=['POST'])
@login_required
def update_invoice():
invoice_id = request.form['invoice_id']
inv_no = request.form['Invoice_No'].strip()
inv_date = request.form['Invoice_Date']
state_id = request.form['state']
district_id = request.form['district']
block_id = request.form['block']
village_id = request.form['village']
subcontractor_id = request.form['subcontractor']
work_type_id = request.form['work_type']
final_amount = request.form['Final_Amount']
db = DB()
db.exec_proc("UpdateInvoice", [
invoice_id, inv_no, inv_date,
state_id, district_id, block_id, village_id,
subcontractor_id, work_type_id, final_amount
])
db.close()
log_action("Update Invoice", f"Invoice ID {invoice_id}")
return json_response(ResponseHandler.update_success("invoice"), 200)
# -----------------------------------
# HOLD ENTRY
# -----------------------------------
@app.route('/hold_entry', methods=['GET', 'POST'])
@login_required
def hold_entry():
db = DB()
if request.method == 'POST':
invoice_id = request.form['invoice']
hold_type_id = request.form['hold_type']
remarks = request.form['Remarks'].strip()
db.exec_proc("InsertHold", [invoice_id, hold_type_id, remarks])
db.close()
log_action("Add Hold", f"Invoice ID: {invoice_id}")
return json_response(ResponseHandler.add_success("hold"), 200)
invoicedata = db.fetch_all_proc("GetInvoiceDetails")
holdtypedata = db.fetch_all_proc("GetHoldTypeDetails")
holddata = db.fetch_all_proc("GetHoldDetails")
db.close()
return render_template('hold_entry.html',
invoicedata=invoicedata,
holdtypedata=holdtypedata,
holddata=holddata)
@app.route('/delete_hold/<int:id>', methods=['POST'])
@login_required
def delete_hold(id):
db = DB()
db.exec_proc("DeleteHold", [id])
db.close()
log_action("Delete Hold", f"Hold ID: {id}")
return json_response(ResponseHandler.delete_success("hold"), 200)
# -----------------------------------
# PAYMENT ENTRY
# -----------------------------------
@app.route('/payment_entry', methods=['GET', 'POST'])
@login_required
def payment_entry():
db = DB()
if request.method == 'POST':
invoice_id = request.form['invoice']
payment_date = request.form['Payment_Date']
payment_amt = request.form['Payment_Amount']
tds_amt = request.form['TDS_Payment_Amount']
total = request.form['Total_amount']
utr = request.form['utr'].strip()
db.exec_proc("InsertPayment", [
invoice_id, payment_date, payment_amt, tds_amt, total, utr
])
db.close()
log_action("Add Payment", f"Invoice ID: {invoice_id}")
return json_response(ResponseHandler.add_success("payment"), 200)
invoicedata = db.fetch_all_proc("GetInvoiceDetails")
paymentdata = db.fetch_all_proc("GetPaymentDetails")
db.close()
return render_template('payment_entry.html',
invoicedata=invoicedata,
paymentdata=paymentdata)
@app.route('/delete_payment/<int:id>', methods=['POST'])
@login_required
def delete_payment(id):
db = DB()
db.exec_proc("DeletePayment", [id])
db.close()
log_action("Delete Payment", f"Payment ID: {id}")
return json_response(ResponseHandler.delete_success("payment"), 200)
# -----------------------------------
# SUBCONTRACTOR MANAGEMENT
# -----------------------------------
@app.route('/add_subcontractor', methods=['GET', 'POST'])
@login_required
def add_subcontractor():
db = DB()
if request.method == 'POST':
subcontractor_name = request.form['subcontractor_Name'].strip()
mobile_no = request.form['Mobile_No'].strip()
if not subcontractor_name:
return json_response(ResponseHandler.invalid_name("subcontractor"), 400)
exists = db.fetch_one_proc("CheckSubcontractorExists", [subcontractor_name])
if exists:
db.close()
return json_response(ResponseHandler.already_exists("subcontractor"), 409)
db.exec_proc("SaveSubcontractor", [subcontractor_name, mobile_no])
db.close()
log_action("Add Subcontractor", f"Name: {subcontractor_name}")
return json_response(ResponseHandler.add_success("subcontractor"), 200)
subcontractordata = db.fetch_all_proc("GetSubcontractorDetails")
db.close()
return render_template('add_subcontractor.html', subcontractordata=subcontractordata)
@app.route('/delete_subcontractor/<int:id>', methods=['POST'])
@login_required
def delete_subcontractor(id):
db = DB()
db.exec_proc("DeleteSubcontractor", [id])
db.close()
log_action("Delete Subcontractor", f"ID: {id}")
return json_response(ResponseHandler.delete_success("subcontractor"), 200)
@app.route('/update_subcontractor', methods=['POST'])
@login_required
def update_subcontractor():
subcontractor_id = request.form['subcontractor_id']
subcontractor_name = request.form['subcontractor_Name'].strip()
mobile_no = request.form['Mobile_No'].strip()
db = DB()
db.exec_proc("UpdateSubcontractor", [subcontractor_id, subcontractor_name, mobile_no])
db.close()
log_action("Update Subcontractor", f"ID {subcontractor_id}")
return json_response(ResponseHandler.update_success("subcontractor"), 200)
# -----------------------------------
# WORK TYPE MANAGEMENT
# -----------------------------------
@app.route('/add_work_type', methods=['GET', 'POST'])
@login_required
def add_work_type():
db = DB()
if request.method == 'POST':
work_type_name = request.form['worktype_Name'].strip()
if not is_valid_name(work_type_name):
return json_response(ResponseHandler.invalid_name("work type"), 400)
exists = db.fetch_one_proc("CheckWorkTypeExists", [work_type_name])
if exists:
db.close()
return json_response(ResponseHandler.already_exists("work type"), 409)
db.exec_proc("SaveWorkType", [work_type_name])
db.close()
log_action("Add Work Type", f"Name: {work_type_name}")
return json_response(ResponseHandler.add_success("work type"), 200)
worktypedata = db.fetch_all_proc("GetWorkTypeDetails")
db.close()
return render_template('add_work_type.html', worktypedata=worktypedata)
@app.route('/delete_work_type/<int:id>', methods=['POST'])
@login_required
def delete_work_type(id):
db = DB()
db.exec_proc("DeleteWorkType", [id])
db.close()
log_action("Delete Work Type", f"ID: {id}")
return json_response(ResponseHandler.delete_success("work type"), 200)
@app.route('/update_work_type', methods=['POST'])
@login_required
def update_work_type():
work_type_id = request.form['worktype_id']
work_type_name = request.form['worktype_Name'].strip()
if not is_valid_name(work_type_name):
return json_response(ResponseHandler.invalid_name("work type"), 400)
db = DB()
db.exec_proc("UpdateWorkTypeById", [work_type_id, work_type_name])
db.close()
log_action("Update Work Type", f"ID {work_type_id}")
return json_response(ResponseHandler.update_success("work type"), 200)
# -----------------------------------
# EXCEL BULK UPLOAD (INVOICE)
# -----------------------------------
@app.route('/upload_excel', methods=['GET', 'POST'])
@login_required
def upload_excel():
if request.method == 'POST':
file = request.files.get("file")
if not file:
flash("No file selected.", "danger")
return redirect(url_for("upload_excel"))
filepath = os.path.join(app.root_path, "uploads", file.filename)
os.makedirs(os.path.dirname(filepath), exist_ok=True)
file.save(filepath)
workbook = openpyxl.load_workbook(filepath)
sheet = workbook.active
db = DB()
for row in sheet.iter_rows(min_row=2, values_only=True):
inv_no, inv_date, state, district, block, village, subcontractor, work_type, amount = row
db.exec_proc("InsertInvoiceBulk", [
inv_no, inv_date, state, district, block, village, subcontractor, work_type, amount
])
db.close()
log_action("Bulk Invoice Upload", f"File: {file.filename}")
flash("Excel uploaded successfully!", "success")
return redirect(url_for("invoice_entry"))
return render_template('uploadExcelFile.html')
# -----------------------------------
# REPORTS
# -----------------------------------
@app.route('/report', methods=['GET', 'POST'])
@login_required
def report():
db = DB()
statedata = db.fetch_all_proc("GetAllStates")
subcontractordata = db.fetch_all_proc( "GetSubcontractorDetails",
["1=1", "{}"] )
db.close()
return render_template('report.html', statedata=statedata, subcontractordata=subcontractordata)
@app.route('/view_report', methods=['POST'])
@login_required
def view_report():
state_id = request.form.get("state", "")
subcontractor_id = request.form.get("subcontractor", "")
db = DB()
reportdata = db.fetch_all_proc("GetReportData", [state_id, subcontractor_id])
db.close()
log_action("Generate Report", f"State {state_id}, Subcontractor {subcontractor_id}")
return render_template("report_table.html", reportdata=reportdata)
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000, debug=True)