report code changes by pankaj

This commit is contained in:
2026-03-23 11:37:15 +05:30
parent c8d5a9c37d
commit f0a01d026b
198 changed files with 8312 additions and 6173 deletions

9
.env Normal file
View File

@@ -0,0 +1,9 @@
Secret_Key = 9f2a1b8c4d6e7f0123456789abcdef01
MYSQL_HOST=127.0.0.1
MYSQL_USER=root
MYSQL_PASSWORD=root
MYSQL_DB=test
DEFAULT_USERNAME=admin
DEFAULT_PASSWORD=admin123

8
.gitignore vendored
View File

@@ -1,8 +0,0 @@
venv/
*.pyc
__pycache__/
.env
.uploads
static/download/

View File

@@ -1,54 +0,0 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask_login import LoginManager, UserMixin
from logging.handlers import RotatingFileHandler
from ldap3 import Server, Connection, ALL, SUBTREE
from ldap3 import Server, Connection, ALL
from ldap3.core.exceptions import LDAPBindError
class DefaultCredentials:
username = 'admin'
password = 'admin123'
class LoginLDAP:
def __init__(self, request):
self.username = request.form['username'].strip()
self.password = request.form['password']
self.isDefaultCredentials = False
self.isValidLogin = False
self.errorMessage = ""
ldap_user_dn = f"uid={self.username},ou=users,dc=lcepl,dc=org"
ldap_server = 'ldap://localhost:389'
#Need to re-factor further
# Static fallback user
if self.username == DefaultCredentials.username and self.password == DefaultCredentials.password:
self.isDefaultCredentials = True
self.isValidLogin = True
return
try:
# LDAP authentication
conn = Connection(
Server(self.ldap_server, get_info=ALL),
user=self.ldap_user_dn,
password=self.password,
auto_bind=True
)
self.isValidLogin = True
return
except LDAPBindError:
self.errorMessage = "Invalid credentials."
except Exception as e:
self.errorMessage = str(e)
class User(UserMixin):
def __init__(self, id):
self.id = id

View File

@@ -1,108 +0,0 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from AppCode.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
from mysql.connector import Error
from AppCode.ItemCRUD import ItemCRUD, itemCRUDMapping
class Block:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ----------------------------------------------------------
# Add Block
# ----------------------------------------------------------
def AddBlock(self, request):
block = ItemCRUD(itemType=ItemCRUDType.Block)
district_id = request.form.get('district_Id')
block_name = request.form.get('block_Name', '').strip()
block.AddItem(request=request, parentid=district_id, childname=block_name, storedprocfetch="GetVillageByNameAndBlock", storedprocadd="SaveVillage" )
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return
# ----------------------------------------------------------
# Get All Blocks
# ----------------------------------------------------------
def GetAllBlocks(self):
block = ItemCRUD(itemType=ItemCRUDType.Block)
blocksdata = block.GetAllData(request=request, storedproc="GetAllBlock")
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return blocksdata
# ----------------------------------------------------------
# Check Block Exists
# ----------------------------------------------------------
def CheckBlock(self, request):
block = ItemCRUD(itemType=ItemCRUDType.Block)
block_name = request.json.get('block_Name', '').strip()
district_id = request.json.get('district_Id')
result = block.CheckItem(request=request, parentid=district_id, childname=block_name, storedprocfetch="GetBlockByNameAndDistrict")
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return result
# ----------------------------------------------------------
# Get Block By ID
# ----------------------------------------------------------
def GetBlockByID(self, id):
block = ItemCRUD(itemType=ItemCRUDType.Village)
blockdata = block.GetAllData("GetBlockDataByID")
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return blockdata
# ----------------------------------------------------------
# Update Block
# ----------------------------------------------------------
def EditBlock(self, request, block_id):
block = ItemCRUD(itemType=ItemCRUDType.Block)
district_id = request.form.get('district_Id')
block_name = request.form.get('block_Name', '').strip()
block.EditItem(request=request, childid=block_id, parentid=district_id, childname=block_name, storedprocadd="UpdateBlockById" )
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return
# ----------------------------------------------------------
# Delete Block
# ----------------------------------------------------------
def DeleteBlock(self, id):
block = ItemCRUD(itemType=ItemCRUDType.Block)
block.DeleteItem(request=request, itemID=id, storedprocDelete="DeleteBlock" )
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return

View File

@@ -1,248 +0,0 @@
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from AppCode.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
from mysql.connector import Error
class itemCRUDMapping:
name = ""
def __init__(self, itemType):
if itemType is ItemCRUDType.Village:
self.name = "Village"
elif itemType is ItemCRUDType.Block:
self.name = "Block"
elif itemType is ItemCRUDType.State:
self.name = "State"
class ItemCRUD:
isSuccess = False
resultMessage = ""
itemCRUDType = ItemCRUDType.Village
itemCRUDMapping = ItemCRUDType(itemCRUDType)
#itemCRUDMapping itemCRUDMapping
def __init__(self, itemType):
self.isSuccess = False
self.resultMessage = ""
self.itemCRUDType = itemType
self.itemCRUDMapping = ItemCRUDType(self.itemCRUDType)
def DeleteItem(self, request, itemID, storedprocDelete):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action("Delete Village", f"User {current_user.id} deleted village '{itemID}'")
try:
cursor.callproc(storedprocDelete, (itemID,))
connection.commit()
self.resultMessage = ResponseHandler.delete_success(self.itemCRUDMapping.name) # Simple message, route will handle redirect
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error deleting village: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.delete_failure(self.itemCRUDMapping.name), 500)
finally:
cursor.close()
connection.close()
#return self.resultMessage
def AddItem(self, request, parentid, childname, storedprocfetch, storedprocadd):
connection = config.get_db_connection()
if not connection:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.db_connection_failure(), 500)
return
cursor = connection.cursor()
LogHelper.log_action(f"Add '{self.itemCRUDMapping.name}'",
f"User {current_user.id} adding '{self.itemCRUDMapping.name}' '{childname}' to block '{parentid}'")
if not parentid:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("block"),
400) # Assuming this is a valid response
return
if not re.match(RegEx.patternAlphabetOnly, childname):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400)
return
try:
# Check if the village already exists in the block
cursor.callproc(storedprocfetch, (childname, parentid,))
for rs in cursor.stored_results():
existing_item = rs.fetchone()
if existing_item:
print("Existing ", self.itemCRUDMapping.name)
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists(self.itemCRUDMapping.name), 409)
return
# Insert new village
cursor.callproc(storedprocadd, (childname, parentid))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_success(self.itemCRUDMapping.name), 200)
return
except mysql.connector.Error as e:
print(f"Database Error: {e}")
print("DatabaseError")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("village"), 500)
return
finally:
cursor.close()
connection.close()
def EditItem(self, request, childid, parentid, childname, storedprocupdate):
"""Handles the POST logic for updating a district."""
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
#district_name = request.form['district_Name'].strip()
#state_id = request.form['state_Id']
LogHelper.log_action("Edit District", f"User {current_user.id} Edited District '{childid}'")
#Need to add validation to see if item exits
# Added validation consistent with your other Edit methods
if not re.match(RegEx.patternAlphabetOnly, childname):
self.isSuccess = False
self.resultMessage = ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400
return self.resultMessage
try:
cursor.callproc(storedprocupdate, (childid, parentid, childname,))
connection.commit()
self.isSuccess = True
self.resultMessage = "Successfully Edited"
except mysql.connector.Error as e:
print(f"Error updating district: {e}")
self.isSuccess = False
self.resultMessage = ResponseHandler.update_failure(self.itemCRUDMapping.name), 500
finally:
cursor.close()
connection.close()
return self.resultMessage
def GetAllData(self, request, storedproc):
data = []
connection = config.get_db_connection()
self.isSuccess = False
self.resultMessage = ""
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc(storedproc)
for result in cursor.stored_results():
data = result.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching villages: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure(self.itemCRUDMapping.name), 500)
return []
finally:
cursor.close()
connection.close()
return data
def GetDataByID(self, id, storedproc):
data = None
connection = config.get_db_connection()
cursor = connection.cursor()
try:
cursor.callproc(storedproc, (id,))
for rs in cursor.stored_results():
data = rs.fetchone()
except mysql.connector.Error as e:
print(f"Error fetching block data: {e}")
return None
finally:
cursor.close()
connection.close()
return data
def CheckItem(self, request, parentid, childname, storedprocfetch):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action("Check Block", f"User {current_user.id} Checked block '{childname}'")
if not re.match(RegEx.patternAlphabetOnly, childname):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400)
return HtmlHelper.json_response(ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400)
try:
cursor.callproc(storedprocfetch, (childname, parentid))
for rs in cursor.stored_results():
existing_item = rs.fetchone()
if existing_item:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists(self.itemCRUDMapping.name), 409)
return HtmlHelper.json_response(ResponseHandler.already_exists(self.itemCRUDMapping.name), 409)
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.is_available(self.itemCRUDMapping.name), 200)
return HtmlHelper.json_response(ResponseHandler.is_available(self.itemCRUDMapping.name), 200)
except mysql.connector.Error as e:
print(f"Error checking block: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure(self.itemCRUDMapping.name), 500)
return HtmlHelper.json_response(ResponseHandler.fetch_failure(self.itemCRUDMapping.name), 500)
finally:
cursor.close()
connection.close()

View File

@@ -1,246 +0,0 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from AppCode.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
from mysql.connector import Error
class State:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
def AddState(self, request):
"""Log user actions with timestamp, user, action, and details."""
statedata = []
connection = config.get_db_connection()
if connection:
cursor = connection.cursor()
state_name = request.form['state_Name'].strip()
LogHelper.log_action("Add State", f"User {current_user.id} added state '{state_name}'")
if not re.match(RegEx.patternAlphabetOnly, state_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("state"), 400)
return
try:
cursor.callproc("CheckStateExists", (state_name,))
for data in cursor.stored_results():
existing_state = data.fetchone()
if existing_state:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists("state"), 409)
return
# cursor.execute("call SaveState (%s)", (state_name,))
cursor.callproc("SaveState", (state_name,))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_success("state"), 200)
return
except mysql.connector.Error as e:
print(f"Error inserting state: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("state"), 500)
return
#Need to make this seperate
def GetAllStates(self, request):
"""Log user actions with timestamp, user, action, and details."""
statedata = []
connection = config.get_db_connection()
self.isSuccess = False
self.resultMessage = ""
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc("GetAllStates")
for res in cursor.stored_results():
statedata = res.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching states: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("state"), 500)
return []
finally:
cursor.close()
connection.close()
return statedata
def CheckState(self, request):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
#connection closing needs to be verified
if connection:
cursor = connection.cursor()
state_name = request.json.get('state_Name', '').strip()
LogHelper.log_action("Check State", f"User {current_user.id} Checked state '{state_name}'")
if not re.match(RegEx.patternAlphabetOnly, state_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("state"), 400)
return HtmlHelper.json_response(ResponseHandler.invalid_name("state"), 400)
try:
# cursor.execute("SELECT * FROM states WHERE State_Name = %s", (state_name,))
# existing_state = cursor.fetchone()
cursor.callproc("CheckStateExists", (state_name,))
for data in cursor.stored_results():
existing_state = data.fetchone()
if existing_state:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists("state"), 409)
return HtmlHelper.json_response(ResponseHandler.already_exists("state"), 409)
else:
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.is_available("state"), 200)
return HtmlHelper.json_response(ResponseHandler.is_available("state"), 200)
except mysql.connector.Error as e:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("state"), 500)
print(f"Error checking state: {e}")
return HtmlHelper.json_response(ResponseHandler.add_failure("state"), 500)
finally:
cursor.close()
connection.close()
def DeleteState(self, request, id):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action("Delete State", f"User {current_user.id} Deleted state '{id}'")
try:
cursor.callproc('DeleteState', (id,))
connection.commit()
self.resultMessage = "Successfully Deleted"
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error deleting data: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.delete_failure("state"), 500)
return HtmlHelper.json_response(ResponseHandler.delete_failure("state"), 500)
finally:
cursor.close()
connection.close()
return self.resultMessage
def EditState(self, request, id):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
# str_pattern_reg = r"^[A-Za-z\s]+$"
state_name = request.form['state_Name'].strip()
LogHelper.log_action("Edit State", f"User {current_user.id} Edited state '{state_name}'")
if not re.match(RegEx.patternAlphabetOnly, state_name):
self.isSuccess = False
self.resultMessage = ResponseHandler.invalid_name("state"), 400
return ResponseHandler.invalid_name("state"), 400
try:
# cursor.execute("UPDATE states SET State_Name = %s WHERE State_ID = %s", (state_name, id))
cursor.callproc("UpdateStateById", (id, state_name))
connection.commit()
self.isSuccess = True
self.resultMessage = "Successfully Edited"
return redirect(url_for('add_state'))
except mysql.connector.Error as e:
print(f"Error updating data: {e}")
self.isSuccess = True
self.resultMessage = ResponseHandler.add_failure("state"), 500
return ResponseHandler.add_failure("state"), 500
finally:
cursor.close()
connection.close()
def GetStateByID(self, request, id):
"""Log user actions with timestamp, user, action, and details."""
statedata = []
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc("GetStateByID", (id,))
for res in cursor.stored_results():
statedata = res.fetchone()
if statedata:
self.isSuccess = True
self.resultMessage = "Success in Fetching"
else:
self.isSuccess = False
self.resultMessage = "State Not Found"
except mysql.connector.Error as e:
print(f"Error fetching states: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("state"), 500)
return []
finally:
cursor.close()
connection.close()
return statedata

View File

@@ -1,8 +0,0 @@
# Payment reconciliation
http://103.186.132.129:3000/pjpatil12/Version-1-old
file path:C:\Work\lcepl_Projects\Payment reconciliation

View File

@@ -1,8 +0,0 @@
{
"folders": [
{
"path": "."
}
],
"settings": {}
}

View File

@@ -1,8 +0,0 @@
{
"folders": [
{
"path": "."
}
],
"settings": {}
}

Binary file not shown.

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@@ -1,12 +1,15 @@
import mysql.connector
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Get MySQL credentials from environment variables
MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1")
MYSQL_USER = os.getenv("MYSQL_USER", "root")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "root")
MYSQL_DB = os.getenv("MYSQL_DB", "test")
MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_USER = os.getenv("MYSQL_USER")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")
MYSQL_DB = os.getenv("MYSQL_DB")
# Connect to MySQL
def get_db_connection():

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,46 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash, session
from flask_login import login_user, logout_user, login_required, current_user
from model.Auth import LoginLDAP, User
from model.Log import LogHelper
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
loginData = LoginLDAP(request)
if loginData.isValidLogin:
if loginData.isDefaultCredentials:
LogHelper.log_action('Login', f"User {loginData.username} logged in (static user)")
else:
LogHelper.log_action('Login', f"User {loginData.username} logged in (LDAP)")
session['username'] = loginData.username
login_user(User(loginData.username))
return redirect(url_for('index'))
else:
flash(loginData.errorMessage, 'danger')
return render_template("login.html")
@auth_bp.route('/logout')
@login_required
def logout():
LogHelper.log_action('Logout', f"User {current_user.id} logged out")
logout_user()
flash('You have been logged out.', 'info')
return redirect(url_for('auth.login'))

