updated Payment reconcillation code

This commit is contained in:
2026-03-27 14:17:21 +05:30
commit 1a825ba46c
113 changed files with 15699 additions and 0 deletions

63
model/Auth.py Normal file
View File

@@ -0,0 +1,63 @@
import os
from dotenv import load_dotenv
from flask_login import UserMixin
from ldap3 import Server, Connection, ALL
from ldap3.core.exceptions import LDAPBindError
# Load .env
load_dotenv()
class DefaultCredentials:
username = os.getenv("DEFAULT_USERNAME")
password = os.getenv("DEFAULT_PASSWORD")
class LoginLDAP:
def __init__(self, request):
self.username = request.form.get("username", "").strip()
self.password = request.form.get("password", "")
self.isDefaultCredentials = False
self.isValidLogin = False
self.errorMessage = ""
ldap_server = "ldap://localhost:389"
ldap_user_dn = f"uid={self.username},ou=users,dc=lcepl,dc=org"
# fallback admin login
if (
self.username == DefaultCredentials.username
and self.password == DefaultCredentials.password
):
self.isDefaultCredentials = True
self.isValidLogin = True
return
try:
server = Server(ldap_server, get_info=ALL)
conn = Connection(
server,
user=ldap_user_dn,
password=self.password,
auto_bind=True
)
if conn.bound:
self.isValidLogin = True
except LDAPBindError:
self.errorMessage = "Invalid LDAP credentials"
except Exception as e:
self.errorMessage = str(e)
class User(UserMixin):
def __init__(self, username):
self.id = username

161
model/Block.py Normal file
View File

@@ -0,0 +1,161 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify
from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from model.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
from mysql.connector import Error
from model.ItemCRUD import ItemCRUD, itemCRUDMapping
class Block:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ----------------------------------------------------------
# Add Block
# ----------------------------------------------------------
def AddBlock(self, request):
block = ItemCRUD(itemType=ItemCRUDType.Block)
district_id = request.form.get('district_Id')
block_name = request.form.get('block_Name', '').strip()
block.AddItem(request=request, parentid=district_id, childname=block_name, storedprocfetch="GetBlockByNameAndDistricts", storedprocadd="SaveBlock" )
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return
# ----------------------------------------------------------
# Get All Blocks
# ----------------------------------------------------------
# def GetAllBlocks(self):
# block = ItemCRUD(itemType=ItemCRUDType.Block)
# blocksdata = block.GetAllData(request=request, storedproc="GetAllBlock")
# self.isSuccess = block.isSuccess
# self.resultMessage = block.resultMessage
# return blocksdata
def GetAllBlocks(self, request):
block = ItemCRUD(itemType=ItemCRUDType.Block)
blocksdata = block.GetAllData(request=request, storedproc="GetAllBlock")
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return blocksdata
# ----------------------------------------------------------
# Check Block Exists
# ----------------------------------------------------------
# def CheckBlock(self, request):
# block = ItemCRUD(itemType=ItemCRUDType.Block)
# block_name = request.json.get('block_Name', '').strip()
# district_id = request.json.get('district_Id')
# result = block.CheckItem(request=request, parentid=district_id, childname=block_name, storedprocfetch="GetBlockByNameAndDistrict")
# self.isSuccess = block.isSuccess
# self.resultMessage = block.resultMessage
# return result
def CheckBlock(self, request):
block = ItemCRUD(itemType=ItemCRUDType.Block)
data = request.get_json(silent=True) or request.form
block_name = (data.get('block_Name') or '').strip()
district_id = data.get('district_Id')
result = block.CheckItem(
request=request,
parentid=district_id,
childname=block_name,
storedprocfetch="GetBlockByNameAndDistrict"
)
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return result
# ----------------------------------------------------------
# Get Block By ID
# ----------------------------------------------------------
# def GetBlockByID(self, id):
# block = ItemCRUD(itemType=ItemCRUDType.Village)
# blockdata = block.GetAllData(id=id,storedproc="GetBlockDataByID")
# self.isSuccess = block.isSuccess
# self.resultMessage = block.resultMessage
# print("akash"+blockdata)
# return blockdata
# def GetBlockByID(self,request,id):
# block = ItemCRUD(itemType=ItemCRUDType.Block)
# blockdata = block.GetDataByID(request=request,id=id,storedproc="GetBlockDataByID")
# self.isSuccess = block.isSuccess
# self.resultMessage = block.resultMessage
# return blockdata
def GetBlockByID(self, id):
block = ItemCRUD(itemType=ItemCRUDType.Block)
blockdata = block.GetDataByID(
id=id,
storedproc="GetBlockDataByID"
)
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return blockdata
# ----------------------------------------------------------
# Update Block
# ----------------------------------------------------------
# def EditBlock(self, request, block_id):
# block = ItemCRUD(itemType=ItemCRUDType.Block)
# district_id = request.form.get('district_Id')
# block_name = request.form.get('block_Name', '').strip()
# block.EditItem(request=request, childid=block_id, parentid=district_id, childname=block_name, storedprocadd="UpdateBlockById" )
# self.isSuccess = block.isSuccess
# self.resultMessage = block.resultMessage
# return
def EditBlock(self, request, block_id):
block = ItemCRUD(itemType=ItemCRUDType.Block)
district_id = request.form.get('district_Id')
block_name = request.form.get('block_Name', '').strip()
block.EditItem(
request=request,
childid=block_id,
parentid=district_id,
childname=block_name,
storedprocupdate="UpdateBlockById"
)
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return
# ----------------------------------------------------------
# Delete Block
# ---------------------------------------------------------
def DeleteBlock(self,request, id):
block = ItemCRUD(itemType=ItemCRUDType.Block)
block.DeleteItem(request=request,itemID=id, storedprocDelete="DeleteBlock")
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return

65
model/ContractorInfo.py Normal file
View File

@@ -0,0 +1,65 @@
from mysql.connector import Error
import config
from datetime import datetime
class ContractorInfo:
def __init__(self, contractor_id):
self.ID = contractor_id
self.contInfo = None
self.fetchData()
def fetchData(self):
"""Fetch basic contractor info by ID."""
try:
connection = config.get_db_connection()
with connection.cursor(dictionary=True, buffered=True) as cursor:
cursor.callproc('GetContractorInfoById', [self.ID])
# Get the first result set
for result in cursor.stored_results():
self.contInfo = result.fetchone()
except Error as e:
print(f"Error fetching contractor info: {e}")
finally:
if connection.is_connected():
connection.close()
def fetchalldata(self):
"""Fetch hold types and invoices for contractor."""
data = {}
try:
connection = config.get_db_connection()
with connection.cursor(dictionary=True, buffered=True) as cursor:
# Fetch Hold Types
cursor.callproc('GetHoldTypesByContractor', [self.ID])
hold_types = []
for result in cursor.stored_results():
hold_types = result.fetchall()
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
data['hold_types'] = hold_type_map
# Fetch Invoices
cursor.callproc('GetInvoicesByContractor', [self.ID])
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
# Remove duplicate invoices
seen_ids = set()
unique_invoices = []
for inv in invoices:
if inv['Invoice_Id'] not in seen_ids:
seen_ids.add(inv['Invoice_Id'])
unique_invoices.append(inv)
data['invoices'] = unique_invoices
except Error as e:
print(f"Error fetching contractor data: {e}")
finally:
if connection.is_connected():
connection.close()
return data

80
model/District.py Normal file
View File

@@ -0,0 +1,80 @@
from model.Utilities import ItemCRUDType
from model.ItemCRUD import ItemCRUD
class District:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# edit district
def EditDistrict(self, request, district_id):
district = ItemCRUD(itemType=ItemCRUDType.District)
district_name = request.form['district_Name'].strip()
state_id = request.form['state_Id']
district.EditItem(request=request, childid=district_id, parentid=state_id, childname=district_name,storedprocupdate="UpdateDistrict" )
self.isSuccess = district.isSuccess
self.resultMessage = district.resultMessage
return
# add district
def AddDistrict(self, request):
district = ItemCRUD(ItemCRUDType.District)
district_name = request.form['district_Name'].strip()
state_id = request.form['state_Id']
district.AddItem(request=request, parentid=state_id, childname=district_name, storedprocfetch="GetDistrictByNameAndState", storedprocadd="SaveDistrict" )
self.isSuccess = district.isSuccess
self.resultMessage = district.resultMessage
return
# get all district data
def GetAllDistricts(self, request):
district = ItemCRUD(itemType=ItemCRUDType.District)
districtsdata = district.GetAllData(request=request, storedproc="GetAllDistricts")
self.isSuccess = district.isSuccess
self.resultMessage = district.resultMessage
return districtsdata
# check district validation
def CheckDistrict(self, request):
district = ItemCRUD(itemType=ItemCRUDType.District)
district_name = request.json.get('district_Name', '').strip()
state_id = request.json.get('state_Id', '')
result = district.CheckItem(request=request, parentid=state_id, childname=district_name, storedprocfetch="GetDistrictByNameAndState")
self.isSuccess = district.isSuccess
self.resultMessage = district.resultMessage
return result
# get district by district id
def GetDistrictByID(self, request, district_id):
district = ItemCRUD(itemType=ItemCRUDType.District)
districtdata = district.GetDataByID(
id=district_id,
storedproc="GetDistrictDataByID"
)
if districtdata:
self.isSuccess = True
else:
self.isSuccess = False
self.resultMessage = "District not found"
return districtdata
# Delete District by district id
def DeleteDistrict(self, request, district_id):
district = ItemCRUD(itemType=ItemCRUDType.District)
district.DeleteItem(request=request,itemID=district_id,storedprocDelete="DeleteDistrict")
self.isSuccess = district.isSuccess
self.resultMessage = str(district.resultMessage)

53
model/FolderAndFile.py Normal file
View File

