new version 2 code updated..

This commit is contained in:
2025-11-25 14:14:10 +05:30
parent c43f6d41ae
commit 0fac9a2ff8
19 changed files with 3628 additions and 1453 deletions

54
AppCode/Auth.py Normal file
View File

@@ -0,0 +1,54 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask_login import LoginManager, UserMixin
from logging.handlers import RotatingFileHandler
from ldap3 import Server, Connection, ALL, SUBTREE
from ldap3 import Server, Connection, ALL
from ldap3.core.exceptions import LDAPBindError
class DefaultCredentials:
username = 'admin'
password = 'admin123'
class LoginLDAP:
def __init__(self, request):
self.username = request.form['username'].strip()
self.password = request.form['password']
self.isDefaultCredentials = False
self.isValidLogin = False
self.errorMessage = ""
ldap_user_dn = f"uid={self.username},ou=users,dc=lcepl,dc=org"
ldap_server = 'ldap://localhost:389'
#Need to re-factor further
# Static fallback user
if self.username == DefaultCredentials.username and self.password == DefaultCredentials.password:
self.isDefaultCredentials = True
self.isValidLogin = True
return
try:
# LDAP authentication
conn = Connection(
Server(self.ldap_server, get_info=ALL),
user=self.ldap_user_dn,
password=self.password,
auto_bind=True
)
self.isValidLogin = True
return
except LDAPBindError:
self.errorMessage = "Invalid credentials."
except Exception as e:
self.errorMessage = str(e)
class User(UserMixin):
def __init__(self, id):
self.id = id

108
AppCode/Block.py Normal file
View File

@@ -0,0 +1,108 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from AppCode.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
from mysql.connector import Error
from AppCode.ItemCRUD import ItemCRUD, itemCRUDMapping
class Block:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ----------------------------------------------------------
# Add Block
# ----------------------------------------------------------
def AddBlock(self, request):
block = ItemCRUD(itemType=ItemCRUDType.Block)
district_id = request.form.get('district_Id')
block_name = request.form.get('block_Name', '').strip()
block.AddItem(request=request, parentid=district_id, childname=block_name, storedprocfetch="GetVillageByNameAndBlock", storedprocadd="SaveVillage" )
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return
# ----------------------------------------------------------
# Get All Blocks
# ----------------------------------------------------------
def GetAllBlocks(self):
block = ItemCRUD(itemType=ItemCRUDType.Block)
blocksdata = block.GetAllData(request=request, storedproc="GetAllBlock")
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return blocksdata
# ----------------------------------------------------------
# Check Block Exists
# ----------------------------------------------------------
def CheckBlock(self, request):
block = ItemCRUD(itemType=ItemCRUDType.Block)
block_name = request.json.get('block_Name', '').strip()
district_id = request.json.get('district_Id')
result = block.CheckItem(request=request, parentid=district_id, childname=block_name, storedprocfetch="GetBlockByNameAndDistrict")
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return result
# ----------------------------------------------------------
# Get Block By ID
# ----------------------------------------------------------
def GetBlockByID(self, id):
block = ItemCRUD(itemType=ItemCRUDType.Village)
blockdata = block.GetAllData("GetBlockDataByID")
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return blockdata
# ----------------------------------------------------------
# Update Block
# ----------------------------------------------------------
def EditBlock(self, request, block_id):
block = ItemCRUD(itemType=ItemCRUDType.Block)
district_id = request.form.get('district_Id')
block_name = request.form.get('block_Name', '').strip()
block.EditItem(request=request, childid=block_id, parentid=district_id, childname=block_name, storedprocadd="UpdateBlockById" )
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return
# ----------------------------------------------------------
# Delete Block
# ----------------------------------------------------------
def DeleteBlock(self, id):
block = ItemCRUD(itemType=ItemCRUDType.Block)
block.DeleteItem(request=request, itemID=id, storedprocDelete="DeleteBlock" )
self.isSuccess = block.isSuccess
self.resultMessage = block.resultMessage
return

76
AppCode/ContractorInfo.py Normal file
View File