View File

@@ -0,0 +1,119 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify
from flask_login import login_required
import config
import mysql.connector
from model.Block import Block
from model.Utilities import HtmlHelper
block_bp = Blueprint('block', __name__)
@block_bp.route('/add_block', methods=['GET', 'POST'])
@login_required
def add_block():
block = Block()
if request.method == 'POST':
block.AddBlock(request)
return block.resultMessage
connection = config.get_db_connection()
cursor = connection.cursor()
cursor.callproc("GetAllStates")
for rs in cursor.stored_results():
states = rs.fetchall()
block_data = block.GetAllBlocks(request)
cursor.close()
connection.close()
return render_template(
'add_block.html',
states=states,
block_data=block_data
)
# ✅ NEW ROUTE (FIX FOR DISTRICT FETCH)
@block_bp.route('/get_districts/<int:state_id>')
@login_required
def get_districts(state_id):
connection = config.get_db_connection()
cursor = connection.cursor()
cursor.callproc("GetDistrictsByStateId", (state_id,))
districts = []
for rs in cursor.stored_results():
districts = rs.fetchall()
cursor.close()
connection.close()
district_list = []
for district in districts:
district_list.append({
"id": district[0],
"name": district[1]
})
return jsonify(district_list)
@block_bp.route('/check_block', methods=['POST'])
@login_required
def check_block():
block = Block()
return block.CheckBlock(request)
@block_bp.route('/edit_block/<int:block_id>', methods=['GET', 'POST'])
@login_required
def edit_block(block_id):
block = Block()
if request.method == 'POST':
block.EditBlock(request, block_id)
return block.resultMessage
connection = config.get_db_connection()
cursor = connection.cursor()
cursor.callproc("GetAllStates")
for rs in cursor.stored_results():
states = rs.fetchall()
cursor.callproc("GetAllDistrictsData")
for rs in cursor.stored_results():
districts = rs.fetchall()
block_data = block.GetBlockByID(block_id)
cursor.close()
connection.close()
return render_template(
'edit_block.html',
block_data=block_data,
states=states,
districts=districts
)
@block_bp.route('/delete_block/<int:block_id>')
@login_required
def delete_block(block_id):
block = Block()
block.DeleteBlock(request, block_id)
return redirect(url_for('block.add_block'))

View File

@@ -0,0 +1,84 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask_login import login_required
from model.District import District
from model.State import State
district_bp = Blueprint('district', __name__)
@district_bp.route('/add_district', methods=['GET', 'POST'])
@login_required
def add_district():
district = District()
if request.method == 'POST':
district.AddDistrict(request=request)
return district.resultMessage
state = State()
states = state.GetAllStates(request=request)
districtdata = district.GetAllDistricts(request=request)
return render_template(
'add_district.html',
districtdata=districtdata,
states=states
)
@district_bp.route('/check_district', methods=['POST'])
@login_required
def check_district():
district = District()
return district.CheckDistrict(request=request)
@district_bp.route('/delete_district/<int:district_id>')
@login_required
def delete_district(district_id):
district = District()
district.DeleteDistrict(request=request, district_id=district_id)
if not district.isSuccess:
return district.resultMessage
else:
return redirect(url_for('district.add_district'))
@district_bp.route('/edit_district/<int:district_id>', methods=['GET', 'POST'])
@login_required
def edit_district(district_id):
district = District()
state = State()
if request.method == 'POST':
district.EditDistrict(request=request, district_id=district_id)
if district.isSuccess:
flash("District updated successfully!", "success")
return redirect(url_for('district.add_district'))
else:
flash(district.resultMessage, "error")
districtdata = district.GetDistrictByID(request=request, district_id=district_id)
if not districtdata:
flash("District not found", "error")
return redirect(url_for('district.add_district'))
states = state.GetAllStates(request=request)
return render_template(
'edit_district.html',
districtdata=districtdata,
states=states
)

View File

@@ -0,0 +1,388 @@
import os
import ast
import re
from flask_login import login_required
import openpyxl
from flask import Blueprint, request, render_template, redirect, url_for, jsonify, current_app
from flask_login import current_user
from model.Log import LogHelper
import config
from model.FolderAndFile import FolderAndFile
excel_bp = Blueprint('excel', __name__)
# Default folder in case config not set
# DEFAULT_UPLOAD_FOLDER = 'uploads'
# def get_upload_folder():
# """Returns the upload folder from Flask config or default, ensures it exists."""
# folder = current_app.config.get('UPLOAD_FOLDER', DEFAULT_UPLOAD_FOLDER)
# if not os.path.exists(folder):
# os.makedirs(folder)
# return folder
# ---------------- Upload Excel File ----------------
@excel_bp.route('/upload_excel_file', methods=['GET', 'POST'])
@login_required
def upload():
if request.method == 'POST':
file = request.files.get('file')
if file and file.filename.endswith('.xlsx'):
# upload_folder = get_upload_folder()
# filepath = os.path.join(upload_folder, file.filename)
filepath =FolderAndFile.get_upload_path(file.filename)
file.save(filepath)
LogHelper.log_action(
"Upload Excel File",
f"User {current_user.id} Upload Excel File '{file.filename}'"
)
return redirect(url_for('excel.show_table', filename=file.filename))
return render_template('uploadExcelFile.html')
# ---------------- Show Excel Table ----------------
@excel_bp.route('/show_table/<filename>')
def show_table(filename):
global data
data = []
# filepath = os.path.join(get_upload_folder(), filename)
filepath = FolderAndFile.get_upload_path(filename)
wb = openpyxl.load_workbook(filepath, data_only=True)
sheet = wb.active
file_info = {
"Subcontractor": sheet.cell(row=1, column=2).value,
"State": sheet.cell(row=2, column=2).value,
"District": sheet.cell(row=3, column=2).value,
"Block": sheet.cell(row=4, column=2).value,
}
errors = []
subcontractor_data = None
state_data = None
district_data = None
block_data = None
connection = config.get_db_connection()
if connection:
try:
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetStateByName', [file_info['State']])
for result in cursor.stored_results():
state_data = result.fetchone()
if not state_data:
errors.append(f"State '{file_info['State']}' is not valid. Please add it.")
if state_data:
cursor.callproc('GetDistrictByNameAndStates', [file_info['District'], state_data['State_ID']])
for result in cursor.stored_results():
district_data = result.fetchone()
if not district_data:
errors.append(f"District '{file_info['District']}' is not valid under state '{file_info['State']}'.")
if district_data:
cursor.callproc('GetBlockByNameAndDistricts', [file_info['Block'], district_data['District_ID']])
for result in cursor.stored_results():
block_data = result.fetchone()
if not block_data:
errors.append(f"Block '{file_info['Block']}' is not valid under district '{file_info['District']}'.")
cursor.callproc('GetSubcontractorByName', [file_info['Subcontractor']])
for result in cursor.stored_results():
subcontractor_data = result.fetchone()
if not subcontractor_data:
cursor.callproc('InsertSubcontractor', [file_info['Subcontractor']])
connection.commit()
cursor.callproc('GetSubcontractorByName', [file_info['Subcontractor']])
for result in cursor.stored_results():
subcontractor_data = result.fetchone()
cursor.callproc("GetAllHoldTypes")
hold_types_data = []
for ht in cursor.stored_results():
hold_types_data = ht.fetchall()
hold_types_lookup = {row['hold_type'].lower(): row['hold_type_id'] for row in hold_types_data if row['hold_type']}
cursor.close()
except Exception as e:
print(f"Database error: {e}")
return f"Database operation failed: {e}", 500
finally:
connection.close()
variables = {}
hold_columns = []
hold_counter = 0
for j in range(1, sheet.max_column + 1):
col_value = sheet.cell(row=5, column=j).value
if col_value:
variables[col_value] = j
if 'hold' in str(col_value).lower():
hold_counter += 1
hold_type_key = str(col_value).lower().strip()
hold_type_id = hold_types_lookup.get(hold_type_key)
hold_columns.append({
'column_name': col_value,
'column_number': j,
'hold_type_id': hold_type_id
})
for i in range(6, sheet.max_row + 1):
row_data = {}
if sheet.cell(row=i, column=1).value:
row_data["Row Number"] = i
for var_name, col_num in variables.items():
row_data[var_name] = sheet.cell(row=i, column=col_num).value
if sum(1 for value in row_data.values() if value) >= 4:
data.append(row_data)
for hold in hold_columns:
if hold['hold_type_id']:
print(f" if Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}")
else:
errors.append(f"Hold Type not added ! Column name '{hold['column_name']}'.")
print(f" else Column: {hold['column_name']}, Column Number: {hold['column_number']}, Hold Type ID: {hold['hold_type_id']}")
return render_template(
'show_excel_file.html',
file_info=file_info,
variables=variables,
data=data,
subcontractor_data=subcontractor_data,
state_data=state_data,
district_data=district_data,
block_data=block_data,
errors=errors,
hold_columns=hold_columns,
hold_counter=hold_counter
)
# save Excel data
@excel_bp.route('/save_data', methods=['POST'])
def save_data():
# Extract form data
subcontractor_id = request.form.get("subcontractor_data")
state_id = request.form.get("state_data")
district_id = request.form.get("district_data")
block_id = request.form.get("block_data")
variables = request.form.getlist('variables[]')
hold_columns = request.form.get("hold_columns")
hold_counter = request.form.get("hold_counter")
if not data:
return jsonify({"error": "No data provided to save"}), 400
if data:
connection = config.get_db_connection()
cursor = connection.cursor()
try:
for entry in data:
save_data = {
"PMC_No": entry.get("PMC_No"),
"Invoice_Details": entry.get("Invoice_Details", ''),
"Work_Type": 'none',
"Invoice_Date": entry.get("Invoice_Date").strftime('%Y-%m-%d') if entry.get(
"Invoice_Date") else None,
"Invoice_No": entry.get("Invoice_No", ''),
"Basic_Amount": entry.get("Basic_Amount", 0.00),
"Debit_Amount": entry.get("Debit_Amount", 0.00),
"After_Debit_Amount": entry.get("After_Debit_Amount", 0.00),
"Amount": entry.get("Amount", 0.00),
"GST_Amount": entry.get("GST_Amount", 0.00),
"TDS_Amount": entry.get("TDS_Amount", 0.00),
"SD_Amount": entry.get("SD_Amount", 0.00),
"On_Commission": entry.get("On_Commission", 0.00),
"Hydro_Testing": entry.get("Hydro_Testing", 0.00),
"Hold_Amount": 0,
"GST_SD_Amount": entry.get("GST_SD_Amount", 0.00),
"Final_Amount": entry.get("Final_Amount", 0.00),
"Payment_Amount": entry.get("Payment_Amount", 0.00),
"Total_Amount": entry.get("Total_Amount", 0.00),
"TDS_Payment_Amount": entry.get("TDS_Payment_Amount", 0.00),
"UTR": entry.get("UTR", ''),
}
village_name, work_type = None, None
village_id = 0
LogHelper.log_action("Data saved", f"User {current_user.id} Data saved'{ village_name}'")
PMC_No = save_data.get('PMC_No')
Invoice_Details = save_data.get('Invoice_Details')
Invoice_Date = save_data.get('Invoice_Date')
Invoice_No = save_data.get('Invoice_No')
Basic_Amount = save_data.get('Basic_Amount')
Debit_Amount = save_data.get('Debit_Amount')
After_Debit_Amount = save_data.get('After_Debit_Amount')
Amount = save_data.get('Amount')
GST_Amount = save_data.get('GST_Amount')
TDS_Amount = save_data.get('TDS_Amount')
SD_Amount = save_data.get('SD_Amount')
On_Commission = save_data.get('On_Commission')
Hydro_Testing = save_data.get('Hydro_Testing')
GST_SD_Amount = save_data.get('GST_SD_Amount')
Final_Amount = save_data.get('Final_Amount')
Payment_Amount = save_data.get('Payment_Amount')
Total_Amount = save_data.get('Total_Amount')
TDS_Payment_Amount = save_data.get('TDS_Payment_Amount')
UTR = save_data.get('UTR')
if Invoice_Details:
words = Invoice_Details.lower().split()
if 'village' in words:
village_pos = words.index('village')
village_name = " ".join(words[:village_pos])
if 'work' in words:
work_pos = words.index('work')
if village_name:
work_type = " ".join(words[village_pos + 1:work_pos + 1])
else:
work_type = " ".join(words[:work_pos + 1])
if Invoice_Details and 'village' in Invoice_Details.lower() and 'work' in Invoice_Details.lower():
print("village_name ::", village_name, "|| work_type ::", work_type)
if block_id and village_name:
village_id = None
cursor.callproc("GetVillageId", (block_id, village_name))
for result in cursor.stored_results():
result = result.fetchone()
village_id = result[0] if result else None
if not village_id:
cursor.callproc("SaveVillage", (village_name, block_id))
cursor.callproc("GetVillageId", (block_id, village_name))
for result in cursor.stored_results():
result = result.fetchone()
village_id = result[0] if result else None
print("village_id :", village_id)
print("block_id :", block_id)
print("invoice :", PMC_No, village_id, work_type, Invoice_Details, Invoice_Date, Invoice_No,
Basic_Amount, Debit_Amount, After_Debit_Amount, Amount, GST_Amount, TDS_Amount,
SD_Amount, On_Commission, Hydro_Testing, GST_SD_Amount, Final_Amount)
args = (
PMC_No, village_id, work_type, Invoice_Details, Invoice_Date, Invoice_No,
Basic_Amount, Debit_Amount, After_Debit_Amount, Amount, GST_Amount, TDS_Amount,
SD_Amount, On_Commission, Hydro_Testing, GST_SD_Amount, Final_Amount,
subcontractor_id, 0
)
print("All invoice Details ",args)
results = cursor.callproc('SaveInvoice', args)
invoice_id = results[-1]
print("invoice id from the excel ", invoice_id)
if isinstance(hold_columns, str):
hold_columns = ast.literal_eval(hold_columns)
if isinstance(hold_columns, list) and all(isinstance(hold, dict) for hold in hold_columns):
for hold in hold_columns:
print(f"Processing hold: {hold}")
hold_column_name = hold.get('column_name') # Get column name
hold_type_id = hold.get('hold_type_id') # Get hold_type_id
if hold_column_name:
hold_amount = entry.get(
hold_column_name) # Get the value for that specific hold column
if hold_amount is not None:
print(f"Processing hold type: {hold_column_name}, Hold Amount: {hold_amount}")
hold_join_data = {
"Contractor_Id": subcontractor_id,
"Invoice_Id": invoice_id,
"hold_type_id": hold_type_id,
"hold_amount": hold_amount
}
cursor.callproc('InsertHoldJoinData', [
hold_join_data['Contractor_Id'], hold_join_data['Invoice_Id'],
hold_join_data['hold_type_id'], hold_join_data['hold_amount']
])
connection.commit()
print(f"Inserted hold join data: {hold_join_data}")
else:
print(f"Invalid hold entry: {hold}")
else:
print("Hold columns data is not a valid list of dictionaries.")
#---------------------------------------------Credit Note---------------------------------------------------------------------------
elif any(keyword in Invoice_Details.lower() for keyword in ['credit note','logging report']):
print("Credit note found:", PMC_No, Invoice_No, Basic_Amount, Debit_Amount, Final_Amount,
After_Debit_Amount, GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR, Invoice_No)
cursor.callproc(
'AddCreditNoteFromExcel',
[
PMC_No, Invoice_Details, Basic_Amount, Debit_Amount, After_Debit_Amount,
GST_Amount, Amount, Final_Amount, Payment_Amount, Total_Amount, UTR,
subcontractor_id, Invoice_No
]
)
#-----------------------------------------------Hold Amount----------------------------------------------------------------------
# Step 1: Normalize Invoice_Details: lowercase, trim, remove extra spaces
normalized_details = re.sub(r'\s+', ' ', Invoice_Details.strip()).lower()
# Step 2: Define lowercase keywords
keywords = [
'excess hold',
'ht',
'hold release amount',
'dpr excess hold amount',
'excess hold amount',
'Multi to Single layer bill',
'hold amount',
'logging report'
]
# Step 3: Matching condition
if any(kw in normalized_details for kw in keywords):
print("✅ Match found. Inserting hold release for:", Invoice_Details)
cursor.callproc(
'AddHoldReleaseFromExcel',
[PMC_No, Invoice_No, Invoice_Details, Basic_Amount, Final_Amount, UTR, subcontractor_id]
)
connection.commit()
print("✅ Hold release inserted for:", PMC_No, Invoice_Details)
#------------------------------------------------------------------------------------------------------------------
elif Invoice_Details and any(
keyword in Invoice_Details.lower() for keyword in ['gst', 'release', 'note']):
print("Gst rels :", PMC_No, Invoice_No, Basic_Amount, Final_Amount,Total_Amount,UTR, subcontractor_id)
cursor.callproc(
'AddGSTReleaseFromExcel',
[PMC_No, Invoice_No, Basic_Amount, Final_Amount, Total_Amount, UTR, subcontractor_id]
)
if PMC_No and Total_Amount and UTR:
print("Payment :", PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR )
cursor.callproc("SavePayment",(PMC_No, Invoice_No, Payment_Amount, TDS_Payment_Amount, Total_Amount, UTR ))
if not village_id:
village_id = None
cursor.callproc('InsertOrUpdateInPayment', (
PMC_No,
village_id,
work_type,
Invoice_Details,
Invoice_Date,
Invoice_No,
Basic_Amount,
Debit_Amount,
After_Debit_Amount,
Amount,
GST_Amount,
TDS_Amount,
SD_Amount,
On_Commission,
Hydro_Testing,
0,
GST_SD_Amount,
Final_Amount,
Payment_Amount,
TDS_Payment_Amount,
Total_Amount,
UTR,
subcontractor_id
))
connection.commit()
return jsonify({"success": "Data saved successfully!"}), 200
except Exception as e:
connection.rollback()
return jsonify({"error": f"An unexpected error occurred: {e}"}), 500
finally:
cursor.close()
connection.close()
return render_template('index.html')
# ---------------------- Report --------------------------------

