Recovered lost local code

This commit is contained in:
2026-03-23 12:31:04 +05:30
parent d1dca531c3
commit 1821f04fe0
501 changed files with 1668 additions and 1842 deletions

View File

@@ -1,72 +1,136 @@
import mysql.connector
# import mysql.connector
# from mysql.connector import Error
# import config
# import openpyxl
# import os
# import re
# import ast
# from datetime import datetime
# class ContractorInfo:
# ID = ""
# contInfo = None
# def __init__(self, id):
# self.ID = id
# print(id)
# self.fetchData()
# def fetchData(self):
# try:
# connection = config.get_db_connection()
# cursor = connection.cursor(dictionary=True, buffered=True)
# print("here", flush=True)
# cursor.callproc('GetContractorInfoById', [self.ID])
# for result in cursor.stored_results():
# self.contInfo = result.fetchone()
# print(self.contInfo,flush=True)
# finally:
# cursor.close()
# connection.close()
# def fetchalldata(self):
# try:
# connection = config.get_db_connection()
# cursor = connection.cursor(dictionary=True, buffered=True)
# print("here", flush=True)
# # ---------------- Hold Types ----------------
# cursor = connection.cursor(dictionary=True)
# cursor.callproc('GetHoldTypesByContractor', [self.ID])
# hold_types = []
# for result in cursor.stored_results():
# hold_types = result.fetchall()
# hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
# # ---------------- Invoices ----------------
# cursor = connection.cursor(dictionary=True)
# cursor.callproc('GetInvoicesByContractor', [self.ID])
# invoices = []
# for result in cursor.stored_results():
# invoices = result.fetchall()
# # Remove duplicate invoices
# invoice_ids_seen = set()
# unique_invoices = []
# for inv in invoices:
# if inv["Invoice_Id"] not in invoice_ids_seen:
# invoice_ids_seen.add(inv["Invoice_Id"])
# unique_invoices.append(inv)
# invoices = unique_invoices
# finally:
# cursor.close()
# connection.close()
from mysql.connector import Error
import config
import openpyxl
import os
import re
import ast
from datetime import datetime
class ContractorInfo:
ID = ""
contInfo = None
def __init__(self, id):
self.ID = id
print(id)
def __init__(self, contractor_id):
self.ID = contractor_id
self.contInfo = None
self.fetchData()
def fetchData(self):
"""Fetch basic contractor info by ID."""
try:
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
cursor.callproc('GetContractorInfoById', [self.ID])
for result in cursor.stored_results():
self.contInfo = result.fetchone()
print(self.contInfo,flush=True)
with connection.cursor(dictionary=True, buffered=True) as cursor:
cursor.callproc('GetContractorInfoById', [self.ID])
# Get the first result set
for result in cursor.stored_results():
self.contInfo = result.fetchone()
except Error as e:
print(f"Error fetching contractor info: {e}")
finally:
cursor.close()
connection.close()
if connection.is_connected():
connection.close()
def fetchalldata(self):
"""Fetch hold types and invoices for contractor."""
data = {}
try:
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
print("here", flush=True)
with connection.cursor(dictionary=True, buffered=True) as cursor:
# Fetch Hold Types
cursor.callproc('GetHoldTypesByContractor', [self.ID])
hold_types = []
for result in cursor.stored_results():
hold_types = result.fetchall()
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
data['hold_types'] = hold_type_map
# ---------------- Hold Types ----------------
cursor = connection.cursor(dictionary=True)
# Fetch Invoices
cursor.callproc('GetInvoicesByContractor', [self.ID])
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
cursor.callproc('GetHoldTypesByContractor', [self.ID])
hold_types = []
for result in cursor.stored_results():
hold_types = result.fetchall()
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
# ---------------- Invoices ----------------
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetInvoicesByContractor', [self.ID])
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
# Remove duplicate invoices
invoice_ids_seen = set()
unique_invoices = []
for inv in invoices:
if inv["Invoice_Id"] not in invoice_ids_seen:
invoice_ids_seen.add(inv["Invoice_Id"])
unique_invoices.append(inv)
invoices = unique_invoices
# Remove duplicate invoices
seen_ids = set()
unique_invoices = []
for inv in invoices:
if inv['Invoice_Id'] not in seen_ids:
seen_ids.add(inv['Invoice_Id'])
unique_invoices.append(inv)
data['invoices'] = unique_invoices
except Error as e:
print(f"Error fetching contractor data: {e}")
finally:
cursor.close()
connection.close()
if connection.is_connected():
connection.close()
return data

View File

