modify upload documents and filter store-proc commit

This commit is contained in:
2025-12-03 19:08:28 +05:30
parent 21eb55d4b4
commit dd339070ab
5 changed files with 19 additions and 965 deletions

View File

@@ -30,29 +30,20 @@ class DocumentHandler:
return
cursor = connection.cursor(dictionary=True)
# --- FILTER QUERY ---
query = "SELECT * FROM documents WHERE 1=1"
params = []
cursor.callproc("GetDocuments", [year, stage])
if year != "":
query += " AND year = %s"
params.append(year)
if stage != "":
query += " AND stage = %s"
params.append(stage)
cursor.execute(query, params)
self.documents = cursor.fetchall()
# fetch first result set
for result in cursor.stored_results():
self.documents = result.fetchall()
break
# ---- GET YEARS FROM STORED PROCEDURE ----
cursor.callproc("GetYear")
for result in cursor.stored_results():
year_rows = result.fetchall()
break # only first result set
break
self.years = [row['year'] for row in year_rows]
@@ -60,10 +51,8 @@ class DocumentHandler:
connection.close()
self.isSuccess = True
# UPLOAD DOCUMENTS
# Upload Documents
def Upload(self, request):
"""Log user actions with timestamp, user, action, and details."""
dbconfig = DBConfig()
connection = dbconfig.get_db_connection()
@@ -72,25 +61,17 @@ class DocumentHandler:
files = request.files.getlist('documents')
year = request.form['year']
stage = request.form['stage']
for file in files:
if file is not FileHandler.ALLOWED_EXTENSIONS:
continue
for file in files:
extension = file.filename.rsplit('.', 1)[1]
if extension not in FileHandler.ALLOWED_EXTENSIONS:
print("Skip invalid file type : ",extension)
continue
filename = secure_filename(file.filename)
filepath = os.path.join(FileHandler.UPLOAD_FOLDER, filename)
extension = file.filename.rsplit('.', 1)[1]
# Need to Check whetehr all three items are required
file.save(filepath)
# cursor.execute("""
# INSERT INTO documents (filename, filepath, filetype, year, stage)
# VALUES (%s, %s, %s, %s, %s)
# """, (filename, filepath, file.filename.rsplit('.', 1)[1], year, stage))
cursor.callproc('InsertDocument', [ filename, filepath, extension, year, stage ])
connection.commit()
@@ -98,13 +79,11 @@ class DocumentHandler:
connection.close()
# return redirect(url_for('view_documents'))
# Summary report
def Summary_report(self, request):
dbconfig = DBConfig()
connection = dbconfig.get_db_connection()
year = request.args.get('year')
# if not year get all year in list.
@@ -113,11 +92,7 @@ class DocumentHandler:
allYears = yearGetter.get_year_by_model("AllYearsInAllModel")
yearGetter.close()
return render_template(
'summary_reports.html',
years=allYears,
message="Please select a year to download."
)
return render_template('summary_reports.html', years=allYears,message="Please select a year to download.")
# for excel
try:
@@ -132,7 +107,6 @@ class DocumentHandler:
for stage_name, table_name in stages.items():
cursor = connection.cursor(dictionary=True)
cursor.callproc("sp_get_stage_data", [table_name, year])
for result in cursor.stored_results():

707
main.py
View File