@@ -0,0 +1,53 @@
import os
from flask import current_app
class FolderAndFile:
# -----------------------------
# BASE FOLDER METHODS
# -----------------------------
@staticmethod
def get_download_folder():
folder = os.path.join(current_app.root_path, "static", "downloads")
if not os.path.exists(folder):
os.makedirs(folder)
os.makedirs(folder, exist_ok=True)
return folder
@staticmethod
def get_upload_folder():
folder = os.path.join(current_app.root_path, "static", "uploads")
if not os.path.exists(folder):
os.makedirs(folder)
os.makedirs(folder, exist_ok=True)
return folder
@staticmethod
def get_logs_folder():
folder = os.path.join(current_app.root_path, "logs")
if not os.path.exists(folder):
os.makedirs(folder)
os.makedirs(folder, exist_ok=True)
return folder
# FILE PATH METHODS - download
@staticmethod
def get_download_path(filename):
return os.path.join(FolderAndFile.get_download_folder(), filename)
# FILE PATH METHODS - upload file
@staticmethod
def get_upload_path(filename):
return os.path.join(FolderAndFile.get_upload_folder(), filename)
# FILE PATH METHODS - activity log file
@staticmethod
def get_activity_log_path(filename):
return os.path.join(FolderAndFile.get_logs_folder(), filename)

51
model/GST.py Normal file
View File

@@ -0,0 +1,51 @@
from model.ItemCRUD import ItemCRUD
from model.Utilities import ItemCRUDType
class GST:
@staticmethod
def get_unreleased_gst():
# Use ItemCRUD for Invoices
invoice_crud = ItemCRUD(itemType=ItemCRUDType.Invoice)
invoices_rows = invoice_crud.GetAllData(storedproc="GetAllInvoicesBasic")
if not invoice_crud.isSuccess:
return [] # Could also log invoice_crud.resultMessage
invoices = [
dict(
Invoice_No=row[1],
GST_SD_Amount=float(row[2]) if row[2] is not None else 0
)
for row in invoices_rows
]
# Use ItemCRUD for GST Releases
gst_crud = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
gst_rows = gst_crud.GetAllData(storedproc="GetAllGSTReleasesBasic")
if not gst_crud.isSuccess:
return [] # Could also log gst_crud.resultMessage
gst_invoice_nos = {
g[2] # Invoice_No is at index 2
for g in gst_rows
if g[2]
}
gst_basic_amounts = {
float(g[3]) # Basic_Amount at index 3
for g in gst_rows
if g[3] is not None
}
# Filter unreleased invoices
unreleased = []
for inv in invoices:
match_by_invoice = inv['Invoice_No'] in gst_invoice_nos
match_by_gst_amount = inv['GST_SD_Amount'] in gst_basic_amounts
if not (match_by_invoice or match_by_gst_amount):
unreleased.append(inv)
return unreleased

90
model/HoldTypes.py Normal file
View File

@@ -0,0 +1,90 @@
from flask import request
from model.ItemCRUD import ItemCRUD
from model.Utilities import ItemCRUDType
class HoldTypes:
"""CRUD operations for Hold Types using ItemCRUD"""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ------------------- Add Hold Type -------------------
def AddHoldType(self, request):
hold = ItemCRUD(itemType=ItemCRUDType.HoldType)
hold_name = request.form.get('hold_type', '').strip()
hold.AddItem(
request=request,
parentid=None,
childname=hold_name,
storedprocfetch="CheckHoldTypeExists",
storedprocadd="SaveHoldType"
)
self.isSuccess = hold.isSuccess
self.resultMessage = hold.resultMessage
# ------------------- Get All Hold Types -------------------
def GetAllHoldTypes(self, request=None):
hold = ItemCRUD(itemType=ItemCRUDType.HoldType)
rows = hold.GetAllData(request=request, storedproc="GetAllHoldTypes")
self.isSuccess = hold.isSuccess
self.resultMessage = hold.resultMessage
# ✅ Convert tuple → dictionary
data = []
for row in rows:
data.append({
"hold_type_id": row[0],
"hold_type": row[1]
})
return data
# ------------------- Get Hold Type By ID -------------------
def GetHoldTypeByID(self, hold_type_id):
hold = ItemCRUD(itemType=ItemCRUDType.HoldType)
row = hold.GetDataByID(hold_type_id, storedproc="GetHoldTypesById")
self.isSuccess = hold.isSuccess
self.resultMessage = hold.resultMessage
# ✅ Convert tuple → dictionary
if row:
return {
"hold_type_id": row[0],
"hold_type": row[1]
}
return None
# ------------------- Update Hold Type -------------------
def EditHoldType(self, request, hold_type_id):
hold = ItemCRUD(itemType=ItemCRUDType.HoldType)
hold_name = request.form.get('hold_type', '').strip()
hold.EditItem(
request=request,
childid=hold_type_id,
parentid=None,
childname=hold_name,
storedprocupdate="UpdateHoldTypeById"
)
self.isSuccess = hold.isSuccess
self.resultMessage = hold.resultMessage
# ------------------- Delete Hold Type -------------------
def DeleteHoldType(self, request, hold_type_id):
hold = ItemCRUD(itemType=ItemCRUDType.HoldType)
hold.DeleteItem(
request=request,
itemID=hold_type_id,
storedprocDelete="DeleteHoldType"
)
self.isSuccess = hold.isSuccess
self.resultMessage = hold.resultMessage

237
model/Invoice.py Normal file
View File

@@ -0,0 +1,237 @@
import config
import mysql.connector
# ------------------- Helper Functions -------------------
def clear_results(cursor):
"""Consume all stored results to prevent cursor issues."""
for r in cursor.stored_results():
r.fetchall()
def fetch_one(cursor):
"""Fetch first row from stored results."""
for result in cursor.stored_results():
return result.fetchone()
return None
def fetch_all(cursor):
"""Fetch all rows from stored results."""
data = []
for result in cursor.stored_results():
data.extend(result.fetchall())
return data
def get_numeric_values(data):
"""Return numeric fields for invoices safely."""
return [
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
]
def execute_db_operation(operation_func):
"""General DB operation wrapper with commit/rollback."""
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
result = operation_func(cursor)
connection.commit()
return result
except Exception:
connection.rollback()
raise
finally:
cursor.close()
connection.close()
# ------------------- Village Functions -------------------
def get_village_id(village_name):
def operation(cursor):
cursor.callproc("GetVillageIdByName", (village_name,))
return fetch_one(cursor)
return execute_db_operation(operation)
def get_all_villages():
def operation(cursor):
cursor.callproc("GetAllVillages")
return fetch_all(cursor)
return execute_db_operation(operation)
# ------------------- Invoice Functions -------------------
def insert_invoice(data, village_id):
def operation(cursor):
# Insert invoice
cursor.callproc('InsertInvoice', [
data.get('pmc_no'),
village_id,
data.get('work_type'),
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
*get_numeric_values(data),
data.get('subcontractor_id')
])
invoice_row = fetch_one(cursor)
if not invoice_row:
raise Exception("Invoice ID not returned")
invoice_id = invoice_row.get('invoice_id')
# # Insert inpayment
# cursor.callproc('InsertInpayment', [
# data.get('pmc_no'),
# village_id,
# data.get('work_type'),
# data.get('invoice_details'),
# data.get('invoice_date'),
# data.get('invoice_no'),
# *get_numeric_values(data),
# data.get('subcontractor_id')
# ])
# clear_results(cursor)
return invoice_id
return execute_db_operation(operation)
def get_all_invoice_details():
def operation(cursor):
cursor.callproc('GetAllInvoiceDetails')
return fetch_all(cursor)
return execute_db_operation(operation)
def get_invoice_by_id(invoice_id):
def operation(cursor):
cursor.callproc('GetInvoiceDetailsById', [invoice_id])
invoice = fetch_one(cursor)
cursor.callproc('GetHoldAmountsByInvoiceId', [invoice_id])
hold_amounts = fetch_all(cursor)
if invoice:
invoice["hold_amounts"] = hold_amounts
return invoice
return execute_db_operation(operation)
def update_invoice(data, invoice_id):
def operation(cursor):
cursor.callproc("GetVillageIdByName", (data.get('village'),))
village = fetch_one(cursor)
if not village:
raise Exception("Village not found")
village_id = village['Village_Id']
cursor.callproc('UpdateInvoice', [
data.get('pmc_no'),
village_id,
data.get('work_type'),
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
*get_numeric_values(data),
invoice_id
])
clear_results(cursor)
execute_db_operation(operation)
# def update_inpayment(data):
# def operation(cursor):
# cursor.callproc('UpdateInpayment', [
# data.get('work_type'),
# data.get('invoice_details'),
# data.get('invoice_date'),
# *get_numeric_values(data),
# data.get('pmc_no'),
# data.get('invoice_no')
# ])
# clear_results(cursor)
# execute_db_operation(operation)
def delete_invoice_data(invoice_id, user_id):
def operation(cursor):
# Fetch PMC and Invoice_No from DB
cursor.callproc('GetInvoicePMCById', (invoice_id,))
record = {}
for result in cursor.stored_results():
record = result.fetchone() or {}
if not record:
raise Exception("Invoice not found")
# Use exact DB keys
pmc_no = record['PMC_No']
invoice_no = record['Invoice_No']
# Delete invoice
cursor.callproc("DeleteInvoice", (invoice_id,))
clear_results(cursor)
# Delete inpayment
# cursor.callproc('DeleteInpaymentByPMCInvoice', (pmc_no, invoice_no))
# clear_results(cursor)
execute_db_operation(operation)
# ------------------- Subcontractor Functions -------------------
def assign_subcontractor(data, village_id):
def operation(cursor):
cursor.callproc('AssignSubcontractor', [
data.get('pmc_no'),
data.get('subcontractor_id'),
village_id
])
clear_results(cursor)
execute_db_operation(operation)
# ------------------- Hold Types Functions -------------------
def insert_hold_types(data, invoice_id):
def operation(cursor):
hold_types = data.getlist('hold_type[]')
hold_amounts = data.getlist('hold_amount[]')
for hold_type, hold_amount in zip(hold_types, hold_amounts):
if not hold_type:
continue
cursor.callproc('GetHoldTypeIdByName', [hold_type])
hold_type_result = fetch_one(cursor)
if not hold_type_result:
cursor.callproc('InsertHoldType', [hold_type, 0])
cursor.execute("SELECT @_InsertHoldType_1")
hold_type_id = cursor.fetchone()[0]
else:
hold_type_id = hold_type_result['hold_type_id']
cursor.callproc('InsertInvoiceSubcontractorHold', [
data.get('subcontractor_id'),
invoice_id,
hold_type_id,
float(hold_amount or 0)
])
clear_results(cursor)
execute_db_operation(operation)
def get_all_hold_types():
def operation(cursor):
cursor.callproc("GetAllHoldTypes")
return fetch_all(cursor)
return execute_db_operation(operation)
# ------------------- Contractor Functions -------------------
def search_contractors(sub_query):
def operation(cursor):
cursor.callproc('SearchContractorsByName', [sub_query])
return fetch_all(cursor)
return execute_db_operation(operation)