View File

@@ -0,0 +1,70 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify
from flask_login import login_required, current_user
from model.gst_release import GSTReleasemodel
from model.Log import LogHelper
gst_release_bp = Blueprint('gst_release_bp', __name__)
# ------------------- Add GST Release -------------------
@gst_release_bp.route('/add_gst_release', methods=['GET', 'POST'])
@login_required
def add_gst_release():
gst_releases_dict = GSTReleasemodel.fetch_all_gst_releases()
gst_releases = [
[
g['GST_Release_Id'], g['PMC_No'], g['invoice_no'],
g['Basic_Amount'], g['Final_Amount'], g['Total_Amount'], g['UTR']
] for g in gst_releases_dict
] if gst_releases_dict else []
if request.method == 'POST':
pmc_no = request.form['PMC_No']
invoice_no = request.form['invoice_No']
basic_amount = request.form['basic_amount']
final_amount = request.form['final_amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
contractor_id = request.form['subcontractor_id']
LogHelper.log_action("Add GST Release", f"User {current_user.id} added GST release '{pmc_no}'")
GSTReleasemodel.insert_gst_release(pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id)
return redirect(url_for('gst_release_bp.add_gst_release'))
return render_template('add_gst_release.html', gst_releases=gst_releases, invoices=[])
# ------------------- Edit GST Release -------------------
@gst_release_bp.route('/edit_gst_release/<int:gst_release_id>', methods=['GET', 'POST'])
@login_required
def edit_gst_release(gst_release_id):
gst_release_data = GSTReleasemodel.fetch_gst_release_by_id(gst_release_id)
if not gst_release_data:
return "GST Release not found", 404
if request.method == 'POST':
pmc_no = request.form['PMC_No']
invoice_no = request.form['invoice_No']
basic_amount = request.form['basic_amount']
final_amount = request.form['final_amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
LogHelper.log_action("Edit GST Release", f"User {current_user.id} edited GST release '{pmc_no}'")
GSTReleasemodel.update_gst_release(gst_release_id, pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr)
return redirect(url_for('gst_release_bp.add_gst_release'))
return render_template('edit_gst_release.html', gst_release_data=gst_release_data, invoices=[])
# ------------------- Delete GST Release -------------------
@gst_release_bp.route('/delete_gst_release/<int:gst_release_id>', methods=['GET', 'POST'])
@login_required
def delete_gst_release(gst_release_id):
success, utr = GSTReleasemodel.delete_gst_release(gst_release_id)
if not success:
return jsonify({"message": "GST Release not found or failed to delete", "status": "error"}), 404
LogHelper.log_action("Delete GST Release", f"User {current_user.id} deleted GST release '{gst_release_id}'")
return jsonify({"message": f"GST Release {gst_release_id} deleted successfully.", "status": "success"}), 200

View File

@@ -0,0 +1,77 @@
from flask import Blueprint, render_template, request, jsonify, redirect, url_for
from flask_login import login_required
from model.HoldTypes import HoldTypes
from model.GST import GST
hold_bp = Blueprint("hold_types", __name__)
# ---------------- ADD HOLD TYPE ----------------
@hold_bp.route('/add_hold_type', methods=['GET','POST'])
@login_required
def add_hold_type():
hold = HoldTypes()
if request.method == 'POST':
hold.AddHoldType(request) # ✅
return hold.resultMessage
hold_types = hold.GetAllHoldTypes() # ✅
return render_template(
"add_hold_type.html",
Hold_Types_data=hold_types
)
# ---------------- CHECK HOLD TYPE (OPTIONAL LIKE BLOCK) ----------------
@hold_bp.route('/check_hold_type', methods=['POST'])
@login_required
def check_hold_type():
hold = HoldTypes()
return hold.CheckHoldType(request) # if exists
# ---------------- EDIT HOLD TYPE ----------------
@hold_bp.route('/edit_hold_type/<int:id>', methods=['GET','POST'])
@login_required
def edit_hold_type(id):
hold = HoldTypes()
if request.method == 'POST':
hold.EditHoldType(request, id) # ✅
return hold.resultMessage
hold_data = hold.GetHoldTypeByID(id) # ✅
return render_template(
"edit_hold_type.html",
hold_type=hold_data
)
# ---------------- DELETE HOLD TYPE ----------------
@hold_bp.route('/delete_hold_type/<int:id>')
@login_required
def delete_hold_type(id):
hold = HoldTypes()
hold.DeleteHoldType(request, id) # ✅
return redirect(url_for("hold_types.add_hold_type"))
# ---------------- GST ----------------
@hold_bp.route('/unreleased_gst')
@login_required
def unreleased_gst():
data = GST.get_unreleased_gst()
return render_template(
"unreleased_gst.html",
data=data
)

View File

@@ -0,0 +1,98 @@
# controllers/invoice_controller.py
from flask import Blueprint, request, jsonify, render_template
from flask_login import login_required, current_user
from model.Invoice import *
from model.Log import LogHelper
invoice_bp = Blueprint('invoice', __name__)
# -------------------------------- Add Invoice ---------------------------------
@invoice_bp.route('/add_invoice', methods=['GET', 'POST'])
@login_required
def add_invoice():
if request.method == 'POST':
try:
village_name = request.form.get('village')
village_result = get_village_id(village_name)
if not village_result:
return jsonify({"status": "error", "message": f"Village '{village_name}' not found"}), 400
village_id = village_result['Village_Id']
data = request.form
invoice_id = insert_invoice(data, village_id)
assign_subcontractor(data, village_id)
insert_hold_types(data, invoice_id)
LogHelper.log_action("Add invoice", f"User {current_user.id} Added invoice '{data.get('pmc_no')}'")
return jsonify({"status": "success", "message": "Invoice added successfully"}), 201
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
invoices = get_all_invoice_details()
villages = get_all_villages()
return render_template('add_invoice.html', invoices=invoices, villages=villages)
# ------------------- Search Subcontractor -------------------
@invoice_bp.route('/search_subcontractor', methods=['POST'])
@login_required
def search_subcontractor():
sub_query = request.form.get("query")
results = search_contractors(sub_query)
if not results:
return "<li>No subcontractor found</li>"
output = "".join(
f"<li data-id='{row['Contractor_Id']}'>{row['Contractor_Name']}</li>"
for row in results
)
return output
# ------------------- Get Hold Types -------------------
@invoice_bp.route('/get_hold_types', methods=['GET'])
@login_required
def get_hold_types():
hold_types = get_all_hold_types()
LogHelper.log_action("Get hold type", f"User {current_user.id} Get hold type '{hold_types}'")
return jsonify(hold_types)
# ------------------- Edit Invoice -------------------
@invoice_bp.route('/edit_invoice/<int:invoice_id>', methods=['GET', 'POST'])
@login_required
def edit_invoice(invoice_id):
if request.method == 'POST':
data = request.form
update_invoice(data, invoice_id)
update_inpayment(data)
LogHelper.log_action("Edit invoice", f"User {current_user.id} Edit invoice '{invoice_id}'")
return jsonify({"status": "success", "message": "Invoice updated successfully"}), 200
invoice = get_invoice_by_id(invoice_id)
return render_template('edit_invoice.html', invoice=invoice)
# ------------------- Delete Invoice -------------------
@invoice_bp.route('/delete_invoice/<int:invoice_id>', methods=['GET'])
@login_required
def delete_invoice_route(invoice_id):
try:
delete_invoice_data(invoice_id, current_user.id)
LogHelper.log_action("Delete Invoice", f"User {current_user.id} deleted Invoice '{invoice_id}'")
return jsonify({
"message": f"Invoice {invoice_id} deleted successfully.",
"status": "success"
})
except Exception as e:
return jsonify({
"message": str(e),
"status": "error"
}), 500

View File

@@ -0,0 +1,31 @@
from flask import Blueprint, render_template, request
from flask_login import login_required
from model.Log import LogData
log_bp = Blueprint('log', __name__)
@log_bp.route('/activity_log', methods=['GET', 'POST'])
@login_required
def activity_log():
start_date = request.values.get("start_date")
end_date = request.values.get("end_date")
user_name = request.values.get("username")
logData = LogData()
filtered_logs = logData.GetFilteredActivitiesLog(
start_date,
end_date,
user_name
)
return render_template(
"activity_log.html",
logs=filtered_logs,
start_date=start_date,
end_date=end_date,
username=user_name
)

View File

@@ -0,0 +1,103 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify
from flask_login import login_required, current_user
from model.payment import Paymentmodel
from model.Log import LogHelper
payment_bp = Blueprint('payment_bp', __name__)
# ------------------- Add Payment -------------------
@payment_bp.route('/add_payment', methods=['GET', 'POST'])
@login_required
def add_payment():
payments_dicts = Paymentmodel.fetch_all_payments()
# Convert to array for template
payments = [
[
p['Payment_Id'], p['PMC_No'], p['Invoice_No'],
p['Payment_Amount'], p['TDS_Payment_Amount'], p['Total_Amount'], p['UTR']
] for p in payments_dicts
] if payments_dicts else []
if request.method == 'POST':
subcontractor_id = request.form.get('subcontractor_id')
pmc_no = request.form['PMC_No']
invoice_no = request.form['invoice_No']
amount = request.form['Payment_Amount']
tds_amount = request.form['TDS_Payment_Amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
LogHelper.log_action("Add Payment", f"User {current_user.id} Add Payment '{pmc_no}'")
Paymentmodel.insert_payment(pmc_no, invoice_no, amount, tds_amount, total_amount, utr)
Paymentmodel.update_inpayment(subcontractor_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr)
return redirect(url_for('payment_bp.add_payment'))
return render_template('add_payment.html', payments=payments)
# ------------------- Get PMC Nos -------------------
@payment_bp.route('/get_pmc_nos_by_subcontractor/<subcontractorId>')
@login_required
def get_pmc_nos_by_subcontractor(subcontractorId):
connection = Paymentmodel.get_connection()
cur = connection.cursor()
cur.callproc('GetDistinctPMCNoByContractorId', [subcontractorId])
results = []
for result in cur.stored_results():
results = result.fetchall()
cur.close()
pmc_nos = [row[0] for row in results]
return jsonify({'pmc_nos': pmc_nos})
# ------------------- Edit Payment -------------------
@payment_bp.route('/edit_payment/<int:payment_id>', methods=['GET', 'POST'])
@login_required
def edit_payment(payment_id):
payment_data = Paymentmodel.fetch_payment_by_id(payment_id)
if not payment_data:
return "Payment not found", 404
if request.method == 'POST':
pmc_no = request.form['PMC_No']
invoice_no = request.form['invoice_No']
amount = request.form['Payment_Amount']
tds_amount = request.form['TDS_Payment_Amount']
total_amount = request.form['total_amount']
utr = request.form['utr']
LogHelper.log_action("Edit Payment", f"User {current_user.id} Edit Payment '{pmc_no}'")
Paymentmodel.call_update_payment_proc(payment_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr)
# Update inpayment
connection = Paymentmodel.get_connection()
cursor = connection.cursor()
cursor.callproc(
'UpdateInpaymentByPMCInvoiceUTR',
[amount, tds_amount, total_amount, pmc_no, invoice_no, utr]
)
connection.commit()
cursor.close()
connection.close()
return redirect(url_for('payment_bp.add_payment'))
return render_template('edit_payment.html', payment_data=payment_data)
# ------------------- Delete Payment -------------------
@payment_bp.route('/delete_payment/<int:payment_id>', methods=['GET', 'POST'])
@login_required
def delete_payment(payment_id):
success, pmc_no, invoice_no = Paymentmodel.delete_payment(payment_id)
if not success:
return jsonify({"message": "Payment not found or failed to delete", "status": "error"}), 404
LogHelper.log_action("Delete Payment", f"User {current_user.id} deleted Payment '{payment_id}'")
return jsonify({
"message": f"Payment ID {payment_id} deleted successfully.",
"status": "success"
}), 200

View File

@@ -0,0 +1,37 @@
from flask import Blueprint, render_template, send_from_directory
from flask_login import login_required, current_user
from model.PmcReport import PmcReport
pmc_report_bp = Blueprint("pmc_report", __name__)
@pmc_report_bp.route("/pmc_report/<pmc_no>")
@login_required
def pmc_report(pmc_no):
data = PmcReport.get_pmc_report(pmc_no)
if not data:
return "No PMC found with this number", 404
return render_template(
"pmc_report.html",
info=data["info"],
invoices=data["invoices"],
hold_types=data["hold_types"],
gst_rel=data["gst_rel"],
payments=data["payments"],
credit_note=data["credit_note"],
hold_release=data["hold_release"],
total=data["total"]
)
@pmc_report_bp.route("/download_pmc_report/<pmc_no>")
@login_required
def download_pmc_report(pmc_no):
result = PmcReport.download_pmc_report(pmc_no)
if not result:
return "No contractor found for this PMC No", 404
output_folder, file_name = result
return send_from_directory(output_folder, file_name, as_attachment=True)

View File

@@ -0,0 +1,193 @@
from flask import Blueprint, render_template, request, jsonify, send_file
from flask_login import login_required, current_user
from model.Report import ReportHelper
from model.Log import LogHelper
import config
import os
import openpyxl
from openpyxl.styles import Font
from model.ContractorInfo import ContractorInfo
from model.FolderAndFile import FolderAndFile
report_bp = Blueprint("report", __name__)
# ---------------- Report Page ----------------
@report_bp.route("/report")
@login_required
def report_page():
return render_template("/report.html")
# ---------------- Search Contractor ----------------
@report_bp.route("/search_contractor", methods=["POST"])
@login_required
def search_contractor():
subcontractor_name = request.form.get("subcontractor_name")
LogHelper.log_action(
"Search Contractor",
f"User {current_user.id} searched contractor '{subcontractor_name}'"
)
data = ReportHelper.search_contractor(request)
return jsonify(data)
# ---------------- Contractor Report ----------------
@report_bp.route('/contractor_report/<int:contractor_id>')
@login_required
def contractor_report(contractor_id):
data = ReportHelper.get_contractor_report(contractor_id)
return render_template(
'subcontractor_report.html',
contractor_id=contractor_id,
**data
)
@report_bp.route('/download_report/<int:contractor_id>')
@login_required
def download_report(contractor_id):
return ReportHelper().download_report(contractor_id=contractor_id)
# @report_bp.route('/download_report/<int:contractor_id>')
# @login_required
# def download_report(contractor_id):
# try:
# connection = config.get_db_connection()
# cursor = connection.cursor(dictionary=True)
# # -------- Contractor Info --------
# contractor = ContractorInfo(contractor_id)
# contInfo = contractor.contInfo
# if not contInfo:
# return "No contractor found", 404
# # -------- Invoice Data --------
# cursor.callproc('FetchInvoicesByContractor', [contractor_id])
# invoices = []
# for result in cursor.stored_results():
# invoices.extend(result.fetchall())
# if not invoices:
# return "No invoice data found"
# # -------- Create Workbook --------
# workbook = openpyxl.Workbook()
# sheet = workbook.active
# sheet.title = "Contractor Report"
# # ================= CONTRACTOR DETAILS =================
# sheet.append(["SUB CONTRACTOR DETAILS"])
# sheet.cell(row=sheet.max_row, column=1).font = Font(bold=True)
# sheet.append([])
# sheet.append(["Name", contInfo.get("Contractor_Name") or ""])
# sheet.append(["Mobile No", contInfo.get("Mobile_No") or ""])
# sheet.append(["Email", contInfo.get("Email") or ""])
# sheet.append(["Village", contInfo.get("Village_Name") or ""])
# sheet.append(["Block", contInfo.get("Block_Name") or ""])
# sheet.append(["District", contInfo.get("District_Name") or ""])
# sheet.append(["State", contInfo.get("State_Name") or ""])
# sheet.append(["Address", contInfo.get("Address") or ""])
# sheet.append(["GST No", contInfo.get("GST_No") or ""])
# sheet.append(["PAN No", contInfo.get("PAN_No") or ""])
# sheet.append([])
# sheet.append([])
# # ================= TABLE HEADERS =================
# headers = [
# "PMC No", "Village", "Invoice No", "Invoice Date", "Work Type","Invoice_Details",
# "Basic Amount", "Debit Amount", "After Debit Amount",
# "Amount", "GST Amount", "TDS Amount", "SD Amount",
# "On Commission", "Hydro Testing", "Hold Amount",
# "GST SD Amount", "Final Amount",
# "Payment Amount", "TDS Payment",
# "Total Amount", "UTR"
# ]
# sheet.append(headers)
# for col in range(1, len(headers) + 1):
# sheet.cell(row=sheet.max_row, column=col).font = Font(bold=True)
# # ================= DATA =================
# total_final = 0
# total_payment = 0
# total_amount = 0
# for inv in invoices:
# row = [
# inv.get("PMC_No"),
# inv.get("Village_Name"),
# inv.get("invoice_no"),
# inv.get("Invoice_Date"),
# inv.get("Work_Type"),
# inv.get("Invoice_Details"),
# inv.get("Basic_Amount"),
# inv.get("Debit_Amount"),
# inv.get("After_Debit_Amount"),
# inv.get("Amount"),
# inv.get("GST_Amount"),
# inv.get("TDS_Amount"),
# inv.get("SD_Amount"),
# inv.get("On_Commission"),
# inv.get("Hydro_Testing"),
# inv.get("Hold_Amount"),
# inv.get("GST_SD_Amount"),
# inv.get("Final_Amount"),
# inv.get("Payment_Amount"),
# inv.get("TDS_Payment_Amount"),
# inv.get("Total_Amount"),
# inv.get("UTR")
# ]
# total_final += float(inv.get("Final_Amount") or 0)
# total_payment += float(inv.get("Payment_Amount") or 0)
# total_amount += float(inv.get("Total_Amount") or 0)
# sheet.append(row)
# # ================= TOTAL ROW =================
# sheet.append([])
# sheet.append([
# "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "",
# "TOTAL",
# total_final,
# total_payment,
# "",
# total_amount,
# ""
# ])
# # ================= AUTO WIDTH =================
# for column in sheet.columns:
# max_length = 0
# column_letter = column[0].column_letter
# for cell in column:
# if cell.value:
# max_length = max(max_length, len(str(cell.value)))
# sheet.column_dimensions[column_letter].width = max_length + 2
# # ================= SAVE FILE =================
# # output_folder = "downloads"
# # os.makedirs(output_folder, exist_ok=True)
# # filename = f"Contractor_Report_{contInfo.get('Contractor_Name')}.xlsx"
# # output_file = os.path.join(output_folder, filename)
# filename = f"Contractor_Report_{contInfo.get('Contractor_Name')}.xlsx"
# output_file = FolderAndFile.get_download_path(filename)
# workbook.save(output_file)
# return send_file(output_file, as_attachment=True)
# except Exception as e:
# return str(e)

View File

@@ -0,0 +1,69 @@
from flask import Blueprint, render_template, request, redirect, url_for
from flask_login import login_required
from model.State import State
state_bp = Blueprint('state', __name__)
@state_bp.route('/add_state', methods=['GET', 'POST'])
@login_required
def add_state():
state = State()
if request.method == 'POST':
state.AddState(request=request)
return state.resultMessage
statedata = state.GetAllStates(request=request)
return render_template('add_state.html', statedata=statedata)
@state_bp.route('/check_state', methods=['POST'])
@login_required
def check_state():
state = State()
return state.CheckState(request=request)
@state_bp.route('/delete_state/<int:id>')
@login_required
def deleteState(id):
state = State()
msg = state.DeleteState(request=request, id=id)
if not state.isSuccess:
return state.resultMessage
else:
return redirect(url_for('state.add_state'))
@state_bp.route('/edit_state/<int:id>', methods=['GET', 'POST'])
@login_required
def editState(id):
state = State()
if request.method == 'POST':
state.EditState(request=request, id=id)
if state.isSuccess:
return redirect(url_for('state.add_state'))
else:
return state.resultMessage
statedata = state.GetStateByID(request=request, id=id)
if not state.isSuccess:
return state.resultMessage
if statedata is None:
statedata = []
return render_template('edit_state.html', state=statedata)

View File

@@ -0,0 +1,117 @@
from flask import Blueprint, render_template, request, redirect, url_for, jsonify
from flask_login import login_required
from model.Subcontractor import Subcontractor
subcontractor_bp = Blueprint('subcontractor', __name__)
# ----------------------------------------------------------
# Helpers (unchanged)
# ----------------------------------------------------------
class HtmlHelper:
@staticmethod
def json_response(data, status=200):
return jsonify(data), status
class ResponseHandler:
@staticmethod
def fetch_failure(entity):
return {"status": "error", "message": f"Failed to fetch {entity}"}
@staticmethod
def add_failure(entity):
return {"status": "error", "message": f"Failed to add {entity}"}
@staticmethod
def update_failure(entity):
return {"status": "error", "message": f"Failed to update {entity}"}
@staticmethod
def delete_failure(entity):
return {"status": "error", "message": f"Failed to delete {entity}"}
# ----------------------------------------------------------
# LIST + ADD
# ----------------------------------------------------------
@subcontractor_bp.route('/subcontractor', methods=['GET', 'POST'])
@login_required
def subcontract():
sub = Subcontractor()
# ---------------- GET ----------------
if request.method == 'GET':
subcontractor = sub.GetAllSubcontractors(request)
if not sub.isSuccess:
return HtmlHelper.json_response(
ResponseHandler.fetch_failure("Subcontractor"), 500
)
return render_template('add_subcontractor.html', subcontractor=subcontractor)
# ---------------- POST (ADD) ----------------
if request.method == 'POST':
sub.AddSubcontractor(request)
if not sub.isSuccess:
return HtmlHelper.json_response(
ResponseHandler.add_failure("Subcontractor"), 500
)
# Reload list after insert
subcontractor = sub.GetAllSubcontractors(request)
return render_template('add_subcontractor.html', subcontractor=subcontractor)
# ----------------------------------------------------------
# EDIT
# ----------------------------------------------------------
@subcontractor_bp.route('/edit_subcontractor/<int:id>', methods=['GET', 'POST'])
@login_required
def edit_subcontractor(id):
sub = Subcontractor()
# Fetch data
subcontractor = sub.GetSubcontractorByID(id)
if not subcontractor:
return HtmlHelper.json_response(
ResponseHandler.fetch_failure("Subcontractor"), 404
)
# ---------------- POST (UPDATE) ----------------
if request.method == 'POST':
sub.EditSubcontractor(request, id)
if not sub.isSuccess:
return HtmlHelper.json_response(
ResponseHandler.update_failure("Subcontractor"), 500
)
return redirect(url_for('subcontractor.subcontract'))
return render_template('edit_subcontractor.html', subcontractor=subcontractor)
# ----------------------------------------------------------
# DELETE
# ----------------------------------------------------------
@subcontractor_bp.route('/deleteSubContractor/<int:id>', methods=['GET', 'POST'])
@login_required
def deleteSubContractor(id):
sub = Subcontractor()
sub.DeleteSubcontractor(request, id)
if not sub.isSuccess:
return HtmlHelper.json_response(
ResponseHandler.delete_failure("Subcontractor"), 500
)
return redirect(url_for('subcontractor.subcontract'))

View File

@@ -0,0 +1,167 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required
import config
from model.Village import Village
from model.State import State
# Create Blueprint
village_bp = Blueprint('village', __name__)
# ------------------------- Add Village -------------------------
@village_bp.route('/add_village', methods=['GET', 'POST'])
@login_required
def add_village():
village = Village()
if request.method == 'POST':
village.AddVillage(request=request)
return village.resultMessage
state = State()
states = state.GetAllStates(request=request)
villages = village.GetAllVillages(request=request)
return render_template(
'add_village.html',
states=states,
villages=villages
)
# ------------------------- Fetch Districts -------------------------
@village_bp.route('/get_districts/<int:state_id>')
@login_required
def get_districts(state_id):
connection = config.get_db_connection()
cursor = connection.cursor()
cursor.callproc("GetDistrictByStateID", [state_id])
districts = []
for rs in cursor.stored_results():
districts = rs.fetchall()
cursor.close()
connection.close()
district_list = []
for d in districts:
district_list.append({
"id": d[0],
"name": d[1]
})
return jsonify(district_list)
# ------------------------- Fetch Blocks -------------------------
@village_bp.route('/get_blocks/<int:district_id>')
@login_required
def get_blocks(district_id):
connection = config.get_db_connection()
cursor = connection.cursor()
cursor.callproc("GetBlocksByDistrictID", [district_id])
blocks = []
for rs in cursor.stored_results():
blocks = rs.fetchall()
cursor.close()
connection.close()
block_list = []
for b in blocks:
block_list.append({
"id": b[0],
"name": b[1]
})
return jsonify(block_list)
# ------------------------- Check Village -------------------------
@village_bp.route('/check_village', methods=['POST'])
@login_required
def check_village():
village = Village()
return village.CheckVillage(request=request)
# ------------------------- Delete Village -------------------------
@village_bp.route('/delete_village/<int:village_id>')
@login_required
def delete_village(village_id):
village = Village()
village.DeleteVillage(request=request, village_id=village_id)
if not village.isSuccess:
flash(village.resultMessage, "error")
else:
flash(village.resultMessage, "success")
return redirect(url_for('village.add_village'))
# ------------------------- Edit Village -------------------------
@village_bp.route('/edit_village/<int:village_id>', methods=['GET', 'POST'])
@login_required
def edit_village(village_id):
village = Village()
if request.method == 'POST':
village.EditVillage(request=request, village_id=village_id)
if village.isSuccess:
flash(village.resultMessage, "success")
return redirect(url_for('village.add_village'))
else:
flash(village.resultMessage, "error")
village_data = village.GetVillageByID(request=request, id=village_id)
blocks = village.GetAllBlocks(request=request)
return render_template(
'edit_village.html',
village_data=village_data,
blocks=blocks
)
else:
village_data = village.GetVillageByID(request=request, id=village_id)
if not village.isSuccess:
flash(village.resultMessage, "error")
return redirect(url_for('village.add_village'))
blocks = village.GetAllBlocks(request=request)
if village_data is None:
village_data = []
if blocks is None:
blocks = []
return render_template(
'edit_village.html',
village_data=village_data,
blocks=blocks
)

5053
main.py

File diff suppressed because it is too large Load Diff

63
model/Auth.py Normal file
View File

@@ -0,0 +1,63 @@
import os
from dotenv import load_dotenv
from flask_login import UserMixin
from ldap3 import Server, Connection, ALL
from ldap3.core.exceptions import LDAPBindError
# Load .env
load_dotenv()
class DefaultCredentials:
username = os.getenv("DEFAULT_USERNAME")
password = os.getenv("DEFAULT_PASSWORD")
class LoginLDAP:
def __init__(self, request):
self.username = request.form.get("username", "").strip()
self.password = request.form.get("password", "")
self.isDefaultCredentials = False
self.isValidLogin = False
self.errorMessage = ""
ldap_server = "ldap://localhost:389"
ldap_user_dn = f"uid={self.username},ou=users,dc=lcepl,dc=org"
# fallback admin login
if (
self.username == DefaultCredentials.username
and self.password == DefaultCredentials.password
):
self.isDefaultCredentials = True
self.isValidLogin = True
return
try:
server = Server(ldap_server, get_info=ALL)
conn = Connection(
server,
user=ldap_user_dn,
password=self.password,
auto_bind=True
)
if conn.bound:
self.isValidLogin = True
except LDAPBindError:
self.errorMessage = "Invalid LDAP credentials"
except Exception as e:
self.errorMessage = str(e)
class User(UserMixin):
def __init__(self, username):
self.id = username

165
model/Block.py Normal file
View File

@@ -0,0 +1,165 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from model.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
from mysql.connector import Error
from model.ItemCRUD import ItemCRUD, itemCRUDMapping
class Block:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ----------------------------------------------------------
# Add Block
# ----------------------------------------------------------
def AddBlock(self, request):
block = ItemCRUD(itemType=ItemCRUDType.Block)
district_id = request.form.get('district_Id')
block_name = request.form.get('block_Name', '').strip()
block.AddItem(request=request, parentid=district_id, childname=block_name, storedprocfetch="GetBlockByNameAndDistricts", storedprocadd="SaveBlock" )
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return
# ----------------------------------------------------------
# Get All Blocks
# ----------------------------------------------------------
# def GetAllBlocks(self):
# block = ItemCRUD(itemType=ItemCRUDType.Block)
# blocksdata = block.GetAllData(request=request, storedproc="GetAllBlock")
# self.isSuccess = block.isSuccess
# self.resultMessage = block.resultMessage
# return blocksdata
def GetAllBlocks(self, request):
block = ItemCRUD(itemType=ItemCRUDType.Block)
blocksdata = block.GetAllData(request=request, storedproc="GetAllBlock")
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return blocksdata
# ----------------------------------------------------------
# Check Block Exists
# ----------------------------------------------------------
# def CheckBlock(self, request):
# block = ItemCRUD(itemType=ItemCRUDType.Block)
# block_name = request.json.get('block_Name', '').strip()
# district_id = request.json.get('district_Id')
# result = block.CheckItem(request=request, parentid=district_id, childname=block_name, storedprocfetch="GetBlockByNameAndDistrict")
# self.isSuccess = block.isSuccess
# self.resultMessage = block.resultMessage
# return result
def CheckBlock(self, request):
block = ItemCRUD(itemType=ItemCRUDType.Block)
data = request.get_json(silent=True) or request.form
block_name = (data.get('block_Name') or '').strip()
district_id = data.get('district_Id')
result = block.CheckItem(
request=request,
parentid=district_id,
childname=block_name,
storedprocfetch="GetBlockByNameAndDistrict"
)
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return result
# ----------------------------------------------------------
# Get Block By ID
# ----------------------------------------------------------
# def GetBlockByID(self, id):
# block = ItemCRUD(itemType=ItemCRUDType.Village)
# blockdata = block.GetAllData(id=id,storedproc="GetBlockDataByID")
# self.isSuccess = block.isSuccess
# self.resultMessage = block.resultMessage
# print("akash"+blockdata)
# return blockdata
# def GetBlockByID(self,request,id):
# block = ItemCRUD(itemType=ItemCRUDType.Block)
# blockdata = block.GetDataByID(request=request,id=id,storedproc="GetBlockDataByID")
# self.isSuccess = block.isSuccess
# self.resultMessage = block.resultMessage
# return blockdata
def GetBlockByID(self, id):
block = ItemCRUD(itemType=ItemCRUDType.Block)
blockdata = block.GetDataByID(
id=id,
storedproc="GetBlockDataByID"
)
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return blockdata
# ----------------------------------------------------------
# Update Block
# ----------------------------------------------------------
# def EditBlock(self, request, block_id):
# block = ItemCRUD(itemType=ItemCRUDType.Block)
# district_id = request.form.get('district_Id')
# block_name = request.form.get('block_Name', '').strip()
# block.EditItem(request=request, childid=block_id, parentid=district_id, childname=block_name, storedprocadd="UpdateBlockById" )
# self.isSuccess = block.isSuccess
# self.resultMessage = block.resultMessage
# return
def EditBlock(self, request, block_id):
block = ItemCRUD(itemType=ItemCRUDType.Block)
district_id = request.form.get('district_Id')
block_name = request.form.get('block_Name', '').strip()
block.EditItem(
request=request,
childid=block_id,
parentid=district_id,
childname=block_name,
storedprocupdate="UpdateBlockById"
)
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return render_template('add_block.html')
# ----------------------------------------------------------
# Delete Block
# ---------------------------------------------------------
def DeleteBlock(self,request, id):
block = ItemCRUD(itemType=ItemCRUDType.Block)
block.DeleteItem(request=request,itemID=id, storedprocDelete="DeleteBlock")
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return

View File

@@ -19,12 +19,10 @@ class ContractorInfo:
def fetchData(self):
try:
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
print("here", flush=True)
cursor.callproc('GetContractorInfoById', [self.contInfo])
#self.contInfo = next(cursor.stored_results()).fetchone()
self.contInfo = cursor.fetchone()
cursor = connection.cursor(dictionary=True, buffered=True)
cursor.callproc('GetContractorInfoById', [self.ID])
for result in cursor.stored_results():
self.contInfo = result.fetchone()
print(self.contInfo,flush=True)
finally:
@@ -40,25 +38,23 @@ class ContractorInfo:
# ---------------- Hold Types ----------------
cursor.execute("""
SELECT DISTINCT ht.hold_type_id, ht.hold_type
FROM invoice_subcontractor_hold_join h
JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id
WHERE h.Contractor_Id = %s
""", (self.contractor_id,))
hold_types = cursor.fetchall()
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetHoldTypesByContractor', [self.ID])
hold_types = []
for result in cursor.stored_results():
hold_types = result.fetchall()
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
# ---------------- Invoices ----------------
cursor.execute("""
SELECT i.*, v.Village_Name
FROM assign_subcontractors asg
INNER JOIN invoice i ON i.PMC_No = asg.PMC_No AND i.Village_Id = asg.Village_Id
LEFT JOIN villages v ON i.Village_Id = v.Village_Id
WHERE asg.Contractor_Id = %s
ORDER BY i.PMC_No, i.Invoice_No
""", (self.contractor_id,))
invoices = cursor.fetchall()
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetInvoicesByContractor', [self.ID])
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
# Remove duplicate invoices
invoice_ids_seen = set()

View File

@@ -4,8 +4,8 @@ from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from AppCode.Log import LogData, LogHelper
from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from model.Log import LogData, LogHelper
import os
import config
@@ -14,7 +14,7 @@ import re
import mysql.connector
from mysql.connector import Error
from AppCode.ItemCRUD import ItemCRUD
from model.ItemCRUD import ItemCRUD
class District:
isSuccess = False
@@ -31,13 +31,13 @@ class District:
district_name = request.form['district_Name'].strip()
state_id = request.form['state_Id']
district.EditItem(request=request, childid=district_id, parentid=state_id, childname=district_name, storedprocadd="UpdateBlockById" )
district.EditItem(request=request, childid=district_id, parentid=state_id, childname=district_name,storedprocupdate="UpdateDistrict" )
self.isSuccess = district.isSuccess
self.resultMessage = district.resultMessage
return
def AddDistrict(self, request):
def AddDistrict(self, request):
district = ItemCRUD(ItemCRUDType.District)
@@ -69,11 +69,32 @@ class District:
return result
def GetDistrictByID(self, request, id):
district = ItemCRUD(itemType=ItemCRUDType.Village)
districtdata = district.GetAllData("GetDistrictDataByID")
self.isSuccess = district.isSuccess
self.resultMessage = district.resultMessage
# def GetDistrictByID(self, request,district_id):
# district = ItemCRUD(itemType=ItemCRUDType.District)
# districtdata = district.GetAllData(id=district_id,storedproc="GetDistrictDataByID")
# self.isSuccess = district.isSuccess
# self.resultMessage = district.resultMessage
# return districtdata
def GetDistrictByID(self, request, district_id):
district = ItemCRUD(itemType=ItemCRUDType.District)
districtdata = district.GetDataByID(
id=district_id,
storedproc="GetDistrictDataByID"
)
if districtdata:
self.isSuccess = True
else:
self.isSuccess = False
self.resultMessage = "District not found"
return districtdata
#Delete District
def DeleteDistrict(self, request, district_id):
district = ItemCRUD(itemType=ItemCRUDType.District)
district.DeleteItem(request=request,itemID=district_id,storedprocDelete="DeleteDistrict")
self.isSuccess = district.isSuccess
self.resultMessage = str(district.resultMessage)

39
model/FolderAndFile.py Normal file
View File

@@ -0,0 +1,39 @@
import os
from flask import current_app
class FolderAndFile:
# -----------------------------
# BASE FOLDER METHODS
# -----------------------------
@staticmethod
def get_download_folder():
folder = os.path.join(current_app.root_path, "static", "downloads")
if not os.path.exists(folder):
os.makedirs(folder)
os.makedirs(folder, exist_ok=True)
return folder
@staticmethod
def get_upload_folder():
folder = os.path.join(current_app.root_path, "static", "uploads")
if not os.path.exists(folder):
os.makedirs(folder)
os.makedirs(folder, exist_ok=True)
return folder
# -----------------------------
# FILE PATH METHODS
# -----------------------------
@staticmethod
def get_download_path(filename):
return os.path.join(FolderAndFile.get_download_folder(), filename)
@staticmethod
def get_upload_path(filename):
return os.path.join(FolderAndFile.get_upload_folder(), filename)

55
model/GST.py Normal file
View File

@@ -0,0 +1,55 @@
import config
class GST:
@staticmethod
def get_unreleased_gst():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
# ----------- Invoices -----------
cursor.callproc('GetAllInvoicesBasic')
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
# ----------- GST Releases -----------
cursor.callproc('GetAllGSTReleasesBasic')
gst_releases = []
for result in cursor.stored_results():
gst_releases = result.fetchall()
gst_invoice_nos = {
g['Invoice_No']
for g in gst_releases
if g['Invoice_No']
}
gst_basic_amounts = {
float(g['Basic_Amount'])
for g in gst_releases
if g['Basic_Amount'] is not None
}
unreleased = []
for inv in invoices:
match_by_invoice = inv['Invoice_No'] in gst_invoice_nos
match_by_gst_amount = float(
inv.get('GST_SD_Amount') or 0
) in gst_basic_amounts
if not (match_by_invoice or match_by_gst_amount):
unreleased.append(inv)
return unreleased
finally:
cursor.close()
connection.close()

90
model/HoldTypes.py Normal file
View File

@@ -0,0 +1,90 @@
from flask import request
from model.ItemCRUD import ItemCRUD
from model.Utilities import ItemCRUDType
class HoldTypes:
"""CRUD operations for Hold Types using ItemCRUD"""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ------------------- Add Hold Type -------------------
def AddHoldType(self, request):
hold = ItemCRUD(itemType=ItemCRUDType.HoldType)
hold_name = request.form.get('hold_type', '').strip()
hold.AddItem(
request=request,
parentid=None,
childname=hold_name,
storedprocfetch="CheckHoldTypeExists",
storedprocadd="SaveHoldType"
)
self.isSuccess = hold.isSuccess
self.resultMessage = hold.resultMessage
# ------------------- Get All Hold Types -------------------
def GetAllHoldTypes(self, request=None):
hold = ItemCRUD(itemType=ItemCRUDType.HoldType)
rows = hold.GetAllData(request=request, storedproc="GetAllHoldTypes")
self.isSuccess = hold.isSuccess
self.resultMessage = hold.resultMessage
# ✅ Convert tuple → dictionary
data = []
for row in rows:
data.append({
"hold_type_id": row[0],
"hold_type": row[1]
})
return data
# ------------------- Get Hold Type By ID -------------------
def GetHoldTypeByID(self, hold_type_id):
hold = ItemCRUD(itemType=ItemCRUDType.HoldType)
row = hold.GetDataByID(hold_type_id, storedproc="GetHoldTypesById")
self.isSuccess = hold.isSuccess
self.resultMessage = hold.resultMessage
# ✅ Convert tuple → dictionary
if row:
return {
"hold_type_id": row[0],
"hold_type": row[1]
}
return None
# ------------------- Update Hold Type -------------------
def EditHoldType(self, request, hold_type_id):
hold = ItemCRUD(itemType=ItemCRUDType.HoldType)
hold_name = request.form.get('hold_type', '').strip()
hold.EditItem(
request=request,
childid=hold_type_id,
parentid=None,
childname=hold_name,
storedprocupdate="UpdateHoldTypeById"
)
self.isSuccess = hold.isSuccess
self.resultMessage = hold.resultMessage
# ------------------- Delete Hold Type -------------------
def DeleteHoldType(self, request, hold_type_id):
hold = ItemCRUD(itemType=ItemCRUDType.HoldType)
hold.DeleteItem(
request=request,
itemID=hold_type_id,
storedprocDelete="DeleteHoldType"
)
self.isSuccess = hold.isSuccess
self.resultMessage = hold.resultMessage

379
model/Invoice.py Normal file
View File

@@ -0,0 +1,379 @@
import config
import mysql.connector
# ------------------- Helper -------------------
def clear_results(cursor):
for r in cursor.stored_results():
r.fetchall()
# ------------------- Get Village Id -------------------
def get_village_id(village_name):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetVillageIdByName", (village_name,))
village_result = None
for rs in cursor.stored_results():
village_result = rs.fetchone()
cursor.close()
connection.close()
return village_result
# ------------------- Insert Invoice -------------------
def insert_invoice(data, village_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
# 1. Insert Invoice
cursor.callproc('InsertInvoice', [
data.get('pmc_no'),
village_id,
data.get('work_type'),
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0)
])
invoice_id = None
for result in cursor.stored_results():
row = result.fetchone()
if row:
invoice_id = row.get('invoice_id')
if not invoice_id:
raise Exception("Invoice ID not returned")
# 2. Insert Inpayment
cursor.callproc('InsertInpayment', [
data.get('pmc_no'),
village_id,
data.get('work_type'),
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
data.get('subcontractor_id')
])
clear_results(cursor)
connection.commit()
return invoice_id
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Assign Subcontractor -------------------
def assign_subcontractor(data, village_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('AssignSubcontractor', [
data.get('pmc_no'),
data.get('subcontractor_id'),
village_id
])
clear_results(cursor)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Insert Hold Types -------------------
def insert_hold_types(data, invoice_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
hold_types = data.getlist('hold_type[]')
hold_amounts = data.getlist('hold_amount[]')
for hold_type, hold_amount in zip(hold_types, hold_amounts):
if not hold_type:
continue
cursor.callproc('GetHoldTypeIdByName', [hold_type])
hold_type_result = None
for result in cursor.stored_results():
hold_type_result = result.fetchone()
if not hold_type_result:
cursor.callproc('InsertHoldType', [hold_type, 0])
cursor.execute("SELECT @_InsertHoldType_1")
hold_type_id = cursor.fetchone()[0]
else:
hold_type_id = hold_type_result['hold_type_id']
hold_amount = float(hold_amount or 0)
cursor.callproc('InsertInvoiceSubcontractorHold', [
data.get('subcontractor_id'),
invoice_id,
hold_type_id,
hold_amount
])
clear_results(cursor)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Get All Invoices -------------------
def get_all_invoice_details():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetAllInvoiceDetails')
invoices = []
for result in cursor.stored_results():
invoices = result.fetchall()
cursor.close()
connection.close()
return invoices
# ------------------- Get All Villages -------------------
def get_all_villages():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetAllVillages")
villages = []
for result in cursor.stored_results():
villages = result.fetchall()
cursor.close()
connection.close()
return villages
# ------------------- Search Contractors -------------------
def search_contractors(sub_query):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc('SearchContractorsByName', [sub_query])
results = []
for result in cursor.stored_results():
results = result.fetchall()
cursor.close()
connection.close()
return results
# ------------------- Get All Hold Types -------------------
def get_all_hold_types():
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc("GetAllHoldTypes")
hold_types = []
for result in cursor.stored_results():
hold_types = result.fetchall()
cursor.close()
connection.close()
return hold_types
# ------------------- Get Invoice By Id -------------------
def get_invoice_by_id(invoice_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.callproc('GetInvoiceDetailsById', [invoice_id])
invoice = None
for result in cursor.stored_results():
invoice = result.fetchone()
cursor.callproc('GetHoldAmountsByInvoiceId', [invoice_id])
hold_amounts = []
for result in cursor.stored_results():
hold_amounts = result.fetchall()
if invoice:
invoice["hold_amounts"] = hold_amounts
cursor.close()
connection.close()
return invoice
# ------------------- Update Invoice -------------------
def update_invoice(data, invoice_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc("GetVillageIdByName", (data.get('village'),))
village = None
for rs in cursor.stored_results():
village = rs.fetchone()
village_id = village['Village_Id']
numeric = [
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
]
cursor.callproc('UpdateInvoice', [
data.get('pmc_no'),
village_id,
data.get('work_type'),
data.get('invoice_details'),
data.get('invoice_date'),
data.get('invoice_no'),
*numeric,
invoice_id
])
clear_results(cursor)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Update Inpayment -------------------
def update_inpayment(data):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
numeric = [
float(data.get('basic_amount') or 0),
float(data.get('debit_amount') or 0),
float(data.get('after_debit_amount') or 0),
float(data.get('amount') or 0),
float(data.get('gst_amount') or 0),
float(data.get('tds_amount') or 0),
float(data.get('sd_amount') or 0),
float(data.get('on_commission') or 0),
float(data.get('hydro_testing') or 0),
float(data.get('gst_sd_amount') or 0),
float(data.get('final_amount') or 0),
]
cursor.callproc('UpdateInpayment', [
data.get('work_type'),
data.get('invoice_details'),
data.get('invoice_date'),
*numeric,
data.get('pmc_no'),
data.get('invoice_no')
])
clear_results(cursor)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()
# ------------------- Delete Invoice -------------------
def delete_invoice_data(invoice_id, user_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
cursor.callproc('GetInvoicePMCById', [invoice_id])
record = {}
for result in cursor.stored_results():
record = result.fetchone() or {}
if not record:
raise Exception("Invoice not found")
cursor.callproc("DeleteInvoice", (invoice_id,))
clear_results(cursor)
cursor.callproc(
'DeleteInpaymentByPMCInvoice',
[record['PMC_No'], record['invoice_no']]
)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
connection.close()

359
model/ItemCRUD.py Normal file
View File

@@ -0,0 +1,359 @@
from flask_login import current_user
from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from model.Log import LogHelper
import config
import re
import mysql.connector
# ----------------------------------------------------------
# Mapping Class
# ----------------------------------------------------------
class itemCRUDMapping:
def __init__(self, itemType):
if itemType is ItemCRUDType.Village:
self.name = "Village"
elif itemType is ItemCRUDType.Block:
self.name = "Block"
elif itemType is ItemCRUDType.State:
self.name = "State"
elif itemType is ItemCRUDType.HoldType:
self.name = "Hold Type"
elif itemType is ItemCRUDType.Subcontractor:
self.name = "Subcontractor"
else:
self.name = "Item"
# ----------------------------------------------------------
# Generic CRUD Class
# ----------------------------------------------------------
class ItemCRUD:
def __init__(self, itemType):
self.isSuccess = False
self.resultMessage = ""
self.itemCRUDType = itemType
self.itemCRUDMapping = itemCRUDMapping(itemType)
# ----------------------------------------------------------
# DELETE
# ----------------------------------------------------------
def DeleteItem(self, request, itemID, storedprocDelete):
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action(
f"Delete {self.itemCRUDMapping.name}",
f"User {current_user.id} deleted {self.itemCRUDMapping.name} '{itemID}'"
)
try:
cursor.callproc(storedprocDelete, (itemID,))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.delete_success(self.itemCRUDMapping.name), 200
)
except mysql.connector.Error as e:
print(f"Error deleting {self.itemCRUDMapping.name}: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.delete_failure(self.itemCRUDMapping.name), 500
)
finally:
cursor.close()
connection.close()
# ----------------------------------------------------------
# ADD
# ----------------------------------------------------------
def AddItem(self, request, parentid=None, childname=None, storedprocfetch=None, storedprocadd=None, data=None):
connection = config.get_db_connection()
if not connection:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.db_connection_failure(), 500
)
return
cursor = connection.cursor()
LogHelper.log_action(
f"Add {self.itemCRUDMapping.name}",
f"User {current_user.id} adding '{childname if childname else (data.get('Contractor_Name') if data else '')}'"
)
try:
# ======================================================
# SUBCONTRACTOR (MULTI-FIELD)
# ======================================================
if data:
# Duplicate check
cursor.callproc(storedprocfetch, (data['Contractor_Name'],))
existing_item = None
for rs in cursor.stored_results():
existing_item = rs.fetchone()
if existing_item:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.already_exists(self.itemCRUDMapping.name), 409
)
return
# Insert
cursor.callproc(storedprocadd, (
data['Contractor_Name'],
data['Address'],
data['Mobile_No'],
data['PAN_No'],
data['Email'],
data['Gender'],
data['GST_Registration_Type'],
data['GST_No'],
data['Contractor_password']
))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.add_success(self.itemCRUDMapping.name), 200
)
return
# ======================================================
# NORMAL (Village / Block / State)
# ======================================================
if not re.match(RegEx.patternAlphabetOnly, childname):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400
)
return
# Duplicate check
if parentid is None:
cursor.callproc(storedprocfetch, (childname,))
else:
cursor.callproc(storedprocfetch, (childname, parentid))
existing_item = None
for rs in cursor.stored_results():
existing_item = rs.fetchone()
if existing_item:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.already_exists(self.itemCRUDMapping.name), 409
)
return
# Insert
if parentid is None:
cursor.callproc(storedprocadd, (childname,))
else:
cursor.callproc(storedprocadd, (childname, parentid))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.add_success(self.itemCRUDMapping.name), 200
)
except mysql.connector.Error as e:
print(f"Database Error: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.add_failure(self.itemCRUDMapping.name), 500
)
finally:
cursor.close()
connection.close()
# ----------------------------------------------------------
# EDIT
# ----------------------------------------------------------
def EditItem(self, request, childid, parentid=None, childname=None, storedprocupdate=None, data=None):
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action(
f"Edit {self.itemCRUDMapping.name}",
f"User {current_user.id} edited '{childid}'"
)
try:
# ======================================================
# SUBCONTRACTOR (MULTI-FIELD)
# ======================================================
if data:
cursor.callproc(storedprocupdate, (
childid,
data['Contractor_Name'],
data['Address'],
data['Mobile_No'],
data['PAN_No'],
data['Email'],
data['Gender'],
data['GST_Registration_Type'],
data['GST_No'],
data['Contractor_password']
))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.update_success(self.itemCRUDMapping.name), 200
)
return
# ======================================================
# NORMAL
# ======================================================
if not re.match(RegEx.patternAlphabetOnly, childname):
self.isSuccess = False
self.resultMessage = ResponseHandler.update_failure(self.itemCRUDMapping.name)['message']
return
if parentid is None:
cursor.callproc(storedprocupdate, (childid, childname))
else:
cursor.callproc(storedprocupdate, (childid, parentid, childname))
connection.commit()
self.isSuccess = True
self.resultMessage = ResponseHandler.update_success(self.itemCRUDMapping.name)['message']
except mysql.connector.Error as e:
print(f"Error updating {self.itemCRUDMapping.name}: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.update_failure(self.itemCRUDMapping.name), 500
)
finally:
cursor.close()
connection.close()
# ----------------------------------------------------------
# GET ALL
# ----------------------------------------------------------
def GetAllData(self, request, storedproc):
data = []
connection = config.get_db_connection()
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc(storedproc)
for result in cursor.stored_results():
data = result.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching {self.itemCRUDMapping.name}: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.fetch_failure(self.itemCRUDMapping.name), 500
)
return []
finally:
cursor.close()
connection.close()
return data
# ----------------------------------------------------------
# GET BY ID
# ----------------------------------------------------------
def GetDataByID(self, id, storedproc):
data = None
connection = config.get_db_connection()
cursor = connection.cursor()
try:
cursor.callproc(storedproc, (id,))
for rs in cursor.stored_results():
data = rs.fetchone()
except mysql.connector.Error as e:
print(f"Error fetching {self.itemCRUDMapping.name}: {e}")
finally:
cursor.close()
connection.close()
return data
# ----------------------------------------------------------
# CHECK ITEM
# ----------------------------------------------------------
def CheckItem(self, request, parentid, childname, storedprocfetch):
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action(
f"Check {self.itemCRUDMapping.name}",
f"User {current_user.id} checked '{childname}'"
)
if not re.match(RegEx.patternAlphabetOnly, childname):
return HtmlHelper.json_response(
ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400
)
try:
if parentid is None:
cursor.callproc(storedprocfetch, (childname,))
else:
cursor.callproc(storedprocfetch, (childname, parentid))
existing_item = None
for rs in cursor.stored_results():
existing_item = rs.fetchone()
if existing_item:
return HtmlHelper.json_response(
ResponseHandler.already_exists(self.itemCRUDMapping.name), 409
)
return HtmlHelper.json_response(
ResponseHandler.is_available(self.itemCRUDMapping.name), 200
)
except mysql.connector.Error as e:
print(f"Error checking {self.itemCRUDMapping.name}: {e}")
return HtmlHelper.json_response(
ResponseHandler.fetch_failure(self.itemCRUDMapping.name), 500
)
finally:
cursor.close()
connection.close()