@@ -5,22 +5,19 @@ import pymysql
import io
import mysql.connector
from werkzeug.utils import secure_filename
from AppCode.FileHandler import FileHandler
from AppCode.DocumentHandler import DocumentHandler
from config import db_config
from AppCode.Config import DBConfig
from AppCode.FileHandler import FileHandler
from AppCode.DocumentHandler import DocumentHandler
from AppCode.ITRHandler import ITRHandler
from AppCode.AOHandler import AOHandler
from AppCode.CITHandler import CITHandler
from AppCode.ITATHandler import ITATHandler
from AppCode.YearGet import YearGet
app = Flask(__name__)
app.secret_key="secret1234"
app.config['UPLOAD_FOLDER'] = FileHandler.UPLOAD_FOLDER
@@ -49,7 +46,7 @@ def upload_file():
docHandler.Upload(request=request)
return redirect(url_for('view_documents'))
return render_template('upload.html')
# View all documents with filters
@app.route('/documents')
@@ -84,42 +81,6 @@ def uploaded_file(filename):
# @app.route('/itr', methods=['GET', 'POST'])
# def itr_form():
# if request.method == 'POST':
# data = {key: request.form.get(key, 0) for key in request.form}
# conn = mysql.connector.connect(**db_config)
# cursor = conn.cursor()
# query = """
# INSERT INTO itr (
# year, gross_total_income, disallowance_14a, disallowance_37,
# deduction_80ia_business, deduction_80ia_misc, deduction_80ia_other,
# deduction_sec37_disallowance, deduction_80g, net_taxable_income,
# tax_30_percent, tax_book_profit_18_5, tax_payable, surcharge_12,
# edu_cess_3, total_tax_payable, mat_credit, interest_234c,
# total_tax, advance_tax, tds, tcs, tax_on_assessment, refund
# ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
# """
# values = tuple([
# int(data.get('year', 0))
# ] + [
# float(data.get(col, 0)) for col in [
# 'gross_total_income', 'disallowance_14a', 'disallowance_37',
# 'deduction_80ia_business', 'deduction_80ia_misc', 'deduction_80ia_other',
# 'deduction_sec37_disallowance', 'deduction_80g', 'net_taxable_income',
# 'tax_30_percent', 'tax_book_profit_18_5', 'tax_payable', 'surcharge_12',
# 'edu_cess_3', 'total_tax_payable', 'mat_credit', 'interest_234c',
# 'total_tax', 'advance_tax', 'tds', 'tcs', 'tax_on_assessment', 'refund'
# ]
# ])
# cursor.execute(query, values)
# conn.commit()
# flash("ITR record deleted successfully!", "success")
# cursor.close()
# conn.close()
# return redirect(url_for('index'))
# return render_template('itr_form.html')
## ===============================================
## ITR (Income Tax Return) Routes
## ===============================================
@@ -173,44 +134,6 @@ def update_itr(id):
## 3. UPDATE an existing ITR record
# @app.route('/itr/update/<int:id>', methods=['GET', 'POST'])
# def update_itr(id):
# conn = get_db_connection()
# if request.method == 'POST':
# cursor = conn.cursor()
# columns = [
# 'year', 'gross_total_income', 'disallowance_14a', 'disallowance_37',
# 'deduction_80ia_business', 'deduction_80ia_misc', 'deduction_80ia_other',
# 'deduction_sec37_disallowance', 'deduction_80g', 'net_taxable_income',
# 'tax_30_percent', 'tax_book_profit_18_5', 'tax_payable', 'surcharge_12',
# 'edu_cess_3', 'total_tax_payable', 'mat_credit', 'interest_234c',
# 'total_tax', 'advance_tax', 'tds', 'tcs', 'tax_on_assessment', 'refund'
# ]
# # Create the "SET column = %s" part of the query
# set_clause = ', '.join([f"{col} = %s" for col in columns])
# query = f"UPDATE itr SET {set_clause} WHERE id = %s"
# values = [request.form.get(col, 0) for col in columns]
# values.append(id) # Add the ID for the WHERE clause at the end
# cursor.execute(query, tuple(values))
# conn.commit()
# cursor.close()
# conn.close()
# return redirect(url_for('display_itr'))
# # For a GET request, fetch the existing data and show it in the form
# cursor = conn.cursor(dictionary=True)
# cursor.execute("SELECT * FROM itr WHERE id = %s", (id,))
# record = cursor.fetchone()
# cursor.close()
# conn.close()
# return render_template('update_itr.html', record=record)
## ===============================================
## AO (Assessing Officer) Routes
@@ -267,47 +190,6 @@ def delete_ao(id):
# 3. UPDATE AO record
# @app.route('/ao/update/<int:id>', methods=['GET', 'POST'])
# def update_ao(id):
# conn = get_db_connection()
# cursor = conn.cursor(dictionary=True)
# cursor.execute("SELECT * FROM ao WHERE id = %s", (id,))
# ao_record = cursor.fetchone()
# if not ao_record:
# cursor.close()
# conn.close()
# return "AO record not found", 404
# if request.method == 'POST':
# columns = [
# 'year', 'gross_total_income', 'disallowance_14a', 'disallowance_37',
# 'deduction_80ia_business', 'deduction_sec37_disallowance', 'deduction_80g',
# 'net_taxable_income', 'tax_30_percent', 'tax_book_profit_18_5',
# 'surcharge_12', 'edu_cess_3', 'total_tax_payable', 'mat_credit',
# 'interest_234c', 'total_tax', 'advance_tax', 'tds', 'tcs',
# 'tax_on_assessment', 'refund'
# ]
# values = [request.form.get(col, 0) for col in columns]
# set_clause = ", ".join([f"{col}=%s" for col in columns])
# query = f"UPDATE ao SET {set_clause} WHERE id=%s"
# cursor.execute(query, tuple(values) + (id,))
# conn.commit()
# cursor.close()
# conn.close()
# flash("AO record updated successfully!", "success")
# return redirect(url_for('display_ao'))
# cursor.close()
# conn.close()
# return render_template('update_ao.html', record=ao_record)
## =======================================================
## CIT (Commissioner of Income Tax) Routes
## =======================================================
@@ -360,94 +242,6 @@ def update_cit(id):
return render_template('update_cit.html', record=record)
# @app.route('/cit/update/<int:id>', methods=['GET', 'POST'])
# def update_cit(id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM cit WHERE id=%s", (id,))
record = cursor.fetchone()
if not record:
cursor.close()
conn.close()
return "CIT record not found", 404
if request.method == 'POST':
columns = [
"year", "gross_total_income", "deduction_80ia_business", "deduction_sec37_disallowance",
"deduction_80g", "net_taxable_income", "tax_30_percent", "tax_book_profit_18_5",
"tax_payable", "surcharge_12", "edu_cess_3", "total_tax_payable", "mat_credit",
"interest_234c", "total_tax", "advance_tax", "tds", "tcs", "tax_on_assessment", "refund"
]
values = [request.form.get(col, 0) for col in columns]
set_clause = ", ".join([f"{col}=%s" for col in columns])
query = f"UPDATE cit SET {set_clause} WHERE id=%s"
cursor.execute(query, tuple(values)+(id,))
conn.commit()
cursor.close()
conn.close()
return redirect(url_for('display_cit'))
cursor.close()
conn.close()
return render_template('add_cit.html', record=record)
# DISPLAY all CIT records
# @app.route('/cit_records')
# def display_cit():
# conn = get_db_connection()
# cursor = conn.cursor(dictionary=True)
# cursor.execute("SELECT * FROM cit ORDER BY year DESC, id DESC")
# cit_records = cursor.fetchall()
# cursor.close()
# conn.close()
# return render_template('display_cit.html', cit_records=cit_records)
# ADD a new CIT record
# @app.route('/cit/add', methods=['GET', 'POST'])
# def add_cit():
# if request.method == 'POST':
# conn = get_db_connection()
# cursor = conn.cursor()
# columns = [
# "year", "gross_total_income", "deduction_80ia_business", "deduction_sec37_disallowance",
# "deduction_80g", "net_taxable_income", "tax_30_percent", "tax_book_profit_18_5",
# "tax_payable", "surcharge_12", "edu_cess_3", "total_tax_payable", "mat_credit",
# "interest_234c", "total_tax", "advance_tax", "tds", "tcs", "tax_on_assessment", "refund"
# ]
# values = [request.form.get(col, 0) for col in columns]
# query = f"INSERT INTO cit ({', '.join(columns)}) VALUES ({', '.join(['%s']*len(columns))})"
# cursor.execute(query, tuple(values))
# conn.commit()
# flash("ITAT record added successfully!", "success")
# cursor.close()
# conn.close()
# return redirect(url_for('display_cit'))
# return render_template('add_cit.html')
# @app.route('/cit/delete/<int:id>', methods=['POST'])
# def delete_cit(id):
# try:
# conn = get_db_connection()
# cursor = conn.cursor()
# cursor.execute("DELETE FROM cit WHERE id=%s", (id,))
# conn.commit()
# flash("ITR record deleted successfully!", "success")
# except Exception as err:
# print(f"Error deleting CIT record: {err}")
# finally:
# cursor.close()
# conn.close()
# return redirect(url_for('display_cit'))
## =======================================================
## ITAT (Income Tax Appellate Tribunal) Routes
## =======================================================
@@ -460,32 +254,6 @@ def display_itat():
itat.close()
return render_template('display_itat.html', records=records)
# ADD a new ITAT record
# @app.route('/itat/add', methods=['GET', 'POST'])
# def add_itat():
# cit = CITHandler()
# cit_records = cit.get_all_cit()
# cit.close()
# if request.method == 'POST':
# data = {
# "cit_id": request.form.get("cit_id"),
# "year": request.form.get("year"),
# "mat_tax_credit": request.form.get("mat_tax_credit"),
# "surcharge": request.form.get("surcharge"),
# "cess": request.form.get("cess"),
# "total_credit": request.form.get("total_credit")
# }
# itat = ITATHandler()
# itat.add_itat(data)
# itat.close()
# flash("ITAT Record Added Successfully!", "success")
# return redirect(url_for('display_itat'))
# return render_template('add_itat.html', cit_records=cit_records)
@app.route('/itat/delete/<int:id>', methods=['POST'])
def delete_itat(id):
@@ -523,56 +291,6 @@ def update_itat(id):
return render_template('update_itat.html', record=record)
# DISPLAY all ITAT records
# @app.route('/itat_records')
# def display_itat():
# conn = get_db_connection()
# cursor = conn.cursor(dictionary=True)
# # Querying the 'itat' table
# cursor.execute("SELECT * FROM itat ORDER BY year DESC, id DESC")
# records = cursor.fetchall()
# cursor.close()
# conn.close()
# # Rendering the 'display_itat.html' template
# return render_template('display_itat.html', records=records)
# ADD a new ITAT record
# @app.route('/itat/add', methods=['GET', 'POST'])
# def add_itat():
# conn = get_db_connection()
# cursor = conn.cursor(dictionary=True)
# # Fetch all CIT records to choose from
# cursor.execute("SELECT id, year FROM cit ORDER BY year DESC")
# cit_records = cursor.fetchall()
# if request.method == 'POST':
# cit_id = request.form.get('cit_id') # selected parent CIT id
# columns = ['id', 'year','mat_tax_credit', 'surcharge', 'cess', 'total_credit']
# values = [cit_id,
# request.form.get('year', 0),
# request.form.get('mat_tax_credit', 0),
# request.form.get('surcharge', 0),
# request.form.get('cess', 0),
# request.form.get('total_credit', 0)]
# query = f"INSERT INTO itat ({', '.join(columns)}) VALUES ({', '.join(['%s'] * len(columns))})"
# cursor.execute(query, tuple(values))
# conn.commit()
# cursor.close()
# conn.close()
# flash("ITAT record added successfully!", "success")
# return redirect(url_for('display_itat'))
# cursor.close()
# conn.close()
# return render_template('add_itat.html', cit_records=cit_records)
@app.route('/itat/add', methods=['GET', 'POST'])
def add_itat():
itat = ITATHandler()
@@ -588,129 +306,7 @@ def add_itat():
return render_template('add_itat.html')
# @app.route('/itat/update/<int:id>', methods=['GET', 'POST'])
# def update_itat(id):
# conn = get_db_connection()
# cursor = conn.cursor(dictionary=True)
# # Fetch the existing record
# cursor.execute("SELECT * FROM itat WHERE id=%s", (id,))
# record = cursor.fetchone()
# if not record:
# cursor.close()
# conn.close()
# flash("ITAT record not found!", "danger")
# return redirect(url_for('display_itat'))
# if request.method == 'POST':
# columns = ['year', 'mat_tax_credit', 'surcharge', 'cess', 'total_credit']
# values = [request.form.get(col, 0) for col in columns]
# set_clause = ", ".join([f"{col}=%s" for col in columns])
# query = f"UPDATE itat SET {set_clause} WHERE id=%s"
# cursor.execute(query, tuple(values) + (id,))
# conn.commit()
# cursor.close()
# conn.close()
# flash("ITAT record updated successfully!", "success")
# return redirect(url_for('display_itat'))
# cursor.close()
# conn.close()
# # Render a template with existing values filled in
# return render_template('update_itat.html', record=record)
# @app.route('/itat/delete/<int:id>', methods=['POST'])
# def delete_itat(id):
# try:
# conn = get_db_connection()
# cursor = conn.cursor()
# cursor.execute("DELETE FROM itat WHERE id=%s", (id,))
# conn.commit()
# flash("ITAT record deleted successfully!", "success")
# except Exception as err:
# flash(f"Error deleting ITAT: {err}", "danger")
# finally:
# cursor.close()
# conn.close()
# return redirect(url_for('display_itat'))
# -------------------- i dont use -------------------------
# @app.route('/cit', methods=['GET', 'POST'])
# def cit_form():
if request.method == 'POST':
data = {key: request.form.get(key, 0) for key in request.form}
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
query = """
INSERT INTO cit (
year, gross_total_income, deduction_80ia_business, deduction_sec37_disallowance,
deduction_80g, net_taxable_income, tax_30_percent, tax_book_profit_18_5,
tax_payable, surcharge_12, edu_cess_3, total_tax_payable, mat_credit,
interest_234c, total_tax, advance_tax, tds, tcs, tax_on_assessment, refund
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
values = (
data.get('year'), # Include 'year' as the first value
float(data.get('gross_total_income', 0)),
float(data.get('deduction_80ia_business', 0)),
float(data.get('deduction_sec37_disallowance', 0)),
float(data.get('deduction_80g', 0)),
float(data.get('net_taxable_income', 0)),
float(data.get('tax_30_percent', 0)),
float(data.get('tax_book_profit_18_5', 0)),
float(data.get('tax_payable', 0)),
float(data.get('surcharge_12', 0)),
float(data.get('edu_cess_3', 0)),
float(data.get('total_tax_payable', 0)),
float(data.get('mat_credit', 0)),
float(data.get('interest_234c', 0)),
float(data.get('total_tax', 0)),
float(data.get('advance_tax', 0)),
float(data.get('tds', 0)),
float(data.get('tcs', 0)),
float(data.get('tax_on_assessment', 0)),
float(data.get('refund', 0))
)
cursor.execute(query, values)
conn.commit()
cursor.close()
conn.close()
return redirect(url_for('index'))
return render_template('cit_form.html')
# -------------------- i dont use -------------------------
# @app.route('/itat', methods=['GET', 'POST'])
# def itat_form():
# if request.method == 'POST':
# mat_tax_credit = request.form['mat_tax_credit']
# surcharge = request.form['surcharge']
# cess = request.form['cess']
# total_credit = request.form['total_credit']
# year=request.form['year']
# conn = mysql.connector.connect(**db_config)
# cursor = conn.cursor()
# cursor.execute("""
# INSERT INTO itat (year, mat_tax_credit, surcharge, cess, total_credit)
# VALUES (%s,%s, %s, %s, %s)
# """, (year,mat_tax_credit, surcharge, cess, total_credit))
# conn.commit()
# cursor.close()
# conn.close()
# return redirect(url_for('index'))
# return render_template('itat_form.html')
# def get_db_connection():
# connection = mysql.connector.connect(**db_config)
# return connection
# report form
@app.route('/reports')
def reports():
return render_template("reports.html")
@@ -796,6 +392,7 @@ def cit_report():
return render_template("cit_reports.html", years=years)
# Itat report download by year
@app.route('/itat_report', methods=['GET'])
def itat_report():
@@ -825,300 +422,6 @@ def itat_report():
return render_template("itat_reports.html", years=years)
# @app.route('/itr/reports', methods=['GET', 'POST'])
# def itr_reports():
# yearGetter = YearGet()
# if request.method == "POST":
# selected_year = request.form.get("year")
# itr=ITRHandler()
# yearGetter.close()
# return redirect(url_for("itr_report_result", year=selected_year))
# # GET method → fetch all distinct years through procedure
# years = yearGetter.get_year_by_model("GetITRYears")
# yearGetter.close()
# print("---- year --",years)
# return render_template("itr_reports.html", years=years)
# @app.route('/ao_report', methods=['GET'])
# def ao_report():
# selected_year = request.args.get('year')
# connection = pymysql.connect(**db_config)
# try:
# if selected_year:
# query = "SELECT * FROM ao WHERE year = %s"
# df = pd.read_sql(query, connection, params=[selected_year])
# if df.empty:
# return "No records found for the selected year."
# # Transpose the DataFrame: rows → fields, columns → records
# df_transposed = df.transpose()
# df_transposed.insert(0, 'Field', df_transposed.index)
# # Rename columns to "Record 1", "Record 2", ...
# for i in range(1, df_transposed.shape[1]):
# df_transposed.rename(columns={df_transposed.columns[i]: f"Record {i}"}, inplace=True)
# df_transposed.reset_index(drop=True, inplace=True)
# output = io.BytesIO()
# with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
# df_transposed.to_excel(writer, index=False, sheet_name='AO_Vertical')
# # Optional: Adjust formatting
# workbook = writer.book
# worksheet = writer.sheets['AO_Vertical']
# worksheet.set_column(0, 0, 30) # Widen 'Field' column
# output.seek(0)
# return send_file(
# output,
# mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
# as_attachment=True,
# download_name=f"AO_Report_{selected_year}.xlsx"
# )
# else:
# with connection.cursor() as cursor:
# cursor.execute("SELECT DISTINCT year FROM ao ORDER BY year DESC")
# years = [row[0] for row in cursor.fetchall()]
# return render_template("ao_reports.html", years=years)
# finally:
# connection.close()
# @app.route('/cit_report', methods=['GET'])
# def cit_report():
# selected_year = request.args.get('year')
# connection = pymysql.connect(**db_config)
# try:
# if selected_year:
# # Fetch data from the `cit` table for the selected year
# query = "SELECT * FROM cit WHERE year = %s"
# df = pd.read_sql(query, connection, params=[selected_year])
# output = io.BytesIO()
# with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
# workbook = writer.book
# # Write each row vertically on a separate sheet or below one another
# for i, (_, row) in enumerate(df.iterrows(), start=1):
# # Convert the row to vertical format
# vertical_df = pd.DataFrame(row).reset_index()
# vertical_df.columns = ['Field', 'Value']
# # Write each vertical entry below the previous (e.g., block by block)
# start_row = (i - 1) * (len(vertical_df) + 3) # 3-row gap between entries
# vertical_df.to_excel(writer, sheet_name='CIT_Report', index=False, startrow=start_row)
# output.seek(0)
# return send_file(
# output,
# mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
# as_attachment=True,
# download_name=f"CIT_Report_{selected_year}_Vertical.xlsx"
# )
# else:
# # Render dropdown for year selection
# with connection.cursor() as cursor:
# cursor.execute("SELECT DISTINCT year FROM cit ORDER BY year DESC")
# years = [row[0] for row in cursor.fetchall()]
# return render_template("cit_reports.html", years=years)
# finally:
# connection.close()
# @app.route('/itat_report', methods=['GET'])
# def itat_report():
# selected_year = request.args.get('year')
# connection = pymysql.connect(**db_config)
# try:
# if selected_year:
# query = "SELECT * FROM itat WHERE year = %s"
# df = pd.read_sql(query, connection, params=[selected_year])
# output = io.BytesIO()
# with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
# df.T.to_excel(writer, header=False, sheet_name='ITAT_Report')
# output.seek(0)
# return send_file(
# output,
# mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
# as_attachment=True,
# download_name=f"ITAT_Report_{selected_year}_Vertical.xlsx"
# )
# else:
# with connection.cursor() as cursor:
# cursor.execute("SELECT DISTINCT year FROM itat ORDER BY year DESC")
# years = [row[0] for row in cursor.fetchall()]
# return render_template("itat_reports.html", years=years)
# finally:
# connection.close()
# @app.route('/itr_report_download', methods=['GET'])
# def itr_report_download():
# connection = pymysql.connect(**db_config)
# try:
# selected_year = request.args.get('year')
# if selected_year:
# query = "SELECT * FROM itr WHERE year = %s"
# df = pd.read_sql(query, connection, params=[selected_year])
# output = io.BytesIO()
# with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
# df.to_excel(writer, index=False, sheet_name=f"ITR {selected_year}")
# output.seek(0)
# return send_file(
# output,
# download_name=f"ITR_Report_{selected_year}.xlsx",
# as_attachment=True,
# mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
# )
# else:
# # If no year is selected, show dropdown
# with connection.cursor() as cursor:
# cursor.execute("SELECT DISTINCT year FROM itr ORDER BY year DESC")
# years = [row[0] for row in cursor.fetchall()]
# return render_template('itr_reports.html', years=years)
# finally:
# connection.close()
# -------------------- i dont use -------------------------
# @app.route('/download/<int:doc_id>')
# def download_report(doc_id):
# conn = get_db_connection()
# cursor = conn.cursor(dictionary=True)
# cursor.execute("SELECT * FROM documents WHERE id = %s", (doc_id,))
# document = cursor.fetchone()
# conn.close()
# if not document:
# return "Document not found", 404
# file_path = os.path.join('static', 'uploads', document['filename']) # adjust as per your storage
# return send_from_directory(directory='static/uploads', path=document['filename'], as_attachment=True)
# @app.route('/summary_report', methods=['GET'])
# def summary_report():
# year = request.args.get('year')
# if not year:
# connection = pymysql.connect(**db_config)
# try:
# years = set()
# for table in ['itr', 'ao', 'cit', 'itat']:
# df = pd.read_sql(f"SELECT DISTINCT year FROM {table}", connection)
# years.update(int(y) for y in df['year'].dropna().tolist())
# return render_template('summary_reports.html', years=sorted(years), message="Please select a year to download.")
# finally:
# connection.close()
# connection = pymysql.connect(**db_config)
# try:
# stages = ['itr', 'ao', 'cit', 'itat']
# stage_data = {}
# for stage in stages:
# query = f"SELECT * FROM {stage} WHERE year = %s"
# df = pd.read_sql(query, connection, params=[year])
# stage_data[stage.upper()] = df
# def safe_get(df, col):
# return df[col].values[0] if col in df.columns and not df.empty else '-'
# particulars = [
# "Gross Total Income", "Add: Disallowance u/s 14A", "Add: Disallowance u/s 37", "GTI as per",
# "Less: Deduction u/s 80IA", "Less: Deduction u/s 80G", "Net Taxable Income", "Tax @ 30%",
# "Tax @ 18.5% on Book Profit", "Surcharge @ 12%", "Education Cess @ 3%", "Total Tax Payable",
# "Less: MAT Credit", "Net Tax", "Add: Interest u/s 234C", "Total Tax",
# "Advance Tax", "TDS", "TCS", "SAT", "Tax on Regular Assessment", "Refund"
# ]
# columns = [
# 'gross_total_income', 'disallowance_14a', 'disallowance_37', 'gti',
# 'deduction_80ia', 'deduction_80g', 'net_taxable_income', 'tax_30',
# 'book_profit_tax', 'surcharge_12', 'education_cess', 'total_tax',
# 'mat_credit', 'net_tax', 'interest_234c', 'total_tax_payable',
# 'advance_tax', 'tds', 'tcs', 'sat', 'tax_regular', 'refund'
# ]
# data = {
# "Particulars": particulars,
# "ITR": [safe_get(stage_data['ITR'], col) for col in columns],
# "AO": [safe_get(stage_data['AO'], col) for col in columns],
# "CIT(A)": [safe_get(stage_data['CIT'], col) for col in columns],
# "ITAT": [safe_get(stage_data['ITAT'], col) for col in columns],
# }
# df = pd.DataFrame(data)
# # Export to Excel with formatting
# output = io.BytesIO()
# with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
# df.to_excel(writer, index=False, sheet_name=f'AY {year}')
# workbook = writer.book
# worksheet = writer.sheets[f'AY {year}']
# # Format definitions
# header_format = workbook.add_format({
# 'bold': True,
# 'text_wrap': True,
# 'valign': 'middle',
# 'align': 'center',
# 'bg_color': '#007bff',
# 'font_color': 'white',
# 'border': 1
# })
# cell_format = workbook.add_format({
# 'border': 1,
# 'valign': 'top',
# 'align': 'center',
# })
# # Apply formats
# for col_num, value in enumerate(df.columns):
# worksheet.write(0, col_num, value, header_format)
# # Auto column width
# max_len = max(df[value].astype(str).map(len).max(), len(str(value))) + 2
# worksheet.set_column(col_num, col_num, max_len)
# # Format data rows
# for row_num in range(1, len(df) + 1):
# for col_num in range(len(df.columns)):
# worksheet.write(row_num, col_num, df.iloc[row_num - 1, col_num], cell_format)
# output.seek(0)
# return send_file(
# output,
# mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
# as_attachment=True,
# download_name=f"Summary_Report_{year}.xlsx"
# )
# finally:
# connection.close()
@app.route('/summary_report', methods=['GET'])
def summary_report():
docHandler = DocumentHandler()