@@ -0,0 +1,76 @@
import mysql.connector
from mysql.connector import Error
import config
import openpyxl
import os
import re
import ast
from datetime import datetime
class ContractorInfo:
ID = ""
contInfo = None
def __init__(self, id):
self.ID = id
print(id)
self.fetchData()
def fetchData(self):
try:
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
print("here", flush=True)
cursor.callproc('GetContractorInfoById', [self.contInfo])
#self.contInfo = next(cursor.stored_results()).fetchone()
self.contInfo = cursor.fetchone()
print(self.contInfo,flush=True)
finally:
cursor.close()
connection.close()
def fetchalldata(self):
try:
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
print("here", flush=True)
# ---------------- Hold Types ----------------
cursor.execute("""
SELECT DISTINCT ht.hold_type_id, ht.hold_type
FROM invoice_subcontractor_hold_join h
JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id
WHERE h.Contractor_Id = %s
""", (self.contractor_id,))
hold_types = cursor.fetchall()
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
# ---------------- Invoices ----------------
cursor.execute("""
SELECT i.*, v.Village_Name
FROM assign_subcontractors asg
INNER JOIN invoice i ON i.PMC_No = asg.PMC_No AND i.Village_Id = asg.Village_Id
LEFT JOIN villages v ON i.Village_Id = v.Village_Id
WHERE asg.Contractor_Id = %s
ORDER BY i.PMC_No, i.Invoice_No
""", (self.contractor_id,))
invoices = cursor.fetchall()
# Remove duplicate invoices
invoice_ids_seen = set()
unique_invoices = []
for inv in invoices:
if inv["Invoice_Id"] not in invoice_ids_seen:
invoice_ids_seen.add(inv["Invoice_Id"])
unique_invoices.append(inv)
invoices = unique_invoices
finally:
cursor.close()
connection.close()

79
AppCode/District.py Normal file
View File

@@ -0,0 +1,79 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from AppCode.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
from mysql.connector import Error
from AppCode.ItemCRUD import ItemCRUD
class District:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
def EditDistrict(self, request, district_id):
district = ItemCRUD(itemType=ItemCRUDType.District)
district_name = request.form['district_Name'].strip()
state_id = request.form['state_Id']
district.EditItem(request=request, childid=district_id, parentid=state_id, childname=district_name, storedprocadd="UpdateBlockById" )
self.isSuccess = district.isSuccess
self.resultMessage = district.resultMessage
return
def AddDistrict(self, request):
district = ItemCRUD(ItemCRUDType.District)
district_name = request.form['district_Name'].strip()
state_id = request.form['state_Id']
district.AddItem(request=request, parentid=state_id, childname=district_name, storedprocfetch="GetDistrictByNameAndState", storedprocadd="SaveDistrict" )
self.isSuccess = district.isSuccess
self.resultMessage = district.resultMessage
return
def GetAllDistricts(self, request):
district = ItemCRUD(itemType=ItemCRUDType.District)
districtsdata = district.GetAllData(request=request, storedproc="GetAllDistricts")
self.isSuccess = district.isSuccess
self.resultMessage = district.resultMessage
return districtsdata
def CheckDistrict(self, request):
district = ItemCRUD(itemType=ItemCRUDType.District)
district_name = request.json.get('district_Name', '').strip()
state_id = request.json.get('state_Id', '')
result = district.CheckItem(request=request, parentid=state_id, childname=district_name, storedprocfetch="GetDistrictByNameAndState")
self.isSuccess = district.isSuccess
self.resultMessage = district.resultMessage
return result
def GetDistrictByID(self, request, id):
district = ItemCRUD(itemType=ItemCRUDType.Village)
districtdata = district.GetAllData("GetDistrictDataByID")
self.isSuccess = district.isSuccess
self.resultMessage = district.resultMessage
return districtdata

248
AppCode/ItemCRUD.py Normal file
View File