456
model/PmcReport.py Normal file
View File

@@ -0,0 +1,456 @@
import openpyxl
from openpyxl.styles import Font, PatternFill
import config
from flask_login import current_user
from model.Log import LogHelper
from model.Report import ReportHelper
from model.FolderAndFile import FolderAndFile
class PmcReport:
@staticmethod
def get_pmc_report(pmc_no):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
try:
# cursor.callproc("GetContractorInfoByPmcNo", (pmc_no,))
# pmc_info = next(cursor.stored_results()).fetchone()
pmc_info = ReportHelper.execute_sp(cursor, 'GetContractorInfoByPmcNo', [pmc_no], True)
if not pmc_info:
return None
cursor.callproc("Get_pmc_hold_types", (pmc_no, pmc_info["Contractor_Id"]))
hold_types = next(cursor.stored_results()).fetchall()
# Extract hold_type_ids
hold_type_ids = [ht['hold_type_id'] for ht in hold_types]
invoices = []
hold_amount_total = 0
if hold_type_ids:
hold_type_ids_str = ",".join(map(str, hold_type_ids))
cursor.callproc(
'GetInvoices_WithHold',
[pmc_no, pmc_info["Contractor_Id"], hold_type_ids_str]
)
else:
cursor.callproc(
'GetInvoices_NoHold',
[pmc_no, pmc_info["Contractor_Id"]]
)
for result in cursor.stored_results():
invoices = result.fetchall()
if hold_type_ids:
hold_amount_total = sum(row.get('hold_amount', 0) or 0 for row in invoices)
total_invo_final = sum(row.get('Final_Amount', 0) or 0 for row in invoices)
# GST RELEASE
# cursor.callproc('GetGSTReleaseByPMC', [pmc_no])
# gst_rel = []
# for result in cursor.stored_results():
# gst_rel = result.fetchall()
gst_rel = ReportHelper.execute_sp(cursor, 'GetGSTReleaseByPMC', [pmc_no])
total_gst_basic = sum(row.get('basic_amount', 0) or 0 for row in gst_rel)
total_gst_final = sum(row.get('final_amount', 0) or 0 for row in gst_rel)
# ---------------- HOLD RELEASE ----------------
# cursor.callproc('GetHoldReleaseByPMC', [pmc_no])
# hold_release = []
# for result in cursor.stored_results():
# hold_release = result.fetchall()
hold_release = ReportHelper.execute_sp(cursor, 'GetHoldReleaseByPMC', [pmc_no])
# ---------------- CREDIT NOTE ----------------
# cursor.callproc('GetCreditNoteByPMC', [pmc_no])
# credit_note = []
# for result in cursor.stored_results():
# credit_note = result.fetchall()
credit_note = ReportHelper.execute_sp(cursor, 'GetCreditNoteByPMC', [pmc_no])
payments = ReportHelper.execute_sp(cursor, 'GetPaymentsByPMC', [pmc_no])
# ---------------- PAYMENTS ----------------
# cursor.callproc('GetPaymentsByPMC', [pmc_no])
# payments = []
# for result in cursor.stored_results():
# payments = result.fetchall()
total_pay_amount = sum(row.get('Payment_Amount', 0) or 0 for row in payments)
total_pay_total = sum(row.get('Total_amount', 0) or 0 for row in payments)
totals = {
"sum_invo_basic_amt": sum(row.get('Basic_Amount', 0) or 0 for row in invoices),
"sum_invo_debit_amt": sum(row.get('Debit_Amount', 0) or 0 for row in invoices),
"sum_invo_after_debit_amt": sum(row.get('After_Debit_Amount', 0) or 0 for row in invoices),
"sum_invo_amt": sum(row.get('Amount', 0) or 0 for row in invoices),
"sum_invo_gst_amt": sum(row.get('GST_Amount', 0) or 0 for row in invoices),
"sum_invo_tds_amt": sum(row.get('TDS_Amount', 0) or 0 for row in invoices),
"sum_invo_ds_amt": sum(row.get('SD_Amount', 0) or 0 for row in invoices),
"sum_invo_on_commission": sum(row.get('On_Commission', 0) or 0 for row in invoices),
"sum_invo_hydro_test": sum(row.get('Hydro_Testing', 0) or 0 for row in invoices),
"sum_invo_gst_sd_amt": sum(row.get('GST_SD_Amount', 0) or 0 for row in invoices),
"sum_invo_final_amt": total_invo_final,
"sum_invo_hold_amt": hold_amount_total,
"sum_gst_basic_amt": total_gst_basic,
"sum_gst_final_amt": total_gst_final,
"sum_pay_payment_amt": total_pay_amount,
"sum_pay_tds_payment_amt": sum(row.get('TDS_Payment_Amount', 0) or 0 for row in payments),
"sum_pay_total_amt": total_pay_total
}
return {
"info": pmc_info,
"invoices": invoices,
"hold_types": hold_types,
"gst_rel": gst_rel,
"payments": payments,
"credit_note": credit_note,
"hold_release": hold_release,
"total": totals
}
finally:
cursor.close()
connection.close()
@staticmethod
def download_pmc_report(pmc_no):
connection = config.get_db_connection()
if not connection:
return None
cursor = connection.cursor(dictionary=True)
try:
# filename
filename = f"PMC_Report_{pmc_no}.xlsx"
output_folder = FolderAndFile.get_download_folder()
output_file = FolderAndFile.get_download_path(filename)
# ================= DATA FETCH =================
contractor_info = ReportHelper.execute_sp(cursor, 'GetContractorDetailsByPMC', [pmc_no], "one")
if not contractor_info:
return None
hold_types = ReportHelper.execute_sp(cursor, 'GetHoldTypesByContractor', [contractor_info["Contractor_Id"]])
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
invoices = ReportHelper.execute_sp(cursor, 'GetInvoicesAndGstReleaseByPmcNo', [pmc_no])
credit_notes = ReportHelper.execute_sp(cursor, 'GetCreditNoteByContractor', [contractor_info["Contractor_Id"]])
hold_amounts = ReportHelper.execute_sp(cursor, 'GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]])
all_payments = ReportHelper.execute_sp(cursor, 'GetAllPaymentsByPMC', [pmc_no])
gst_releases = ReportHelper.execute_sp(cursor, 'GetGSTReleaseDetailsByPMC', [pmc_no])
# ================= DATA MAPPING =================
hold_data = {}
for h in hold_amounts:
hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
payments_map = {}
for pay in all_payments:
if pay['invoice_no']:
payments_map.setdefault(pay['invoice_no'], []).append(pay)
# ================= LOG =================
LogHelper.log_action(
"Download PMC Report",
f"User {current_user.id} Download PMC Report '{pmc_no}'"
)
# ================= EXCEL =================
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = "PMC Report"
# HEADER INFO
sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD."])
sheet.append(["Contractor Name", contractor_info["Contractor_Name"]])
sheet.append(["State", contractor_info["State_Name"]])
sheet.append(["District", contractor_info["District_Name"]])
sheet.append(["Block", contractor_info["Block_Name"]])
sheet.append([])
base_headers = [
"PMC No","Village","Work Type","Invoice Details","Invoice Date","Invoice No",
"Basic Amount","Debit","After Debit Amount","GST","Amount","TDS",
"SD","On Commission","Hydro Testing","GST SD Amount"
]
hold_headers = [ht['hold_type'] for ht in hold_types]
payment_headers = [
"Final Amount","Payment Amount","TDS Payment","Total Paid","UTR"
]
headers = base_headers + hold_headers + payment_headers
sheet.append(headers)
# STYLE
for cell in sheet[sheet.max_row]:
cell.font = Font(bold=True)
# DATA
seen_invoices = set()
for inv in invoices:
invoice_no = inv["Invoice_No"]
payments = payments_map.get(invoice_no, [])
if invoice_no in seen_invoices:
continue
seen_invoices.add(invoice_no)
first_payment = payments[0] if payments else None
row = [
pmc_no,
inv["Village_Name"],
inv["Work_Type"],
inv["Invoice_Details"],
inv["Invoice_Date"],
invoice_no,
inv["Basic_Amount"],
inv["Debit_Amount"],
inv["After_Debit_Amount"],
inv["GST_Amount"],
inv["Amount"],
inv["TDS_Amount"],
inv["SD_Amount"],
inv["On_Commission"],
inv["Hydro_Testing"],
inv["GST_SD_Amount"]
]
# HOLD DATA
invoice_holds = hold_data.get(inv["Invoice_Id"], {})
for ht_id in hold_type_map.keys():
row.append(invoice_holds.get(ht_id, ""))
# PAYMENT DATA
row += [
inv["Final_Amount"],
first_payment["Payment_Amount"] if first_payment else "",
first_payment["TDS_Payment_Amount"] if first_payment else "",
first_payment["Total_amount"] if first_payment else "",
first_payment["UTR"] if first_payment else ""
]
sheet.append(row)
# AUTO WIDTH
for col in sheet.columns:
max_len = max((len(str(cell.value)) for cell in col if cell.value), default=0)
sheet.column_dimensions[col[0].column_letter].width = max_len + 2
# SAVE
workbook.save(output_file)
workbook.close()
return output_folder, filename
except Exception as e:
print(f"Error generating PMC report: {e}")
return None
finally:
cursor.close()
connection.close()
# @staticmethod
# def download_pmc_report(pmc_no):
# connection = config.get_db_connection()
# cursor = connection.cursor(dictionary=True)
# # output_folder = "static/download"
# # output_file = os.path.join(output_folder, f"PMC_Report_{pmc_no}.xlsx")
# output_folder = FolderAndFile.get_download_folder
# filename = f"PMC_Report_{pmc_no}.xlsx"
# output_file = FolderAndFile.get_download_path(filename)
# try:
# cursor.callproc('GetContractorDetailsByPMC', [pmc_no])
# contractor_info = next(cursor.stored_results()).fetchone()
# if not contractor_info:
# return None
# cursor.callproc('GetHoldTypesByContractor', [contractor_info["Contractor_Id"]])
# hold_types = next(cursor.stored_results()).fetchall()
# hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
# cursor.callproc('GetInvoicesAndGstReleaseByPmcNo', [pmc_no])
# invoices = next(cursor.stored_results()).fetchall()
# cursor.callproc('GetCreditNoteByContractor',[contractor_info["Contractor_Id"]])
# credit_notes = []
# for result in cursor.stored_results():
# credit_notes = result.fetchall()
# credit_note_map = {}
# for cn in credit_notes:
# key = (cn["PMC_No"], cn["Invoice_No"])
# credit_note_map.setdefault(key, []).append(cn)
# cursor.callproc('GetHoldAmountsByContractor', [contractor_info["Contractor_Id"]])
# hold_amounts = next(cursor.stored_results()).fetchall()
# hold_data = {}
# for h in hold_amounts:
# hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
# cursor.callproc('GetAllPaymentsByPMC', [pmc_no])
# all_payments = next(cursor.stored_results()).fetchall()
# payments_map = {}
# extra_payments = []
# for pay in all_payments:
# if pay['invoice_no']:
# payments_map.setdefault(pay['invoice_no'], []).append(pay)
# else:
# extra_payments.append(pay)
# # ---------------- GST RELEASE DETAILS ----------------
# cursor.callproc('GetGSTReleaseDetailsByPMC', [pmc_no])
# gst_releases = []
# for result in cursor.stored_results():
# gst_releases = result.fetchall()
# gst_release_map = {}
# for gr in gst_releases:
# invoice_nos = []
# if gr['Invoice_No']:
# cleaned = gr['Invoice_No'].replace(' ', '')
# if '&' in cleaned:
# invoice_nos = cleaned.split('&')
# elif ',' in cleaned:
# invoice_nos = cleaned.split(',')
# else:
# invoice_nos = [cleaned]
# for inv_no in invoice_nos:
# gst_release_map.setdefault(inv_no, []).append(gr)
# LogHelper.log_action(
# "Download PMC Report",
# f"User {current_user.id} Download PMC Report '{pmc_no}'"
# )
# workbook = openpyxl.Workbook()
# sheet = workbook.active
# sheet.title = "PMC Report"
# sheet.append(["", "", "Laxmi Civil Engineering Services PVT. LTD."])
# sheet.append(["Contractor Name", contractor_info["Contractor_Name"], "", "GST No", contractor_info["GST_No"], "", "GST Type", contractor_info["GST_Registration_Type"]])
# sheet.append(["State", contractor_info["State_Name"], "", "PAN No", contractor_info["PAN_No"], "", "Address", contractor_info["Address"]])
# sheet.append(["District", contractor_info["District_Name"], "", "Mobile No", contractor_info["Mobile_No"]])
# sheet.append(["Block", contractor_info["Block_Name"], "", "Email", contractor_info["Email"]])
# sheet.append([])
# base_headers = [
# "PMC No","Village","Work Type","Invoice Details","Invoice Date","Invoice No",
# "Basic Amount","Debit","After Debit Amount","GST (18%)","Amount","TDS (1%)",
# "SD (5%)","On Commission","Hydro Testing","GST SD Amount"
# ]
# hold_headers = [ht['hold_type'] for ht in hold_types]
# payment_headers = [
# "Final Amount","Payment Amount","TDS Payment","Total Paid","UTR"
# ]
# sheet.append(base_headers + hold_headers + payment_headers)
# header_fill = PatternFill(start_color="ADD8E6",end_color="ADD8E6",fill_type="solid")
# header_font = Font(bold=True)
# for cell in sheet[sheet.max_row]:
# cell.font = header_font
# cell.fill = header_fill
# seen_invoices = set()
# processed_payments = set()
# for inv in invoices:
# invoice_no = inv["Invoice_No"]
# payments = payments_map.get(invoice_no, [])
# if invoice_no not in seen_invoices:
# seen_invoices.add(invoice_no)
# first_payment = payments[0] if payments else None
# row = [
# pmc_no, inv["Village_Name"], inv["Work_Type"],
# inv["Invoice_Details"], inv["Invoice_Date"], invoice_no,
# inv["Basic_Amount"], inv["Debit_Amount"],
# inv["After_Debit_Amount"], inv["GST_Amount"],
# inv["Amount"], inv["TDS_Amount"], inv["SD_Amount"],
# inv["On_Commission"], inv["Hydro_Testing"], inv["GST_SD_Amount"]
# ]
# invoice_holds = hold_data.get(inv["Invoice_Id"], {})
# for ht_id in hold_type_map.keys():
# row.append(invoice_holds.get(ht_id, ""))
# row += [
# inv["Final_Amount"],
# first_payment["Payment_Amount"] if first_payment else "",
# first_payment["TDS_Payment_Amount"] if first_payment else "",
# first_payment["Total_amount"] if first_payment else "",
# first_payment["UTR"] if first_payment else ""
# ]
# sheet.append(row)
# workbook.save(output_file)
# workbook.close()
# return output_folder, filename
# finally:
# cursor.close()
# connection.close()

275
model/Report.py Normal file
View File

@@ -0,0 +1,275 @@
import config
from datetime import datetime
from flask import send_file
import openpyxl
from openpyxl.styles import Font
from model.FolderAndFile import FolderAndFile
class ReportHelper:
isSuccess = False
resultMessage = ""
data=[]
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
self.data = []
@staticmethod
def execute_sp(cursor, proc_name, params=[], fetch_one=False):
cursor.callproc(proc_name, params)
return (
ReportHelper.fetch_one_result(cursor)
if fetch_one else
ReportHelper.fetch_all_results(cursor)
)
@staticmethod
def fetch_all_results(cursor):
data = []
for result in cursor.stored_results():
data = result.fetchall()
return data
@staticmethod
def fetch_one_result(cursor):
data = None
for result in cursor.stored_results():
data = result.fetchone()
return data
@staticmethod
def search_contractor(request):
subcontractor_name = request.form.get("subcontractor_name")
pmc_no = request.form.get("pmc_no")
state = request.form.get("state")
district = request.form.get("district")
block = request.form.get("block")
village = request.form.get("village")
year_from = request.form.get("year_from")
year_to = request.form.get("year_to")
connection = config.get_db_connection()
if not connection:
return []
cursor = connection.cursor(dictionary=True)
try:
data = ReportHelper.execute_sp(
cursor,
"search_contractor_info",
[
subcontractor_name or None,
pmc_no or None,
state or None,
district or None,
block or None,
village or None,
year_from or None,
year_to or None
]
)
except Exception as e:
print(f"Error in search_contractor: {e}")
data = []
finally:
cursor.close()
connection.close()
return data
@staticmethod
def get_contractor_report(contractor_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
try:
# Contractor Info (only one fetch)
contInfo = ReportHelper.execute_sp(cursor, 'GetContractorInfo', [contractor_id], True)
# Hold Types
hold_types = ReportHelper.execute_sp(cursor, 'GetContractorHoldTypes', [contractor_id])
# Invoices
invoices = ReportHelper.execute_sp(cursor, 'GetContractorInvoices', [contractor_id])
# GST Release
gst_rel = ReportHelper.execute_sp(cursor, 'GetGSTRelease', [contractor_id])
# Hold Release
hold_release = ReportHelper.execute_sp(cursor, 'GetHoldRelease', [contractor_id])
# Credit Note
credit_note = ReportHelper.execute_sp(cursor, 'GetCreditNote', [contractor_id])
# Payments
payments = ReportHelper.execute_sp(cursor, 'GetPayments', [contractor_id])
# Totals
total = {
"sum_invo_basic_amt": float(sum(row['Basic_Amount'] or 0 for row in invoices)),
"sum_invo_debit_amt": float(sum(row['Debit_Amount'] or 0 for row in invoices)),
"sum_invo_after_debit_amt": float(sum(row['After_Debit_Amount'] or 0 for row in invoices)),
"sum_invo_amt": float(sum(row['Amount'] or 0 for row in invoices)),
"sum_invo_gst_amt": float(sum(row['GST_Amount'] or 0 for row in invoices)),
"sum_invo_tds_amt": float(sum(row['TDS_Amount'] or 0 for row in invoices)),
"sum_invo_ds_amt": float(sum(row['SD_Amount'] or 0 for row in invoices)),
"sum_invo_on_commission": float(sum(row['On_Commission'] or 0 for row in invoices)),
"sum_invo_hydro_test": float(sum(row['Hydro_Testing'] or 0 for row in invoices)),
"sum_invo_gst_sd_amt": float(sum(row['GST_SD_Amount'] or 0 for row in invoices)),
"sum_invo_final_amt": float(sum(row['Final_Amount'] or 0 for row in invoices)),
"sum_invo_hold_amt": float(sum(row['hold_amount'] or 0 for row in invoices)),
"sum_gst_basic_amt": float(sum(row['basic_amount'] or 0 for row in gst_rel)),
"sum_gst_final_amt": float(sum(row['final_amount'] or 0 for row in gst_rel)),
"sum_pay_payment_amt": float(sum(row['Payment_Amount'] or 0 for row in payments)),
"sum_pay_tds_payment_amt": float(sum(row['TDS_Payment_Amount'] or 0 for row in payments)),
"sum_pay_total_amt": float(sum(row['Total_amount'] or 0 for row in payments))
}
current_date = datetime.now().strftime('%Y-%m-%d')
finally:
cursor.close()
connection.close()
return {
"contInfo": contInfo,
"invoices": invoices,
"hold_types": hold_types,
"gst_rel": gst_rel,
"payments": payments,
"credit_note": credit_note,
"hold_release": hold_release,
"total": total,
"current_date": current_date
}
@staticmethod
def download_report(contractor_id):
try:
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True)
# -------- Contractor Info --------
contInfo = ReportHelper.execute_sp(cursor, 'GetContractorInfo', [contractor_id], True)
if not contInfo:
return "No contractor found", 404
# -------- Invoice Data --------
cursor.callproc('FetchInvoicesByContractor', [contractor_id])
invoices = []
for result in cursor.stored_results():
invoices.extend(result.fetchall())
if not invoices:
return "No invoice data found"
# -------- Create Workbook --------
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = "Contractor Report"
# ================= CONTRACTOR DETAILS =================
sheet.append(["SUB CONTRACTOR DETAILS"])
sheet.cell(row=sheet.max_row, column=1).font = Font(bold=True)
sheet.append([])
sheet.append(["Name", contInfo.get("Contractor_Name") or ""])
sheet.append(["Mobile No", contInfo.get("Mobile_No") or ""])
sheet.append(["Email", contInfo.get("Email") or ""])
sheet.append(["Village", contInfo.get("Village_Name") or ""])
sheet.append(["Block", contInfo.get("Block_Name") or ""])
sheet.append(["District", contInfo.get("District_Name") or ""])
sheet.append(["State", contInfo.get("State_Name") or ""])
sheet.append(["Address", contInfo.get("Address") or ""])
sheet.append(["GST No", contInfo.get("GST_No") or ""])
sheet.append(["PAN No", contInfo.get("PAN_No") or ""])
sheet.append([])
sheet.append([])
# ================= TABLE HEADERS =================
headers = [
"PMC No", "Village", "Invoice No", "Invoice Date", "Work Type","Invoice_Details",
"Basic Amount", "Debit Amount", "After Debit Amount",
"Amount", "GST Amount", "TDS Amount", "SD Amount",
"On Commission", "Hydro Testing", "Hold Amount",
"GST SD Amount", "Final Amount",
"Payment Amount", "TDS Payment",
"Total Amount", "UTR"
]
sheet.append(headers)
for col in range(1, len(headers) + 1):
sheet.cell(row=sheet.max_row, column=col).font = Font(bold=True)
# ================= DATA =================
total_final = 0
total_payment = 0
total_amount = 0
for inv in invoices:
row = [
inv.get("PMC_No"),
inv.get("Village_Name"),
inv.get("invoice_no"),
inv.get("Invoice_Date"),
inv.get("Work_Type"),
inv.get("Invoice_Details"),
inv.get("Basic_Amount"),
inv.get("Debit_Amount"),
inv.get("After_Debit_Amount"),
inv.get("Amount"),
inv.get("GST_Amount"),
inv.get("TDS_Amount"),
inv.get("SD_Amount"),
inv.get("On_Commission"),
inv.get("Hydro_Testing"),
inv.get("Hold_Amount"),
inv.get("GST_SD_Amount"),
inv.get("Final_Amount"),
inv.get("Payment_Amount"),
inv.get("TDS_Payment_Amount"),
inv.get("Total_Amount"),
inv.get("UTR")
]
total_final += float(inv.get("Final_Amount") or 0)
total_payment += float(inv.get("Payment_Amount") or 0)
total_amount += float(inv.get("Total_Amount") or 0)
sheet.append(row)
# ================= TOTAL ROW =================
sheet.append([])
sheet.append([
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "",
"TOTAL",
total_final,
total_payment,
"",
total_amount,
""
])
# ================= AUTO WIDTH =================
for column in sheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
if cell.value:
max_length = max(max_length, len(str(cell.value)))
sheet.column_dimensions[column_letter].width = max_length + 2
# ================= SAVE FILE =================
filename = f"Contractor_Report_{contInfo.get('Contractor_Name')}.xlsx"
output_file = FolderAndFile.get_download_path(filename)
workbook.save(output_file)
return send_file(output_file, as_attachment=True)
except Exception as e:
return str(e)

168
model/State.py Normal file
View File

@@ -0,0 +1,168 @@
from flask import request, redirect, url_for
from flask_login import current_user
from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from model.Log import LogHelper
from model.ItemCRUD import ItemCRUD
import config
import re
import mysql.connector
class State:
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ----------------------------------------------------------
# ADD STATE (USING ITEM CRUD)
# ----------------------------------------------------------
def AddState(self, request):
state_name = request.form['state_Name'].strip()
crud = ItemCRUD(ItemCRUDType.State)
crud.AddItem(
request=request,
childname=state_name,
storedprocfetch="CheckStateExists",
storedprocadd="SaveState"
)
self.isSuccess = crud.isSuccess
self.resultMessage = crud.resultMessage
return self.resultMessage
# ----------------------------------------------------------
# GET ALL STATES (NO CHANGE - THIS IS CORRECT)
# ----------------------------------------------------------
def GetAllStates(self, request):
connection = config.get_db_connection()
data = []
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc("GetAllStates")
for res in cursor.stored_results():
data = res.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching states: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
ResponseHandler.fetch_failure("state"), 500
)
return []
finally:
cursor.close()
connection.close()
return data
# ----------------------------------------------------------
# CHECK STATE (USING ITEM CRUD)
# ----------------------------------------------------------
def CheckState(self, request):
state_name = request.json.get('state_Name', '').strip()
crud = ItemCRUD(ItemCRUDType.State)
return crud.CheckItem(
request=request,
parentid=None,
childname=state_name,
storedprocfetch="CheckStateExists"
)
# ----------------------------------------------------------
# DELETE STATE (USING ITEM CRUD)
# ----------------------------------------------------------
def DeleteState(self, request, id):
crud = ItemCRUD(ItemCRUDType.State)
crud.DeleteItem(
request=request,
itemID=id,
storedprocDelete="DeleteState"
)
self.isSuccess = crud.isSuccess
self.resultMessage = crud.resultMessage
return self.resultMessage
# ----------------------------------------------------------
# EDIT STATE (USING ITEM CRUD)
# ----------------------------------------------------------
def EditState(self, request, id):
state_name = request.form['state_Name'].strip()
crud = ItemCRUD(ItemCRUDType.State)
crud.EditItem(
request=request,
childid=id,
parentid=None,
childname=state_name,
storedprocupdate="UpdateStateById"
)
self.isSuccess = crud.isSuccess
self.resultMessage = crud.resultMessage
return redirect(url_for('state.add_state'))
# ----------------------------------------------------------
# GET STATE BY ID (KEEP SAME)
# ----------------------------------------------------------
def GetStateByID(self, request, id):
connection = config.get_db_connection()
data = None
if not connection:
return None
cursor = connection.cursor()
try:
cursor.callproc("GetStateByID", (id,))
for res in cursor.stored_results():
data = res.fetchone()
if data:
self.isSuccess = True
self.resultMessage = "Success"
else:
self.isSuccess = False
self.resultMessage = "Not Found"
except mysql.connector.Error as e:
print(f"Error fetching state: {e}")
self.isSuccess = False
finally:
cursor.close()
connection.close()
return data

