testing code

This commit is contained in:
2026-04-04 14:18:58 +05:30
parent 0ec878d2ec
commit 7981ad0206
12 changed files with 930 additions and 595 deletions

View File

@@ -1,185 +1,185 @@
import openpyxl
from openpyxl.styles import Font, PatternFill
import config
from flask_login import current_user
from model.Log import LogHelper
from model.excel import excel
from model.Report import ReportHelper
from model.FolderAndFile import FolderAndFile
# import openpyxl
# from openpyxl.styles import Font, PatternFill
# import config
# from flask_login import current_user
# from model.Log import LogHelper
# from model.excel import excel
# from model.Report import ReportHelper
# from model.FolderAndFile import FolderAndFile
class PmcReport:
# class PmcReport:
@staticmethod
def get_pmc_report(pmc_no):
# @staticmethod
# def get_pmc_report(pmc_no):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
# connection = config.get_db_connection()
# cursor = connection.cursor(dictionary=True, buffered=True)
try:
pmc_info = ReportHelper.execute_sp(cursor, 'GetContractorInfoByPmcNo', [pmc_no], True)
# try:
# pmc_info = ReportHelper.execute_sp(cursor, 'GetContractorInfoByPmcNo', [pmc_no], True)
if not pmc_info:
return None
# if not pmc_info:
# return None
cursor.callproc("Get_pmc_hold_types", (pmc_no, pmc_info["Contractor_Id"]))
hold_types = next(cursor.stored_results()).fetchall()
# cursor.callproc("Get_pmc_hold_types", (pmc_no, pmc_info["Contractor_Id"]))
# hold_types = next(cursor.stored_results()).fetchall()
# Extract hold_type_ids
hold_type_ids = [ht['hold_type_id'] for ht in hold_types]
# # Extract hold_type_ids
# hold_type_ids = [ht['hold_type_id'] for ht in hold_types]
invoices = []
hold_amount_total = 0
hold_type_ids_str = ",".join(map(str, hold_type_ids))
cursor.callproc(
'GetInvoices_WithHold',
[pmc_no, pmc_info["Contractor_Id"], hold_type_ids_str]
)
# invoices = []
# hold_amount_total = 0
# hold_type_ids_str = ",".join(map(str, hold_type_ids))
# cursor.callproc(
# 'GetInvoices_WithHold',
# [pmc_no, pmc_info["Contractor_Id"], hold_type_ids_str]
# )
for result in cursor.stored_results():
invoices = result.fetchall()
# for result in cursor.stored_results():
# invoices = result.fetchall()
if hold_type_ids:
hold_amount_total = sum(row.get('hold_amount', 0) or 0 for row in invoices)
# if hold_type_ids:
# hold_amount_total = sum(row.get('hold_amount', 0) or 0 for row in invoices)
total_invo_final = sum(row.get('Final_Amount', 0) or 0 for row in invoices)
# total_invo_final = sum(row.get('Final_Amount', 0) or 0 for row in invoices)
gst_rel = ReportHelper.execute_sp(cursor, 'GetGSTReleaseByPMC', [pmc_no])
total_gst_basic = sum(row.get('basic_amount', 0) or 0 for row in gst_rel)
total_gst_final = sum(row.get('final_amount', 0) or 0 for row in gst_rel)
# gst_rel = ReportHelper.execute_sp(cursor, 'GetGSTReleaseByPMC', [pmc_no])
# total_gst_basic = sum(row.get('basic_amount', 0) or 0 for row in gst_rel)
# total_gst_final = sum(row.get('final_amount', 0) or 0 for row in gst_rel)
hold_release = ReportHelper.execute_sp(cursor, 'GetHoldReleaseByPMC', [pmc_no])
# hold_release = ReportHelper.execute_sp(cursor, 'GetHoldReleaseByPMC', [pmc_no])
credit_note = ReportHelper.execute_sp(cursor, 'GetCreditNoteByPMC', [pmc_no])
# credit_note = ReportHelper.execute_sp(cursor, 'GetCreditNoteByPMC', [pmc_no])
payments = ReportHelper.execute_sp(cursor, 'GetPaymentsByPMC', [pmc_no])
total_pay_amount = sum(row.get('Payment_Amount', 0) or 0 for row in payments)
total_pay_total = sum(row.get('Total_amount', 0) or 0 for row in payments)
# payments = ReportHelper.execute_sp(cursor, 'GetPaymentsByPMC', [pmc_no])
# total_pay_amount = sum(row.get('Payment_Amount', 0) or 0 for row in payments)
# total_pay_total = sum(row.get('Total_amount', 0) or 0 for row in payments)
totals = {
"sum_invo_basic_amt": sum(row.get('Basic_Amount', 0) or 0 for row in invoices),
"sum_invo_debit_amt": sum(row.get('Debit_Amount', 0) or 0 for row in invoices),
"sum_invo_after_debit_amt": sum(row.get('After_Debit_Amount', 0) or 0 for row in invoices),
"sum_invo_amt": sum(row.get('Amount', 0) or 0 for row in invoices),
"sum_invo_gst_amt": sum(row.get('GST_Amount', 0) or 0 for row in invoices),
"sum_invo_tds_amt": sum(row.get('TDS_Amount', 0) or 0 for row in invoices),
"sum_invo_ds_amt": sum(row.get('SD_Amount', 0) or 0 for row in invoices),
"sum_invo_on_commission": sum(row.get('On_Commission', 0) or 0 for row in invoices),
"sum_invo_hydro_test": sum(row.get('Hydro_Testing', 0) or 0 for row in invoices),
"sum_invo_gst_sd_amt": sum(row.get('GST_SD_Amount', 0) or 0 for row in invoices),
"sum_invo_final_amt": total_invo_final,
"sum_invo_hold_amt": hold_amount_total,
"sum_gst_basic_amt": total_gst_basic,
"sum_gst_final_amt": total_gst_final,
"sum_pay_payment_amt": total_pay_amount,
"sum_pay_tds_payment_amt": sum(row.get('TDS_Payment_Amount', 0) or 0 for row in payments),
"sum_pay_total_amt": total_pay_total
}
# totals = {
# "sum_invo_basic_amt": sum(row.get('Basic_Amount', 0) or 0 for row in invoices),
# "sum_invo_debit_amt": sum(row.get('Debit_Amount', 0) or 0 for row in invoices),
# "sum_invo_after_debit_amt": sum(row.get('After_Debit_Amount', 0) or 0 for row in invoices),
# "sum_invo_amt": sum(row.get('Amount', 0) or 0 for row in invoices),
# "sum_invo_gst_amt": sum(row.get('GST_Amount', 0) or 0 for row in invoices),
# "sum_invo_tds_amt": sum(row.get('TDS_Amount', 0) or 0 for row in invoices),
# "sum_invo_ds_amt": sum(row.get('SD_Amount', 0) or 0 for row in invoices),
# "sum_invo_on_commission": sum(row.get('On_Commission', 0) or 0 for row in invoices),
# "sum_invo_hydro_test": sum(row.get('Hydro_Testing', 0) or 0 for row in invoices),
# "sum_invo_gst_sd_amt": sum(row.get('GST_SD_Amount', 0) or 0 for row in invoices),
# "sum_invo_final_amt": total_invo_final,
# "sum_invo_hold_amt": hold_amount_total,
# "sum_gst_basic_amt": total_gst_basic,
# "sum_gst_final_amt": total_gst_final,
# "sum_pay_payment_amt": total_pay_amount,
# "sum_pay_tds_payment_amt": sum(row.get('TDS_Payment_Amount', 0) or 0 for row in payments),
# "sum_pay_total_amt": total_pay_total
# }
return {
"info": pmc_info,
"invoices": invoices,
"hold_types": hold_types,
"gst_rel": gst_rel,
"payments": payments,
"credit_note": credit_note,
"hold_release": hold_release,
"total": totals
}
# return {
# "info": pmc_info,
# "invoices": invoices,
# "hold_types": hold_types,
# "gst_rel": gst_rel,
# "payments": payments,
# "credit_note": credit_note,
# "hold_release": hold_release,
# "total": totals
# }
finally:
cursor.close()
connection.close()
# finally:
# cursor.close()
# connection.close()
@staticmethod
def download_pmc_report(pmc_no):
# @staticmethod
# def download_pmc_report(pmc_no):
connection = config.get_db_connection()
if not connection:
return None
# connection = config.get_db_connection()
# if not connection:
# return None
cursor = connection.cursor(dictionary=True)
# cursor = connection.cursor(dictionary=True)
try:
filename = f"PMC_Report_{pmc_no}.xlsx"
# try:
# filename = f"PMC_Report_{pmc_no}.xlsx"
output_folder = FolderAndFile.get_download_folder()
output_file = FolderAndFile.get_download_path(filename)
# output_folder = FolderAndFile.get_download_folder()
# output_file = FolderAndFile.get_download_path(filename)
# ================= DATA FETCH =================
# # ================= DATA FETCH =================
contractor_info = ReportHelper.execute_sp(
cursor, 'GetContractorInfoByPmcNo', [pmc_no]
)
# contractor_info = ReportHelper.execute_sp(
# cursor, 'GetContractorInfoByPmcNo', [pmc_no]
# )
contractor_info = contractor_info[0] if contractor_info else None
print("contractor_info:::",contractor_info)
# contractor_info = contractor_info[0] if contractor_info else None
# print("contractor_info:::",contractor_info)
if not contractor_info:
return None
# if not contractor_info:
# return None
hold_types = ReportHelper.execute_sp(
cursor, 'GetHoldTypesByContractor',
[contractor_info["Contractor_Id"]]
)
# hold_types = ReportHelper.execute_sp(
# cursor, 'GetHoldTypesByContractor',
# [contractor_info["Contractor_Id"]]
# )
hold_type_map = {
ht['hold_type_id']: ht['hold_type'] for ht in hold_types
}
# hold_type_map = {
# ht['hold_type_id']: ht['hold_type'] for ht in hold_types
# }
invoices = ReportHelper.execute_sp(
cursor, 'GetInvoicesByContractorOrPMCNo', [None,pmc_no]
)
# invoices = ReportHelper.execute_sp(
# cursor, 'GetInvoicesByContractorOrPMCNo', [None,pmc_no]
# )
credit_notes = ReportHelper.execute_sp(
cursor, 'NewGetCreditNotesByPMCNo', [pmc_no]
)
credit_note_map = {}
for cn in credit_notes:
key = (
str(cn['PMC_No']).strip()
)
credit_note_map.setdefault(key, []).append(cn)
# credit_notes = ReportHelper.execute_sp(
# cursor, 'NewGetCreditNotesByPMCNo', [pmc_no]
# )
# credit_note_map = {}
# for cn in credit_notes:
# key = (
# str(cn['PMC_No']).strip()
# )
# credit_note_map.setdefault(key, []).append(cn)
hold_amounts = ReportHelper.execute_sp(
cursor, 'GetHoldAmountsByContractor',
[contractor_info["Contractor_Id"]]
)
# hold_amounts = ReportHelper.execute_sp(
# cursor, 'GetHoldAmountsByContractor',
# [contractor_info["Contractor_Id"]]
# )
gst_releases = ReportHelper.execute_sp(
cursor, 'GetGSTReleaseDetailsByPMC', [pmc_no]
)
gst_release_map = {}
for gr in gst_releases:
# gst_releases = ReportHelper.execute_sp(
# cursor, 'GetGSTReleaseDetailsByPMC', [pmc_no]
# )
# gst_release_map = {}
# for gr in gst_releases:
key = (
str(gr['PMC_No']).strip()
)
gst_release_map.setdefault(key, []).append(gr)
# key = (
# str(gr['PMC_No']).strip()
# )
# gst_release_map.setdefault(key, []).append(gr)
# ================= DATA MAPPING =================
hold_data = {}
for h in hold_amounts:
hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
# # ================= DATA MAPPING =================
# hold_data = {}
# for h in hold_amounts:
# hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
# ================= LOG =================
LogHelper.log_action(
"Download PMC Report",
f"User {current_user.id} Download PMC Report '{pmc_no}'"
)
# # ================= LOG =================
# LogHelper.log_action(
# "Download PMC Report",
# f"User {current_user.id} Download PMC Report '{pmc_no}'"
# )
excel.generate_excel(
0, contractor_info, invoices, hold_types, hold_data,
credit_note_map,gst_release_map, output_file
)
# excel.generate_excel(
# 0, contractor_info, invoices, hold_types, hold_data,
# credit_note_map,gst_release_map, output_file
# )
return output_folder, filename
# return output_folder, filename
except Exception as e:
print(f"Error generating PMC report: {e}")
return None
# except Exception as e:
# print(f"Error generating PMC report: {e}")
# return None
finally:
cursor.close()
connection.close()
# finally:
# cursor.close()
# connection.close()