397
model/ItemCRUD.py Normal file
View File

@@ -0,0 +1,397 @@
from flask_login import current_user
from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from model.Log import LogHelper
import config
import re
import mysql.connector
# ----------------------------------------------------------
# Mapping Class
# ----------------------------------------------------------
class itemCRUDMapping:
def __init__(self, itemType):
if itemType is ItemCRUDType.Village:
self.name = "Village"
elif itemType is ItemCRUDType.Block:
self.name = "Block"
elif itemType is ItemCRUDType.State:
self.name = "State"
elif itemType is ItemCRUDType.HoldType:
self.name = "Hold Type"
elif itemType is ItemCRUDType.Subcontractor:
self.name = "Subcontractor"
elif itemType.name == "GSTRelease":
self.name = "GSTRelease"
else:
self.name = "Item"
# ----------------------------------------------------------
# Generic CRUD Class
# ----------------------------------------------------------
class ItemCRUD:
def __init__(self, itemType):
self.isSuccess = False
self.resultMessage = ""
self.itemCRUDType = itemType
self.itemCRUDMapping = itemCRUDMapping(itemType)
# ----------------------------------------------------------
# DELETE
# ----------------------------------------------------------
def DeleteItem(self, request, itemID, storedprocDelete):
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action(
f"Delete {self.itemCRUDMapping.name}",
f"User {current_user.id} deleted {self.itemCRUDMapping.name} '{itemID}'"
)
try:
cursor.callproc(storedprocDelete, (itemID,))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.delete_success(self.itemCRUDMapping.name), 200
)
except mysql.connector.Error as e:
print(f"Error deleting {self.itemCRUDMapping.name}: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.delete_failure(self.itemCRUDMapping.name), 500
)
finally:
cursor.close()
connection.close()
# ----------------------------------------------------------
# ADD
# ----------------------------------------------------------
def AddItem(self, request, parentid=None, childname=None, storedprocfetch=None, storedprocadd=None, data=None):
connection = config.get_db_connection()
if not connection:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.db_connection_failure(), 500
)
return
cursor = connection.cursor()
LogHelper.log_action(
f"Add {self.itemCRUDMapping.name}",
f"User {current_user.id} adding '{childname if childname else (data.get('Contractor_Name') if data else '')}'"
)
try:
# ======================================================
# GSTRelease MULTI-FIELD
# ======================================================
if self.itemCRUDType.name == "GSTRelease" and data:
# Duplicate check (PMC_No + Invoice_No)
if storedprocfetch:
cursor.callproc(storedprocfetch, (data['PMC_No'], data['Invoice_No']))
existing_item = None
for rs in cursor.stored_results():
existing_item = rs.fetchone()
if existing_item:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.already_exists(self.itemCRUDMapping.name), 409
)
return
# Insert GSTRelease
cursor.callproc(storedprocadd, (
data['PMC_No'],
data['Invoice_No'],
data['Basic_Amount'],
data['Final_Amount'],
data['Total_Amount'],
data['UTR'],
data['Contractor_ID']
))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.add_success(self.itemCRUDMapping.name), 200
)
return
# ======================================================
# SUBCONTRACTOR MULTI-FIELD
# ======================================================
if self.itemCRUDType.name == "Subcontractor" and data:
cursor.callproc(storedprocfetch, (data['Contractor_Name'],))
existing_item = None
for rs in cursor.stored_results():
existing_item = rs.fetchone()
if existing_item:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.already_exists(self.itemCRUDMapping.name), 409
)
return
cursor.callproc(storedprocadd, (
data['Contractor_Name'],
data['Address'],
data['Mobile_No'],
data['PAN_No'],
data['Email'],
data['Gender'],
data['GST_Registration_Type'],
data['GST_No'],
data['Contractor_password']
))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.add_success(self.itemCRUDMapping.name), 200
)
return
# ======================================================
# NORMAL SINGLE-FIELD (Village / Block / State)
# ======================================================
if not re.match(RegEx.allPattern, childname):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400
)
return
if parentid is None:
cursor.callproc(storedprocfetch, (childname,))
else:
cursor.callproc(storedprocfetch, (childname, parentid))
existing_item = None
for rs in cursor.stored_results():
existing_item = rs.fetchone()
if existing_item:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.already_exists(self.itemCRUDMapping.name), 409
)
return
if parentid is None:
cursor.callproc(storedprocadd, (childname,))
else:
cursor.callproc(storedprocadd, (childname, parentid))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.add_success(self.itemCRUDMapping.name), 200
)
except mysql.connector.Error as e:
print(f"Database Error: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.add_failure(self.itemCRUDMapping.name), 500
)
finally:
cursor.close()
connection.close()
# ----------------------------------------------------------
# EDIT
# ----------------------------------------------------------
def EditItem(self, request, childid, parentid=None, childname=None, storedprocupdate=None, data=None):
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action(
f"Edit {self.itemCRUDMapping.name}",
f"User {current_user.id} edited '{childid}'"
)
try:
# ======================================================
# GSTRelease MULTI-FIELD
# ======================================================
if self.itemCRUDType.name == "GSTRelease" and data:
cursor.callproc(storedprocupdate, (
data['p_pmc_no'], # PMC_No
data['p_invoice_no'], # Invoice_No
data['p_basic_amount'], # Basic_Amount
data['p_final_amount'], # Final_Amount
data['p_total_amount'], # Total_Amount
data['p_utr'], # UTR
data['p_gst_release_id']# GST_Release_Id
))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.update_success(self.itemCRUDMapping.name), 200
)
return
# ======================================================
# SUBCONTRACTOR MULTI-FIELD
# ======================================================
if self.itemCRUDType.name == "Subcontractor" and data:
cursor.callproc(storedprocupdate, (
childid,
data['Contractor_Name'],
data['Address'],
data['Mobile_No'],
data['PAN_No'],
data['Email'],
data['Gender'],
data['GST_Registration_Type'],
data['GST_No'],
data['Contractor_password']
))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.update_success(self.itemCRUDMapping.name), 200
)
return
# ======================================================
# NORMAL SINGLE-FIELD
# ======================================================
if not re.match(RegEx.allPattern, childname):
self.isSuccess = False
self.resultMessage = ResponseHandler.update_failure(self.itemCRUDMapping.name)['message']
return
if parentid is None:
cursor.callproc(storedprocupdate, (childid, childname))
else:
cursor.callproc(storedprocupdate, (childid, parentid, childname))
connection.commit()
self.isSuccess = True
self.resultMessage = ResponseHandler.update_success(self.itemCRUDMapping.name)['message']
except mysql.connector.Error as e:
print(f"Error updating {self.itemCRUDMapping.name}: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.update_failure(self.itemCRUDMapping.name), 500
)
finally:
cursor.close()
connection.close()
# ----------------------------------------------------------
# GET ALL
# ----------------------------------------------------------
def GetAllData(self, request, storedproc):
data = []
connection = config.get_db_connection()
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc(storedproc)
for result in cursor.stored_results():
data = result.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching {self.itemCRUDMapping.name}: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.fetch_failure(self.itemCRUDMapping.name), 500
)
return []
finally:
cursor.close()
connection.close()
return data
# ----------------------------------------------------------
# GET BY ID
# ----------------------------------------------------------
def GetDataByID(self, id, storedproc):
data = None
connection = config.get_db_connection()
cursor = connection.cursor()
try:
cursor.callproc(storedproc, (id,))
for rs in cursor.stored_results():
data = rs.fetchone()
except mysql.connector.Error as e:
print(f"Error fetching {self.itemCRUDMapping.name}: {e}")
finally:
cursor.close()
connection.close()
return data
# ----------------------------------------------------------
# CHECK ITEM
# ----------------------------------------------------------
def CheckItem(self, request, parentid, childname, storedprocfetch):
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action(
f"Check {self.itemCRUDMapping.name}",
f"User {current_user.id} checked '{childname}'"
)
if not re.match(RegEx.allPattern, childname):
return HtmlHelper.json_response(
ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400
)
try:
if parentid is None:
cursor.callproc(storedprocfetch, (childname,))
else:
cursor.callproc(storedprocfetch, (childname, parentid))
existing_item = None
for rs in cursor.stored_results():
existing_item = rs.fetchone()
if existing_item:
return HtmlHelper.json_response(
ResponseHandler.already_exists(self.itemCRUDMapping.name), 409
)
return HtmlHelper.json_response(
ResponseHandler.is_available(self.itemCRUDMapping.name), 200
)
except mysql.connector.Error as e:
print(f"Error checking {self.itemCRUDMapping.name}: {e}")
return HtmlHelper.json_response(
ResponseHandler.fetch_failure(self.itemCRUDMapping.name), 500
)
finally:
cursor.close()
connection.close()