@@ -0,0 +1,248 @@
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from AppCode.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
from mysql.connector import Error
class itemCRUDMapping:
name = ""
def __init__(self, itemType):
if itemType is ItemCRUDType.Village:
self.name = "Village"
elif itemType is ItemCRUDType.Block:
self.name = "Block"
elif itemType is ItemCRUDType.State:
self.name = "State"
class ItemCRUD:
isSuccess = False
resultMessage = ""
itemCRUDType = ItemCRUDType.Village
itemCRUDMapping = ItemCRUDType(itemCRUDType)
#itemCRUDMapping itemCRUDMapping
def __init__(self, itemType):
self.isSuccess = False
self.resultMessage = ""
self.itemCRUDType = itemType
self.itemCRUDMapping = ItemCRUDType(self.itemCRUDType)
def DeleteItem(self, request, itemID, storedprocDelete):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action("Delete Village", f"User {current_user.id} deleted village '{itemID}'")
try:
cursor.callproc(storedprocDelete, (itemID,))
connection.commit()
self.resultMessage = ResponseHandler.delete_success(self.itemCRUDMapping.name) # Simple message, route will handle redirect
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error deleting village: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.delete_failure(self.itemCRUDMapping.name), 500)
finally:
cursor.close()
connection.close()
#return self.resultMessage
def AddItem(self, request, parentid, childname, storedprocfetch, storedprocadd):
connection = config.get_db_connection()
if not connection:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.db_connection_failure(), 500)
return
cursor = connection.cursor()
LogHelper.log_action(f"Add '{self.itemCRUDMapping.name}'",
f"User {current_user.id} adding '{self.itemCRUDMapping.name}' '{childname}' to block '{parentid}'")
if not parentid:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("block"),
400) # Assuming this is a valid response
return
if not re.match(RegEx.patternAlphabetOnly, childname):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400)
return
try:
# Check if the village already exists in the block
cursor.callproc(storedprocfetch, (childname, parentid,))
for rs in cursor.stored_results():
existing_item = rs.fetchone()
if existing_item:
print("Existing ", self.itemCRUDMapping.name)
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists(self.itemCRUDMapping.name), 409)
return
# Insert new village
cursor.callproc(storedprocadd, (childname, parentid))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_success(self.itemCRUDMapping.name), 200)
return
except mysql.connector.Error as e:
print(f"Database Error: {e}")
print("DatabaseError")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("village"), 500)
return
finally:
cursor.close()
connection.close()
def EditItem(self, request, childid, parentid, childname, storedprocupdate):
"""Handles the POST logic for updating a district."""
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
#district_name = request.form['district_Name'].strip()
#state_id = request.form['state_Id']
LogHelper.log_action("Edit District", f"User {current_user.id} Edited District '{childid}'")
#Need to add validation to see if item exits
# Added validation consistent with your other Edit methods
if not re.match(RegEx.patternAlphabetOnly, childname):
self.isSuccess = False
self.resultMessage = ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400
return self.resultMessage
try:
cursor.callproc(storedprocupdate, (childid, parentid, childname,))
connection.commit()
self.isSuccess = True
self.resultMessage = "Successfully Edited"
except mysql.connector.Error as e:
print(f"Error updating district: {e}")
self.isSuccess = False
self.resultMessage = ResponseHandler.update_failure(self.itemCRUDMapping.name), 500
finally:
cursor.close()
connection.close()
return self.resultMessage
def GetAllData(self, request, storedproc):
data = []
connection = config.get_db_connection()
self.isSuccess = False
self.resultMessage = ""
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc(storedproc)
for result in cursor.stored_results():
data = result.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching villages: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure(self.itemCRUDMapping.name), 500)
return []
finally:
cursor.close()
connection.close()
return data
def GetDataByID(self, id, storedproc):
data = None
connection = config.get_db_connection()
cursor = connection.cursor()
try:
cursor.callproc(storedproc, (id,))
for rs in cursor.stored_results():
data = rs.fetchone()
except mysql.connector.Error as e:
print(f"Error fetching block data: {e}")
return None
finally:
cursor.close()
connection.close()
return data
def CheckItem(self, request, parentid, childname, storedprocfetch):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action("Check Block", f"User {current_user.id} Checked block '{childname}'")
if not re.match(RegEx.patternAlphabetOnly, childname):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400)
return HtmlHelper.json_response(ResponseHandler.invalid_name(self.itemCRUDMapping.name), 400)
try:
cursor.callproc(storedprocfetch, (childname, parentid))
for rs in cursor.stored_results():
existing_item = rs.fetchone()
if existing_item:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists(self.itemCRUDMapping.name), 409)
return HtmlHelper.json_response(ResponseHandler.already_exists(self.itemCRUDMapping.name), 409)
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.is_available(self.itemCRUDMapping.name), 200)
return HtmlHelper.json_response(ResponseHandler.is_available(self.itemCRUDMapping.name), 200)
except mysql.connector.Error as e:
print(f"Error checking block: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure(self.itemCRUDMapping.name), 500)
return HtmlHelper.json_response(ResponseHandler.fetch_failure(self.itemCRUDMapping.name), 500)
finally:
cursor.close()
connection.close()

87
AppCode/Log.py Normal file
View File

@@ -0,0 +1,87 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import os
class LogHelper:
@staticmethod
def log_action(action, details=""):
"""Log user actions with timestamp, user, action, and details."""
logData = LogData()
logData.WriteLog(action, details="")
class LogData:
filepath = ""
timestamp = None
def __init__(self):
self.filepath = os.path.join(current_app.root_path, 'activity.log')
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if hasattr(current_user, "cn") and current_user.cn:
self.user = current_user.cn
elif hasattr(current_user, "username") and current_user.username:
self.user = current_user.username
elif hasattr(current_user, "sAMAccountName") and current_user.sAMAccountName:
self.user = current_user.sAMAccountName
else:
self.user = "Unknown"
def WriteLog(self, action, details=""):
"""Log user actions with timestamp, user, action, and details."""
with open(self.filepath, "a", encoding="utf-8") as f:
f.write(
f"Timestamp: {self.timestamp} | "
f"User: {self.user} | "
f"Action: {action} | "
f"Details: {details}\n"
)
def GetActivitiesLog(self):
logs = []
if os.path.exists(self.filepath):
with open(self.filepath, 'r') as f:
for line in f:
parts = line.strip().split(" | ")
if len(parts) == 4:
logs.append({
"timestamp": parts[0].replace("Timestamp:", "").strip(),
"user": parts[1].replace("User:", "").strip(),
"action": parts[2].replace("Action:", "").strip(),
"details": parts[3].replace("Details:", "").strip()
})
return logs
def GetFilteredActivitiesLog(self, startDate, endDate, userName):
filtered_logs = self.GetActivitiesLog()
# Date filter
if startDate or endDate:
try:
start_dt = datetime.strptime(startDate, "%Y-%m-%d") if startDate else datetime.min
end_dt = datetime.strptime(endDate, "%Y-%m-%d") if endDate else datetime.max
filtered_logs = [
log for log in filtered_logs
if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt
]
except Exception as e:
print("Date filter error:", e)
#Why catching all exceptions? Need to handle specific exceptions
# Username filter
if userName:
filtered_logs = [log for log in filtered_logs if userName.lower() in log["user"].lower()]
return filtered_logs

View File

246
AppCode/State.py Normal file
View File

@@ -0,0 +1,246 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from AppCode.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
from mysql.connector import Error
class State:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
def AddState(self, request):
"""Log user actions with timestamp, user, action, and details."""
statedata = []
connection = config.get_db_connection()
if connection:
cursor = connection.cursor()
state_name = request.form['state_Name'].strip()
LogHelper.log_action("Add State", f"User {current_user.id} added state '{state_name}'")
if not re.match(RegEx.patternAlphabetOnly, state_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("state"), 400)
return
try:
cursor.callproc("CheckStateExists", (state_name,))
for data in cursor.stored_results():
existing_state = data.fetchone()
if existing_state:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists("state"), 409)
return
# cursor.execute("call SaveState (%s)", (state_name,))
cursor.callproc("SaveState", (state_name,))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_success("state"), 200)
return
except mysql.connector.Error as e:
print(f"Error inserting state: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("state"), 500)
return
#Need to make this seperate
def GetAllStates(self, request):
"""Log user actions with timestamp, user, action, and details."""
statedata = []
connection = config.get_db_connection()
self.isSuccess = False
self.resultMessage = ""
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc("GetAllStates")
for res in cursor.stored_results():
statedata = res.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching states: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("state"), 500)
return []
finally:
cursor.close()
connection.close()
return statedata
def CheckState(self, request):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
#connection closing needs to be verified
if connection:
cursor = connection.cursor()
state_name = request.json.get('state_Name', '').strip()
LogHelper.log_action("Check State", f"User {current_user.id} Checked state '{state_name}'")
if not re.match(RegEx.patternAlphabetOnly, state_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("state"), 400)
return HtmlHelper.json_response(ResponseHandler.invalid_name("state"), 400)
try:
# cursor.execute("SELECT * FROM states WHERE State_Name = %s", (state_name,))
# existing_state = cursor.fetchone()
cursor.callproc("CheckStateExists", (state_name,))
for data in cursor.stored_results():
existing_state = data.fetchone()
if existing_state:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists("state"), 409)
return HtmlHelper.json_response(ResponseHandler.already_exists("state"), 409)
else:
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.is_available("state"), 200)
return HtmlHelper.json_response(ResponseHandler.is_available("state"), 200)
except mysql.connector.Error as e:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("state"), 500)
print(f"Error checking state: {e}")
return HtmlHelper.json_response(ResponseHandler.add_failure("state"), 500)
finally:
cursor.close()
connection.close()
def DeleteState(self, request, id):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action("Delete State", f"User {current_user.id} Deleted state '{id}'")
try:
cursor.callproc('DeleteState', (id,))
connection.commit()
self.resultMessage = "Successfully Deleted"
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error deleting data: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.delete_failure("state"), 500)
return HtmlHelper.json_response(ResponseHandler.delete_failure("state"), 500)
finally:
cursor.close()
connection.close()
return self.resultMessage
def EditState(self, request, id):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
# str_pattern_reg = r"^[A-Za-z\s]+$"
state_name = request.form['state_Name'].strip()
LogHelper.log_action("Edit State", f"User {current_user.id} Edited state '{state_name}'")
if not re.match(RegEx.patternAlphabetOnly, state_name):
self.isSuccess = False
self.resultMessage = ResponseHandler.invalid_name("state"), 400
return ResponseHandler.invalid_name("state"), 400
try:
# cursor.execute("UPDATE states SET State_Name = %s WHERE State_ID = %s", (state_name, id))
cursor.callproc("UpdateStateById", (id, state_name))
connection.commit()
self.isSuccess = True
self.resultMessage = "Successfully Edited"
return redirect(url_for('add_state'))
except mysql.connector.Error as e:
print(f"Error updating data: {e}")
self.isSuccess = True
self.resultMessage = ResponseHandler.add_failure("state"), 500
return ResponseHandler.add_failure("state"), 500
finally:
cursor.close()
connection.close()
def GetStateByID(self, request, id):
"""Log user actions with timestamp, user, action, and details."""
statedata = []
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc("GetStateByID", (id,))
for res in cursor.stored_results():
statedata = res.fetchone()
if statedata:
self.isSuccess = True
self.resultMessage = "Success in Fetching"
else:
self.isSuccess = False
self.resultMessage = "State Not Found"
except mysql.connector.Error as e:
print(f"Error fetching states: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("state"), 500)
return []
finally:
cursor.close()
connection.close()
return statedata

64
AppCode/Utilities.py Normal file
View File

@@ -0,0 +1,64 @@
from flask import flash, jsonify, json
from enum import Enum
class ItemCRUDType(Enum):
Village = 1
Block = 2
District = 3
State = 4
class RegEx:
patternAlphabetOnly = "^[A-Za-z ]+$"
class ResponseHandler:
@staticmethod
def invalid_name(entity):
return {'status': 'error', 'message': f'Invalid {entity} name. Only letters are allowed!'}
@staticmethod
def already_exists(entity):
return {'status': 'exists', 'message': f'{entity.capitalize()} already exists!'}
@staticmethod
def add_success(entity):
return {'status': 'success', 'message': f'{entity.capitalize()} added successfully!'}
@staticmethod
def add_failure(entity):
return {'status': 'error', 'message': f'Failed to add {entity}.'}
@staticmethod
def is_available(entity):
return {'status': 'available', 'message': f'{entity.capitalize()} name is available!'}
@staticmethod
def delete_success(entity):
return {'status': 'success', 'message': f'{entity.capitalize()} deleted successfully!'}
@staticmethod
def delete_failure(entity):
return {'status': 'error', 'message': f'Failed to delete {entity}.'}
@staticmethod
def update_success(entity):
return {'status': 'success', 'message': f'{entity.capitalize()} updated successfully!'}
@staticmethod
def update_failure(entity):
return {'status': 'error', 'message': f'Failed to update {entity}.'}
@staticmethod
def fetch_failure(entity):
return {'status': 'error', 'message': f'Failed to fetch {entity}.'}
class HtmlHelper:
# Helper: JSON Response Formatter
@staticmethod
def json_response(message_obj, status_code):
return jsonify(message_obj), status_code
#May need to refactor further

110
AppCode/Village.py Normal file
View File

@@ -0,0 +1,110 @@
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper, ItemCRUDType
from AppCode.Log import LogData, LogHelper
import config
import mysql.connector
from mysql.connector import Error
from AppCode.ItemCRUD import ItemCRUD, itemCRUDMapping
class Village:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
def AddVillage(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village)
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
village.AddItem(request=request, parentid=block_id, childname=village_name, storedprocfetch="GetVillageByNameAndBlock", storedprocadd="SaveVillage" )
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return
#self.isSuccess = False
def GetAllVillages(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village)
villagesdata = village.GetAllData(request=request, storedproc="GetAllVillages")
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return villagesdata
def CheckVillage(self, request):
village = ItemCRUD(itemType=ItemCRUDType.Village)
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
result = village.CheckItem(request=request, parentid=block_id, childname=village_name, storedprocfetch="GetVillageByNameAndBlocks")
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return result
def DeleteVillage(self, request, village_id):
village = ItemCRUD(itemType=ItemCRUDType.Village)
village.DeleteItem(request=request, itemID=village_id, storedprocDelete="DeleteVillage" )
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return
def EditVillage(self, request, village_id):
village = ItemCRUD(itemType=ItemCRUDType.Village)
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
village.EditItem(request=request, childid=village_id, parentid=block_id, childname=village_name, storedprocadd="UpdateVillage" )
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return
def GetVillageByID(self, request, id):
village = ItemCRUD(itemType=ItemCRUDType.Village)
villagedetailsdata = village.GetAllData(request=request, storedproc="GetVillageDetailsById")
self.isSuccess = village.isSuccess
self.resultMessage = village.resultMessage
return villagedetailsdata
def GetAllBlocks(self, request):
blocks = []
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc('GetAllBlocks')
for result in cursor.stored_results():
blocks = result.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching blocks: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("block"), 500)
finally:
cursor.close()
connection.close()
return blocks

