updated code of prajakta block,payment, gst
This commit is contained in:
78
model/GST.py
78
model/GST.py
@@ -1,55 +1,51 @@
|
||||
import config
|
||||
from model.ItemCRUD import ItemCRUD
|
||||
from model.Utilities import ItemCRUDType
|
||||
|
||||
class GST:
|
||||
|
||||
@staticmethod
|
||||
def get_unreleased_gst():
|
||||
# Use ItemCRUD for Invoices
|
||||
invoice_crud = ItemCRUD(itemType=ItemCRUDType.Invoice)
|
||||
invoices_rows = invoice_crud.GetAllData(storedproc="GetAllInvoicesBasic")
|
||||
|
||||
connection = config.get_db_connection()
|
||||
cursor = connection.cursor(dictionary=True)
|
||||
if not invoice_crud.isSuccess:
|
||||
return [] # Could also log invoice_crud.resultMessage
|
||||
|
||||
try:
|
||||
# ----------- Invoices -----------
|
||||
cursor.callproc('GetAllInvoicesBasic')
|
||||
invoices = []
|
||||
for result in cursor.stored_results():
|
||||
invoices = result.fetchall()
|
||||
invoices = [
|
||||
dict(
|
||||
Invoice_No=row[1],
|
||||
GST_SD_Amount=float(row[2]) if row[2] is not None else 0
|
||||
)
|
||||
for row in invoices_rows
|
||||
]
|
||||
|
||||
# Use ItemCRUD for GST Releases
|
||||
gst_crud = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
|
||||
gst_rows = gst_crud.GetAllData(storedproc="GetAllGSTReleasesBasic")
|
||||
|
||||
# ----------- GST Releases -----------
|
||||
cursor.callproc('GetAllGSTReleasesBasic')
|
||||
gst_releases = []
|
||||
for result in cursor.stored_results():
|
||||
gst_releases = result.fetchall()
|
||||
if not gst_crud.isSuccess:
|
||||
return [] # Could also log gst_crud.resultMessage
|
||||
|
||||
gst_invoice_nos = {
|
||||
g['Invoice_No']
|
||||
for g in gst_releases
|
||||
if g['Invoice_No']
|
||||
}
|
||||
gst_invoice_nos = {
|
||||
g[2] # Invoice_No is at index 2
|
||||
for g in gst_rows
|
||||
if g[2]
|
||||
}
|
||||
|
||||
gst_basic_amounts = {
|
||||
float(g['Basic_Amount'])
|
||||
for g in gst_releases
|
||||
if g['Basic_Amount'] is not None
|
||||
}
|
||||
gst_basic_amounts = {
|
||||
float(g[3]) # Basic_Amount at index 3
|
||||
for g in gst_rows
|
||||
if g[3] is not None
|
||||
}
|
||||
|
||||
unreleased = []
|
||||
# Filter unreleased invoices
|
||||
unreleased = []
|
||||
for inv in invoices:
|
||||
match_by_invoice = inv['Invoice_No'] in gst_invoice_nos
|
||||
match_by_gst_amount = inv['GST_SD_Amount'] in gst_basic_amounts
|
||||
|
||||
for inv in invoices:
|
||||
if not (match_by_invoice or match_by_gst_amount):
|
||||
unreleased.append(inv)
|
||||
|
||||
match_by_invoice = inv['Invoice_No'] in gst_invoice_nos
|
||||
|
||||
match_by_gst_amount = float(
|
||||
inv.get('GST_SD_Amount') or 0
|
||||
) in gst_basic_amounts
|
||||
|
||||
if not (match_by_invoice or match_by_gst_amount):
|
||||
unreleased.append(inv)
|
||||
|
||||
return unreleased
|
||||
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
|
||||
return unreleased
|
||||
123
model/Log.py
123
model/Log.py
@@ -1,39 +1,29 @@
|
||||
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash, jsonify, json
|
||||
from flask import current_app
|
||||
|
||||
from datetime import datetime
|
||||
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
from flask import current_app
|
||||
from flask_login import current_user
|
||||
|
||||
|
||||
class LogHelper:
|
||||
@staticmethod
|
||||
def log_action(action, details=""):
|
||||
"""Log user actions with timestamp, user, action, and details."""
|
||||
logData = LogData()
|
||||
logData.WriteLog(action, details="")
|
||||
"""Add a log entry."""
|
||||
log_data = LogData()
|
||||
log_data.add_log(action, details)
|
||||
|
||||
|
||||
class LogData:
|
||||
|
||||
filepath = ""
|
||||
timestamp = None
|
||||
|
||||
def __init__(self):
|
||||
self.filepath = os.path.join(current_app.root_path, 'activity.log')
|
||||
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
self.user = getattr(current_user, "cn", None) \
|
||||
or getattr(current_user, "username", None) \
|
||||
or getattr(current_user, "sAMAccountName", "Unknown")
|
||||
|
||||
if hasattr(current_user, "cn") and current_user.cn:
|
||||
self.user = current_user.cn
|
||||
elif hasattr(current_user, "username") and current_user.username:
|
||||
self.user = current_user.username
|
||||
elif hasattr(current_user, "sAMAccountName") and current_user.sAMAccountName:
|
||||
self.user = current_user.sAMAccountName
|
||||
else:
|
||||
self.user = "Unknown"
|
||||
|
||||
def WriteLog(self, action, details=""):
|
||||
"""Log user actions with timestamp, user, action, and details."""
|
||||
|
||||
def add_log(self, action, details=""):
|
||||
"""Create/Add a log entry."""
|
||||
with open(self.filepath, "a", encoding="utf-8") as f:
|
||||
f.write(
|
||||
f"Timestamp: {self.timestamp} | "
|
||||
@@ -42,46 +32,73 @@ class LogData:
|
||||
f"Details: {details}\n"
|
||||
)
|
||||
|
||||
|
||||
def GetActivitiesLog(self):
|
||||
def get_all_logs(self):
|
||||
"""Read all logs."""
|
||||
logs = []
|
||||
|
||||
if os.path.exists(self.filepath):
|
||||
with open(self.filepath, 'r') as f:
|
||||
with open(self.filepath, 'r', encoding="utf-8") as f:
|
||||
for line in f:
|
||||
parts = line.strip().split(" | ")
|
||||
if len(parts) == 4:
|
||||
logs.append({
|
||||
"timestamp": parts[0].replace("Timestamp:", "").strip(),
|
||||
"user": parts[1].replace("User:", "").strip(),
|
||||
"action": parts[2].replace("Action:", "").strip(),
|
||||
"details": parts[3].replace("Details:", "").strip()
|
||||
"timestamp": parts[0].split(":", 1)[1].strip(),
|
||||
"user": parts[1].split(":", 1)[1].strip(),
|
||||
"action": parts[2].split(":", 1)[1].strip(),
|
||||
"details": parts[3].split(":", 1)[1].strip()
|
||||
})
|
||||
return logs
|
||||
|
||||
def GetFilteredActivitiesLog(self, startDate, endDate, userName):
|
||||
def get_filtered_logs(self, start_date=None, end_date=None, user_name=None):
|
||||
"""Filter logs by date and/or user."""
|
||||
logs = self.get_all_logs()
|
||||
|
||||
filtered_logs = self.GetActivitiesLog()
|
||||
# Filter by date
|
||||
if start_date or end_date:
|
||||
start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else datetime.min
|
||||
end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else datetime.max
|
||||
logs = [
|
||||
log for log in logs
|
||||
if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt
|
||||
]
|
||||
|
||||
# Date filter
|
||||
if startDate or endDate:
|
||||
try:
|
||||
start_dt = datetime.strptime(startDate, "%Y-%m-%d") if startDate else datetime.min
|
||||
end_dt = datetime.strptime(endDate, "%Y-%m-%d") if endDate else datetime.max
|
||||
# Filter by username
|
||||
if user_name:
|
||||
logs = [log for log in logs if user_name.lower() in log.get("user", "").lower()]
|
||||
|
||||
return logs
|
||||
|
||||
def update_log(self, index, action=None, details=None):
|
||||
"""Update a specific log entry by index (0-based)."""
|
||||
logs = self.get_all_logs()
|
||||
if 0 <= index < len(logs):
|
||||
if action:
|
||||
logs[index]["action"] = action
|
||||
if details:
|
||||
logs[index]["details"] = details
|
||||
self._rewrite_logs_file(logs)
|
||||
return True
|
||||
return False
|
||||
|
||||
def delete_log(self, index):
|
||||
"""Delete a specific log entry by index (0-based)."""
|
||||
logs = self.get_all_logs()
|
||||
if 0 <= index < len(logs):
|
||||
logs.pop(index)
|
||||
self._rewrite_logs_file(logs)
|
||||
return True
|
||||
return False
|
||||
|
||||
# ------------------- INTERNAL HELPER -------------------
|
||||
|
||||
def _rewrite_logs_file(self, logs):
|
||||
"""Overwrite the log file with current logs."""
|
||||
with open(self.filepath, "w", encoding="utf-8") as f:
|
||||
for log in logs:
|
||||
f.write(
|
||||
f"Timestamp: {log['timestamp']} | "
|
||||
f"User: {log['user']} | "
|
||||
f"Action: {log['action']} | "
|
||||
f"Details: {log['details']}\n"
|
||||
)
|
||||
|
||||
|
||||
filtered_logs = [
|
||||
log for log in filtered_logs
|
||||
if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt
|
||||
]
|
||||
|
||||
|
||||
except Exception as e:
|
||||
print("Date filter error:", e)
|
||||
#Why catching all exceptions? Need to handle specific exceptions
|
||||
|
||||
# Username filter
|
||||
if userName:
|
||||
filtered_logs = [log for log in filtered_logs if userName.lower() in log["user"].lower()]
|
||||
|
||||
return filtered_logs
|
||||
@@ -1,150 +1,246 @@
|
||||
import config
|
||||
import mysql.connector
|
||||
# from flask import request
|
||||
# from model.ItemCRUD import ItemCRUD
|
||||
# from model.Utilities import ItemCRUDType
|
||||
|
||||
class GSTReleasemodel:
|
||||
# class GSTRelease:
|
||||
# """CRUD operations for GST Release using ItemCRUD"""
|
||||
|
||||
# def __init__(self):
|
||||
# self.isSuccess = False
|
||||
# self.resultMessage = ""
|
||||
|
||||
# # ------------------- Add GST Release -------------------
|
||||
# def AddGSTRelease(self, request):
|
||||
# pmc_no = request.form.get('PMC_No', '').strip()
|
||||
# invoice_no = request.form.get('invoice_No', '').strip()
|
||||
|
||||
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
|
||||
# gst.AddItem(
|
||||
# request=request,
|
||||
# parentid=None,
|
||||
# childname=f"{pmc_no}-{invoice_no}",
|
||||
# storedprocfetch="CheckGSTReleaseExists",
|
||||
# storedprocadd="AddGSTReleaseFromExcel" # your stored procedure handles extra fields
|
||||
# )
|
||||
|
||||
# self.isSuccess = gst.isSuccess
|
||||
# self.resultMessage = str(gst.resultMessage)
|
||||
|
||||
# # ------------------- Get All GST Releases -------------------
|
||||
# def GetAllGSTReleases(self):
|
||||
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
|
||||
# # Pass request=None for fetch
|
||||
# rows = gst.GetAllData(request=None, storedproc="GetAllGSTReleases")
|
||||
|
||||
# self.isSuccess = gst.isSuccess
|
||||
# self.resultMessage = str(gst.resultMessage)
|
||||
|
||||
# data = []
|
||||
# for row in rows:
|
||||
# data.append({
|
||||
# "gst_release_id": row[0],
|
||||
# "pmc_no": row[1],
|
||||
# "invoice_no": row[2],
|
||||
# "basic_amount": row[3],
|
||||
# "final_amount": row[4],
|
||||
# "total_amount": row[5],
|
||||
# "utr": row[6],
|
||||
# "contractor_id": row[7]
|
||||
# })
|
||||
# return data
|
||||
|
||||
# # ------------------- Get GST Release By ID -------------------
|
||||
# def GetGSTReleaseByID(self, gst_release_id):
|
||||
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
|
||||
# row = gst.GetDataByID(gst_release_id, request=None, storedproc="GetGSTReleaseById")
|
||||
|
||||
# self.isSuccess = gst.isSuccess
|
||||
# self.resultMessage = str(gst.resultMessage)
|
||||
|
||||
# if row:
|
||||
# return {
|
||||
# "gst_release_id": row[0],
|
||||
# "pmc_no": row[1],
|
||||
# "invoice_no": row[2],
|
||||
# "basic_amount": row[3],
|
||||
# "final_amount": row[4],
|
||||
# "total_amount": row[5],
|
||||
# "utr": row[6],
|
||||
# "contractor_id": row[7]
|
||||
# }
|
||||
# return None
|
||||
|
||||
# # ------------------- Edit GST Release -------------------
|
||||
# def EditGSTRelease(self, request, gst_release_id):
|
||||
# pmc_no = request.form.get('PMC_No', '').strip()
|
||||
# invoice_no = request.form.get('invoice_No', '').strip()
|
||||
|
||||
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
|
||||
# gst.EditItem(
|
||||
# request=request,
|
||||
# childid=gst_release_id,
|
||||
# parentid=None,
|
||||
# childname=f"{pmc_no}-{invoice_no}",
|
||||
# storedprocupdate="UpdateGSTRelease" # stored procedure handles extra fields
|
||||
# )
|
||||
|
||||
# self.isSuccess = gst.isSuccess
|
||||
# self.resultMessage = str(gst.resultMessage)
|
||||
|
||||
# # ------------------- Delete GST Release -------------------
|
||||
# def DeleteGSTRelease(self, gst_release_id):
|
||||
# gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
|
||||
# gst.DeleteItem(
|
||||
# itemID=gst_release_id,
|
||||
# request=None,
|
||||
# storedprocDelete="DeleteGSTReleaseById"
|
||||
# )
|
||||
|
||||
# self.isSuccess = gst.isSuccess
|
||||
# self.resultMessage = str(gst.resultMessage)
|
||||
|
||||
from flask import request, jsonify
|
||||
from model.ItemCRUD import ItemCRUD
|
||||
from model.Utilities import ItemCRUDType
|
||||
|
||||
|
||||
class GSTRelease:
|
||||
|
||||
def __init__(self):
|
||||
self.isSuccess = False
|
||||
self.resultMessage = ""
|
||||
|
||||
# ------------------- Add GST Release -------------------
|
||||
def AddGSTRelease(self, request):
|
||||
try:
|
||||
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
|
||||
|
||||
data = {
|
||||
"PMC_No": request.form.get("PMC_No", "").strip(),
|
||||
"Invoice_No": request.form.get("Invoice_No", "").strip(),
|
||||
"Basic_Amount": float(request.form.get("Basic_Amount", 0) or 0),
|
||||
"Final_Amount": float(request.form.get("Final_Amount", 0) or 0),
|
||||
"Total_Amount": float(request.form.get("Total_Amount", 0) or 0),
|
||||
"UTR": request.form.get("UTR", "").strip(),
|
||||
"Contractor_ID": int(request.form.get("Contractor_ID", 0) or 0)
|
||||
}
|
||||
|
||||
gst.AddItem(
|
||||
request=request,
|
||||
data=data,
|
||||
storedprocfetch="CheckGSTReleaseExists",
|
||||
storedprocadd="AddGSTReleaseFromExcel"
|
||||
)
|
||||
|
||||
self.isSuccess = gst.isSuccess
|
||||
self.resultMessage = str(gst.resultMessage)
|
||||
|
||||
except Exception as e:
|
||||
print("ERROR in AddGSTRelease:", e)
|
||||
self.isSuccess = False
|
||||
self.resultMessage = str(e)
|
||||
|
||||
return jsonify({"success": self.isSuccess, "message": self.resultMessage})
|
||||
|
||||
# ------------------- Edit GST Release -------------------
|
||||
def EditGSTRelease(self, request, gst_release_id):
|
||||
try:
|
||||
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
|
||||
|
||||
data = {
|
||||
"PMC_No": request.form.get("PMC_No", "").strip(),
|
||||
"Invoice_No": request.form.get("Invoice_No", "").strip(),
|
||||
"Basic_Amount": float(request.form.get("Basic_Amount", 0) or 0),
|
||||
"Final_Amount": float(request.form.get("Final_Amount", 0) or 0),
|
||||
"Total_Amount": float(request.form.get("Total_Amount", 0) or 0),
|
||||
"UTR": request.form.get("UTR", "").strip()
|
||||
}
|
||||
|
||||
gst.EditItem(
|
||||
request=request,
|
||||
childid=gst_release_id,
|
||||
data=data,
|
||||
storedprocupdate="UpdateGSTRelease"
|
||||
)
|
||||
|
||||
self.isSuccess = gst.isSuccess
|
||||
self.resultMessage = str(gst.resultMessage)
|
||||
|
||||
except Exception as e:
|
||||
print("ERROR in EditGSTRelease:", e)
|
||||
self.isSuccess = False
|
||||
self.resultMessage = str(e)
|
||||
|
||||
return jsonify({"success": self.isSuccess, "message": self.resultMessage})
|
||||
|
||||
# ------------------- Delete GST Release -------------------
|
||||
def DeleteGSTRelease(self, gst_release_id):
|
||||
try:
|
||||
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
|
||||
|
||||
gst.DeleteItem(
|
||||
request=None,
|
||||
itemID=gst_release_id,
|
||||
storedprocDelete="DeleteGSTReleaseById"
|
||||
)
|
||||
|
||||
self.isSuccess = gst.isSuccess
|
||||
self.resultMessage = str(gst.resultMessage)
|
||||
|
||||
except Exception as e:
|
||||
print("ERROR in DeleteGSTRelease:", e)
|
||||
self.isSuccess = False
|
||||
self.resultMessage = str(e)
|
||||
|
||||
return jsonify({"success": self.isSuccess, "message": self.resultMessage})
|
||||
|
||||
# ------------------- Get All GST Releases -------------------
|
||||
def GetAllGSTReleases(self):
|
||||
try:
|
||||
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
|
||||
|
||||
rows = gst.GetAllData(None, "GetAllGSTReleases")
|
||||
|
||||
data = []
|
||||
for row in rows:
|
||||
data.append({
|
||||
"gst_release_id": row[0],
|
||||
"pmc_no": row[1],
|
||||
"invoice_no": row[2],
|
||||
"basic_amount": row[3],
|
||||
"final_amount": row[4],
|
||||
"total_amount": row[5],
|
||||
"utr": row[6],
|
||||
"contractor_id": row[7]
|
||||
})
|
||||
|
||||
return data
|
||||
|
||||
except Exception as e:
|
||||
print("ERROR in GetAllGSTReleases:", e)
|
||||
return []
|
||||
|
||||
# ------------------- Get GST Release By ID -------------------
|
||||
def GetGSTReleaseByID(self, gst_release_id):
|
||||
try:
|
||||
gst = ItemCRUD(itemType=ItemCRUDType.GSTRelease)
|
||||
|
||||
row = gst.GetDataByID(gst_release_id, "GetGSTReleaseById")
|
||||
|
||||
if row:
|
||||
return {
|
||||
"gst_release_id": row[0],
|
||||
"pmc_no": row[1],
|
||||
"invoice_no": row[2],
|
||||
"basic_amount": row[3],
|
||||
"final_amount": row[4],
|
||||
"total_amount": row[5],
|
||||
"utr": row[6],
|
||||
"contractor_id": row[7]
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def get_connection():
|
||||
connection = config.get_db_connection()
|
||||
if not connection:
|
||||
return None
|
||||
return connection
|
||||
|
||||
@staticmethod
|
||||
def fetch_all_gst_releases():
|
||||
connection = GSTReleasemodel.get_connection()
|
||||
gst_releases = []
|
||||
if connection:
|
||||
cursor = connection.cursor(dictionary=True)
|
||||
|
||||
try:
|
||||
cursor.callproc('GetAllGSTReleases')
|
||||
|
||||
gst_releases = []
|
||||
for result in cursor.stored_results(): # change to procedure
|
||||
gst_releases = result.fetchall()
|
||||
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error fetching GST releases: {e}")
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
return gst_releases
|
||||
|
||||
@staticmethod
|
||||
def insert_gst_release(pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id):
|
||||
connection = GSTReleasemodel.get_connection()
|
||||
if not connection:
|
||||
return False
|
||||
try:
|
||||
cursor = connection.cursor()
|
||||
|
||||
# Insert into gst_release
|
||||
cursor.callproc(
|
||||
'InsertGSTReleaseOnly',
|
||||
[pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id]
|
||||
)
|
||||
|
||||
# Insert into inpayment
|
||||
cursor.callproc(
|
||||
'InsertInpaymentOnly',
|
||||
[pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, contractor_id]
|
||||
)
|
||||
|
||||
connection.commit()
|
||||
return True
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error inserting GST release: {e}")
|
||||
return False
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
|
||||
@staticmethod
|
||||
def fetch_gst_release_by_id(gst_release_id):
|
||||
connection = GSTReleasemodel.get_connection()
|
||||
if not connection:
|
||||
return None
|
||||
data = {}
|
||||
try:
|
||||
cursor = connection.cursor(dictionary=True)
|
||||
|
||||
cursor.callproc('GetGSTReleaseById', [gst_release_id])
|
||||
|
||||
for result in cursor.stored_results():
|
||||
data = result.fetchone()
|
||||
if data:
|
||||
# Convert to array for template
|
||||
data = [
|
||||
data.get('GST_Release_Id'),
|
||||
data.get('PMC_No'),
|
||||
data.get('Invoice_No'),
|
||||
data.get('Basic_Amount'),
|
||||
data.get('Final_Amount'),
|
||||
data.get('Total_Amount'),
|
||||
data.get('UTR')
|
||||
]
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error fetching GST release by id: {e}")
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def update_gst_release(gst_release_id, pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr):
|
||||
connection = GSTReleasemodel.get_connection()
|
||||
if not connection:
|
||||
return False
|
||||
try:
|
||||
cursor = connection.cursor()
|
||||
# Update gst_release
|
||||
cursor.callproc(
|
||||
'UpdateGSTRelease',
|
||||
[pmc_no, invoice_no, basic_amount, final_amount, total_amount, utr, gst_release_id]
|
||||
)
|
||||
# Update inpayment
|
||||
cursor.callproc(
|
||||
'UpdateInpaymentByUTR',
|
||||
[basic_amount, final_amount, total_amount, utr]
|
||||
)
|
||||
connection.commit()
|
||||
return True
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error updating GST release: {e}")
|
||||
return False
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
|
||||
@staticmethod
|
||||
def delete_gst_release(gst_release_id):
|
||||
connection = GSTReleasemodel.get_connection()
|
||||
if not connection:
|
||||
return False, None
|
||||
try:
|
||||
cursor = connection.cursor(dictionary=True)
|
||||
cursor.callproc('GetGSTReleaseUTRById', [gst_release_id])
|
||||
record = None
|
||||
for result in cursor.stored_results():
|
||||
record = result.fetchone()
|
||||
|
||||
if not record:
|
||||
return False, None
|
||||
|
||||
utr = record['UTR']
|
||||
|
||||
# Step 1: Delete gst_release
|
||||
cursor.callproc('DeleteGSTReleaseById', [gst_release_id])
|
||||
|
||||
# Step 2: Reset inpayment using UTR
|
||||
cursor.callproc('ResetInpaymentByUTR', [utr])
|
||||
|
||||
connection.commit()
|
||||
return True, utr
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error deleting GST release: {e}")
|
||||
return False, None
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
except Exception as e:
|
||||
print("ERROR in GetGSTReleaseByID:", e)
|
||||
return None
|
||||
102
model/payment.py
102
model/payment.py
@@ -1,8 +1,13 @@
|
||||
import config
|
||||
import mysql.connector
|
||||
import config
|
||||
import mysql.connector
|
||||
from enum import Enum
|
||||
from model.Utilities import ItemCRUDType
|
||||
|
||||
class Paymentmodel:
|
||||
|
||||
# ---------------- Database Connection ----------------
|
||||
@staticmethod
|
||||
def get_connection():
|
||||
connection = config.get_db_connection()
|
||||
@@ -10,6 +15,7 @@ class Paymentmodel:
|
||||
return None
|
||||
return connection
|
||||
|
||||
# ---------------- Payment Methods ----------------
|
||||
@staticmethod
|
||||
def fetch_all_payments():
|
||||
connection = Paymentmodel.get_connection()
|
||||
@@ -52,14 +58,8 @@ class Paymentmodel:
|
||||
try:
|
||||
cursor = connection.cursor()
|
||||
cursor.callproc('UpdateInpaymentRecord', [
|
||||
subcontractor_id,
|
||||
pmc_no,
|
||||
invoice_no,
|
||||
amount,
|
||||
tds_amount,
|
||||
total_amount,
|
||||
utr
|
||||
])
|
||||
subcontractor_id, pmc_no, invoice_no, amount, tds_amount, total_amount, utr
|
||||
])
|
||||
connection.commit()
|
||||
return True
|
||||
except mysql.connector.Error as e:
|
||||
@@ -80,7 +80,6 @@ class Paymentmodel:
|
||||
cursor.callproc("GetPaymentById", (payment_id,))
|
||||
for result in cursor.stored_results():
|
||||
payment_data = result.fetchone()
|
||||
# Convert to array for template
|
||||
if payment_data:
|
||||
payment_data = [
|
||||
payment_data.get('Payment_Id'),
|
||||
@@ -117,42 +116,99 @@ class Paymentmodel:
|
||||
|
||||
@staticmethod
|
||||
def delete_payment(payment_id):
|
||||
"""
|
||||
Deletes a payment and resets the related inpayment fields in one go.
|
||||
Returns (success, pmc_no, invoice_no)
|
||||
"""
|
||||
connection = Paymentmodel.get_connection()
|
||||
if not connection:
|
||||
return False, None, None
|
||||
|
||||
try:
|
||||
cursor = connection.cursor(dictionary=True)
|
||||
|
||||
# Fetch PMC & Invoice before deleting
|
||||
cursor.callproc('GetPaymentPMCInvoiceById', [payment_id])
|
||||
|
||||
record = {}
|
||||
for result in cursor.stored_results():
|
||||
record = result.fetchone() or {}
|
||||
if not record:
|
||||
return False, None, None
|
||||
|
||||
pmc_no = record['PMC_No']
|
||||
invoice_no = record['Invoice_No']
|
||||
|
||||
# Step 2: Delete the payment using the stored procedure
|
||||
# Delete payment
|
||||
cursor.callproc("DeletePayment", (payment_id,))
|
||||
connection.commit()
|
||||
|
||||
# Step 3: Reset inpayment fields using the stored procedure
|
||||
# Reset inpayment fields
|
||||
cursor.callproc("ResetInpayment", [pmc_no, invoice_no])
|
||||
connection.commit()
|
||||
|
||||
return True, pmc_no, invoice_no
|
||||
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error deleting payment: {e}")
|
||||
return False, None, None
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
|
||||
# ---------------- Item CRUD Methods ----------------
|
||||
@staticmethod
|
||||
def fetch_items(item_type: ItemCRUDType):
|
||||
connection = Paymentmodel.get_connection()
|
||||
items = []
|
||||
if connection:
|
||||
cursor = connection.cursor(dictionary=True)
|
||||
try:
|
||||
cursor.callproc('GetItemsByType', [item_type.value])
|
||||
for result in cursor.stored_results():
|
||||
items = result.fetchall()
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error fetching {item_type.name}: {e}")
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
return items
|
||||
|
||||
@staticmethod
|
||||
def insert_item(item_type: ItemCRUDType, name: str):
|
||||
connection = Paymentmodel.get_connection()
|
||||
if not connection:
|
||||
return False
|
||||
try:
|
||||
cursor = connection.cursor()
|
||||
cursor.callproc('InsertItem', [item_type.value, name])
|
||||
connection.commit()
|
||||
return True
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error inserting {item_type.name}: {e}")
|
||||
return False
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
|
||||
@staticmethod
|
||||
def update_item(item_type: ItemCRUDType, item_id: int, new_name: str):
|
||||
connection = Paymentmodel.get_connection()
|
||||
if not connection:
|
||||
return False
|
||||
try:
|
||||
cursor = connection.cursor()
|
||||
cursor.callproc('UpdateItem', [item_type.value, item_id, new_name])
|
||||
connection.commit()
|
||||
return True
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error updating {item_type.name}: {e}")
|
||||
return False
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
|
||||
@staticmethod
|
||||
def delete_item(item_type: ItemCRUDType, item_id: int):
|
||||
connection = Paymentmodel.get_connection()
|
||||
if not connection:
|
||||
return False
|
||||
try:
|
||||
cursor = connection.cursor()
|
||||
cursor.callproc('DeleteItem', [item_type.value, item_id])
|
||||
connection.commit()
|
||||
return True
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error deleting {item_type.name}: {e}")
|
||||
return False
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
Reference in New Issue
Block a user