84
model/Log.py Normal file
View File

@@ -0,0 +1,84 @@
import os
from flask import current_app
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from datetime import datetime
from model.FolderAndFile import FolderAndFile
class LogHelper:
@staticmethod
def log_action(action, details=""):
"""Log user actions with timestamp, user, action, and details."""
logData = LogData()
logData.WriteLog(action, details="")
class LogData:
filepath = ""
timestamp = None
def __init__(self):
self.filepath = FolderAndFile.get_activity_log_path('activity.log')
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.user = LogData.get_current_user()
@staticmethod
def get_current_user():
if hasattr(current_user, "cn") and current_user.cn:
return current_user.cn
elif hasattr(current_user, "username") and current_user.username:
return current_user.username
elif hasattr(current_user, "sAMAccountName") and current_user.sAMAccountName:
return current_user.sAMAccountName
return "Unknown"
def WriteLog(self, action, details=""):
"""Log user actions with timestamp, user, action, and details."""
with open(self.filepath, "a", encoding="utf-8") as f:
f.write(
f"Timestamp: {self.timestamp} | "
f"User: {self.user} | "
f"Action: {action} | "
f"Details: {details}\n"
)
def GetActivitiesLog(self):
logs = []
if os.path.exists(self.filepath):
with open(self.filepath, 'r') as f:
for line in f:
parts = line.strip().split(" | ")
if len(parts) == 4:
logs.append({
"timestamp": parts[0].replace("Timestamp:", "").strip(),
"user": parts[1].replace("User:", "").strip(),
"action": parts[2].replace("Action:", "").strip(),
"details": parts[3].replace("Details:", "").strip()
})
return logs
def GetFilteredActivitiesLog(self, startDate, endDate, userName):
filtered_logs = self.GetActivitiesLog()
# Date filter
if startDate or endDate:
try:
start_dt = datetime.strptime(startDate, "%Y-%m-%d") if startDate else datetime.min
end_dt = datetime.strptime(endDate, "%Y-%m-%d") if endDate else datetime.max
filtered_logs = [
log for log in filtered_logs
if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt
]
except Exception as e:
print("Date filter error:", e)
# Username filter
if userName:
filtered_logs = [log for log in filtered_logs if userName.lower() in log["user"].lower()]
return filtered_logs

456
model/PmcReport.py Normal file
View File