0
AppRoutes/StateRoute.py Normal file
View File

8
Version-1.code-workspace Normal file
View File

@@ -0,0 +1,8 @@
{
"folders": [
{
"path": "."
}
],
"settings": {}
}

File diff suppressed because it is too large Load Diff

View File

@@ -5,7 +5,7 @@ import os
# Get MySQL credentials from environment variables # Get MySQL credentials from environment variables
MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1") MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1")
MYSQL_USER = os.getenv("MYSQL_USER", "root") MYSQL_USER = os.getenv("MYSQL_USER", "root")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "tiger") MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "root")
MYSQL_DB = os.getenv("MYSQL_DB", "test") MYSQL_DB = os.getenv("MYSQL_DB", "test")
# Connect to MySQL # Connect to MySQL

2820
main.py

File diff suppressed because it is too large Load Diff

View File

@@ -29,6 +29,9 @@
<h2>Display States</h2> <h2>Display States</h2>
<input type="text" id="searchBar" placeholder="Searching..." onkeyup="searchTable()"> <input type="text" id="searchBar" placeholder="Searching..." onkeyup="searchTable()">
</div> </div>
<script>
console.log(statedata)
</script>
<table id="sortableTable" border="1"> <table id="sortableTable" border="1">
<tr> <tr>
<th>State ID</th> <th>State ID</th>

View File

@@ -24,7 +24,7 @@
</a> </a>
</li> </li>
<li class="nav-item"> <li class="nav-item">
<a href="/upload_excel" class="nav-link"> <a href="/upload_excel_file" class="nav-link">
<i class="fas fa-book"></i> Import Excel <i class="fas fa-book"></i> Import Excel
</a> </a>
</li> </li>

View File

@@ -41,7 +41,7 @@
</a> </a>
</li> </li>
<li class="nav-item"> <li class="nav-item">
<a href="/upload_excel" class="nav-link"> <a href="/upload_excel_file" class="nav-link">
<i class="fas fa-book"></i> Import Excel <i class="fas fa-book"></i> Import Excel
</a> </a>
</li> </li>

View File

@@ -11,7 +11,7 @@
<h2>Upload Excel File</h2> <h2>Upload Excel File</h2>
<div class="container"> <div class="container">
<form action="/upload_excel" method="post" enctype="multipart/form-data" <form action="/upload_excel_file" method="post" enctype="multipart/form-data"
onsubmit="return validateFileInput()"> onsubmit="return validateFileInput()">
<input type="file" name="file" accept=".xlsx,.xls"> <input type="file" name="file" accept=".xlsx,.xls">
<br><br> <br><br>