@@ -1,35 +1,76 @@
import config
import mysql.connector
# ------------------- Helper -------------------
# ------------------- Helper Functions -------------------
def clear_results(cursor):
"""Consume all stored results to prevent cursor issues."""
for r in cursor.stored_results():
r.fetchall()
def fetch_one(cursor):
"""Fetch first row from stored results."""
for result in cursor.stored_results():
return result.fetchone()
return None
# ------------------- Get Village Id -------------------
def get_village_id(village_name):
def fetch_all(cursor):
"""Fetch all rows from stored results."""
data = []
for result in cursor.stored_results():
data.extend(result.fetchall())
return data
def get_numeric_values(data):
"""Return numeric fields for invoices safely."""
return [
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
]
def execute_db_operation(operation_func):
"""General DB operation wrapper with commit/rollback."""
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetVillageIdByName", (village_name,))
village_result = None
for rs in cursor.stored_results():
village_result = rs.fetchone()
cursor.close()
connection.close()
return village_result
# ------------------- Insert Invoice -------------------
def insert_invoice(data, village_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
# 1. Insert Invoice
result = operation_func(cursor)
connection.commit()
return result
except Exception:
connection.rollback()
raise
finally:
cursor.close()
connection.close()
# ------------------- Village Functions -------------------
def get_village_id(village_name):
def operation(cursor):
cursor.callproc("GetVillageIdByName", (village_name,))
return fetch_one(cursor)
return execute_db_operation(operation)
def get_all_villages():
def operation(cursor):
cursor.callproc("GetAllVillages")
return fetch_all(cursor)
return execute_db_operation(operation)
# ------------------- Invoice Functions -------------------
def insert_invoice(data, village_id):
def operation(cursor):
# Insert invoice
cursor.callproc('InsertInvoice', [
data.get('pmc_no'),
village_id,
@@ -37,29 +78,14 @@ def insert_invoice(data, village_id):
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0)
*get_numeric_values(data)
])
invoice_id = None
for result in cursor.stored_results():
row = result.fetchone()
if row:
invoice_id = row.get('invoice_id')
if not invoice_id:
invoice_row = fetch_one(cursor)
if not invoice_row:
raise Exception("Invoice ID not returned")
invoice_id = invoice_row.get('invoice_id')
# 2. Insert Inpayment
# Insert inpayment
cursor.callproc('InsertInpayment', [
data.get('pmc_no'),
village_id,
@@ -67,221 +93,41 @@ def insert_invoice(data, village_id):
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
*get_numeric_values(data),
data.get('subcontractor_id')
])
clear_results(cursor)
connection.commit()
return invoice_id
except Exception as e:
connection.rollback()
raise e
return execute_db_operation(operation)
finally:
cursor.close()
connection.close()
# ------------------- Assign Subcontractor -------------------
def assign_subcontractor(data, village_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('AssignSubcontractor', [
data.get('pmc_no'),
data.get('subcontractor_id'),
village_id
])
clear_results(cursor)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Insert Hold Types -------------------
def insert_hold_types(data, invoice_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
hold_types = data.getlist('hold_type[]')
hold_amounts = data.getlist('hold_amount[]')
for hold_type, hold_amount in zip(hold_types, hold_amounts):
if not hold_type:
continue
cursor.callproc('GetHoldTypeIdByName', [hold_type])
hold_type_result = None
for result in cursor.stored_results():
hold_type_result = result.fetchone()
if not hold_type_result:
cursor.callproc('InsertHoldType', [hold_type, 0])
cursor.execute("SELECT @_InsertHoldType_1")
hold_type_id = cursor.fetchone()[0]
else:
hold_type_id = hold_type_result['hold_type_id']
hold_amount = float(hold_amount or 0)
cursor.callproc('InsertInvoiceSubcontractorHold', [
data.get('subcontractor_id'),
invoice_id,
hold_type_id,
hold_amount
])
clear_results(cursor)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Get All Invoices -------------------
def get_all_invoice_details():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
def operation(cursor):
cursor.callproc('GetAllInvoiceDetails')
return fetch_all(cursor)
return execute_db_operation(operation)
cursor.callproc('GetAllInvoiceDetails')
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
cursor.close()
connection.close()
return invoices
# ------------------- Get All Villages -------------------
def get_all_villages():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetAllVillages")
villages = []
for result in cursor.stored_results():
villages = result.fetchall()
cursor.close()
connection.close()
return villages
# ------------------- Search Contractors -------------------
def search_contractors(sub_query):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc('SearchContractorsByName', [sub_query])
results = []
for result in cursor.stored_results():
results = result.fetchall()
cursor.close()
connection.close()
return results
# ------------------- Get All Hold Types -------------------
def get_all_hold_types():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetAllHoldTypes")
hold_types = []
for result in cursor.stored_results():
hold_types = result.fetchall()
cursor.close()
connection.close()
return hold_types
# ------------------- Get Invoice By Id -------------------
def get_invoice_by_id(invoice_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
def operation(cursor):
cursor.callproc('GetInvoiceDetailsById', [invoice_id])
invoice = fetch_one(cursor)
cursor.callproc('GetInvoiceDetailsById', [invoice_id])
invoice = None
cursor.callproc('GetHoldAmountsByInvoiceId', [invoice_id])
hold_amounts = fetch_all(cursor)
for result in cursor.stored_results():
invoice = result.fetchone()
if invoice:
invoice["hold_amounts"] = hold_amounts
return invoice
return execute_db_operation(operation)
cursor.callproc('GetHoldAmountsByInvoiceId', [invoice_id])
hold_amounts = []
for result in cursor.stored_results():
hold_amounts = result.fetchall()
if invoice:
invoice["hold_amounts"] = hold_amounts
cursor.close()
connection.close()
return invoice
# ------------------- Update Invoice -------------------
def update_invoice(data, invoice_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
def operation(cursor):
cursor.callproc("GetVillageIdByName", (data.get('village'),))
village = None
for rs in cursor.stored_results():
village = rs.fetchone()
village = fetch_one(cursor)
if not village:
raise Exception("Village not found")
village_id = village['Village_Id']
numeric = [
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
]
cursor.callproc('UpdateInvoice', [
data.get('pmc_no'),
village_id,
@@ -289,91 +135,101 @@ def update_invoice(data, invoice_id):
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
*numeric,
*get_numeric_values(data),
invoice_id
])
clear_results(cursor)
execute_db_operation(operation)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Update Inpayment -------------------
def update_inpayment(data):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
numeric = [
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
]
def operation(cursor):
cursor.callproc('UpdateInpayment', [
data.get('work_type'),
data.get('invoice_details'),
data.get('invoice_date'),
*numeric,
*get_numeric_values(data),
data.get('pmc_no'),
data.get('invoice_no')
])
clear_results(cursor)
execute_db_operation(operation)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Delete Invoice -------------------
def delete_invoice_data(invoice_id, user_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('GetInvoicePMCById', [invoice_id])
def operation(cursor):
# Fetch PMC and Invoice_No from DB
cursor.callproc('GetInvoicePMCById', (invoice_id,))
record = {}
for result in cursor.stored_results():
record = result.fetchone() or {}
if not record:
raise Exception("Invoice not found")
# Use exact DB keys
pmc_no = record['PMC_No']
invoice_no = record['Invoice_No']
# Delete invoice
cursor.callproc("DeleteInvoice", (invoice_id,))
clear_results(cursor)
cursor.callproc(
'DeleteInpaymentByPMCInvoice',
[record['PMC_No'], record['invoice_no']]
)
# Delete inpayment
cursor.callproc('DeleteInpaymentByPMCInvoice', (pmc_no, invoice_no))
clear_results(cursor)
connection.commit()
execute_db_operation(operation)
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Subcontractor Functions -------------------
def assign_subcontractor(data, village_id):
def operation(cursor):
cursor.callproc('AssignSubcontractor', [
data.get('pmc_no'),
data.get('subcontractor_id'),
village_id
])
clear_results(cursor)
execute_db_operation(operation)
# ------------------- Hold Types Functions -------------------
def insert_hold_types(data, invoice_id):
def operation(cursor):
hold_types = data.getlist('hold_type[]')
hold_amounts = data.getlist('hold_amount[]')
for hold_type, hold_amount in zip(hold_types, hold_amounts):
if not hold_type:
continue
cursor.callproc('GetHoldTypeIdByName', [hold_type])
hold_type_result = fetch_one(cursor)
if not hold_type_result:
cursor.callproc('InsertHoldType', [hold_type, 0])
cursor.execute("SELECT @_InsertHoldType_1")
hold_type_id = cursor.fetchone()[0]
else:
hold_type_id = hold_type_result['hold_type_id']
cursor.callproc('InsertInvoiceSubcontractorHold', [
data.get('subcontractor_id'),
invoice_id,
hold_type_id,
float(hold_amount or 0)
])
clear_results(cursor)
execute_db_operation(operation)
def get_all_hold_types():
def operation(cursor):
cursor.callproc("GetAllHoldTypes")
return fetch_all(cursor)
return execute_db_operation(operation)
# ------------------- Contractor Functions -------------------
def search_contractors(sub_query):
def operation(cursor):
cursor.callproc('SearchContractorsByName', [sub_query])
return fetch_all(cursor)
return execute_db_operation(operation)

View File

@@ -21,8 +21,6 @@ class itemCRUDMapping:
self.name = "State"
elif itemType is ItemCRUDType.HoldType:
self.name = "Hold Type"
elif itemType is ItemCRUDType.Subcontractor:
self.name = "Subcontractor"
else:
self.name = "Item"
@@ -56,16 +54,12 @@ class ItemCRUD:
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.delete_success(self.itemCRUDMapping.name), 200
)
self.resultMessage = ResponseHandler.delete_success(self.itemCRUDMapping.name)['message']
except mysql.connector.Error as e:
print(f"Error deleting {self.itemCRUDMapping.name}: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.delete_failure(self.itemCRUDMapping.name), 500
)
self.resultMessage = ResponseHandler.delete_failure(self.itemCRUDMapping.name)['message']
finally:
cursor.close()
@@ -74,7 +68,7 @@ class ItemCRUD:
# ----------------------------------------------------------
# ADD
# ----------------------------------------------------------
def AddItem(self, request, parentid=None, childname=None, storedprocfetch=None, storedprocadd=None, data=None):
def AddItem(self, request, parentid, childname, storedprocfetch, storedprocadd):
connection = config.get_db_connection()
if not connection:
@@ -88,70 +82,31 @@ class ItemCRUD:
LogHelper.log_action(
f"Add {self.itemCRUDMapping.name}",
f"User {current_user.id} adding '{childname if childname else (data.get('Contractor_Name') if data else '')}'"
f"User {current_user.id} adding '{childname}'"
)
# Validation
if not re.match(RegEx.patternAlphabetOnly, childname):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400
)
return
try:
# ======================================================
# SUBCONTRACTOR (MULTI-FIELD)
# ======================================================
if data:
# Duplicate check
cursor.callproc(storedprocfetch, (data['Contractor_Name'],))
existing_item = None
for rs in cursor.stored_results():
existing_item = rs.fetchone()
if existing_item:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.already_exists(self.itemCRUDMapping.name), 409
)
return
# Insert
cursor.callproc(storedprocadd, (
data['Contractor_Name'],
data['Address'],
data['Mobile_No'],
data['PAN_No'],
data['Email'],
data['Gender'],
data['GST_Registration_Type'],
data['GST_No'],
data['Contractor_password']
))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.add_success(self.itemCRUDMapping.name), 200
)
return
# ======================================================
# NORMAL (Village / Block / State)
# ======================================================
if not re.match(RegEx.patternAlphabetOnly, childname):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400
)
return
# Duplicate check
# Call check procedure
if parentid is None:
cursor.callproc(storedprocfetch, (childname,))
else:
cursor.callproc(storedprocfetch, (childname, parentid))
# ✅ FIX: initialize variable
existing_item = None
for rs in cursor.stored_results():
existing_item = rs.fetchone()
# Check duplicate
if existing_item:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
@@ -169,7 +124,6 @@ class ItemCRUD:
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.add_success(self.itemCRUDMapping.name), 200
)
@@ -187,7 +141,7 @@ class ItemCRUD:
# ----------------------------------------------------------
# EDIT
# ----------------------------------------------------------
def EditItem(self, request, childid, parentid=None, childname=None, storedprocupdate=None, data=None):
def EditItem(self, request, childid, parentid, childname, storedprocupdate):
connection = config.get_db_connection()
cursor = connection.cursor()
@@ -197,40 +151,12 @@ class ItemCRUD:
f"User {current_user.id} edited '{childid}'"
)
if not re.match(RegEx.patternAlphabetOnly, childname):
self.isSuccess = False
self.resultMessage = ResponseHandler.update_failure(self.itemCRUDMapping.name)['message']
return
try:
# ======================================================
# SUBCONTRACTOR (MULTI-FIELD)
# ======================================================
if data:
cursor.callproc(storedprocupdate, (
childid,
data['Contractor_Name'],
data['Address'],
data['Mobile_No'],
data['PAN_No'],
data['Email'],
data['Gender'],
data['GST_Registration_Type'],
data['GST_No'],
data['Contractor_password']
))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.update_success(self.itemCRUDMapping.name), 200
)
return
# ======================================================
# NORMAL
# ======================================================
if not re.match(RegEx.patternAlphabetOnly, childname):
self.isSuccess = False
self.resultMessage = ResponseHandler.update_failure(self.itemCRUDMapping.name)['message']
return
if parentid is None:
cursor.callproc(storedprocupdate, (childid, childname))
else:
@@ -335,7 +261,9 @@ class ItemCRUD:
else:
cursor.callproc(storedprocfetch, (childname, parentid))
# ✅ FIX
existing_item = None
for rs in cursor.stored_results():
existing_item = rs.fetchone()