@@ -0,0 +1,456 @@
import openpyxl
from openpyxl.styles import Font, PatternFill
import config
from flask_login import current_user
from model.Log import LogHelper
from model.Report import ReportHelper
from model.FolderAndFile import FolderAndFile
class PmcReport:
@staticmethod
def get_pmc_report(pmc_no):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
try:
# cursor.callproc("GetContractorInfoByPmcNo", (pmc_no,))
# pmc_info = next(cursor.stored_results()).fetchone()
pmc_info = ReportHelper.execute_sp(cursor, 'GetContractorInfoByPmcNo', [pmc_no], True)
if not pmc_info:
return None
cursor.callproc("Get_pmc_hold_types", (pmc_no, pmc_info["Contractor_Id"]))
hold_types = next(cursor.stored_results()).fetchall()
# Extract hold_type_ids
hold_type_ids = [ht['hold_type_id'] for ht in hold_types]
invoices = []
hold_amount_total = 0
if hold_type_ids:
hold_type_ids_str = ",".join(map(str, hold_type_ids))
cursor.callproc(
'GetInvoices_WithHold',
[pmc_no, pmc_info["Contractor_Id"], hold_type_ids_str]
)
else:
cursor.callproc(
'GetInvoices_NoHold',
[pmc_no, pmc_info["Contractor_Id"]]
)
for result in cursor.stored_results():
invoices = result.fetchall()
if hold_type_ids:
hold_amount_total = sum(row.get('hold_amount', 0) or 0 for row in invoices)
total_invo_final = sum(row.get('Final_Amount', 0) or 0 for row in invoices)
# GST RELEASE
# cursor.callproc('GetGSTReleaseByPMC', [pmc_no])
# gst_rel = []
# for result in cursor.stored_results():
# gst_rel = result.fetchall()
gst_rel = ReportHelper.execute_sp(cursor, 'GetGSTReleaseByPMC', [pmc_no])
total_gst_basic = sum(row.get('basic_amount', 0) or 0 for row in gst_rel)
total_gst_final = sum(row.get('final_amount', 0) or 0 for row in gst_rel)
# ---------------- HOLD RELEASE ----------------
# cursor.callproc('GetHoldReleaseByPMC', [pmc_no])
# hold_release = []
# for result in cursor.stored_results():
# hold_release = result.fetchall()
hold_release = ReportHelper.execute_sp(cursor, 'GetHoldReleaseByPMC', [pmc_no])
# ---------------- CREDIT NOTE ----------------
# cursor.callproc('GetCreditNoteByPMC', [pmc_no])
# credit_note = []
# for result in cursor.stored_results():
# credit_note = result.fetchall()
credit_note = ReportHelper.execute_sp(cursor, 'GetCreditNoteByPMC', [pmc_no])
payments = ReportHelper.execute_sp(cursor, 'GetPaymentsByPMC', [pmc_no])
# ---------------- PAYMENTS ----------------
# cursor.callproc('GetPaymentsByPMC', [pmc_no])
# payments = []
# for result in cursor.stored_results():
# payments = result.fetchall()
total_pay_amount = sum(row.get('Payment_Amount', 0) or 0 for row in payments)
total_pay_total = sum(row.get('Total_amount', 0) or 0 for row in payments)
totals = {
"sum_invo_basic_amt": sum(row.get('Basic_Amount', 0) or 0 for row in invoices),
"sum_invo_debit_amt": sum(row.get('Debit_Amount', 0) or 0 for row in invoices),
"sum_invo_after_debit_amt": sum(row.get('After_Debit_Amount', 0) or 0 for row in invoices),
"sum_invo_amt": sum(row.get('Amount', 0) or 0 for row in invoices),
"sum_invo_gst_amt": sum(row.get('GST_Amount', 0) or 0 for row in invoices),
"sum_invo_tds_amt": sum(row.get('TDS_Amount', 0) or 0 for row in invoices),
"sum_invo_ds_amt": sum(row.get('SD_Amount', 0) or 0 for row in invoices),
"sum_invo_on_commission": sum(row.get('On_Commission', 0) or 0 for row in invoices),
"sum_invo_hydro_test": sum(row.get('Hydro_Testing', 0) or 0 for row in invoices),
"sum_invo_gst_sd_amt": sum(row.get('GST_SD_Amount', 0) or 0 for row in invoices),
"sum_invo_final_amt": total_invo_final,
"sum_invo_hold_amt": hold_amount_total,
"sum_gst_basic_amt": total_gst_basic,
"sum_gst_final_amt": total_gst_final,
"sum_pay_payment_amt": total_pay_amount,
"sum_pay_tds_payment_amt": sum(row.get('TDS_Payment_Amount', 0) or 0 for row in payments),
"sum_pay_total_amt": total_pay_total
}
return {
"info": pmc_info,
"invoices": invoices,
"hold_types": hold_types,
"gst_rel": gst_rel,
"payments": payments,
"credit_note": credit_note,
"hold_release": hold_release,
"total": totals
}
finally:
cursor.close()
connection.close()
@staticmethod
def download_pmc_report(pmc_no):
connection = config.get_db_connection()
if not connection:
return None
cursor = connection.cursor(dictionary=True)
try:
# filename
filename = f"PMC_Report_{pmc_no}.xlsx"
output_folder = FolderAndFile.get_download_folder()
output_file = FolderAndFile.get_download_path(filename)
# ================= DATA FETCH =================
contractor_info = ReportHelper.execute_sp(cursor, 'GetContractorDetailsByPMC', [pmc_no], "one")
if not contractor_info:
return None
hold_types = ReportHelper.execute_sp(cursor, 'GetHoldTypesByContractor', [contractor_info["Contractor_Id"]])
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
invoices = ReportHelper.execute_sp(cursor, 'GetInvoicesAndGstReleaseByPmcNo', [pmc_no])
credit_notes = ReportHelper.execute_sp(cursor, 'GetCreditNoteByContractor', [contractor_info["Contractor_Id"]])
hold_amounts = ReportHelper.execute_sp(cursor, 'GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]])
all_payments = ReportHelper.execute_sp(cursor, 'GetAllPaymentsByPMC', [pmc_no])
gst_releases = ReportHelper.execute_sp(cursor, 'GetGSTReleaseDetailsByPMC', [pmc_no])
# ================= DATA MAPPING =================
hold_data = {}
for h in hold_amounts:
hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
payments_map = {}
for pay in all_payments:
if pay['invoice_no']:
payments_map.setdefault(pay['invoice_no'], []).append(pay)
# ================= LOG =================
LogHelper.log_action(
"Download PMC Report",
f"User {current_user.id} Download PMC Report '{pmc_no}'"
)
# ================= EXCEL =================
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = "PMC Report"
# HEADER INFO
sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD."])
sheet.append(["Contractor Name", contractor_info["Contractor_Name"]])
sheet.append(["State", contractor_info["State_Name"]])
sheet.append(["District", contractor_info["District_Name"]])
sheet.append(["Block", contractor_info["Block_Name"]])
sheet.append([])
base_headers = [
"PMC No","Village","Work Type","Invoice Details","Invoice Date","Invoice No",
"Basic Amount","Debit","After Debit Amount","GST","Amount","TDS",
"SD","On Commission","Hydro Testing","GST SD Amount"
]
hold_headers = [ht['hold_type'] for ht in hold_types]
payment_headers = [
"Final Amount","Payment Amount","TDS Payment","Total Paid","UTR"
]
headers = base_headers + hold_headers + payment_headers
sheet.append(headers)
# STYLE
for cell in sheet[sheet.max_row]:
cell.font = Font(bold=True)
# DATA
seen_invoices = set()
for inv in invoices:
invoice_no = inv["Invoice_No"]
payments = payments_map.get(invoice_no, [])
if invoice_no in seen_invoices:
continue
seen_invoices.add(invoice_no)
first_payment = payments[0] if payments else None
row = [
pmc_no,
inv["Village_Name"],
inv["Work_Type"],
inv["Invoice_Details"],
inv["Invoice_Date"],
invoice_no,
inv["Basic_Amount"],
inv["Debit_Amount"],
inv["After_Debit_Amount"],
inv["GST_Amount"],
inv["Amount"],
inv["TDS_Amount"],
inv["SD_Amount"],
inv["On_Commission"],
inv["Hydro_Testing"],
inv["GST_SD_Amount"]
]
# HOLD DATA
invoice_holds = hold_data.get(inv["Invoice_Id"], {})
for ht_id in hold_type_map.keys():
row.append(invoice_holds.get(ht_id, ""))
# PAYMENT DATA
row += [
inv["Final_Amount"],
first_payment["Payment_Amount"] if first_payment else "",
first_payment["TDS_Payment_Amount"] if first_payment else "",
first_payment["Total_amount"] if first_payment else "",
first_payment["UTR"] if first_payment else ""
]
sheet.append(row)
# AUTO WIDTH
for col in sheet.columns:
max_len = max((len(str(cell.value)) for cell in col if cell.value), default=0)
sheet.column_dimensions[col[0].column_letter].width = max_len + 2
# SAVE
workbook.save(output_file)
workbook.close()
return output_folder, filename
except Exception as e:
print(f"Error generating PMC report: {e}")
return None
finally:
cursor.close()
connection.close()
# @staticmethod
# def download_pmc_report(pmc_no):
# connection = config.get_db_connection()
# cursor = connection.cursor(dictionary=True)
# # output_folder = "static/download"
# # output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx")
# output_folder = FolderAndFile.get_download_folder
# filename = f"PMC_Report_{pmc_no}.xlsx"
# output_file = FolderAndFile.get_download_path(filename)
# try:
# cursor.callproc('GetContractorDetailsByPMC', [pmc_no])
# contractor_info = next(cursor.stored_results()).fetchone()
# if not contractor_info:
# return None
# cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]])
# hold_types = next(cursor.stored_results()).fetchall()
# hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
# cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no])
# invoices = next(cursor.stored_results()).fetchall()
# cursor.callproc('GetCreditNoteByContractor',[contractor_info["Contractor_Id"]])
# credit_notes = []
# for result in cursor.stored_results():
# credit_notes = result.fetchall()
# credit_note_map = {}
# for cn in credit_notes:
# key = (cn["PMC_No"], cn["Invoice_No"])
# credit_note_map.setdefault(key, []).append(cn)
# cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]])
# hold_amounts = next(cursor.stored_results()).fetchall()
# hold_data = {}
# for h in hold_amounts:
# hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
# cursor.callproc('GetAllPaymentsByPMC', [pmc_no])
# all_payments = next(cursor.stored_results()).fetchall()
# payments_map = {}
# extra_payments = []
# for pay in all_payments:
# if pay['invoice_no']:
# payments_map.setdefault(pay['invoice_no'], []).append(pay)
# else:
# extra_payments.append(pay)
# # ---------------- GST RELEASE DETAILS ----------------
# cursor.callproc('GetGSTReleaseDetailsByPMC', [pmc_no])
# gst_releases = []
# for result in cursor.stored_results():
# gst_releases = result.fetchall()
# gst_release_map = {}
# for gr in gst_releases:
# invoice_nos = []
# if gr['Invoice_No']:
# cleaned = gr['Invoice_No'].replace(' ', '')
# if '&' in cleaned:
# invoice_nos = cleaned.split('&')
# elif ',' in cleaned:
# invoice_nos = cleaned.split(',')
# else:
# invoice_nos = [cleaned]
# for inv_no in invoice_nos:
# gst_release_map.setdefault(inv_no, []).append(gr)
# LogHelper.log_action(
# "Download PMC Report",
# f"User {current_user.id} Download PMC Report '{pmc_no}'"
# )
# workbook = openpyxl.Workbook()
# sheet = workbook.active
# sheet.title = "PMC Report"
# sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD."])
# sheet.append(["Contractor Name", contractor_info["Contractor_Name"], "", "GST No", contractor_info["GST_No"], "", "GST Type", contractor_info["GST_Registration_Type"]])
# sheet.append(["State", contractor_info["State_Name"], "", "PAN No", contractor_info["PAN_No"], "", "Address", contractor_info["Address"]])
# sheet.append(["District", contractor_info["District_Name"], "", "Mobile No", contractor_info["Mobile_No"]])
# sheet.append(["Block", contractor_info["Block_Name"], "", "Email", contractor_info["Email"]])
# sheet.append([])
# base_headers = [
# "PMC No","Village","Work Type","Invoice Details","Invoice Date","Invoice No",
# "Basic Amount","Debit","After Debit Amount","GST (18%)","Amount","TDS (1%)",
# "SD (5%)","On Commission","Hydro Testing","GST SD Amount"
# ]
# hold_headers = [ht['hold_type'] for ht in hold_types]
# payment_headers = [
# "Final Amount","Payment Amount","TDS Payment","Total Paid","UTR"
# ]
# sheet.append(base_headers + hold_headers + payment_headers)
# header_fill = PatternFill(start_color="ADD8E6",end_color="ADD8E6",fill_type="solid")
# header_font = Font(bold=True)
# for cell in sheet[sheet.max_row]:
# cell.font = header_font
# cell.fill = header_fill
# seen_invoices = set()
# processed_payments = set()
# for inv in invoices:
# invoice_no = inv["Invoice_No"]
# payments = payments_map.get(invoice_no, [])
# if invoice_no not in seen_invoices:
# seen_invoices.add(invoice_no)
# first_payment = payments[0] if payments else None
# row = [
# pmc_no, inv["Village_Name"], inv["Work_Type"],
# inv["Invoice_Details"], inv["Invoice_Date"], invoice_no,
# inv["Basic_Amount"], inv["Debit_Amount"],
# inv["After_Debit_Amount"], inv["GST_Amount"],
# inv["Amount"], inv["TDS_Amount"], inv["SD_Amount"],
# inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"]
# ]
# invoice_holds = hold_data.get(inv["Invoice_Id"], {})
# for ht_id in hold_type_map.keys():
# row.append(invoice_holds.get(ht_id, ""))
# row += [
# inv["Final_Amount"],
# first_payment["Payment_Amount"] if first_payment else "",
# first_payment["TDS_Payment_Amount"] if first_payment else "",
# first_payment["Total_amount"] if first_payment else "",
# first_payment["UTR"] if first_payment else ""
# ]
# sheet.append(row)
# workbook.save(output_file)
# workbook.close()
# return output_folder, filename
# finally:
# cursor.close()
# connection.close()

382
model/Report.py Normal file
View File

@@ -0,0 +1,382 @@
import config
from datetime import datetime
from flask import send_file
import os
import openpyxl
from openpyxl.styles import Font, PatternFill
from model.FolderAndFile import FolderAndFile
class ReportHelper:
isSuccess = False
resultMessage = ""
data=[]
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
self.data = []
@staticmethod
def execute_sp(cursor, proc_name, params=[], fetch_one=False):
cursor.callproc(proc_name, params)
return (
ReportHelper.fetch_one_result(cursor)
if fetch_one else
ReportHelper.fetch_all_results(cursor)
)
@staticmethod
def fetch_all_results(cursor):
data = []
for result in cursor.stored_results():
data = result.fetchall()
return data
@staticmethod
def fetch_one_result(cursor):
data = None
for result in cursor.stored_results():
data = result.fetchone()
return data
@staticmethod
def search_contractor(request):
subcontractor_name = request.form.get("subcontractor_name")
pmc_no = request.form.get("pmc_no")
state = request.form.get("state")
district = request.form.get("district")
block = request.form.get("block")
village = request.form.get("village")
year_from = request.form.get("year_from")
year_to = request.form.get("year_to")
connection = config.get_db_connection()
if not connection:
return []
cursor = connection.cursor(dictionary=True)
try:
data = ReportHelper.execute_sp(
cursor,
"search_contractor_info",
[
subcontractor_name or None,
pmc_no or None,
state or None,
district or None,
block or None,
village or None,
year_from or None,
year_to or None
]
)
except Exception as e:
print(f"Error in search_contractor: {e}")
data = []
finally:
cursor.close()
connection.close()
return data
@staticmethod
def get_contractor_report(contractor_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
try:
# Contractor Info (only one fetch)
contInfo = ReportHelper.execute_sp(cursor, 'GetContractorInfo', [contractor_id], True)
# Hold Types
hold_types = ReportHelper.execute_sp(cursor, 'GetContractorHoldTypes', [contractor_id])
# Invoices
invoices = ReportHelper.execute_sp(cursor, 'GetContractorInvoices', [contractor_id])
# GST Release
gst_rel = ReportHelper.execute_sp(cursor, 'GetGSTRelease', [contractor_id])
# Hold Release
hold_release = ReportHelper.execute_sp(cursor, 'GetHoldRelease', [contractor_id])
# Credit Note
credit_note = ReportHelper.execute_sp(cursor, 'GetCreditNote', [contractor_id])
# Payments
payments = ReportHelper.execute_sp(cursor, 'GetPayments', [contractor_id])
# Totals
total = {
"sum_invo_basic_amt": float(sum(row['Basic_Amount'] or 0 for row in invoices)),
"sum_invo_debit_amt": float(sum(row['Debit_Amount'] or 0 for row in invoices)),
"sum_invo_after_debit_amt": float(sum(row['After_Debit_Amount'] or 0 for row in invoices)),
"sum_invo_amt": float(sum(row['Amount'] or 0 for row in invoices)),
"sum_invo_gst_amt": float(sum(row['GST_Amount'] or 0 for row in invoices)),
"sum_invo_tds_amt": float(sum(row['TDS_Amount'] or 0 for row in invoices)),
"sum_invo_ds_amt": float(sum(row['SD_Amount'] or 0 for row in invoices)),
"sum_invo_on_commission": float(sum(row['On_Commission'] or 0 for row in invoices)),
"sum_invo_hydro_test": float(sum(row['Hydro_Testing'] or 0 for row in invoices)),
"sum_invo_gst_sd_amt": float(sum(row['GST_SD_Amount'] or 0 for row in invoices)),
"sum_invo_final_amt": float(sum(row['Final_Amount'] or 0 for row in invoices)),
"sum_invo_hold_amt": float(sum(row['hold_amount'] or 0 for row in invoices)),
"sum_gst_basic_amt": float(sum(row['basic_amount'] or 0 for row in gst_rel)),
"sum_gst_final_amt": float(sum(row['final_amount'] or 0 for row in gst_rel)),
"sum_pay_payment_amt": float(sum(row['Payment_Amount'] or 0 for row in payments)),
"sum_pay_tds_payment_amt": float(sum(row['TDS_Payment_Amount'] or 0 for row in payments)),
"sum_pay_total_amt": float(sum(row['Total_amount'] or 0 for row in payments))
}
current_date = datetime.now().strftime('%Y-%m-%d')
finally:
cursor.close()
connection.close()
return {
"contInfo": contInfo,
"invoices": invoices,
"hold_types": hold_types,
"gst_rel": gst_rel,
"payments": payments,
"credit_note": credit_note,
"hold_release": hold_release,
"total": total,
"current_date": current_date
}
@staticmethod
def get_contractor_info(contractor_id):
from model.ContractorInfo import ContractorInfo
contractor = ContractorInfo(contractor_id)
return contractor.contInfo if contractor.contInfo else None
# @staticmethod
# def generate_excel(contractor_id, contInfo, invoices, hold_types, hold_data,
# extra_payments_map, credit_note_map, gst_release_map, output_file):
@staticmethod
def generate_excel(contractor_id, contInfo, invoices, hold_types, hold_data,
credit_note_map, gst_release_map, output_file):
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = "Contractor Report"
# Contractor Info
for field, value in contInfo.items():
sheet.append([field.replace("_", " "), value])
sheet.append([])
# Headers
base_headers = ["PMC No", "Village", "Work Type", "Invoice Details", "Invoice Date", "Invoice No",
"Basic Amount", "Debit", "After Debit Amount", "GST (18%)", "Amount", "TDS (1%)",
"SD (5%)", "On Commission", "Hydro Testing", "GST SD Amount"]
hold_headers = [ht['hold_type'] for ht in hold_types]
payment_headers = ["Final Amount", "Payment Amount", "TDS Payment", "Total Paid", "UTR"]
all_headers = base_headers + hold_headers + payment_headers
sheet.append(all_headers)
for cell in sheet[sheet.max_row]:
cell.font = Font(bold=True)
cell.fill = PatternFill(start_color="ADD8E6", end_color="ADD8E6", fill_type="solid")
processed_gst_releases = set()
appended_credit_keys = set()
previous_pmc_no = None
for inv in invoices:
pmc_no = str(inv["PMC_No"]).strip()
invoice_no = (
inv["invoice_no"].replace(" ", "") if inv["invoice_no"] else ""
if inv["invoice_no"] not in (None, "", 0)
else ""
)
key = (pmc_no, invoice_no)
# Yellow separator
if previous_pmc_no and pmc_no != previous_pmc_no:
sheet.append([""] * len(all_headers))
yellow_fill = PatternFill(start_color="FFFF99", end_color="FFFF99", fill_type="solid")
for cell in sheet[sheet.max_row]:
cell.fill = yellow_fill
previous_pmc_no = pmc_no
# Invoice Row
row = [
pmc_no,
inv.get("Village_Name", ""),
inv.get("Work_Type", ""),
inv.get("Invoice_Details", ""),
inv.get("Invoice_Date", ""),
invoice_no,
inv.get("Basic_Amount", ""),
inv.get("Debit_Amount", ""),
inv.get("After_Debit_Amount", ""),
inv.get("GST_Amount", ""),
inv.get("Amount", ""),
inv.get("TDS_Amount", ""),
inv.get("SD_Amount", ""),
inv.get("On_Commission", ""),
inv.get("Hydro_Testing", ""),
inv.get("GST_SD_Amount", "")
]
# Hold values
invoice_holds = hold_data.get(inv["Invoice_Id"], {})
for ht_id in [ht['hold_type_id'] for ht in hold_types]:
row.append(invoice_holds.get(ht_id, ""))
# Payment values
row += [
inv.get("Final_Amount", ""),
inv.get("Payment_Amount", ""),
inv.get("TDS_Payment_Amount", ""),
inv.get("Total_Amount", ""),
inv.get("UTR", "")
]
sheet.append(row)
# # Extra Payments
# if pmc_no in extra_payments_map:
# for ep in extra_payments_map[pmc_no]:
# extra_row = [pmc_no] + [""] * (len(base_headers) - 1)
# extra_row += [""] * len(hold_headers)
# extra_row += [
# "",
# ep.get("Payment_Amount", ""),
# ep.get("TDS_Payment_Amount", ""),
# ep.get("Total_Amount", ""),
# ep.get("utr", "")
# ]
# sheet.append(extra_row)
# del extra_payments_map[pmc_no]
# GST Releases
if key in gst_release_map and key not in processed_gst_releases:
for gr in gst_release_map[key]:
gst_row = [
pmc_no, "", "", "GST Release Note", "", gr.get("Invoice_No", ""),
gr.get("Basic_Amount", ""), "", "", "", "", "", "", "", "", ""
]
gst_row += [""] * len(hold_headers)
gst_row += [
gr.get("Final_Amount", ""),
"",
"",
gr.get("Total_Amount", ""),
gr.get("UTR", "")
]
sheet.append(gst_row)
processed_gst_releases.add(key)
# Credit Notes
if key in credit_note_map and key not in appended_credit_keys:
for cn in credit_note_map[key]:
cn_row = [
pmc_no, "", "", cn.get("Invoice_Details", "Credit Note"), "",
cn.get("Invoice_No", ""),
cn.get("Basic_Amount", ""),
cn.get("Debit_Amount", ""),
cn.get("After_Debit_Amount", ""),
cn.get("GST_Amount", ""),
cn.get("Amount", ""),
"", "", "", "", ""
]
cn_row += [""] * len(hold_headers)
cn_row += [
cn.get("Final_Amount", ""),
"",
"",
cn.get("Total_Amount", ""),
cn.get("UTR", "")
]
sheet.append(cn_row)
appended_credit_keys.add(key)
# SAVE ONCE AT END
workbook.save(output_file)
@staticmethod
def create_contractor_report(contractor_id):
DOWNLOAD_FOLDER = os.path.join("static", "download")
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
output_file = os.path.join(DOWNLOAD_FOLDER, f"Contractor_Report_{contractor_id}.xlsx")
# Fetch Data
contInfo = ReportHelper.get_contractor_info(contractor_id)
if not contInfo:
return None, "No contractor found"
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
hold_types = ReportHelper.execute_sp(cursor, 'HoldTypesByContractorId', [contractor_id])
invoices = ReportHelper.execute_sp(cursor, 'FetchInvoicesByContractor', [contractor_id])
hold_amounts = ReportHelper.execute_sp(cursor, 'HoldAmountsByContractorId', [contractor_id])
hold_data = {}
for h in hold_amounts:
hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
# # # -------- Extra Payments MAP --------
# # extra_payments_raw = ReportHelper.execute_sp(cursor, 'GetExtraPayments')
# # extra_payments_map = {}
# # for ep in extra_payments_raw:
# # pmc = str(ep['pmc_no']).strip()
# # extra_payments_map.setdefault(pmc, []).append(ep)
# -------- Credit Note MAP --------
credit_note_raw = ReportHelper.execute_sp(cursor, 'GetCreditNotesByContractor', [contractor_id])
credit_note_map = {}
for cn in credit_note_raw:
key = (
str(cn['PMC_No']).strip(),
str(cn['Invoice_No']).replace(" ", "") if cn['Invoice_No'] else ""
)
credit_note_map.setdefault(key, []).append(cn)
# -------- GST MAP --------
gst_release_raw = ReportHelper.execute_sp(cursor, 'GstReleasesByContractorId', [contractor_id])
gst_release_map = {}
for gr in gst_release_raw:
key = (
str(gr['PMC_No']).strip(),
str(gr['Invoice_No']).replace(" ", "") if gr['Invoice_No'] else ""
)
gst_release_map.setdefault(key, []).append(gr)
print("GST MAP:", gst_release_map)
# Generate Excel
# ReportHelper.generate_excel(
# contractor_id, contInfo, invoices, hold_types, hold_data,
# extra_payments_map, credit_note_map, gst_release_map, output_file
# )
ReportHelper.generate_excel(
contractor_id, contInfo, invoices, hold_types, hold_data,
credit_note_map, gst_release_map, output_file
)
return output_file, None

156
model/State.py Normal file
View File

@@ -0,0 +1,156 @@
import config
import mysql.connector
from flask import request, redirect, url_for
from model.Utilities import ResponseHandler, HtmlHelper, ItemCRUDType
from model.ItemCRUD import ItemCRUD
class State:
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ----------------------------------------------------------
# ADD STATE (USING ITEM CRUD)
# ----------------------------------------------------------
def AddState(self, request):
state_name = request.form['state_Name'].strip()
crud = ItemCRUD(ItemCRUDType.State)
crud.AddItem(
request=request,
childname=state_name,
storedprocfetch="CheckStateExists",
storedprocadd="SaveState"
)
self.isSuccess = crud.isSuccess
self.resultMessage = crud.resultMessage
return self.resultMessage
# ----------------------------------------------------------
# GET ALL STATES (NO CHANGE - THIS IS CORRECT)
# ----------------------------------------------------------
def GetAllStates(self, request):
connection = config.get_db_connection()
data = []
if not connection:
return []
try:
cursor = connection.cursor()
cursor.callproc("GetAllStates")
for res in cursor.stored_results():
data = res.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching states: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.fetch_failure("state"), 500
)
return []
finally:
cursor.close()
connection.close()
return data
# ----------------------------------------------------------
# CHECK STATE (USING ITEM CRUD)
# ----------------------------------------------------------
def CheckState(self, request):
state_name = request.json.get('state_Name', '').strip()
crud = ItemCRUD(ItemCRUDType.State)
return crud.CheckItem(
request=request,
parentid=None,
childname=state_name,
storedprocfetch="CheckStateExists"
)
# ----------------------------------------------------------
# DELETE STATE (USING ITEM CRUD)
# ----------------------------------------------------------
def DeleteState(self, request, id):
crud = ItemCRUD(ItemCRUDType.State)
crud.DeleteItem(
request=request,
itemID=id,
storedprocDelete="DeleteState"
)
self.isSuccess = crud.isSuccess
self.resultMessage = crud.resultMessage
return self.resultMessage
# ----------------------------------------------------------
# EDIT STATE (USING ITEM CRUD)
# ----------------------------------------------------------
def EditState(self, request, id):
state_name = request.form['state_Name'].strip()
crud = ItemCRUD(ItemCRUDType.State)
crud.EditItem(
request=request,
childid=id,
parentid=None,
childname=state_name,
storedprocupdate="UpdateStateById"
)
self.isSuccess = crud.isSuccess
self.resultMessage = crud.resultMessage
return redirect(url_for('state.add_state'))
# ----------------------------------------------------------
# GET STATE BY ID (KEEP SAME)
# ----------------------------------------------------------
def GetStateByID(self, request, id):
connection = config.get_db_connection()
data = None
if not connection:
return None
try:
cursor = connection.cursor()
cursor.callproc("GetStateByID", (id,))
for res in cursor.stored_results():
data = res.fetchone()
if data:
self.isSuccess = True
self.resultMessage = "Success"
else:
self.isSuccess = False
self.resultMessage = "Not Found"
except mysql.connector.Error as e:
print(f"Error fetching state: {e}")
self.isSuccess = False
finally:
cursor.close()
connection.close()
return data

140
model/Subcontractor.py Normal file
View File

@@ -0,0 +1,140 @@
from model.Utilities import ItemCRUDType
from model.ItemCRUD import ItemCRUD
class Subcontractor:
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ----------------------------------------------------------
# ADD
# ----------------------------------------------------------
def AddSubcontractor(self, request):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
data = {
"Contractor_Name": request.form.get('Contractor_Name', '').strip(),
"Address": request.form.get('Address', '').strip(),
"Mobile_No": request.form.get('Mobile_No', '').strip(),
"PAN_No": request.form.get('PAN_No', '').strip(),
"Email": request.form.get('Email', '').strip(),
"Gender": request.form.get('Gender', '').strip(),
"GST_Registration_Type": request.form.get('GST_Registration_Type', '').strip(),
"GST_No": request.form.get('GST_No', '').strip(),
"Contractor_password": request.form.get('Contractor_password', '').strip()
}
subcontractor.AddItem(
request=request,
data=data,
storedprocfetch="GetSubcontractorByName",
storedprocadd="SaveContractor"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return
# ----------------------------------------------------------
# GET ALL
# ----------------------------------------------------------
def GetAllSubcontractors(self, request):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
data = subcontractor.GetAllData(
request=request,
storedproc="GetAllSubcontractors"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return data
# ----------------------------------------------------------
# GET BY ID
# ----------------------------------------------------------
def GetSubcontractorByID(self, id):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
data = subcontractor.GetDataByID(
id=id,
storedproc="GetSubcontractorById"
)
if data:
self.isSuccess = True
else:
self.isSuccess = False
self.resultMessage = "Subcontractor not found"
return data
# ----------------------------------------------------------
# CHECK (Duplicate)
# ----------------------------------------------------------
def CheckSubcontractor(self, request):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
name = request.form.get('Contractor_Name', '').strip()
result = subcontractor.CheckItem(
request=request,
childname=name,
storedprocfetch="GetSubcontractorByName"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return result
# ----------------------------------------------------------
# EDIT
# ----------------------------------------------------------
def EditSubcontractor(self, request, subcontractor_id):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
data = {
"Contractor_Name": request.form.get('Contractor_Name', '').strip(),
"Address": request.form.get('Address', '').strip(),
"Mobile_No": request.form.get('Mobile_No', '').strip(),
"PAN_No": request.form.get('PAN_No', '').strip(),
"Email": request.form.get('Email', '').strip(),
"Gender": request.form.get('Gender', '').strip(),
"GST_Registration_Type": request.form.get('GST_Registration_Type', '').strip(),
"GST_No": request.form.get('GST_No', '').strip(),
"Contractor_password": request.form.get('Contractor_password', '').strip()
}
subcontractor.EditItem(
request=request,
childid=subcontractor_id,
data=data,
storedprocupdate="UpdateSubcontractor"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return
# ----------------------------------------------------------
# DELETE
# ----------------------------------------------------------
def DeleteSubcontractor(self, request, subcontractor_id):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
subcontractor.DeleteItem(
request=request,
itemID=subcontractor_id,
storedprocDelete="DeleteSubcontractor"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return

67
model/Utilities.py Normal file
View File

@@ -0,0 +1,67 @@
from flask import flash, jsonify, json
from enum import Enum
class ItemCRUDType(Enum):
Village = 1
Block = 2
District = 3
State = 4
HoldType = 5
Subcontractor = 6
GSTRelease = 7
class RegEx:
patternAlphabetOnly = "^[A-Za-z ]+$"
allPattern = "^(?!\s*$).+"
class ResponseHandler:
@staticmethod
def invalid_name(entity):
return {'status': 'error', 'message': f'Invalid {entity} name. Only letters are allowed!'}
@staticmethod
def already_exists(entity):
return {'status': 'exists', 'message': f'{entity.capitalize()} already exists!'}
@staticmethod
def add_success(entity):
return {'status': 'success', 'message': f'{entity.capitalize()} added successfully!'}
@staticmethod
def add_failure(entity):
return {'status': 'error', 'message': f'Failed to add {entity}.'}
@staticmethod
def is_available(entity):
return {'status': 'available', 'message': f'{entity.capitalize()} name is available!'}
@staticmethod
def delete_success(entity):
return {'status': 'success', 'message': f'{entity.capitalize()} deleted successfully!'}
@staticmethod
def delete_failure(entity):
return {'status': 'error', 'message': f'Failed to delete {entity}.'}
@staticmethod
def update_success(entity):
return {'status': 'success', 'message': f'{entity.capitalize()} updated successfully!'}
@staticmethod
def update_failure(entity):
return {'status': 'error', 'message': f'Failed to update {entity}.'}
@staticmethod
def fetch_failure(entity):
return {'status': 'error', 'message': f'Failed to fetch {entity}.'}
class HtmlHelper:
# Helper: JSON Response Formatter
@staticmethod
def json_response(message_obj, status_code):
return jsonify(message_obj), status_code

186
model/Village.py Normal file
View File

@@ -0,0 +1,186 @@
from model.Utilities import ResponseHandler, HtmlHelper, ItemCRUDType
import config
import mysql.connector
from model.ItemCRUD import ItemCRUD
class Village:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
self.response = {} # ✅ ADDED
self.village = ItemCRUD(itemType=ItemCRUDType.Village)
# 🔹 Helper: sync status
def _set_status(self, village):
self.isSuccess = village.isSuccess
# ✅ UPDATED (safe handling)
if hasattr(village, "response"):
self.response = village.response
self.resultMessage = village.response.get("message", "")
else:
self.resultMessage = village.resultMessage
# 🔹 Helper: get request data
def _get_form_data(self, request):
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
return block_id, village_name
def AddVillage(self, request):
block_id, village_name = self._get_form_data(request)
if not village_name:
self.response = ResponseHandler.invalid_name("village") # ✅ UPDATED
self.resultMessage = self.response["message"]
self.isSuccess = False
return
try:
self.village.AddItem(
request=request,
parentid=block_id,
childname=village_name,
storedprocfetch="GetVillageByNameAndBlock",
storedprocadd="SaveVillage"
)
self._set_status(self.village)
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
def GetAllVillages(self, request):
try:
villagesdata = self.village.GetAllData(
request=request,
storedproc="GetAllVillages"
)
self._set_status(self.village)
return villagesdata
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return []
def CheckVillage(self, request):
block_id, village_name = self._get_form_data(request)
if not village_name:
self.response = ResponseHandler.invalid_name("village") # ✅ UPDATED
self.resultMessage = self.response["message"]
self.isSuccess = False
return None
try:
result = self.village.CheckItem(
request=request,
parentid=block_id,
childname=village_name,
storedprocfetch="GetVillageByNameAndBlocks"
)
self._set_status(self.village)
return result
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return None
def DeleteVillage(self, request, village_id):
try:
self.village.DeleteItem(
request=request,
itemID=village_id,
storedprocDelete="DeleteVillage"
)
self._set_status(self.village)
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
def EditVillage(self, request, village_id):
block_id, village_name = self._get_form_data(request)
if not village_name:
self.response = ResponseHandler.invalid_name("village") # ✅ UPDATED
self.resultMessage = self.response["message"]
self.isSuccess = False
return
try:
self.village.EditItem(
request=request,
childid=village_id,
parentid=block_id,
childname=village_name,
storedprocupdate="UpdateVillage"
)
self._set_status(self.village)
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
def GetVillageByID(self, id):
try:
villagedetailsdata = self.village.GetDataByID(
id=id,
storedproc="GetVillageDetailsById"
)
if villagedetailsdata:
self.isSuccess = True
else:
self.isSuccess = False
self.resultMessage = "Village not found"
return villagedetailsdata
except Exception as e:
self.isSuccess = False
self.resultMessage = str(e)
return None
def GetAllBlocks(self):
blocks = []
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
if not connection:
return []
try:
with connection.cursor() as cursor:
cursor.callproc('GetAllBlocks')
for result in cursor.stored_results():
blocks.extend(result.fetchall())
self.isSuccess = True
return blocks
except mysql.connector.Error as e:
print(f"Error fetching blocks: {e}")
self.isSuccess = False
# ✅ FIXED (removed jsonify response)
self.response = ResponseHandler.fetch_failure("block")
self.resultMessage = self.response["message"]
return []
finally:
connection.close()

160
model/gst_release.py Normal file
View File

@@ -0,0 +1,160 @@
# model/gst_release.py
from flask import request, jsonify
from model.ItemCRUD import ItemCRUD
from model.Utilities import ItemCRUDType
class GSTRelease:
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ------------------- Add GST Release -------------------
def AddGSTRelease(self, request):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# Print the full form data
print("===== DEBUG: FORM DATA =====")
for key, value in request.form.items():
print(f"{key} : {value}")
print("=============================")
data = {
"PMC_No": request.form.get("PMC_No", "").strip(),
"Invoice_No": request.form.get("Invoice_No", "").strip(),
"Basic_Amount": float(request.form.get("Basic_Amount", 0) or 0),
"Final_Amount": float(request.form.get("Final_Amount", 0) or 0),
"Total_Amount": float(request.form.get("Total_Amount", 0) or 0),
"UTR": request.form.get("UTR", "").strip(),
"Contractor_ID": int(request.form.get("Contractor_ID", 0) or 0)
}
print("===== DEBUG: PARSED DATA =====")
print(data)
print("==============================")
# Add GST Release
gst.AddItem(
request=request,
data=data,
storedprocfetch="CheckGSTReleaseExists",
storedprocadd="AddGSTReleaseFromExcel"
)
print(f"AddItem result: isSuccess={gst.isSuccess}, message={gst.resultMessage}")
self.isSuccess = gst.isSuccess
self.resultMessage = str(gst.resultMessage)
except Exception as e:
print("ERROR in AddGSTRelease:", e)
self.isSuccess = False
self.resultMessage = str(e)
return jsonify({"success": self.isSuccess, "message": self.resultMessage})
def EditGSTRelease(self, request, gst_release_id):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
# Map form inputs to stored procedure parameters
data = {
"p_pmc_no": request.form.get("PMC_No", "").strip(),
"p_invoice_no": request.form.get("invoice_no", "").strip(),
"p_basic_amount": float(request.form.get("Basic_Amount", 0) or 0),
"p_final_amount": float(request.form.get("Final_Amount", 0) or 0),
"p_total_amount": float(request.form.get("Total_Amount", 0) or 0),
"p_utr": request.form.get("UTR", "").strip(),
"p_gst_release_id": gst_release_id
}
print("===== DEBUG: UPDATE DATA =====")
print(data)
print("==============================")
# Call your stored procedure
gst.EditItem(
request=request,
childid=gst_release_id,
data=data,
storedprocupdate="UpdateGSTRelease"
)
self.isSuccess = gst.isSuccess
self.resultMessage = str(gst.resultMessage)
except Exception as e:
print("ERROR in EditGSTRelease:", e)
self.isSuccess = False
self.resultMessage = str(e)
# ------------------- Delete GST Release -------------------
def DeleteGSTRelease(self, gst_release_id):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
gst.DeleteItem(
request=None,
itemID=gst_release_id,
storedprocDelete="DeleteGSTReleaseById"
)
self.isSuccess = gst.isSuccess
self.resultMessage = str(gst.resultMessage)
except Exception as e:
print("ERROR in DeleteGSTRelease:", e)
self.isSuccess = False
self.resultMessage = str(e)
return jsonify({"success": self.isSuccess, "message": self.resultMessage})
# ------------------- Get All GST Releases -------------------
def GetAllGSTReleases(self):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
rows = gst.GetAllData(None, "GetAllGSTReleases")
data = []
for row in rows:
data.append({
"gst_release_id": row[0],
"pmc_no": row[1],
"invoice_no": row[2],
"basic_amount": row[3],
"final_amount": row[4],
"total_amount": row[5],
"utr": row[6],
"contractor_id": row[7]
})
return data
except Exception as e:
print("ERROR in GetAllGSTReleases:", e)
return []
# ------------------- Get GST Release By ID -------------------
def GetGSTReleaseByID(self, gst_release_id):
try:
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
row = gst.GetDataByID(gst_release_id, "GetGSTReleaseById")
if row:
return {
"gst_release_id": row[0],
"pmc_no": row[1],
"invoice_no": row[2],
"basic_amount": row[3],
"final_amount": row[4],
"total_amount": row[5],
"utr": row[6],
"contractor_id": row[7]
}
return None
except Exception as e:
print("ERROR in GetGSTReleaseByID:", e)
return None

236
model/payment.py Normal file
View File

@@ -0,0 +1,236 @@
import config
import mysql.connector
import config
import mysql.connector
from enum import Enum
from model.Utilities import ItemCRUDType
class Paymentmodel:
# ---------------- Database Connection ----------------
@staticmethod
def get_connection():
connection = config.get_db_connection()
if not connection:
return None
return connection
# ---------------- Payment Methods ----------------
@staticmethod
def fetch_all_payments():
connection = Paymentmodel.get_connection()
payments = []
if connection:
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('GetAllPayments')
for result in cursor.stored_results():
payments = result.fetchall()
except mysql.connector.Error as e:
print(f"Error fetching payment history: {e}")
finally:
cursor.close()
connection.close()
return payments
@staticmethod
def insert_payment(subcontractor_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr):
connection = Paymentmodel.get_connection()
if not connection:
return False
cursor = None
try:
cursor = connection.cursor()
cursor.callproc('GetInvoiceId', [subcontractor_id, pmc_no, invoice_no])
invoice_id = None
for result in cursor.stored_results():
row = result.fetchone()
if row:
invoice_id = row[0]
if not invoice_id:
return False
cursor.callproc(
'InsertPayments',
[pmc_no, invoice_no, amount, tds_amount, total_amount, utr, invoice_id]
)
connection.commit()
return True
except Exception as e:
print(e)
return False
finally:
if cursor:
cursor.close()
if connection:
connection.close()
# @staticmethod
# def update_inpayment(subcontractor_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr):
# connection = Paymentmodel.get_connection()
# if not connection:
# return False
# try:
# cursor = connection.cursor()
# cursor.callproc('UpdateInpaymentRecord', [
# subcontractor_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr
# ])
# connection.commit()
# return True
# except mysql.connector.Error as e:
# print(f"Error updating inpayment: {e}")
# return False
# finally:
# cursor.close()
# connection.close()
@staticmethod
def fetch_payment_by_id(payment_id):
connection = Paymentmodel.get_connection()
if not connection:
return None
payment_data = {}
try:
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetPaymentById", (payment_id,))
for result in cursor.stored_results():
payment_data = result.fetchone()
if payment_data:
payment_data = [
payment_data.get('Payment_Id'),
payment_data.get('PMC_No'),
payment_data.get('Invoice_No'),
payment_data.get('Payment_Amount'),
payment_data.get('TDS_Payment_Amount'),
payment_data.get('Total_Amount'),
payment_data.get('UTR')
]
except mysql.connector.Error as e:
print(f"Error fetching payment data: {e}")
finally:
cursor.close()
connection.close()
return payment_data
@staticmethod
def call_update_payment_proc(payment_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr):
connection = Paymentmodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.callproc("UpdatePayment", (payment_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr))
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error updating payment: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod
def delete_payment(payment_id):
connection = Paymentmodel.get_connection()
if not connection:
return False, None, None
try:
cursor = connection.cursor(dictionary=True)
# Fetch PMC & Invoice before deleting
cursor.callproc('GetPaymentPMCInvoiceById', [payment_id])
record = {}
for result in cursor.stored_results():
record = result.fetchone() or {}
if not record:
return False, None, None
pmc_no = record['PMC_No']
invoice_no = record['Invoice_No']
# Delete payment
cursor.callproc("DeletePayment", (payment_id,))
connection.commit()
# Reset inpayment fields
# cursor.callproc("ResetInpayment", [pmc_no, invoice_no])
# connection.commit()
return True, pmc_no, invoice_no
except mysql.connector.Error as e:
print(f"Error deleting payment: {e}")
return False, None, None
finally:
cursor.close()
connection.close()
# ---------------- Item CRUD Methods ----------------
@staticmethod
def fetch_items(item_type: ItemCRUDType):
connection = Paymentmodel.get_connection()
items = []
if connection:
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('GetItemsByType', [item_type.value])
for result in cursor.stored_results():
items = result.fetchall()
except mysql.connector.Error as e:
print(f"Error fetching {item_type.name}: {e}")
finally:
cursor.close()
connection.close()
return items
@staticmethod
def insert_item(item_type: ItemCRUDType, name: str):
connection = Paymentmodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.callproc('InsertItem', [item_type.value, name])
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error inserting {item_type.name}: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod
def update_item(item_type: ItemCRUDType, item_id: int, new_name: str):
connection = Paymentmodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.callproc('UpdateItem', [item_type.value, item_id, new_name])
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error updating {item_type.name}: {e}")
return False
finally:
cursor.close()
connection.close()
@staticmethod
def delete_item(item_type: ItemCRUDType, item_id: int):
connection = Paymentmodel.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.callproc('DeleteItem', [item_type.value, item_id])
connection.commit()
return True
except mysql.connector.Error as e:
print(f"Error deleting {item_type.name}: {e}")
return False
finally:
cursor.close()
connection.close()