Initial commit

This commit is contained in:
Swapnil9693
2025-11-21 12:10:45 +05:30
parent 45388d0b5e
commit 565af3be64
347 changed files with 8865 additions and 3849 deletions

54
AppCode/Auth.py Normal file
View File

@@ -0,0 +1,54 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask_login import LoginManager, UserMixin
from logging.handlers import RotatingFileHandler
from ldap3 import Server, Connection, ALL, SUBTREE
from ldap3 import Server, Connection, ALL
from ldap3.core.exceptions import LDAPBindError
class DefaultCredentials:
username = 'admin'
password = 'admin123'
class LoginLDAP:
def __init__(self, request):
self.username = request.form['username'].strip()
self.password = request.form['password']
self.isDefaultCredentials = False
self.isValidLogin = False
self.errorMessage = ""
ldap_user_dn = f"uid={self.username},ou=users,dc=lcepl,dc=org"
ldap_server = 'ldap://localhost:389'
#Need to re-factor further
# Static fallback user
if self.username == DefaultCredentials.username and self.password == DefaultCredentials.password:
self.isDefaultCredentials = True
self.isValidLogin = True
return
try:
# LDAP authentication
conn = Connection(
Server(self.ldap_server, get_info=ALL),
user=self.ldap_user_dn,
password=self.password,
auto_bind=True
)
self.isValidLogin = True
return
except LDAPBindError:
self.errorMessage = "Invalid credentials."
except Exception as e:
self.errorMessage = str(e)
class User(UserMixin):
def __init__(self, id):
self.id = id

208
AppCode/Block.py Normal file
View File

@@ -0,0 +1,208 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper
from AppCode.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
from mysql.connector import Error
class Block:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
# ----------------------------------------------------------
# Add Block
# ----------------------------------------------------------
def AddBlock(self, request):
connection = config.get_db_connection()
if not connection:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.db_connection_failure(), 500)
return
cursor = connection.cursor()
block_name = request.form['block_Name'].strip()
district_id = request.form['district_Id']
LogHelper.log_action("Add Block", f"User {current_user.id} Added Block '{block_name}'")
if not re.match(RegEx.patternAlphabetOnly, block_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("block"), 400)
return
try:
cursor.callproc("GetBlockByNameAndDistrict", (block_name, district_id))
for rs in cursor.stored_results():
existing_block = rs.fetchone()
if existing_block:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists("block"), 409)
return
cursor.callproc("SaveBlock", (block_name, district_id))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_success("block"), 200)
return
except mysql.connector.Error as e:
print(f"Error inserting block: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("block"), 500)
finally:
cursor.close()
connection.close()
# ----------------------------------------------------------
# Get All Blocks
# ----------------------------------------------------------
def GetAllBlocks(self):
blockdata = []
connection = config.get_db_connection()
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc("GetAllBlocks")
for rs in cursor.stored_results():
blockdata = rs.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching blocks: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("blocks"), 500)
return []
finally:
cursor.close()
connection.close()
return blockdata
# ----------------------------------------------------------
# Check Block Exists
# ----------------------------------------------------------
def CheckBlock(self, request):
connection = config.get_db_connection()
cursor = connection.cursor()
block_name = request.json.get('block_Name', '').strip()
district_id = request.json.get('district_Id')
LogHelper.log_action("Check Block", f"User {current_user.id} Checked block '{block_name}'")
if not re.match(RegEx.patternAlphabetOnly, block_name):
return HtmlHelper.json_response(ResponseHandler.invalid_name("block"), 400)
try:
cursor.callproc("GetBlockByNameAndDistrict", (block_name, district_id))
for rs in cursor.stored_results():
existing_block = rs.fetchone()
if existing_block:
return HtmlHelper.json_response(ResponseHandler.already_exists("block"), 409)
return HtmlHelper.json_response(ResponseHandler.is_available("block"), 200)
except mysql.connector.Error as e:
print(f"Error checking block: {e}")
return HtmlHelper.json_response(ResponseHandler.fetch_failure("block"), 500)
finally:
cursor.close()
connection.close()
# ----------------------------------------------------------
# Get Block By ID
# ----------------------------------------------------------
def GetBlockByID(self, id):
blockdata = None
connection = config.get_db_connection()
cursor = connection.cursor()
try:
cursor.callproc("GetBlockDataByID", (id,))
for rs in cursor.stored_results():
blockdata = rs.fetchone()
except mysql.connector.Error as e:
print(f"Error fetching block data: {e}")
return None
finally:
cursor.close()
connection.close()
return blockdata
# ----------------------------------------------------------
# Update Block
# ----------------------------------------------------------
def EditBlock(self, request, id):
connection = config.get_db_connection()
cursor = connection.cursor()
block_name = request.form['block_Name'].strip()
district_id = request.form['district_Id']
LogHelper.log_action("Edit Block", f"User {current_user.id} Edited block '{id}'")
if not re.match(RegEx.patternAlphabetOnly, block_name):
return HtmlHelper.json_response(ResponseHandler.invalid_name("block"), 400)
try:
cursor.callproc("UpdateBlockById", (block_name, district_id, id))
connection.commit()
return "Successfully Updated"
except mysql.connector.Error as e:
print(f"Error updating block: {e}")
return HtmlHelper.json_response(ResponseHandler.update_failure("block"), 500)
finally:
cursor.close()
connection.close()
# ----------------------------------------------------------
# Delete Block
# ----------------------------------------------------------
def DeleteBlock(self, id):
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action("Delete Block", f"User {current_user.id} Deleted block '{id}'")
try:
cursor.callproc("DeleteBlock", (id,))
connection.commit()
return "Successfully Deleted"
except mysql.connector.Error as e:
print(f"Error deleting block: {e}")
return HtmlHelper.json_response(ResponseHandler.delete_failure("block"), 500)
finally:
cursor.close()
connection.close()