223
test.py
View File

@@ -1,223 +0,0 @@
import os
from AppCode.Config import DBConfig
from AppCode.FileHandler import FileHandler
from werkzeug.utils import secure_filename
class DocumentHandler:
years = ""
documents = ""
isSuccess = False
resultMessage = ""
def View(self,request):
year = request.args.get('year')
stage = request.args.get('stage')
dbconfig = DBConfig()
connection = dbconfig.get_db_connection()
if not connection:
self.isSuccess = False
return
cursor = connection.cursor()
params = []
query = "SELECT * FROM documents WHERE 1=1"
if year:
query += " AND year = %s"
params.append(year)
if stage:
query += " AND stage = %s"
params.append(stage)
cursor.execute(query, params)
documentsdata = cursor.fetchall()
print("*************")
print(documentsdata)
cursor.callproc("GetYear")
# records = []
# for result in cursor.stored_results():
# records = result.fetchall()
yearsdata = ""
for res in cursor.stored_results():
yearsdata = res.fetchall()
print(yearsdata)
self.years = yearsdata
self.documents = documentsdata
self.isSuccess = True
print("document --",documentsdata)
cursor.close()
connection.close()
def Upload(self, request):
"""Log user actions with timestamp, user, action, and details."""
dbconfig = DBConfig()
connection = dbconfig.get_db_connection()
if connection:
cursor = connection.cursor()
files = request.files.getlist('documents')
year = request.form['year']
stage = request.form['stage']
for file in files:
if file is not FileHandler.ALLOWED_EXTENSIONS:
continue
filename = secure_filename(file.filename)
filepath = os.path.join(FileHandler.UPLOAD_FOLDER, filename)
extension = file.filename.rsplit('.', 1)[1]
# Need to Check whetehr all three items are required
file.save(filepath)
# cursor.execute("""
# INSERT INTO documents (filename, filepath, filetype, year, stage)
# VALUES (%s, %s, %s, %s, %s)
# """, (filename, filepath, file.filename.rsplit('.', 1)[1], year, stage))
cursor.callproc('InsertDocument', [
filename,
filepath,
extension,
year,
stage
])
connection.commit()
cursor.close()
connection.close()
# return redirect(url_for('view_documents'))
#return render_template('upload.html')
# old itr report code
@app.route('/itr_report', methods=['GET'])
def itr_report():
yearGetter = YearGet()
connection = pymysql.connect(**db_config)
try:
selected_year = request.args.get('year')
if selected_year:
# Fetch ITR data for the selected year
query = "SELECT * FROM itr WHERE year = %s"
df = pd.read_sql(query, connection, params=[selected_year])
if df.empty:
return "No records found for the selected year."
# Transpose DataFrame: vertical fields, horizontal records
df_transposed = df.transpose()
df_transposed.insert(0, 'Field', df_transposed.index)
# Rename columns as Record 1, Record 2, etc.
record_cols = {i: f'Record {i}' for i in df_transposed.columns if isinstance(i, int)}
df_transposed.rename(columns=record_cols, inplace=True)
df_transposed.reset_index(drop=True, inplace=True)
output = io.BytesIO()
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
df_transposed.to_excel(writer, index=False, sheet_name='ITR_Vertical')
# Format for better readability (optional)
workbook = writer.book
worksheet = writer.sheets['ITR_Vertical']
worksheet.set_column(0, 0, 30) # Field column wider
output.seek(0)
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f"ITR_Report_{selected_year}.xlsx"
)
else:
# Render dropdown form with available years
with connection.cursor() as cursor:
cursor.execute("SELECT DISTINCT year FROM itr ORDER BY year DESC")
years = [row[0] for row in cursor.fetchall()]
return render_template("itr_reports.html", years=years)
finally:
connection.close()
# --------------\
@app.route('/itr_report', methods=['GET'])
def itr_report():
yearGetter = YearGet()
selected_year = request.args.get('year')
if selected_year:
# Fetch ITR data for the selected year
query = "SELECT * FROM itr WHERE year = %s"
df = pd.read_sql(query, connection, params=[selected_year])
if df.empty:
return "No records found for the selected year."
# Transpose DataFrame: vertical fields, horizontal records
df_transposed = df.transpose()
df_transposed.insert(0, 'Field', df_transposed.index)
# Rename columns as Record 1, Record 2, etc.
record_cols = {i: f'Record {i}' for i in df_transposed.columns if isinstance(i, int)}
df_transposed.rename(columns=record_cols, inplace=True)
df_transposed.reset_index(drop=True, inplace=True)
output = io.BytesIO()
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
df_transposed.to_excel(writer, index=False, sheet_name='ITR_Vertical')
# Format for better readability (optional)
workbook = writer.book
worksheet = writer.sheets['ITR_Vertical']
worksheet.set_column(0, 0, 30) # Field column wider
output.seek(0)
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f"ITR_Report_{selected_year}.xlsx"
)
else:
# Render dropdown form with available years
with connection.cursor() as cursor:
# cursor.execute("SELECT DISTINCT year FROM itr ORDER BY year DESC")
# years = [row[0] for row in cursor.fetchall()]
years = yearGetter.get_year_by_model("GetITRYears")
yearGetter.close()
print("---- year --",years)
return render_template("itr_reports.html", years=years)