140
model/Subcontractor.py Normal file
View File

@@ -0,0 +1,140 @@
from model.Utilities import ItemCRUDType
from model.ItemCRUD import ItemCRUD
class Subcontractor:
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ----------------------------------------------------------
# ADD
# ----------------------------------------------------------
def AddSubcontractor(self, request):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
data = {
"Contractor_Name": request.form.get('Contractor_Name', '').strip(),
"Address": request.form.get('Address', '').strip(),
"Mobile_No": request.form.get('Mobile_No', '').strip(),
"PAN_No": request.form.get('PAN_No', '').strip(),
"Email": request.form.get('Email', '').strip(),
"Gender": request.form.get('Gender', '').strip(),
"GST_Registration_Type": request.form.get('GST_Registration_Type', '').strip(),
"GST_No": request.form.get('GST_No', '').strip(),
"Contractor_password": request.form.get('Contractor_password', '').strip()
}
subcontractor.AddItem(
request=request,
data=data,
storedprocfetch="GetSubcontractorByName",
storedprocadd="SaveContractor"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return
# ----------------------------------------------------------
# GET ALL
# ----------------------------------------------------------
def GetAllSubcontractors(self, request):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
data = subcontractor.GetAllData(
request=request,
storedproc="GetAllSubcontractors"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return data
# ----------------------------------------------------------
# GET BY ID
# ----------------------------------------------------------
def GetSubcontractorByID(self, id):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
data = subcontractor.GetDataByID(
id=id,
storedproc="GetSubcontractorById"
)
if data:
self.isSuccess = True
else:
self.isSuccess = False
self.resultMessage = "Subcontractor not found"
return data
# ----------------------------------------------------------
# CHECK (Duplicate)
# ----------------------------------------------------------
def CheckSubcontractor(self, request):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
name = request.form.get('Contractor_Name', '').strip()
result = subcontractor.CheckItem(
request=request,
childname=name,
storedprocfetch="GetSubcontractorByName"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return result
# ----------------------------------------------------------
# EDIT
# ----------------------------------------------------------
def EditSubcontractor(self, request, subcontractor_id):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
data = {
"Contractor_Name": request.form.get('Contractor_Name', '').strip(),
"Address": request.form.get('Address', '').strip(),
"Mobile_No": request.form.get('Mobile_No', '').strip(),
"PAN_No": request.form.get('PAN_No', '').strip(),
"Email": request.form.get('Email', '').strip(),
"Gender": request.form.get('Gender', '').strip(),
"GST_Registration_Type": request.form.get('GST_Registration_Type', '').strip(),
"GST_No": request.form.get('GST_No', '').strip(),
"Contractor_password": request.form.get('Contractor_password', '').strip()
}
subcontractor.EditItem(
request=request,
childid=subcontractor_id,
data=data,
storedprocupdate="UpdateSubcontractor"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return
# ----------------------------------------------------------
# DELETE
# ----------------------------------------------------------
def DeleteSubcontractor(self, request, subcontractor_id):
subcontractor = ItemCRUD(itemType=ItemCRUDType.Subcontractor)
subcontractor.DeleteItem(
request=request,
itemID=subcontractor_id,
storedprocDelete="DeleteSubcontractor"
)
self.isSuccess = subcontractor.isSuccess
self.resultMessage = subcontractor.resultMessage
return

View File

@@ -6,7 +6,9 @@ class ItemCRUDType(Enum):
Block = 2
District = 3
State = 4
HoldType = 5
Subcontractor = 6
class RegEx:
patternAlphabetOnly = "^[A-Za-z ]+$"

View File

@@ -1,15 +1,15 @@
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from AppCode.Log import LogData, LogHelper
from model.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from model.Log import LogData, LogHelper
import config
import mysql.connector
from mysql.connector import Error
from AppCode.ItemCRUD import ItemCRUD, itemCRUDMapping
from model.ItemCRUD import ItemCRUD
class Village:
@@ -60,25 +60,36 @@ class Village:
return
def EditVillage(self, request, village_id):
corsor=None
village = ItemCRUD(itemType=ItemCRUDType.Village)
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
village.EditItem(request=request, childid=village_id, parentid=block_id, childname=village_name, storedprocadd="UpdateVillage" )
village.EditItem(request=request,childid=village_id,parentid=block_id,childname=village_name,storedprocupdate="UpdateVillage" )
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return
# def GetVillageByID(self, request, id):
# village = ItemCRUD(itemType=ItemCRUDType.Village)
# villagedetailsdata = village.GetAllData(request=request, storedproc="GetVillageDetailsById")
# self.isSuccess = village.isSuccess
# self.resultMessage = village.resultMessage
# return villagedetailsdata
def GetVillageByID(self, request, id):
village = ItemCRUD(itemType=ItemCRUDType.Village)
villagedetailsdata = village.GetAllData(request=request, storedproc="GetVillageDetailsById")
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return villagedetailsdata
villagedetailsdata = village.GetDataByID(id=id,storedproc="GetVillageDetailsById")
if villagedetailsdata:
self.isSuccess = True
else:
self.isSuccess = False
self.resultMessage = "Village not found"
return villagedetailsdata
def GetAllBlocks(self, request):

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More