View File

@@ -1,12 +1,11 @@
import os
import openpyxl
from openpyxl.styles import Font, PatternFill
from decimal import Decimal
from datetime import datetime
import config
from flask_login import current_user
from model.Log import LogHelper
from model.Report import ReportHelper
from model.FolderAndFile import FolderAndFile
class PmcReport:
@staticmethod
@@ -17,9 +16,8 @@ class PmcReport:
try:
# cursor.callproc("GetContractorInfoByPmcNo", (pmc_no,))
# pmc_info = next(cursor.stored_results()).fetchone()
pmc_info = ReportHelper.execute_sp(cursor, 'GetContractorInfoByPmcNo', [pmc_no], True)
cursor.callproc("GetContractorInfoByPmcNo", (pmc_no,))
pmc_info = next(cursor.stored_results()).fetchone()
if not pmc_info:
return None
@@ -33,12 +31,15 @@ class PmcReport:
invoices = []
hold_amount_total = 0
if hold_type_ids:
hold_type_ids_str = ",".join(map(str, hold_type_ids))
cursor.callproc(
'GetInvoices_WithHold',
[pmc_no, pmc_info["Contractor_Id"], hold_type_ids_str]
)
else:
cursor.callproc(
'GetInvoices_NoHold',
[pmc_no, pmc_info["Contractor_Id"]]
@@ -53,42 +54,33 @@ class PmcReport:
# GST RELEASE
# cursor.callproc('GetGSTReleaseByPMC', [pmc_no])
# gst_rel = []
# for result in cursor.stored_results():
# gst_rel = result.fetchall()
gst_rel = ReportHelper.execute_sp(cursor, 'GetGSTReleaseByPMC', [pmc_no])
cursor.callproc('GetGSTReleaseByPMC', [pmc_no])
gst_rel = []
for result in cursor.stored_results():
gst_rel = result.fetchall()
total_gst_basic = sum(row.get('basic_amount', 0) or 0 for row in gst_rel)
total_gst_final = sum(row.get('final_amount', 0) or 0 for row in gst_rel)
# ---------------- HOLD RELEASE ----------------
# cursor.callproc('GetHoldReleaseByPMC', [pmc_no])
# hold_release = []
# for result in cursor.stored_results():
# hold_release = result.fetchall()
cursor.callproc('GetHoldReleaseByPMC', [pmc_no])
hold_release = []
for result in cursor.stored_results():
hold_release = result.fetchall()
hold_release = ReportHelper.execute_sp(cursor, 'GetHoldReleaseByPMC', [pmc_no])
# ---------------- CREDIT NOTE ----------------
# cursor.callproc('GetCreditNoteByPMC', [pmc_no])
# credit_note = []
# for result in cursor.stored_results():
# credit_note = result.fetchall()
credit_note = ReportHelper.execute_sp(cursor, 'GetCreditNoteByPMC', [pmc_no])
payments = ReportHelper.execute_sp(cursor, 'GetPaymentsByPMC', [pmc_no])
cursor.callproc('GetCreditNoteByPMC', [pmc_no])
credit_note = []
for result in cursor.stored_results():
credit_note = result.fetchall()
# ---------------- PAYMENTS ----------------
# cursor.callproc('GetPaymentsByPMC', [pmc_no])
# payments = []
# for result in cursor.stored_results():
# payments = result.fetchall()
cursor.callproc('GetPaymentsByPMC', [pmc_no])
payments = []
for result in cursor.stored_results():
payments = result.fetchall()
total_pay_amount = sum(row.get('Payment_Amount', 0) or 0 for row in payments)
total_pay_total = sum(row.get('Total_amount', 0) or 0 for row in payments)
@@ -133,73 +125,109 @@ class PmcReport:
def download_pmc_report(pmc_no):
connection = config.get_db_connection()
if not connection:
return None
output_folder = "static/download"
output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx")
if not os.path.exists(output_folder):
os.makedirs(output_folder)
cursor = connection.cursor(dictionary=True)
try:
# filename
filename = f"PMC_Report_{pmc_no}.xlsx"
output_folder = FolderAndFile.get_download_folder()
output_file = FolderAndFile.get_download_path(filename)
# ================= DATA FETCH =================
contractor_info = ReportHelper.execute_sp(cursor, 'GetContractorDetailsByPMC', [pmc_no], "one")
cursor.callproc('GetContractorDetailsByPMC', [pmc_no])
contractor_info = next(cursor.stored_results()).fetchone()
if not contractor_info:
return None
hold_types = ReportHelper.execute_sp(cursor, 'GetHoldTypesByContractor', [contractor_info["Contractor_Id"]])
cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]])
hold_types = next(cursor.stored_results()).fetchall()
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
invoices = ReportHelper.execute_sp(cursor, 'GetInvoicesAndGstReleaseByPmcNo', [pmc_no])
cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no])
invoices = next(cursor.stored_results()).fetchall()
credit_notes = ReportHelper.execute_sp(cursor, 'GetCreditNoteByContractor', [contractor_info["Contractor_Id"]])
cursor.callproc('GetCreditNoteByContractor',[contractor_info["Contractor_Id"]])
hold_amounts = ReportHelper.execute_sp(cursor, 'GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]])
credit_notes = []
for result in cursor.stored_results():
credit_notes = result.fetchall()
all_payments = ReportHelper.execute_sp(cursor, 'GetAllPaymentsByPMC', [pmc_no])
credit_note_map = {}
for cn in credit_notes:
key = (cn["PMC_No"], cn["Invoice_No"])
credit_note_map.setdefault(key, []).append(cn)
gst_releases = ReportHelper.execute_sp(cursor, 'GetGSTReleaseDetailsByPMC', [pmc_no])
# ================= DATA MAPPING =================
cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]])
hold_amounts = next(cursor.stored_results()).fetchall()
hold_data = {}
for h in hold_amounts:
hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
cursor.callproc('GetAllPaymentsByPMC', [pmc_no])
all_payments = next(cursor.stored_results()).fetchall()
payments_map = {}
extra_payments = []
for pay in all_payments:
if pay['invoice_no']:
payments_map.setdefault(pay['invoice_no'], []).append(pay)
else:
extra_payments.append(pay)
# ---------------- GST RELEASE DETAILS ----------------
cursor.callproc('GetGSTReleaseDetailsByPMC', [pmc_no])
gst_releases = []
for result in cursor.stored_results():
gst_releases = result.fetchall()
gst_release_map = {}
for gr in gst_releases:
invoice_nos = []
if gr['Invoice_No']:
cleaned = gr['Invoice_No'].replace(' ', '')
if '&' in cleaned:
invoice_nos = cleaned.split('&')
elif ',' in cleaned:
invoice_nos = cleaned.split(',')
else:
invoice_nos = [cleaned]
for inv_no in invoice_nos:
gst_release_map.setdefault(inv_no, []).append(gr)
# ================= LOG =================
LogHelper.log_action(
"Download PMC Report",
f"User {current_user.id} Download PMC Report '{pmc_no}'"
)
# ================= EXCEL =================
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = "PMC Report"
# HEADER INFO
sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD."])
sheet.append(["Contractor Name", contractor_info["Contractor_Name"]])
sheet.append(["State", contractor_info["State_Name"]])
sheet.append(["District", contractor_info["District_Name"]])
sheet.append(["Block", contractor_info["Block_Name"]])
sheet.append(["Contractor Name", contractor_info["Contractor_Name"], "", "GST No", contractor_info["GST_No"], "", "GST Type", contractor_info["GST_Registration_Type"]])
sheet.append(["State", contractor_info["State_Name"], "", "PAN No", contractor_info["PAN_No"], "", "Address", contractor_info["Address"]])
sheet.append(["District", contractor_info["District_Name"], "", "Mobile No", contractor_info["Mobile_No"]])
sheet.append(["Block", contractor_info["Block_Name"], "", "Email", contractor_info["Email"]])
sheet.append([])
base_headers = [
"PMC No","Village","Work Type","Invoice Details","Invoice Date","Invoice No",
"Basic Amount","Debit","After Debit Amount","GST","Amount","TDS",
"SD","On Commission","Hydro Testing","GST SD Amount"
"Basic Amount","Debit","After Debit Amount","GST (18%)","Amount","TDS (1%)",
"SD (5%)","On Commission","Hydro Testing","GST SD Amount"
]
hold_headers = [ht['hold_type'] for ht in hold_types]
@@ -208,249 +236,58 @@ class PmcReport:
"Final Amount","Payment Amount","TDS Payment","Total Paid","UTR"
]
headers = base_headers + hold_headers + payment_headers
sheet.append(headers)
sheet.append(base_headers + hold_headers + payment_headers)
header_fill = PatternFill(start_color="ADD8E6",end_color="ADD8E6",fill_type="solid")
header_font = Font(bold=True)
# STYLE
for cell in sheet[sheet.max_row]:
cell.font = Font(bold=True)
cell.font = header_font
cell.fill = header_fill
# DATA
seen_invoices = set()
processed_payments = set()
for inv in invoices:
invoice_no = inv["Invoice_No"]
payments = payments_map.get(invoice_no, [])
if invoice_no in seen_invoices:
continue
if invoice_no not in seen_invoices:
seen_invoices.add(invoice_no)
seen_invoices.add(invoice_no)
first_payment = payments[0] if payments else None
first_payment = payments[0] if payments else None
row = [
pmc_no, inv["Village_Name"], inv["Work_Type"],
inv["Invoice_Details"], inv["Invoice_Date"], invoice_no,
inv["Basic_Amount"], inv["Debit_Amount"],
inv["After_Debit_Amount"], inv["GST_Amount"],
inv["Amount"], inv["TDS_Amount"], inv["SD_Amount"],
inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"]
]
row = [
pmc_no,
inv["Village_Name"],
inv["Work_Type"],
inv["Invoice_Details"],
inv["Invoice_Date"],
invoice_no,
inv["Basic_Amount"],
inv["Debit_Amount"],
inv["After_Debit_Amount"],
inv["GST_Amount"],
inv["Amount"],
inv["TDS_Amount"],
inv["SD_Amount"],
inv["On_Commission"],
inv["Hydro_Testing"],
inv["GST_SD_Amount"]
]
invoice_holds = hold_data.get(inv["Invoice_Id"], {})
# HOLD DATA
invoice_holds = hold_data.get(inv["Invoice_Id"], {})
for ht_id in hold_type_map.keys():
row.append(invoice_holds.get(ht_id, ""))
for ht_id in hold_type_map.keys():
row.append(invoice_holds.get(ht_id, ""))
# PAYMENT DATA
row += [
inv["Final_Amount"],
first_payment["Payment_Amount"] if first_payment else "",
first_payment["TDS_Payment_Amount"] if first_payment else "",
first_payment["Total_amount"] if first_payment else "",
first_payment["UTR"] if first_payment else ""
]
row += [
inv["Final_Amount"],
first_payment["Payment_Amount"] if first_payment else "",
first_payment["TDS_Payment_Amount"] if first_payment else "",
first_payment["Total_amount"] if first_payment else "",
first_payment["UTR"] if first_payment else ""
]
sheet.append(row)
sheet.append(row)
# AUTO WIDTH
for col in sheet.columns:
max_len = max((len(str(cell.value)) for cell in col if cell.value), default=0)
sheet.column_dimensions[col[0].column_letter].width = max_len + 2
# SAVE
workbook.save(output_file)
workbook.close()
return output_folder, filename
except Exception as e:
print(f"Error generating PMC report: {e}")
return None
return output_folder, f"PMC_Report_{pmc_no}.xlsx"
finally:
cursor.close()
connection.close()
# @staticmethod
# def download_pmc_report(pmc_no):
# connection = config.get_db_connection()
# cursor = connection.cursor(dictionary=True)
# # output_folder = "static/download"
# # output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx")
# output_folder = FolderAndFile.get_download_folder
# filename = f"PMC_Report_{pmc_no}.xlsx"
# output_file = FolderAndFile.get_download_path(filename)
# try:
# cursor.callproc('GetContractorDetailsByPMC', [pmc_no])
# contractor_info = next(cursor.stored_results()).fetchone()
# if not contractor_info:
# return None
# cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]])
# hold_types = next(cursor.stored_results()).fetchall()
# hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
# cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no])
# invoices = next(cursor.stored_results()).fetchall()
# cursor.callproc('GetCreditNoteByContractor',[contractor_info["Contractor_Id"]])
# credit_notes = []
# for result in cursor.stored_results():
# credit_notes = result.fetchall()
# credit_note_map = {}
# for cn in credit_notes:
# key = (cn["PMC_No"], cn["Invoice_No"])
# credit_note_map.setdefault(key, []).append(cn)
# cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]])
# hold_amounts = next(cursor.stored_results()).fetchall()
# hold_data = {}
# for h in hold_amounts:
# hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
# cursor.callproc('GetAllPaymentsByPMC', [pmc_no])
# all_payments = next(cursor.stored_results()).fetchall()
# payments_map = {}
# extra_payments = []
# for pay in all_payments:
# if pay['invoice_no']:
# payments_map.setdefault(pay['invoice_no'], []).append(pay)
# else:
# extra_payments.append(pay)
# # ---------------- GST RELEASE DETAILS ----------------
# cursor.callproc('GetGSTReleaseDetailsByPMC', [pmc_no])
# gst_releases = []
# for result in cursor.stored_results():
# gst_releases = result.fetchall()
# gst_release_map = {}
# for gr in gst_releases:
# invoice_nos = []
# if gr['Invoice_No']:
# cleaned = gr['Invoice_No'].replace(' ', '')
# if '&' in cleaned:
# invoice_nos = cleaned.split('&')
# elif ',' in cleaned:
# invoice_nos = cleaned.split(',')
# else:
# invoice_nos = [cleaned]
# for inv_no in invoice_nos:
# gst_release_map.setdefault(inv_no, []).append(gr)
# LogHelper.log_action(
# "Download PMC Report",
# f"User {current_user.id} Download PMC Report '{pmc_no}'"
# )
# workbook = openpyxl.Workbook()
# sheet = workbook.active
# sheet.title = "PMC Report"
# sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD."])
# sheet.append(["Contractor Name", contractor_info["Contractor_Name"], "", "GST No", contractor_info["GST_No"], "", "GST Type", contractor_info["GST_Registration_Type"]])
# sheet.append(["State", contractor_info["State_Name"], "", "PAN No", contractor_info["PAN_No"], "", "Address", contractor_info["Address"]])
# sheet.append(["District", contractor_info["District_Name"], "", "Mobile No", contractor_info["Mobile_No"]])
# sheet.append(["Block", contractor_info["Block_Name"], "", "Email", contractor_info["Email"]])
# sheet.append([])
# base_headers = [
# "PMC No","Village","Work Type","Invoice Details","Invoice Date","Invoice No",
# "Basic Amount","Debit","After Debit Amount","GST (18%)","Amount","TDS (1%)",
# "SD (5%)","On Commission","Hydro Testing","GST SD Amount"
# ]
# hold_headers = [ht['hold_type'] for ht in hold_types]
# payment_headers = [
# "Final Amount","Payment Amount","TDS Payment","Total Paid","UTR"
# ]
# sheet.append(base_headers + hold_headers + payment_headers)
# header_fill = PatternFill(start_color="ADD8E6",end_color="ADD8E6",fill_type="solid")
# header_font = Font(bold=True)
# for cell in sheet[sheet.max_row]:
# cell.font = header_font
# cell.fill = header_fill
# seen_invoices = set()
# processed_payments = set()
# for inv in invoices:
# invoice_no = inv["Invoice_No"]
# payments = payments_map.get(invoice_no, [])
# if invoice_no not in seen_invoices:
# seen_invoices.add(invoice_no)
# first_payment = payments[0] if payments else None
# row = [
# pmc_no, inv["Village_Name"], inv["Work_Type"],
# inv["Invoice_Details"], inv["Invoice_Date"], invoice_no,
# inv["Basic_Amount"], inv["Debit_Amount"],
# inv["After_Debit_Amount"], inv["GST_Amount"],
# inv["Amount"], inv["TDS_Amount"], inv["SD_Amount"],
# inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"]
# ]
# invoice_holds = hold_data.get(inv["Invoice_Id"], {})
# for ht_id in hold_type_map.keys():
# row.append(invoice_holds.get(ht_id, ""))
# row += [
# inv["Final_Amount"],
# first_payment["Payment_Amount"] if first_payment else "",
# first_payment["TDS_Payment_Amount"] if first_payment else "",
# first_payment["Total_amount"] if first_payment else "",
# first_payment["UTR"] if first_payment else ""
# ]
# sheet.append(row)
# workbook.save(output_file)
# workbook.close()
# return output_folder, filename
# finally:
# cursor.close()
# connection.close()
connection.close()

