from flask import Flask, render_template, request, redirect, url_for, send_from_directory, abort, flash,send_file ,jsonify import os from dotenv import load_dotenv load_dotenv() import pandas as pd from werkzeug.utils import secure_filename from datetime import date from AppCode.Config import DBConfig from AppCode.LoginAuth import LoginAuth from AppCode.FileHandler import FileHandler from AppCode.YearGet import YearGet from AppCode.DocumentHandler import DocumentHandler from AppCode.ITRHandler import ITRHandler from AppCode.AOHandler import AOHandler from AppCode.CITHandler import CITHandler from AppCode.ITATHandler import ITATHandler from AppCode.MatCreditHandler import MatCreditHandler import subprocess # Server app = Flask(__name__) app.secret_key=os.getenv("SECRET_KEY") auth = LoginAuth() app.register_blueprint(auth.bp) # welcome page @app.route('/') @auth.login_required def welcome(): return render_template('index.html') # Dashboard page @app.route('/dashboard') @auth.login_required def index(): return render_template('index.html') # Upload File route @app.route('/upload', methods=['GET', 'POST']) @auth.login_required def upload_file(): if request.method == 'POST': FileHandler.CHeckExistingOrCreateNewUploadFolder() docHandler = DocumentHandler() docHandler.Upload(request=request) return redirect(url_for('view_documents')) return render_template('upload.html') # View all documents with filters @app.route('/documents') @auth.login_required def view_documents(): docHandler = DocumentHandler() docHandler.View(request=request) return render_template('view_docs.html', documents=docHandler.documents, years=docHandler.years) # Upload file documents @app.route('/uploads/') @auth.login_required def uploaded_file(filename): mode = request.args.get('mode', 'view') filepath = os.path.join(FileHandler.UPLOAD_FOLDER, secure_filename(filename)) if not os.path.exists(filepath): flash("Unsupported file type for viewing", "warning") return redirect(url_for('view_documents')) file_ext = filename.rsplit('.', 1)[-1].lower() # --- View Mode --- if mode == 'view': # pdf if file_ext == 'pdf': return send_file(filepath, mimetype='application/pdf') # Word elif file_ext in ['doc', 'docx']: return send_file(filepath, as_attachment=True) # Excel elif file_ext in ['xls', 'xlsx']: return send_file(filepath, as_attachment=False, download_name=filename, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') else: flash("Unsupported file type for viewing", "warning") return redirect(url_for('view_documents')) return send_file(filepath, as_attachment=True, download_name=filename) ## =============================================== ## ITR (Income Tax Return) Routes ## =============================================== ## 1. READ/DISPLAY all ITR records @app.route('/itr_records') @auth.login_required def display_itr(): itr = ITRHandler() records = itr.get_all_itr() itr.close() return render_template('display_itr.html', records=records) ## 2. CREATE/ADD a new ITR record @app.route('/itr/add', methods=['GET', 'POST']) @auth.login_required def add_itr(): if request.method == 'POST': itr = ITRHandler() mat = MatCreditHandler() itr.add_itr(request.form) itr.close() if 'documents' in request.files: doc = DocumentHandler() doc.Upload(request) # AUTO SAVE MAT FROM ITR mat.save_from_itr( year=request.form["year"], mat_created=float(request.form.get("mat_credit_created", 0)), mat_utilized=float(request.form.get("mat_credit_utilized", 0)), remarks="Created via ITR" ) # flash("ITR record added successfully!", "success") flash("ITR record and documents uploaded successfully!", "success") return redirect(url_for('display_itr')) return render_template('add_itr.html',current_date=date.today().isoformat()) ## 4. DELETE an ITR record @app.route('/itr/delete/', methods=['POST']) @auth.login_required def delete_itr(id): itr = ITRHandler() itr.delete_itr_by_id(id=id) itr.close() return redirect(url_for('display_itr')) ## 3. UPDATE an existing ITR record @app.route('/itr/update/', methods=['GET', 'POST']) @auth.login_required def update_itr(id): itr = ITRHandler() if request.method == 'POST': data = {k: request.form.get(k, 0) for k in request.form} itr.update(id, data) itr.close() return redirect(url_for('display_itr')) record = itr.get_itr_by_id(id) itr.close() return render_template('update_itr.html', record=record, current_date=date.today().isoformat()) ## =============================================== ## AO (Assessing Officer) Routes ## =============================================== # 1. DISPLAY all AO records @app.route('/ao_records') @auth.login_required def display_ao(): ao = AOHandler() ao_records = ao.get_all_ao() ao.close() return render_template('display_ao.html', ao_records=ao_records) # 2. ADD a new AO record @app.route('/ao/add', methods=['GET', 'POST']) @auth.login_required def add_ao(): if request.method == 'POST': ao = AOHandler() mat = MatCreditHandler() ao.add_ao(request.form) ao.close() if 'documents' in request.files: doc = DocumentHandler() doc.Upload(request) # AUTO SAVE MAT FROM ITR mat.save_from_itr( year=request.form["year"], mat_created=float(request.form.get("mat_credit_created", 0)), mat_utilized=float(request.form.get("mat_credit_utilized", 0)), remarks="Created via ITR" ) flash("AO record added successfully!", "success") return redirect(url_for('display_ao')) return render_template('add_ao.html',current_date=date.today().isoformat()) # 3. UPDATE AO record @app.route('/ao/update/', methods=['GET', 'POST']) @auth.login_required def update_ao(id): ao = AOHandler() record = ao.get_ao_by_id(id) if not record: return "AO record not found", 404 if request.method == 'POST': data = request.form.to_dict() ao.update_ao(id, data) ao.close() flash("AO record updated successfully!", "success") return redirect(url_for('display_ao')) ao.close() return render_template("update_ao.html", record=record) # 4. DELETE AO record safely @app.route('/ao/delete/', methods=['POST']) @auth.login_required def delete_ao(id): ao = AOHandler() ao.delete_ao_by_id(id=id) ao.close() flash("AO deleted successfully!", "success") return redirect(url_for('display_ao')) ## ======================================================= ## CIT (Commissioner of Income Tax) Routes ## ======================================================= # 1 DISPLAY all CIT records @app.route('/cit_records') @auth.login_required def display_cit(): cit = CITHandler() cit_records = cit.get_all_cit() cit.close() return render_template('display_cit.html', cit_records=cit_records) # 2 new CIT records add @app.route('/cit/add', methods=['GET', 'POST']) @auth.login_required def add_cit(): if request.method == 'POST': cit = CITHandler() cit.add_cit(request.form) cit.close() flash("CIT record added successfully!", "success") return redirect(url_for('display_cit')) return render_template('add_cit.html') # 3 delete CIT records by id @app.route('/cit/delete/', methods=['POST']) @auth.login_required def delete_cit(id): cit = CITHandler() cit.delete_cit(id) cit.close() flash("CIT record deleted successfully!", "success") return redirect(url_for('display_cit')) # 4 update CIT records by id @app.route('/cit/update/', methods=['GET', 'POST']) @auth.login_required def update_cit(id): cit = CITHandler() record = cit.get_cit_by_id(id) if not record: cit.close() return "CIT record not found", 404 if request.method == 'POST': data = {k: request.form.get(k, 0) for k in request.form} cit.update_cit(id, data) cit.close() return redirect(url_for('display_cit')) cit.close() return render_template('update_cit.html', record=record) ## ======================================================= ## ITAT (Income Tax Appellate Tribunal) Routes ## ======================================================= # 1.DISPLAY all ITAT records @app.route('/itat_records') @auth.login_required def display_itat(): itat = ITATHandler() records = itat.get_all_itat() itat.close() return render_template('display_itat.html', records=records) # 2.Add new ITAT records @app.route('/itat/add', methods=['GET', 'POST']) @auth.login_required def add_itat(): if request.method == 'POST': itat = ITATHandler() data = {k: request.form.get(k, 0) for k in request.form} itat.add_itat(data) itat.close() flash("ITAT record added successfully!", "success") return redirect(url_for('display_itat')) return render_template('add_itat.html') # 3.Update ITAT records by id @app.route('/itat/update/', methods=['GET', 'POST']) @auth.login_required def update_itat(id): itat = ITATHandler() record = itat.get_itat_by_id(id) if not record: flash("Record Not Found!", "danger") return redirect(url_for('display_itat')) if request.method == 'POST': itat.update_itat(id, request.form) itat.close() flash("ITAT Record Updated!", "success") return redirect(url_for('display_itat')) itat.close() return render_template('update_itat.html', record=record) # 3.delete ITAT records by id @app.route('/itat/delete/', methods=['POST']) @auth.login_required def delete_itat(id): itat = ITATHandler() itat.delete_itat_by_id(id) itat.close() flash("ITAT Record Deleted!", "success") return redirect(url_for('display_itat')) ## ======================================================= ## All Report Routes ## ======================================================= # report page @app.route('/reports') @auth.login_required def reports(): return render_template("reports.html") # Itr report download by year @app.route('/itr_report', methods=['GET']) @auth.login_required def itr_report(): yearGetter = YearGet() selected_year = request.args.get('year') if selected_year: itr = ITRHandler() output = itr.itr_report_download(selected_year) itr.close() if output is None: return "No records found for the selected year." return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"ITR_Report_{selected_year}.xlsx" ) else: years = yearGetter.get_year_by_model("GetITRYears") yearGetter.close() return render_template("itr_reports.html", years=years) # Ao report download by year @app.route('/ao_report', methods=['GET']) @auth.login_required def ao_report(): yearGetter = YearGet() selected_year = request.args.get('year') if selected_year: ao = AOHandler() output = ao.ao_report_download(selected_year) ao.close() if output is None: return "No records found for the selected year." return send_file( output, mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", as_attachment=True, download_name=f"AO_Report_{selected_year}.xlsx" ) else: years = yearGetter.get_year_by_model("GetAOYears") yearGetter.close() return render_template("ao_reports.html", years=years) # Cit report download by year @app.route('/cit_report', methods=['GET']) @auth.login_required def cit_report(): selected_year = request.args.get('year') yearGetter = YearGet() if selected_year: cit = CITHandler() output = cit.cit_report_download(selected_year) cit.close() if output is None: return "No records found for the selected year." return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"CIT_Report_{selected_year}_Vertical.xlsx" ) else: years = yearGetter.get_year_by_model("GetCITYears") yearGetter.close() return render_template("cit_reports.html", years=years) # Itat report download by year @app.route('/itat_report', methods=['GET']) @auth.login_required def itat_report(): selected_year = request.args.get('year') yearGetter = YearGet() if selected_year: itat = ITATHandler() output = itat.itat_report_download(selected_year) itat.close() if output is None: return "No records found for the selected year." return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"ITAT_Report_{selected_year}_Vertical.xlsx" ) else: # Use stored procedure for years years = yearGetter.get_year_by_model("GetITATYears") yearGetter.close() return render_template("itat_reports.html", years=years) # summary report @app.route('/summary_report', methods=['GET']) @auth.login_required def summary_report(): docHandler = DocumentHandler() return docHandler.Summary_report(request=request) @app.route('/summary/download', methods=['GET']) @auth.login_required def download_summary(): year_raw = request.args.get('year') if not year_raw: return "Year parameter is required", 400 docHandler = DocumentHandler() # reuse your existing Summary_report method return docHandler.Summary_report(request=request) # check year in table existe or not by using ajax calling. # @app.route('/check_year', methods=['POST']) # @auth.login_required # def check_year(): # data = request.get_json() # table_name = data.get("table") # year = data.get("year") # check_year_obj = YearGet() # result = check_year_obj.CheckYearExists(table_name, year) # check_year_obj.close() # return result @app.route('/check_year', methods=['POST']) def check_year(): table_name = request.json.get("table") year = request.json.get("year") conn = DBConfig.get_db_connection() cursor = conn.cursor() sqlstr = f"SELECT COUNT(*) FROM {table_name} WHERE year = %s" cursor.execute(sqlstr, (year,)) result = cursor.fetchone()[0] cursor.close() conn.close() return {"exists": result > 0} # Mat credit from @app.route("/mat_credit", methods=["GET"]) @auth.login_required def mat_credit(): mat = MatCreditHandler() try: mat_rows, utilization_rows = mat.fetch_all() finally: mat.close() utilization_map = {} all_years = set() for u in utilization_rows: all_years.add(u["utilized_year"]) utilization_map.setdefault( u["mat_credit_id"], {} )[u["utilized_year"]] = u["utilized_amount"] return render_template( "mat_credit.html", mat_rows=mat_rows, utilization_map=utilization_map, added_years=sorted(all_years) ) # save mat credit row data @app.route("/save_mat_row", methods=["POST"]) @auth.login_required def save_mat_row(): mat = MatCreditHandler() try: mat.save_single(request.json) return jsonify({"message": "Row saved successfully"}) except Exception as e: return jsonify({"error": str(e)}), 500 finally: mat.close() @app.route("/summary/preview") def summary_preview_route(): handler = DocumentHandler() return handler.Summary_preview(request) # save mat credit bulk data # @app.route("/save_mat_all", methods=["POST"]) # @auth.login_required # def save_mat_all(): # mat= MatCreditHandler() # try: # skipped = mat.save_bulk(request.json) # return jsonify({"message": "Saved successfully", "skipped": skipped}) # except Exception as e: # return jsonify({"error": str(e)}), 500 # run server if __name__ == '__main__': app.run( host=os.getenv("FLASK_HOST"), port=int(os.getenv("FLASK_PORT")), debug=os.getenv("FLASK_DEBUG") == "true" )