211 lines
8.2 KiB
Python
211 lines
8.2 KiB
Python
from flask import Blueprint, request, render_template, send_from_directory, redirect, url_for, current_app, flash
|
|
from app.__init__ import db
|
|
from openpyxl import Workbook
|
|
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
from app.models import Task
|
|
|
|
reports = Blueprint('reports', __name__)
|
|
|
|
|
|
|
|
import logging
|
|
|
|
def clean_text(text):
|
|
if not isinstance(text, str):
|
|
return ""
|
|
return text.strip().replace(",", "").replace("(", "").replace(")", "")\
|
|
.replace(".", "").replace("&", "").replace("\n", "").lower()
|
|
|
|
@reports.route('/report_excel', methods=['GET'])
|
|
def generate_report():
|
|
main_task_rexp = r'[\\/*?:"<>|]'
|
|
block = request.args.get('block', '')
|
|
main_task = request.args.get('main_task', '')
|
|
|
|
block_clean = clean_text(block)
|
|
main_task_clean = clean_text(main_task)
|
|
|
|
if not block_clean:
|
|
return "Please select a Block.", 400
|
|
if not main_task_clean:
|
|
return "Please select a Main Task.", 400
|
|
|
|
print(f"Block selected: {block}")
|
|
print(f"Main task selected: {main_task}")
|
|
|
|
# Filter main task records based on cleaned block and task name
|
|
main_task_records = [task for task in Task.query.filter_by(block_name=block).all()
|
|
if clean_text(task.task_name) == main_task_clean]
|
|
|
|
print(f"Found {len(main_task_records)} main task records")
|
|
|
|
# if not main_task_records:
|
|
# return f"Main Task '{main_task}' not found in the selected block '{block}'.", 404
|
|
if not main_task_records:
|
|
flash("No data found for selected Block and Main Task", "error")
|
|
# return redirect(url_for('generate_report_page'))
|
|
return redirect(url_for('main.generate_report_page'))
|
|
|
|
report_data = {}
|
|
|
|
def safe_float(value):
|
|
try:
|
|
return round(float(value), 2)
|
|
except (ValueError, TypeError):
|
|
return 0.0
|
|
|
|
# Process subtasks for each main task
|
|
for main_task_record in main_task_records:
|
|
main_task_serial_number = main_task_record.serial_number
|
|
|
|
|
|
# Get all subtasks under the selected main task (match cleaned names)
|
|
subtasks_query = [
|
|
task for task in Task.query.filter(Task.block_name == block).all()
|
|
if clean_text(task.parent_task_name) == main_task_clean
|
|
]
|
|
|
|
print(f"Found {len(subtasks_query)} subtasks for main task '{main_task}'")
|
|
|
|
for task in subtasks_query:
|
|
key = task.village_name.strip() if task.village_name else ""
|
|
totalElemList = 26
|
|
if key not in report_data:
|
|
report_data[key] = [None] * totalElemList
|
|
report_data[key][0] = task.id
|
|
report_data[key][1] = task.village_name
|
|
|
|
boq_amount = safe_float(task.boq_amount)
|
|
previous_billing_amount = safe_float(task.previous_billing_amount)
|
|
remaining_amount = safe_float(boq_amount - previous_billing_amount)
|
|
tender_amount = safe_float(task.qty) * safe_float(task.rate)
|
|
values = [
|
|
safe_float(task.qty),
|
|
safe_float(task.rate),
|
|
tender_amount,
|
|
safe_float(task.previous_billed_qty),
|
|
previous_billing_amount,
|
|
remaining_amount
|
|
]
|
|
|
|
# Determine task type section
|
|
task_name_clean = task.task_name.lower() if task.task_name else ""
|
|
start, end = (
|
|
(2, 8) if "supply" in task_name_clean else
|
|
(8, 14) if "erection" in task_name_clean else
|
|
(14, 20) if "testing" in task_name_clean else
|
|
(20, 26) if "commissioning" in task_name_clean else
|
|
(None, None)
|
|
)
|
|
|
|
if start is not None and end is not None:
|
|
for i in range(start, end):
|
|
current_value = safe_float(report_data[key][i]) if report_data[key][i] is not None else 0
|
|
report_data[key][i] = current_value + values[i - start]
|
|
|
|
print(f"Number of villages in report data: {len(report_data)}")
|
|
|
|
# if not report_data:
|
|
# return f"No matching data found for the selected block '{block}' and main task '{main_task}'.", 404
|
|
if not report_data:
|
|
flash("Sub task data not found", "error")
|
|
# return redirect(url_for('generate_report_page'))
|
|
return redirect(url_for('main.generate_report_page'))
|
|
|
|
# Generate Excel report
|
|
sanitized_main_task = re.sub(main_task_rexp, "", main_task)
|
|
|
|
max_length = 30
|
|
|
|
if len(sanitized_main_task) > max_length:
|
|
sanitized_main_task = sanitized_main_task[:max_length]
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
# file_name = f"{sanitized_main_task}_Report.xlsx"
|
|
file_name = f"{sanitized_main_task}_{timestamp}.xlsx"
|
|
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], file_name)
|
|
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws.title = "Subtask Report"
|
|
|
|
# Excel formatting
|
|
thin_border = Border(left=Side(style="thin"), right=Side(style="thin"),
|
|
top=Side(style="thin"), bottom=Side(style="thin"))
|
|
header_fill = PatternFill(start_color="FFC000", end_color="FFC000", fill_type="solid")
|
|
subheader_fill = PatternFill(start_color="92D050", end_color="92D050", fill_type="solid")
|
|
data_row_fill1 = PatternFill(start_color="FFFFFF", end_color="FFFFFF", fill_type="solid")
|
|
data_row_fill2 = PatternFill(start_color="D9EAD3", end_color="D9EAD3", fill_type="solid")
|
|
total_row_fill = PatternFill(start_color="FFF2CC", end_color="FFF2CC", fill_type="solid")
|
|
|
|
ws.merge_cells(start_row=1, start_column=3, end_row=1, end_column=8)
|
|
ws["A1"] = "Task ID"
|
|
ws["B1"] = "Village Name"
|
|
ws["C1"] = "Supply (70%)"
|
|
ws.merge_cells(start_row=1, start_column=9, end_row=1, end_column=14)
|
|
ws["I1"] = "Erection (20%)"
|
|
ws.merge_cells(start_row=1, start_column=15, end_row=1, end_column=20)
|
|
ws["O1"] = "Testing (5%)"
|
|
ws.merge_cells(start_row=1, start_column=21, end_row=1, end_column=26)
|
|
ws["U1"] = "Commissioning (5%)"
|
|
|
|
for start_col in [3, 9, 15, 21]:
|
|
ws.cell(row=2, column=start_col).value = "Tender Qty"
|
|
ws.cell(row=2, column=start_col + 1).value = "Tender Rate"
|
|
ws.cell(row=2, column=start_col + 2).value = "Tender Amount"
|
|
ws.cell(row=2, column=start_col + 3).value = "Previous Bill QTY"
|
|
ws.cell(row=2, column=start_col + 4).value = "Previous Bill Amount"
|
|
ws.cell(row=2, column=start_col + 5).value = "Remaining Amount"
|
|
|
|
# Style header and subheaders
|
|
for col in range(1, 27):
|
|
col_letter = ws.cell(row=2, column=col).column_letter
|
|
ws[f"{col_letter}1"].font = Font(bold=True, size=12)
|
|
ws[f"{col_letter}1"].alignment = Alignment(horizontal="center", vertical="center")
|
|
ws[f"{col_letter}1"].fill = header_fill
|
|
ws[f"{col_letter}1"].border = thin_border
|
|
|
|
ws[f"{col_letter}2"].font = Font(bold=True, size=11)
|
|
ws[f"{col_letter}2"].alignment = Alignment(horizontal="center", vertical="center")
|
|
ws[f"{col_letter}2"].fill = subheader_fill
|
|
ws[f"{col_letter}2"].border = thin_border
|
|
|
|
# Fill data
|
|
row_index = 3
|
|
totals = ["Total", ""] + [0] * 24
|
|
sum_columns = [4, 6, 7, 10, 12, 13, 16, 18, 19, 22, 24, 25]
|
|
|
|
for row_data in report_data.values():
|
|
ws.append(row_data)
|
|
fill = data_row_fill1 if row_index % 2 != 0 else data_row_fill2
|
|
for col in range(1, 27):
|
|
ws.cell(row=row_index, column=col).fill = fill
|
|
ws.cell(row=row_index, column=col).border = thin_border
|
|
for i in sum_columns:
|
|
totals[i] += safe_float(row_data[i])
|
|
row_index += 1
|
|
|
|
# Add totals
|
|
ws.append(totals)
|
|
for col in range(1, 27):
|
|
ws.cell(row=row_index, column=col).font = Font(bold=True)
|
|
ws.cell(row=row_index, column=col).fill = total_row_fill
|
|
ws.cell(row=row_index, column=col).alignment = Alignment(horizontal="center")
|
|
ws.cell(row=row_index, column=col).border = thin_border
|
|
|
|
for i in range(1, 27):
|
|
ws.column_dimensions[ws.cell(row=2, column=i).column_letter].width = 20
|
|
|
|
wb.save(file_path)
|
|
print(f"Report generated: {file_name}")
|
|
return redirect(url_for('reports.download_report', filename=file_name))
|
|
|
|
|
|
@reports.route('/download/<filename>')
|
|
def download_report(filename):
|
|
return send_from_directory(current_app.config['UPLOAD_FOLDER'], filename, as_attachment=True)
|