View File

@@ -1,110 +1,78 @@
import config
from datetime import datetime
from flask import send_file
import openpyxl
from openpyxl.styles import Font
from model.FolderAndFile import FolderAndFile
class ReportHelper:
isSuccess = False
resultMessage = ""
data=[]
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
self.data = []
@staticmethod
def execute_sp(cursor, proc_name, params=[], fetch_one=False):
cursor.callproc(proc_name, params)
return (
ReportHelper.fetch_one_result(cursor)
if fetch_one else
ReportHelper.fetch_all_results(cursor)
)
def search_contractor(subcontractor_name, pmc_no, state, district, block, village, year_from, year_to):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("search_contractor_info", [
subcontractor_name or None,
pmc_no or None,
state or None,
district or None,
block or None,
village or None,
year_from or None,
year_to or None
])
@staticmethod
def fetch_all_results(cursor):
data = []
for result in cursor.stored_results():
data = result.fetchall()
return data
@staticmethod
def fetch_one_result(cursor):
data = None
for result in cursor.stored_results():
data = result.fetchone()
return data
@staticmethod
def search_contractor(request):
subcontractor_name = request.form.get("subcontractor_name")
pmc_no = request.form.get("pmc_no")
state = request.form.get("state")
district = request.form.get("district")
block = request.form.get("block")
village = request.form.get("village")
year_from = request.form.get("year_from")
year_to = request.form.get("year_to")
connection = config.get_db_connection()
if not connection:
return []
cursor = connection.cursor(dictionary=True)
try:
data = ReportHelper.execute_sp(
cursor,
"search_contractor_info",
[
subcontractor_name or None,
pmc_no or None,
state or None,
district or None,
block or None,
village or None,
year_from or None,
year_to or None
]
)
except Exception as e:
print(f"Error in search_contractor: {e}")
data = []
finally:
cursor.close()
connection.close()
cursor.close()
connection.close()
return data
@staticmethod
def get_contractor_report(contractor_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
try:
# Contractor Info (only one fetch)
contInfo = ReportHelper.execute_sp(cursor, 'GetContractorInfo', [contractor_id], True)
# Contractor Info
cursor.callproc('GetContractorInfo', [contractor_id])
for result in cursor.stored_results():
contInfo = result.fetchone()
# Hold Types
hold_types = ReportHelper.execute_sp(cursor, 'GetContractorHoldTypes', [contractor_id])
cursor.callproc('GetContractorHoldTypes', [contractor_id])
for result in cursor.stored_results():
hold_types = result.fetchall()
# Invoices
invoices = ReportHelper.execute_sp(cursor, 'GetContractorInvoices', [contractor_id])
cursor.callproc('GetContractorInvoices', [contractor_id])
for result in cursor.stored_results():
invoices = result.fetchall()
# GST Release
gst_rel = ReportHelper.execute_sp(cursor, 'GetGSTRelease', [contractor_id])
cursor.callproc('GetGSTRelease', [contractor_id])
for result in cursor.stored_results():
gst_rel = result.fetchall()
# Hold Release
hold_release = ReportHelper.execute_sp(cursor, 'GetHoldRelease', [contractor_id])
cursor.callproc('GetHoldRelease', [contractor_id])
for result in cursor.stored_results():
hold_release = result.fetchall()
# Credit Note
credit_note = ReportHelper.execute_sp(cursor, 'GetCreditNote', [contractor_id])
cursor.callproc('GetCreditNote', [contractor_id])
for result in cursor.stored_results():
credit_note = result.fetchall()
# Payments
payments = ReportHelper.execute_sp(cursor, 'GetPayments', [contractor_id])
cursor.callproc('GetPayments', [contractor_id])
for result in cursor.stored_results():
payments = result.fetchall()
# Totals
total = {
@@ -146,130 +114,3 @@ class ReportHelper:
"total": total,
"current_date": current_date
}
@staticmethod
def download_report(contractor_id):
try:
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
# -------- Contractor Info --------
contInfo = ReportHelper.execute_sp(cursor, 'GetContractorInfo', [contractor_id], True)
if not contInfo:
return "No contractor found", 404
# -------- Invoice Data --------
cursor.callproc('FetchInvoicesByContractor', [contractor_id])
invoices = []
for result in cursor.stored_results():
invoices.extend(result.fetchall())
if not invoices:
return "No invoice data found"
# -------- Create Workbook --------
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = "Contractor Report"
# ================= CONTRACTOR DETAILS =================
sheet.append(["SUB CONTRACTOR DETAILS"])
sheet.cell(row=sheet.max_row, column=1).font = Font(bold=True)
sheet.append([])
sheet.append(["Name", contInfo.get("Contractor_Name") or ""])
sheet.append(["Mobile No", contInfo.get("Mobile_No") or ""])
sheet.append(["Email", contInfo.get("Email") or ""])
sheet.append(["Village", contInfo.get("Village_Name") or ""])
sheet.append(["Block", contInfo.get("Block_Name") or ""])
sheet.append(["District", contInfo.get("District_Name") or ""])
sheet.append(["State", contInfo.get("State_Name") or ""])
sheet.append(["Address", contInfo.get("Address") or ""])
sheet.append(["GST No", contInfo.get("GST_No") or ""])
sheet.append(["PAN No", contInfo.get("PAN_No") or ""])
sheet.append([])
sheet.append([])
# ================= TABLE HEADERS =================
headers = [
"PMC No", "Village", "Invoice No", "Invoice Date", "Work Type","Invoice_Details",
"Basic Amount", "Debit Amount", "After Debit Amount",
"Amount", "GST Amount", "TDS Amount", "SD Amount",
"On Commission", "Hydro Testing", "Hold Amount",
"GST SD Amount", "Final Amount",
"Payment Amount", "TDS Payment",
"Total Amount", "UTR"
]
sheet.append(headers)
for col in range(1, len(headers) + 1):
sheet.cell(row=sheet.max_row, column=col).font = Font(bold=True)
# ================= DATA =================
total_final = 0
total_payment = 0
total_amount = 0
for inv in invoices:
row = [
inv.get("PMC_No"),
inv.get("Village_Name"),
inv.get("invoice_no"),
inv.get("Invoice_Date"),
inv.get("Work_Type"),
inv.get("Invoice_Details"),
inv.get("Basic_Amount"),
inv.get("Debit_Amount"),
inv.get("After_Debit_Amount"),
inv.get("Amount"),
inv.get("GST_Amount"),
inv.get("TDS_Amount"),
inv.get("SD_Amount"),
inv.get("On_Commission"),
inv.get("Hydro_Testing"),
inv.get("Hold_Amount"),
inv.get("GST_SD_Amount"),
inv.get("Final_Amount"),
inv.get("Payment_Amount"),
inv.get("TDS_Payment_Amount"),
inv.get("Total_Amount"),
inv.get("UTR")
]
total_final += float(inv.get("Final_Amount") or 0)
total_payment += float(inv.get("Payment_Amount") or 0)
total_amount += float(inv.get("Total_Amount") or 0)
sheet.append(row)
# ================= TOTAL ROW =================
sheet.append([])
sheet.append([
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "",
"TOTAL",
total_final,
total_payment,
"",
total_amount,
""
])
# ================= AUTO WIDTH =================
for column in sheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
if cell.value:
max_length = max(max_length, len(str(cell.value)))
sheet.column_dimensions[column_letter].width = max_length + 2
# ================= SAVE FILE =================
filename = f"Contractor_Report_{contInfo.get('Contractor_Name')}.xlsx"
output_file = FolderAndFile.get_download_path(filename)
workbook.save(output_file)
return send_file(output_file, as_attachment=True)
except Exception as e:
return str(e)

0
model/ReportGenerator.py Normal file
View File

View File

@@ -1,168 +1,246 @@
from flask import request, redirect, url_for
from flask_login import current_user
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from model.Log import LogHelper
from model.ItemCRUD import ItemCRUD
from model.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
import mysql.connector
from mysql.connector import Error
class State:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ----------------------------------------------------------
# ADD STATE (USING ITEM CRUD)
# ----------------------------------------------------------
def AddState(self, request):
state_name = request.form['state_Name'].strip()
crud = ItemCRUD(ItemCRUDType.State)
crud.AddItem(
request=request,
childname=state_name,
storedprocfetch="CheckStateExists",
storedprocadd="SaveState"
)
self.isSuccess = crud.isSuccess
self.resultMessage = crud.resultMessage
return self.resultMessage
# ----------------------------------------------------------
# GET ALL STATES (NO CHANGE - THIS IS CORRECT)
# ----------------------------------------------------------
def GetAllStates(self, request):
"""Log user actions with timestamp, user, action, and details."""
statedata = []
connection = config.get_db_connection()
data = []
if connection:
cursor = connection.cursor()
state_name = request.form['state_Name'].strip()
LogHelper.log_action("Add State", f"User {current_user.id} added state '{state_name}'")
if not re.match(RegEx.patternAlphabetOnly, state_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("state"), 400)
return
try:
cursor.callproc("CheckStateExists", (state_name,))
for data in cursor.stored_results():
existing_state = data.fetchone()
if existing_state:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists("state"), 409)
return
# cursor.execute("call SaveState (%s)", (state_name,))
cursor.callproc("SaveState", (state_name,))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_success("state"), 200)
return
except mysql.connector.Error as e:
print(f"Error inserting state: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("state"), 500)
return
#Need to make this seperate
def GetAllStates(self, request):
"""Log user actions with timestamp, user, action, and details."""
statedata = []
connection = config.get_db_connection()
self.isSuccess = False
self.resultMessage = ""
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc("GetAllStates")
for res in cursor.stored_results():
data = res.fetchall()
statedata = res.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching states: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.fetch_failure("state"), 500
)
return []
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("state"), 500)
return []
finally:
cursor.close()
connection.close()
return data
return statedata
# ----------------------------------------------------------
# CHECK STATE (USING ITEM CRUD)
# ----------------------------------------------------------
def CheckState(self, request):
state_name = request.json.get('state_Name', '').strip()
crud = ItemCRUD(ItemCRUDType.State)
return crud.CheckItem(
request=request,
parentid=None,
childname=state_name,
storedprocfetch="CheckStateExists"
)
# ----------------------------------------------------------
# DELETE STATE (USING ITEM CRUD)
# ----------------------------------------------------------
def DeleteState(self, request, id):
crud = ItemCRUD(ItemCRUDType.State)
crud.DeleteItem(
request=request,
itemID=id,
storedprocDelete="DeleteState"
)
self.isSuccess = crud.isSuccess
self.resultMessage = crud.resultMessage
return self.resultMessage
# ----------------------------------------------------------
# EDIT STATE (USING ITEM CRUD)
# ----------------------------------------------------------
def EditState(self, request, id):
state_name = request.form['state_Name'].strip()
crud = ItemCRUD(ItemCRUDType.State)
crud.EditItem(
request=request,
childid=id,
parentid=None,
childname=state_name,
storedprocupdate="UpdateStateById"
)
self.isSuccess = crud.isSuccess
self.resultMessage = crud.resultMessage
return redirect(url_for('state.add_state'))
# ----------------------------------------------------------
# GET STATE BY ID (KEEP SAME)
# ----------------------------------------------------------
def GetStateByID(self, request, id):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
data = None
#connection closing needs to be verified
if connection:
cursor = connection.cursor()
state_name = request.json.get('state_Name', '').strip()
LogHelper.log_action("Check State", f"User {current_user.id} Checked state '{state_name}'")
if not re.match(RegEx.patternAlphabetOnly, state_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("state"), 400)
return HtmlHelper.json_response(ResponseHandler.invalid_name("state"), 400)
try:
# cursor.execute("SELECT * FROM states WHERE State_Name = %s", (state_name,))
# existing_state = cursor.fetchone()
cursor.callproc("CheckStateExists", (state_name,))
for data in cursor.stored_results():
existing_state = data.fetchone()
if existing_state:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists("state"), 409)
return HtmlHelper.json_response(ResponseHandler.already_exists("state"), 409)
else:
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.is_available("state"), 200)
return HtmlHelper.json_response(ResponseHandler.is_available("state"), 200)
except mysql.connector.Error as e:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("state"), 500)
print(f"Error checking state: {e}")
return HtmlHelper.json_response(ResponseHandler.add_failure("state"), 500)
finally:
cursor.close()
connection.close()
def DeleteState(self, request, id):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action("Delete State", f"User {current_user.id} Deleted state '{id}'")
try:
cursor.callproc('DeleteState', (id,))
connection.commit()
self.resultMessage = "Successfully Deleted"
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error deleting data: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.delete_failure("state"), 500)
return HtmlHelper.json_response(ResponseHandler.delete_failure("state"), 500)
finally:
cursor.close()
connection.close()
return self.resultMessage
def EditState(self, request, id):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
# str_pattern_reg = r"^[A-Za-z\s]+$"
state_name = request.form['state_Name'].strip()
LogHelper.log_action("Edit State", f"User {current_user.id} Edited state '{state_name}'")
if not re.match(RegEx.patternAlphabetOnly, state_name):
self.isSuccess = False
self.resultMessage = ResponseHandler.invalid_name("state"), 400
return ResponseHandler.invalid_name("state"), 400
try:
# cursor.execute("UPDATE states SET State_Name = %s WHERE State_ID = %s", (state_name, id))
cursor.callproc("UpdateStateById", (id, state_name))
connection.commit()
self.isSuccess = True
self.resultMessage = "Successfully Edited"
return redirect(url_for('state.add_state'))
except mysql.connector.Error as e:
print(f"Error updating data: {e}")
self.isSuccess = True
self.resultMessage = ResponseHandler.add_failure("state"), 500
return ResponseHandler.add_failure("state"), 500
finally:
cursor.close()
connection.close()
def GetStateByID(self, request, id):
"""Log user actions with timestamp, user, action, and details."""
statedata = []
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
if not connection:
return None
return []
cursor = connection.cursor()
try:
cursor.callproc("GetStateByID", (id,))
for res in cursor.stored_results():
data = res.fetchone()
statedata = res.fetchone()
if data:
if statedata:
self.isSuccess = True
self.resultMessage = "Success"
self.resultMessage = "Success in Fetching"
else:
self.isSuccess = False
self.resultMessage = "Not Found"
self.resultMessage = "State Not Found"
except mysql.connector.Error as e:
print(f"Error fetching state: {e}")
print(f"Error fetching states: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("state"), 500)
return []
finally:
cursor.close()
connection.close()
return data
return statedata

View File

@@ -1,140 +1,131 @@
from model.Utilities import ItemCRUDType
from model.ItemCRUD import ItemCRUD
# model/Subcontractor.py
import config
from model.Log import LogHelper
from mysql.connector import Error
class Subcontractor:
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ----------------------------------------------------------
# ADD
# ----------------------------------------------------------
def AddSubcontractor(self, request):
@staticmethod
def get_connection():
return config.get_db_connection()
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
@staticmethod
def get_all_subcontractors():
connection = Subcontractor.get_connection()
subcontractors = []
if not connection:
return None, "Database connection failed"
try:
cursor = connection.cursor()
cursor.callproc('GetAllSubcontractors')
for result in cursor.stored_results():
subcontractors = result.fetchall()
except Error as e:
print(f"Error fetching subcontractors: {e}")
return None, str(e)
finally:
cursor.close()
connection.close()
return subcontractors, None
data = {
"Contractor_Name": request.form.get('Contractor_Name', '').strip(),
"Address": request.form.get('Address', '').strip(),
"Mobile_No": request.form.get('Mobile_No', '').strip(),
"PAN_No": request.form.get('PAN_No', '').strip(),
"Email": request.form.get('Email', '').strip(),
"Gender": request.form.get('Gender', '').strip(),
"GST_Registration_Type": request.form.get('GST_Registration_Type', '').strip(),
"GST_No": request.form.get('GST_No', '').strip(),
"Contractor_password": request.form.get('Contractor_password', '').strip()
}
@staticmethod
def get_subcontractor_by_id(id):
connection = Subcontractor.get_connection()
subcontractor = None
if not connection:
return None, "Database connection failed"
try:
cursor = connection.cursor()
cursor.callproc("GetSubcontractorById", (id,))
for result in cursor.stored_results():
subcontractor = result.fetchone()
except Error as e:
print(f"Error fetching subcontractor: {e}")
return None, str(e)
finally:
cursor.close()
connection.close()
return subcontractor, None
subcontractor.AddItem(
request=request,
data=data,
storedprocfetch="GetSubcontractorByName",
storedprocadd="SaveContractor"
)
@staticmethod
def save_subcontractor(data):
connection = Subcontractor.get_connection()
if not connection:
return "Database connection failed"
try:
cursor = connection.cursor()
cursor.callproc('SaveContractor', (
data['Contractor_Name'],
data['Address'],
data['Mobile_No'],
data['PAN_No'],
data['Email'],
data['Gender'],
data['GST_Registration_Type'],
data['GST_No'],
data['Contractor_password']
))
connection.commit()
# Active log
LogHelper.log_action("Add Subcontractor", f"Added subcontractor '{data['Contractor_Name']}'")
except Error as e:
print(f"Error inserting subcontractor: {e}")
return str(e)
finally:
cursor.close()
connection.close()
return None
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return
@staticmethod
def update_subcontractor(id, data):
connection = Subcontractor.get_connection()
if not connection:
return "Database connection failed"
try:
cursor = connection.cursor()
cursor.callproc('UpdateSubcontractor', (
id,
data['Contractor_Name'],
data['Address'],
data['Mobile_No'],
data['PAN_No'],
data['Email'],
data['Gender'],
data['GST_Registration_Type'],
data['GST_No'],
data['Contractor_password']
))
connection.commit()
# Active log
LogHelper.log_action("Edit Subcontractor", f"Edited subcontractor '{id}'")
except Error as e:
print(f"Error updating subcontractor: {e}")
return str(e)
finally:
cursor.close()
connection.close()
return None
# ----------------------------------------------------------
# GET ALL
# ----------------------------------------------------------
def GetAllSubcontractors(self, request):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
data = subcontractor.GetAllData(
request=request,
storedproc="GetAllSubcontractors"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return data
# ----------------------------------------------------------
# GET BY ID
# ----------------------------------------------------------
def GetSubcontractorByID(self, id):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
data = subcontractor.GetDataByID(
id=id,
storedproc="GetSubcontractorById"
)
if data:
self.isSuccess = True
else:
self.isSuccess = False
self.resultMessage = "Subcontractor not found"
return data
# ----------------------------------------------------------
# CHECK (Duplicate)
# ----------------------------------------------------------
def CheckSubcontractor(self, request):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
name = request.form.get('Contractor_Name', '').strip()
result = subcontractor.CheckItem(
request=request,
childname=name,
storedprocfetch="GetSubcontractorByName"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return result
# ----------------------------------------------------------
# EDIT
# ----------------------------------------------------------
def EditSubcontractor(self, request, subcontractor_id):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
data = {
"Contractor_Name": request.form.get('Contractor_Name', '').strip(),
"Address": request.form.get('Address', '').strip(),
"Mobile_No": request.form.get('Mobile_No', '').strip(),
"PAN_No": request.form.get('PAN_No', '').strip(),
"Email": request.form.get('Email', '').strip(),
"Gender": request.form.get('Gender', '').strip(),
"GST_Registration_Type": request.form.get('GST_Registration_Type', '').strip(),
"GST_No": request.form.get('GST_No', '').strip(),
"Contractor_password": request.form.get('Contractor_password', '').strip()
}
subcontractor.EditItem(
request=request,
childid=subcontractor_id,
data=data,
storedprocupdate="UpdateSubcontractor"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return
# ----------------------------------------------------------
# DELETE
# ----------------------------------------------------------
def DeleteSubcontractor(self, request, subcontractor_id):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
subcontractor.DeleteItem(
request=request,
itemID=subcontractor_id,
storedprocDelete="DeleteSubcontractor"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return
@staticmethod
def delete_subcontractor(id):
connection = Subcontractor.get_connection()
if not connection:
return "Database connection failed", 0
affected_rows = 0
try:
cursor = connection.cursor()
cursor.callproc('DeleteSubcontractor', (id,))
connection.commit()
for result in cursor.stored_results():
row = result.fetchone()
affected_rows = row[0] if row else 0
# Active log
LogHelper.log_action("Delete Subcontractor", f"Deleted subcontractor '{id}'")
except Error as e:
print(f"Error deleting subcontractor: {e}")
return str(e), 0
finally:
cursor.close()
connection.close()
return None, affected_rows

View File

@@ -7,8 +7,6 @@ class ItemCRUDType(Enum):
District = 3
State = 4
HoldType = 5
Subcontractor = 6
class RegEx:
patternAlphabetOnly = "^[A-Za-z ]+$"

View File

@@ -1,14 +1,7 @@
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from model.Log import LogData, LogHelper
# return blocks
from model.Utilities import ResponseHandler, HtmlHelper, ItemCRUDType
import config
import mysql.connector
from mysql.connector import Error
from model.ItemCRUD import ItemCRUD
@@ -19,103 +12,162 @@ class Village:
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
self.village = ItemCRUD(itemType=ItemCRUDType.Village)
# 🔹 Helper: sync status
def _set_status(self, village):
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
# 🔹 Helper: get request data
def _get_form_data(self, request):
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
return block_id, village_name
def AddVillage(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village)
block_id, village_name = self._get_form_data(request)
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
if not village_name:
self.isSuccess = False
self.resultMessage = "Village name cannot be empty"
return
village.AddItem(request=request, parentid=block_id, childname=village_name, storedprocfetch="GetVillageByNameAndBlock", storedprocadd="SaveVillage" )
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return
#self.isSuccess = False
try:
self.village.AddItem(
request=request,
parentid=block_id,
childname=village_name,
storedprocfetch="GetVillageByNameAndBlock",
storedprocadd="SaveVillage"
)
self._set_status(self.village)
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
def GetAllVillages(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village)
villagesdata = village.GetAllData(request=request, storedproc="GetAllVillages")
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return villagesdata
try:
villagesdata = self.village.GetAllData(
request=request,
storedproc="GetAllVillages"
)
self._set_status(self.village)
return villagesdata
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return []
def CheckVillage(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village)
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
result = village.CheckItem(request=request, parentid=block_id, childname=village_name, storedprocfetch="GetVillageByNameAndBlocks")
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return result
block_id, village_name = self._get_form_data(request)
if not village_name:
self.isSuccess = False
self.resultMessage = "Village name cannot be empty"
return None
try:
result = self.village.CheckItem(
request=request,
parentid=block_id,
childname=village_name,
storedprocfetch="GetVillageByNameAndBlocks"
)
self._set_status(self.village)
return result
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return None
def DeleteVillage(self, request, village_id):
village = ItemCRUD(itemType=ItemCRUDType.Village)
try:
self.village.DeleteItem(
request=request,
itemID=village_id,
storedprocDelete="DeleteVillage"
)
self._set_status(self.village)
village.DeleteItem(request=request, itemID=village_id, storedprocDelete="DeleteVillage" )
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
def EditVillage(self, request, village_id):
corsor=None
village = ItemCRUD(itemType=ItemCRUDType.Village)
block_id, village_name = self._get_form_data(request)
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
village.EditItem(request=request,childid=village_id,parentid=block_id,childname=village_name,storedprocupdate="UpdateVillage" )
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return
# def GetVillageByID(self, request, id):
# village = ItemCRUD(itemType=ItemCRUDType.Village)
# villagedetailsdata = village.GetAllData(request=request, storedproc="GetVillageDetailsById")
# self.isSuccess = village.isSuccess
# self.resultMessage = village.resultMessage
# return villagedetailsdata
def GetVillageByID(self, request, id):
village = ItemCRUD(itemType=ItemCRUDType.Village)
villagedetailsdata = village.GetDataByID(id=id,storedproc="GetVillageDetailsById")
if villagedetailsdata:
self.isSuccess = True
else:
if not village_name:
self.isSuccess = False
self.resultMessage = "Village not found"
self.resultMessage = "Village name cannot be empty"
return
return villagedetailsdata
try:
self.village.EditItem(
request=request,
childid=village_id,
parentid=block_id,
childname=village_name,
storedprocupdate="UpdateVillage"
)
self._set_status(self.village)
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
def GetAllBlocks(self, request):
def GetVillageByID(self, id):
try:
villagedetailsdata = self.village.GetDataByID(
id=id,
storedproc="GetVillageDetailsById"
)
if villagedetailsdata:
self.isSuccess = True
else:
self.isSuccess = False
self.resultMessage = "Village not found"
return villagedetailsdata
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return None
def GetAllBlocks(self):
blocks = []
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
connection = config.get_db_connection()
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc('GetAllBlocks')
for result in cursor.stored_results():
blocks = result.fetchall()
with connection.cursor() as cursor:
cursor.callproc('GetAllBlocks')
for result in cursor.stored_results():
blocks.extend(result.fetchall())
self.isSuccess = True
return blocks
except mysql.connector.Error as e:
print(f"Error fetching blocks: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("block"), 500)
finally:
cursor.close()
connection.close()
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.fetch_failure("block"), 500
)
return []
return blocks
finally:
connection.close()