View File

@@ -86,67 +86,7 @@ class ReportHelper:
return data
@staticmethod
def get_contractor_report(contractor_id):
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
try:
# Contractor Info (only one fetch)
contInfo = ReportHelper.execute_sp(cursor, 'GetContractorInfo', [contractor_id], True)
# Hold Types
hold_types = ReportHelper.execute_sp(cursor, 'GetContractorHoldTypes', [contractor_id])
# Invoices
invoices = ReportHelper.execute_sp(cursor, 'GetContractorInvoices', [contractor_id])
# GST Release
gst_rel = ReportHelper.execute_sp(cursor, 'GetGSTRelease', [contractor_id])
# Hold Release
hold_release = ReportHelper.execute_sp(cursor, 'GetHoldRelease', [contractor_id])
# Credit Note
credit_note = ReportHelper.execute_sp(cursor, 'GetCreditNotesByContractor', [contractor_id])
# Payments
payments = ReportHelper.execute_sp(cursor, 'GetPayments', [contractor_id])
# Totals
total = {
"sum_invo_basic_amt": float(sum(row['Basic_Amount'] or 0 for row in invoices)),
"sum_invo_debit_amt": float(sum(row['Debit_Amount'] or 0 for row in invoices)),
"sum_invo_after_debit_amt": float(sum(row['After_Debit_Amount'] or 0 for row in invoices)),
"sum_invo_amt": float(sum(row['Amount'] or 0 for row in invoices)),
"sum_invo_gst_amt": float(sum(row['GST_Amount'] or 0 for row in invoices)),
"sum_invo_tds_amt": float(sum(row['TDS_Amount'] or 0 for row in invoices)),
"sum_invo_ds_amt": float(sum(row['SD_Amount'] or 0 for row in invoices)),
"sum_invo_on_commission": float(sum(row['On_Commission'] or 0 for row in invoices)),
"sum_invo_hydro_test": float(sum(row['Hydro_Testing'] or 0 for row in invoices)),
"sum_invo_gst_sd_amt": float(sum(row['GST_SD_Amount'] or 0 for row in invoices)),
"sum_invo_final_amt": float(sum(row['Final_Amount'] or 0 for row in invoices)),
"sum_invo_hold_amt": float(sum(row['hold_amount'] or 0 for row in invoices)),
"sum_gst_basic_amt": float(sum(row['basic_amount'] or 0 for row in gst_rel)),
"sum_gst_final_amt": float(sum(row['final_amount'] or 0 for row in gst_rel)),
"sum_pay_payment_amt": float(sum(row['Payment_Amount'] or 0 for row in payments)),
"sum_pay_tds_payment_amt": float(sum(row['TDS_Payment_Amount'] or 0 for row in payments)),
"sum_pay_total_amt": float(sum(row['Total_amount'] or 0 for row in payments))
}
current_date = datetime.now().strftime('%Y-%m-%d')
finally:
cursor.close()
connection.close()
return {
"contInfo": contInfo,
"invoices": invoices,
"hold_types": hold_types,
"gst_rel": gst_rel,
"payments": payments,
"credit_note": credit_note,
"hold_release": hold_release,
"total": total,
"current_date": current_date
}
@staticmethod
def get_contractor_info(contractor_id):
from model.ContractorInfo import ContractorInfo
@@ -154,53 +94,4 @@ class ReportHelper:
return contractor.contInfo if contractor.contInfo else None
@staticmethod
def create_contractor_report(contractor_id):
fileName=f"Contractor_Report_{contractor_id}.xlsx"
output_file = FolderAndFile.get_download_path(filename=fileName)
# Fetch Data
contInfo = ReportHelper.get_contractor_info(contractor_id)
if not contInfo:
return None, "No contractor found"
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
hold_types = ReportHelper.execute_sp(cursor, 'HoldTypesByContractorId', [contractor_id])
invoices = ReportHelper.execute_sp(cursor, 'GetInvoicesByContractorOrPMCNo', [contractor_id,None])
hold_amounts = ReportHelper.execute_sp(cursor, 'HoldAmountsByContractorId', [contractor_id])
hold_data = {}
for h in hold_amounts:
hold_data.setdefault(h['Invoice_Id'], {})[h['hold_type_id']] = h['hold_amount']
# -------- Credit Note MAP --------
credit_note_raw = ReportHelper.execute_sp(cursor, 'GetCreditNotesByContractor', [contractor_id])
credit_note_map = {}
for cn in credit_note_raw:
key = (
str(cn['PMC_No']).strip()
)
credit_note_map.setdefault(key, []).append(cn)
# -------- GST MAP --------
gst_release_raw = ReportHelper.execute_sp(cursor, 'GstReleasesByContractorId', [contractor_id])
gst_release_map = {}
for gr in gst_release_raw:
key = (
str(gr['PMC_No']).strip()
)
gst_release_map.setdefault(key, []).append(gr)
excel.generate_excel(
contractor_id, contInfo, invoices, hold_types, hold_data,
credit_note_map, gst_release_map, output_file
)
return output_file, None