76
AppCode/ContractorInfo.py Normal file
View File

@@ -0,0 +1,76 @@
import mysql.connector
from mysql.connector import Error
import config
import openpyxl
import os
import re
import ast
from datetime import datetime
class ContractorInfo:
ID = ""
contInfo = None
def __init__(self, id):
self.ID = id
print(id)
self.fetchData()
def fetchData(self):
try:
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
print("here", flush=True)
cursor.callproc('GetContractorInfoById', [self.contInfo])
#self.contInfo = next(cursor.stored_results()).fetchone()
self.contInfo = cursor.fetchone()
print(self.contInfo,flush=True)
finally:
cursor.close()
connection.close()
def fetchalldata(self):
try:
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
print("here", flush=True)
# ---------------- Hold Types ----------------
cursor.execute("""
SELECT DISTINCT ht.hold_type_id, ht.hold_type
FROM invoice_subcontractor_hold_join h
JOIN hold_types ht ON h.hold_type_id = ht.hold_type_id
WHERE h.Contractor_Id = %s
""", (self.contractor_id,))
hold_types = cursor.fetchall()
hold_type_map = {ht['hold_type_id']: ht['hold_type'] for ht in hold_types}
# ---------------- Invoices ----------------
cursor.execute("""
SELECT i.*, v.Village_Name
FROM assign_subcontractors asg
INNER JOIN invoice i ON i.PMC_No = asg.PMC_No AND i.Village_Id = asg.Village_Id
LEFT JOIN villages v ON i.Village_Id = v.Village_Id
WHERE asg.Contractor_Id = %s
ORDER BY i.PMC_No, i.Invoice_No
""", (self.contractor_id,))
invoices = cursor.fetchall()
# Remove duplicate invoices
invoice_ids_seen = set()
unique_invoices = []
for inv in invoices:
if inv["Invoice_Id"] not in invoice_ids_seen:
invoice_ids_seen.add(inv["Invoice_Id"])
unique_invoices.append(inv)
invoices = unique_invoices
finally:
cursor.close()
connection.close()

236
AppCode/District.py Normal file
View File