View File

@@ -0,0 +1,159 @@
import config
from model.excel import excel
from model.Report import ReportHelper
from model.FolderAndFile import FolderAndFile
from decimal import Decimal
class UnifiedReportService:
# Static variable to cache report data
_cached_report_data = None
_cached_contractor_id = None
_cached_pmc_no = None
@staticmethod
def get_report(contractor_id=None, pmc_no=None):
# If cached and same request, return cached data
if (UnifiedReportService._cached_report_data and
contractor_id == UnifiedReportService._cached_contractor_id and
pmc_no == UnifiedReportService._cached_pmc_no):
return UnifiedReportService._cached_report_data
connection = config.get_db_connection()
cursor = connection.cursor(dictionary=True, buffered=True)
try:
# ================= BASIC INFO =================
if contractor_id:
info = ReportHelper.execute_sp(cursor,'GetContractorInfo',[contractor_id],True)
else:
info = ReportHelper.execute_sp(cursor,'GetContractorInfoByPmcNo',[pmc_no],True)
contractor_id = info["Contractor_Id"] if info else None
if not info:
return None
# ================= HOLD TYPES =================
hold_types = ReportHelper.execute_sp(cursor,'GetHoldTypesByContractor',[contractor_id]) or []
# ================= INVOICES =================
if pmc_no:
invoices = ReportHelper.execute_sp(cursor,'GetInvoicesByContractorOrPMCNo',[None, pmc_no]) or []
credit_notes = ReportHelper.execute_sp(cursor,'NewGetCreditNotesByPMCNo',[pmc_no]) or []
gst_rel = ReportHelper.execute_sp(cursor,'GetGSTReleaseDetailsByPMC',[pmc_no]) or []
payments = ReportHelper.execute_sp(cursor,'GetPaymentsByPMC',[pmc_no]) or []
else:
invoices = ReportHelper.execute_sp(cursor,'GetInvoicesByContractorOrPMCNo',[contractor_id, None]) or []
credit_notes = ReportHelper.execute_sp(cursor,'GetCreditNotesByContractor',[contractor_id]) or []
gst_rel = ReportHelper.execute_sp(cursor,'GstReleasesByContractorId',[contractor_id]) or []
payments = ReportHelper.execute_sp(cursor,'GetPayments',[contractor_id]) or []
# ================= HOLD AMOUNTS =================
hold_amounts = ReportHelper.execute_sp(cursor,'GetHoldAmountsByContractor',[contractor_id]) or []
hold_data = {}
for h in hold_amounts:
hold_data.setdefault(h["Invoice_Id"], {})[h["hold_type_id"]] = h["hold_amount"]
# ================= CREDIT NOTES =================
credit_note_map = {}
for cn in credit_notes:
key = str(cn.get("PMC_No")).strip()
credit_note_map.setdefault(key, []).append(cn)
# ================= GST RELEASE =================
gst_release_map = {}
for gr in gst_rel:
key = str(gr.get("PMC_No")).strip()
gst_release_map.setdefault(key, []).append(gr)
# ================= PAYMENTS =================
total = {
"sum_invo_basic_amt": sum(row.get('Basic_Amount', 0) or 0 for row in invoices),
"sum_invo_debit_amt": sum(row.get('Debit_Amount', 0) or 0 for row in invoices),
"sum_invo_after_debit_amt": sum(row.get('After_Debit_Amount', 0) or 0 for row in invoices),
"sum_invo_amt": sum(row.get('Amount', 0) or 0 for row in invoices),
"sum_invo_gst_amt": sum(row.get('GST_Amount', 0) or 0 for row in invoices),
"sum_invo_tds_amt": sum(row.get('TDS_Amount', 0) or 0 for row in invoices),
"sum_invo_ds_amt": sum(row.get('SD_Amount', 0) or 0 for row in invoices),
"sum_invo_on_commission": sum(row.get('On_Commission', 0) or 0 for row in invoices),
"sum_invo_hydro_test": sum(row.get('Hydro_Testing', 0) or 0 for row in invoices),
"sum_invo_gst_sd_amt": sum(row.get('GST_SD_Amount', 0) or 0 for row in invoices),
"sum_invo_final_amt": sum(r.get("Final_Amount", 0) or 0 for r in invoices),
"sum_invo_hold_amt": sum(r.get("hold_amount", 0) or 0 for r in hold_amounts),
"sum_gst_basic_amt": sum(r.get("Basic_Amount", 0) or 0 for r in gst_rel),
"sum_gst_final_amt": sum(r.get("Final_Amount", 0) or 0 for r in gst_rel),
"sum_gst_total_amt": sum(r.get("Total_Amount", 0) or 0 for r in gst_rel),
"sum_pay_total_amt": sum(Decimal(r.get("Total_amount") or 0) for r in payments),
"sum_pay_payment_amt": sum(Decimal(r.get("Total_amount") or 0) for r in payments), # same as above if you want
"sum_pay_tds_payment_amt": sum(Decimal(r.get("TDS_Payment_Amount") or 0) for r in payments),
}
# Prepare final report
report_data = {
"info": info,
"invoices": invoices,
"hold_types": hold_types,
"payments": payments,
"hold_data": hold_data,
"hold_release": hold_data,
"credit_note_map": credit_note_map,
"credit_note": credit_notes,
"gst_release_map": gst_release_map,
"gst_rel": gst_rel,
"total": total
}
# Cache the report
UnifiedReportService._cached_report_data = report_data
UnifiedReportService._cached_contractor_id = contractor_id
UnifiedReportService._cached_pmc_no = pmc_no
return report_data
finally:
cursor.close()
connection.close()
# ================= DOWNLOAD EXCEL =================
@staticmethod
def download(contractor_id=None, pmc_no=None):
# Use cached report if available
report_data = UnifiedReportService.get_report(contractor_id, pmc_no)
if not report_data:
return None
fileName = (
f"Contractor_Report_{contractor_id}.xlsx"
if contractor_id else f"PMC_Report_{pmc_no}.xlsx"
)
output_file = FolderAndFile.get_download_path(fileName)
excel.generate_excel(
contractor_id or 0,
report_data["info"],
report_data["invoices"],
report_data["hold_types"],
report_data["hold_data"],
report_data["credit_note_map"],
report_data["gst_release_map"],
report_data["payments"],
output_file
)
return output_file

View File

@@ -10,16 +10,26 @@ from model.FolderAndFile import FolderAndFile
class excel:
@staticmethod
def generate_excel(contractor_id, contInfo, invoices, hold_types, hold_data,
credit_note_map, gst_release_map, output_file):
# def generate_excel(contractor_id, contInfo, invoices, hold_types, hold_data,
# credit_note_map, gst_release_map, output_file):
def generate_excel(
contractor_id,
info,
invoices,
hold_types,
hold_data,
credit_note_map,
gst_release_map,
payments,
output_file
):
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = "Contractor Report"
# Contractor Info
for field, value in contInfo.items():
for field, value in info.items():
sheet.append([field.replace("_", " "), value])
sheet.append([])