@@ -0,0 +1,236 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper
from AppCode.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
from mysql.connector import Error
class District:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
def AddDistrict(self, request):
"""Handles the logic for adding a new district."""
connection = config.get_db_connection()
if not connection:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.db_connection_failure(), 500)
return
cursor = connection.cursor()
district_name = request.form['district_Name'].strip()
state_id = request.form['state_Id']
LogHelper.log_action("Add District", f"User {current_user.id} Added District '{district_name}'")
if not re.match(RegEx.patternAlphabetOnly, district_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("district"), 400)
return
try:
cursor.callproc("GetDistrictByNameAndState", (district_name, state_id))
for data in cursor.stored_results():
rs = data.fetchone()
if rs:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists("district"), 409)
return
cursor.callproc('SaveDistrict', (district_name, state_id))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_success("district"), 200)
return
except mysql.connector.Error as e:
print(f"Error inserting district: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("district"), 500)
return
finally:
cursor.close()
connection.close()
def GetAllDistricts(self, request):
"""Fetches all districts with their state names."""
districtdata = []
connection = config.get_db_connection()
self.isSuccess = False
self.resultMessage = ""
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc("GetAllDistricts")
for dis in cursor.stored_results():
districtdata = dis.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching districts: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("districts"), 500)
return []
finally:
cursor.close()
connection.close()
return districtdata
def CheckDistrict(self, request):
"""Checks if a district name already exists within a specific state."""
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
if not connection:
return HtmlHelper.json_response(ResponseHandler.db_connection_failure(), 500)
cursor = connection.cursor()
district_name = request.json.get('district_Name', '').strip()
state_id = request.json.get('state_Id', '')
LogHelper.log_action("Check District", f"User {current_user.id} Checked District '{district_name}'")
if not re.match(RegEx.patternAlphabetOnly, district_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("district"), 400)
return self.resultMessage
try:
cursor.callproc("GetDistrictByNameAndState", (district_name, state_id,))
for result in cursor.stored_results():
existing_district = result.fetchone()
if existing_district:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists("district"), 409)
else:
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.is_available("district"), 200)
return self.resultMessage
except mysql.connector.Error as e:
print(f"Error checking district: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("district"), 500)
return self.resultMessage
finally:
cursor.close()
connection.close()
def DeleteDistrict(self, request, id):
"""Deletes a district by its ID."""
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action("Delete District", f"User {current_user.id} Deleted District '{id}'")
try:
cursor.callproc("DeleteDistrict", (id,))
connection.commit()
self.resultMessage = "Successfully Deleted"
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error deleting district: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.delete_failure("district"), 500)
finally:
cursor.close()
connection.close()
return self.resultMessage
def EditDistrict(self, request, id):
"""Handles the POST logic for updating a district."""
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
district_name = request.form['district_Name'].strip()
state_id = request.form['state_Id']
LogHelper.log_action("Edit District", f"User {current_user.id} Edited District '{id}'")
# Added validation consistent with your other Edit methods
if not re.match(RegEx.patternAlphabetOnly, district_name):
self.isSuccess = False
self.resultMessage = ResponseHandler.invalid_name("district"), 400
return self.resultMessage
try:
cursor.callproc("UpdateDistrict", (id, state_id, district_name,))
connection.commit()
self.isSuccess = True
self.resultMessage = "Successfully Edited"
except mysql.connector.Error as e:
print(f"Error updating district: {e}")
self.isSuccess = False
self.resultMessage = ResponseHandler.update_failure("district"), 500
finally:
cursor.close()
connection.close()
return self.resultMessage
def GetDistrictByID(self, request, id):
"""Fetches a single district's details by its ID."""
districtdata = None
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
if not connection:
return None
cursor = connection.cursor()
try:
cursor.callproc("GetDistrictDataByID", (id,))
for rs in cursor.stored_results():
districtdata = rs.fetchone()
if districtdata:
self.isSuccess = True
self.resultMessage = "Success in Fetching"
else:
self.isSuccess = False
self.resultMessage = "District Not Found"
except mysql.connector.Error as e:
print(f"Error fetching district data: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("district"), 500)
finally:
cursor.close()
connection.close()
return districtdata

87
AppCode/Log.py Normal file
View File

@@ -0,0 +1,87 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import os
class LogHelper:
@staticmethod
def log_action(action, details=""):
"""Log user actions with timestamp, user, action, and details."""
logData = LogData()
logData.WriteLog(action, details="")
class LogData:
filepath = ""
timestamp = None
def __init__(self):
self.filepath = os.path.join(current_app.root_path, 'activity.log')
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if hasattr(current_user, "cn") and current_user.cn:
self.user = current_user.cn
elif hasattr(current_user, "username") and current_user.username:
self.user = current_user.username
elif hasattr(current_user, "sAMAccountName") and current_user.sAMAccountName:
self.user = current_user.sAMAccountName
else:
self.user = "Unknown"
def WriteLog(self, action, details=""):
"""Log user actions with timestamp, user, action, and details."""
with open(self.filepath, "a", encoding="utf-8") as f:
f.write(
f"Timestamp: {self.timestamp} | "
f"User: {self.user} | "
f"Action: {action} | "
f"Details: {details}\n"
)
def GetActivitiesLog(self):
logs = []
if os.path.exists(self.filepath):
with open(self.filepath, 'r') as f:
for line in f:
parts = line.strip().split(" | ")
if len(parts) == 4:
logs.append({
"timestamp": parts[0].replace("Timestamp:", "").strip(),
"user": parts[1].replace("User:", "").strip(),
"action": parts[2].replace("Action:", "").strip(),
"details": parts[3].replace("Details:", "").strip()
})
return logs
def GetFilteredActivitiesLog(self, startDate, endDate, userName):
filtered_logs = self.GetActivitiesLog()
# Date filter
if startDate or endDate:
try:
start_dt = datetime.strptime(startDate, "%Y-%m-%d") if startDate else datetime.min
end_dt = datetime.strptime(endDate, "%Y-%m-%d") if endDate else datetime.max
filtered_logs = [
log for log in filtered_logs
if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt
]
except Exception as e:
print("Date filter error:", e)
#Why catching all exceptions? Need to handle specific exceptions
# Username filter
if userName:
filtered_logs = [log for log in filtered_logs if userName.lower() in log["user"].lower()]
return filtered_logs

View File

246
AppCode/State.py Normal file
View File

@@ -0,0 +1,246 @@
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
from flask import current_app
from datetime import datetime
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper
from AppCode.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
from mysql.connector import Error
class State:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
def AddState(self, request):
"""Log user actions with timestamp, user, action, and details."""
statedata = []
connection = config.get_db_connection()
if connection:
cursor = connection.cursor()
state_name = request.form['state_Name'].strip()
LogHelper.log_action("Add State", f"User {current_user.id} added state '{state_name}'")
if not re.match(RegEx.patternAlphabetOnly, state_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("state"), 400)
return
try:
cursor.callproc("CheckStateExists", (state_name,))
for data in cursor.stored_results():
existing_state = data.fetchone()
if existing_state:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists("state"), 409)
return
# cursor.execute("call SaveState (%s)", (state_name,))
cursor.callproc("SaveState", (state_name,))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_success("state"), 200)
return
except mysql.connector.Error as e:
print(f"Error inserting state: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("state"), 500)
return
#Need to make this seperate
def GetAllStates(self, request):
"""Log user actions with timestamp, user, action, and details."""
statedata = []
connection = config.get_db_connection()
self.isSuccess = False
self.resultMessage = ""
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc("GetAllStates")
for res in cursor.stored_results():
statedata = res.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching states: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("state"), 500)
return []
finally:
cursor.close()
connection.close()
return statedata
def CheckState(self, request):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
#connection closing needs to be verified
if connection:
cursor = connection.cursor()
state_name = request.json.get('state_Name', '').strip()
LogHelper.log_action("Check State", f"User {current_user.id} Checked state '{state_name}'")
if not re.match(RegEx.patternAlphabetOnly, state_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("state"), 400)
return HtmlHelper.json_response(ResponseHandler.invalid_name("state"), 400)
try:
# cursor.execute("SELECT * FROM states WHERE State_Name = %s", (state_name,))
# existing_state = cursor.fetchone()
cursor.callproc("CheckStateExists", (state_name,))
for data in cursor.stored_results():
existing_state = data.fetchone()
if existing_state:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists("state"), 409)
return HtmlHelper.json_response(ResponseHandler.already_exists("state"), 409)
else:
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.is_available("state"), 200)
return HtmlHelper.json_response(ResponseHandler.is_available("state"), 200)
except mysql.connector.Error as e:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("state"), 500)
print(f"Error checking state: {e}")
return HtmlHelper.json_response(ResponseHandler.add_failure("state"), 500)
finally:
cursor.close()
connection.close()
def DeleteState(self, request, id):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action("Delete State", f"User {current_user.id} Deleted state '{id}'")
try:
cursor.callproc('DeleteState', (id,))
connection.commit()
self.resultMessage = "Successfully Deleted"
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error deleting data: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.delete_failure("state"), 500)
return HtmlHelper.json_response(ResponseHandler.delete_failure("state"), 500)
finally:
cursor.close()
connection.close()
return self.resultMessage
def EditState(self, request, id):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
# str_pattern_reg = r"^[A-Za-z\s]+$"
state_name = request.form['state_Name'].strip()
LogHelper.log_action("Edit State", f"User {current_user.id} Edited state '{state_name}'")
if not re.match(RegEx.patternAlphabetOnly, state_name):
self.isSuccess = False
self.resultMessage = ResponseHandler.invalid_name("state"), 400
return ResponseHandler.invalid_name("state"), 400
try:
# cursor.execute("UPDATE states SET State_Name = %s WHERE State_ID = %s", (state_name, id))
cursor.callproc("UpdateStateById", (id, state_name))
connection.commit()
self.isSuccess = True
self.resultMessage = "Successfully Edited"
return redirect(url_for('add_state'))
except mysql.connector.Error as e:
print(f"Error updating data: {e}")
self.isSuccess = True
self.resultMessage = ResponseHandler.add_failure("state"), 500
return ResponseHandler.add_failure("state"), 500
finally:
cursor.close()
connection.close()
def GetStateByID(self, request, id):
"""Log user actions with timestamp, user, action, and details."""
statedata = []
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc("GetStateByID", (id,))
for res in cursor.stored_results():
statedata = res.fetchone()
if statedata:
self.isSuccess = True
self.resultMessage = "Success in Fetching"
else:
self.isSuccess = False
self.resultMessage = "State Not Found"
except mysql.connector.Error as e:
print(f"Error fetching states: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("state"), 500)
return []
finally:
cursor.close()
connection.close()
return statedata

57
AppCode/Utilities.py Normal file
View File

@@ -0,0 +1,57 @@
from flask import flash, jsonify, json
class RegEx:
patternAlphabetOnly = "^[A-Za-z ]+$"
class ResponseHandler:
@staticmethod
def invalid_name(entity):
return {'status': 'error', 'message': f'Invalid {entity} name. Only letters are allowed!'}
@staticmethod
def already_exists(entity):
return {'status': 'exists', 'message': f'{entity.capitalize()} already exists!'}
@staticmethod
def add_success(entity):
return {'status': 'success', 'message': f'{entity.capitalize()} added successfully!'}
@staticmethod
def add_failure(entity):
return {'status': 'error', 'message': f'Failed to add {entity}.'}
@staticmethod
def is_available(entity):
return {'status': 'available', 'message': f'{entity.capitalize()} name is available!'}
@staticmethod
def delete_success(entity):
return {'status': 'success', 'message': f'{entity.capitalize()} deleted successfully!'}
@staticmethod
def delete_failure(entity):
return {'status': 'error', 'message': f'Failed to delete {entity}.'}
@staticmethod
def update_success(entity):
return {'status': 'success', 'message': f'{entity.capitalize()} updated successfully!'}
@staticmethod
def update_failure(entity):
return {'status': 'error', 'message': f'Failed to update {entity}.'}
@staticmethod
def fetch_failure(entity):
return {'status': 'error', 'message': f'Failed to fetch {entity}.'}
class HtmlHelper:
# Helper: JSON Response Formatter
@staticmethod
def json_response(message_obj, status_code):
return jsonify(message_obj), status_code
#May need to refactor further

274
AppCode/Village.py Normal file
View File

@@ -0,0 +1,274 @@
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from AppCode.Utilities import RegEx, ResponseHandler, HtmlHelper
from AppCode.Log import LogData, LogHelper
import os
import config
import re
import mysql.connector
from mysql.connector import Error
class Village:
isSuccess = False
resultMessage = ""
def __init__(self):
self.isSuccess = False
self.resultMessage = ""
def AddVillage(self, request):
connection = config.get_db_connection()
if not connection:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.db_connection_failure(), 500)
return
cursor = connection.cursor()
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
LogHelper.log_action("Add Village",
f"User {current_user.id} adding village '{village_name}' to block '{block_id}'")
if not block_id:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("block"),
400) # Assuming this is a valid response
return
if not re.match(RegEx.patternAlphabetOnly, village_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("village"), 400)
return
try:
# Check if the village already exists in the block
cursor.callproc("GetVillageByNameAndBlock", (village_name, block_id,))
for rs in cursor.stored_results():
existing_village = rs.fetchone()
if existing_village:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists("village"), 409)
return
# Insert new village
cursor.callproc('SaveVillage', (village_name, block_id))
connection.commit()
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_success("village"), 200)
return
except mysql.connector.Error as e:
print(f"Database Error: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("village"), 500)
return
finally:
cursor.close()
connection.close()
def GetAllVillages(self, request):
villages = []
connection = config.get_db_connection()
self.isSuccess = False
self.resultMessage = ""
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc("GetAllVillages")
for result in cursor.stored_results():
villages = result.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching villages: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("village"), 500)
return []
finally:
cursor.close()
connection.close()
return villages
def CheckVillage(self, request):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
if not connection:
return HtmlHelper.json_response(ResponseHandler.db_connection_failure(), 500)
cursor = connection.cursor()
block_id = request.form.get('block_Id')
village_name = request.form.get('Village_Name', '').strip()
LogHelper.log_action("Check Village",
f"User {current_user.id} checked village '{village_name}' in block '{block_id}'")
if not re.match(RegEx.patternAlphabetOnly, village_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("village"), 400)
return self.resultMessage
if not block_id or not village_name:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(
{'status': 'error', 'message': 'Block and Village Name are required!'}, 400)
return self.resultMessage
try:
cursor.callproc("GetVillageByNameAndBlocks", (village_name, block_id))
for rs in cursor.stored_results():
existing_village = rs.fetchone()
if existing_village:
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.already_exists("village"), 409)
else:
self.isSuccess = True
self.resultMessage = HtmlHelper.json_response(ResponseHandler.is_available("village"), 200)
return self.resultMessage
except mysql.connector.Error as e:
print(f"Error checking village: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.add_failure("village"), 500)
return self.resultMessage
finally:
cursor.close()
connection.close()
def DeleteVillage(self, request, village_id):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
LogHelper.log_action("Delete Village", f"User {current_user.id} deleted village '{village_id}'")
try:
cursor.callproc("DeleteVillage", (village_id,))
connection.commit()
self.resultMessage = ResponseHandler.delete_success("village") # Simple message, route will handle redirect
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error deleting village: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.delete_failure("village"), 500)
finally:
cursor.close()
connection.close()
return self.resultMessage
def EditVillage(self, request, village_id):
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
cursor = connection.cursor()
village_name = request.form['Village_Name'].strip()
block_id = request.form['block_Id']
LogHelper.log_action("Edit Village",
f"User {current_user.id} edited village '{village_id}' to name '{village_name}'")
if not re.match(RegEx.patternAlphabetOnly, village_name):
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.invalid_name("village"), 400)
return self.resultMessage
try:
# Using the stored procedure pattern from your State class
cursor.callproc("UpdateVillage", (village_id, block_id, village_name,))
connection.commit()
self.isSuccess = True
self.resultMessage = ResponseHandler.update_success("village")
except mysql.connector.Error as e:
print(f"Error updating data: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.update_failure("village"), 500)
finally:
cursor.close()
connection.close()
return self.resultMessage
def GetVillageByID(self, request, id):
village_data = None
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
if not connection:
return None
cursor = connection.cursor()
try:
cursor.callproc("GetVillageDetailsById", (id,))
for rs in cursor.stored_results():
village_data = rs.fetchone()
if village_data:
self.isSuccess = True
self.resultMessage = "Success in Fetching"
else:
self.isSuccess = False
self.resultMessage = "Village Not Found"
except mysql.connector.Error as e:
print(f"Error fetching village: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("village"), 500)
finally:
cursor.close()
connection.close()
return village_data
def GetAllBlocks(self, request):
blocks = []
self.isSuccess = False
self.resultMessage = ""
connection = config.get_db_connection()
if not connection:
return []
cursor = connection.cursor()
try:
cursor.callproc('GetAllBlocks')
for result in cursor.stored_results():
blocks = result.fetchall()
self.isSuccess = True
except mysql.connector.Error as e:
print(f"Error fetching blocks: {e}")
self.isSuccess = False
self.resultMessage = HtmlHelper.json_response(ResponseHandler.fetch_failure("block"), 500)
finally:
cursor.close()
connection.close()
return blocks