import os import pandas as pd from werkzeug.utils import secure_filename from app.utils.file_utils import ensure_upload_folder from app.config import Config from app import db # Subcontractor models import from app.models.trench_excavation_model import TrenchExcavation from app.models.manhole_excavation_model import ManholeExcavation from app.models.manhole_domestic_chamber_model import ManholeDomesticChamber from app.models.laying_model import Laying # Client models import from app.models.tr_ex_client_model import TrenchExcavationClient from app.models.mh_ex_client_model import ManholeExcavationClient from app.models.mh_dc_client_model import ManholeDomesticChamberClient class FileService: # ---------------- COMMON HELPERS ---------------- def allowed_file(self, filename): return ("." in filename and filename.rsplit(".", 1)[1].lower() in Config.ALLOWED_EXTENSIONS) def normalize(self, val): if val is None or pd.isna(val): return None val = str(val).strip() if val.lower() in ["", "nan", "none", "-", "—"]: return None return val.upper() # ---------------- SUBCONTRACTOR FILE UPLOAD ---------------- def handle_file_upload(self, file, subcontractor_id, RA_Bill_No): if not subcontractor_id: return False, "Please select subcontractor." if not RA_Bill_No: return False, "Please Enter RA Bill No." if not file or file.filename == "": return False, "No file selected." if not self.allowed_file(file.filename): return False, "Invalid file type! Allowed: CSV, XLSX, XLS" ensure_upload_folder() folder = os.path.join(Config.UPLOAD_FOLDER, f"sub_{subcontractor_id}") os.makedirs(folder, exist_ok=True) filename = secure_filename(file.filename) filepath = os.path.join(folder, filename) file.save(filepath) try: df_tr_ex = pd.read_excel(filepath, sheet_name="Tr.Ex.", header=12, dtype={"MH No": str}) df_mh_ex = pd.read_excel(filepath, sheet_name="MH Ex.", header=12, dtype={"MH No": str}) df_mh_dc = pd.read_excel(filepath, sheet_name="MH & DC", header=11, dtype={"MH No": str}) df_laying = pd.read_excel(filepath, sheet_name="Laying", header=11, dtype={"MH No": str}) self.process_trench_excavation(df_tr_ex, subcontractor_id, RA_Bill_No) self.process_manhole_excavation(df_mh_ex, subcontractor_id, RA_Bill_No) self.process_manhole_domestic_chamber(df_mh_dc, subcontractor_id, RA_Bill_No) self.process_laying(df_laying, subcontractor_id, RA_Bill_No) return True, "SUBCONTRACTOR File uploaded successfully." except Exception as e: db.session.rollback() return False, f"Import failed: {e}" # ---------------- Trench Excavation (Subcontractor) ---------------- def process_trench_excavation(self, df, subcontractor_id, RA_Bill_No): df.columns = ( df.columns.astype(str) .str.strip() .str.replace(r"[^\w]", "_", regex=True) .str.replace("__+", "_", regex=True) .str.strip("_") ) df = df.dropna(how="all") if "Location" in df.columns: df["Location"] = df["Location"].ffill() errors = [] for idx, row in df.iterrows(): location = self.normalize(row.get("Location")) mh_no = self.normalize(row.get("MH_NO")) if not location or not mh_no: continue # exists = TrenchExcavation.query.filter_by( # subcontractor_id=subcontractor_id, # RA_Bill_No=RA_Bill_No, # Location=location, # MH_NO=mh_no, # ).first() # if exists: # errors.append( # f"Model-Tr.Ex. (Row {idx+1}): Duplicate → Location={location}, MH_NO={mh_no}" # ) # continue record_data = {} for col in df.columns: if hasattr(TrenchExcavation, col): val = row[col] if pd.isna(val) or str(val).strip() in ["", "-", "—", "nan"]: val = None record_data[col] = val record = TrenchExcavation( subcontractor_id=subcontractor_id, RA_Bill_No=RA_Bill_No, **record_data, ) db.session.add(record) if errors: raise Exception(" | ".join(errors)) db.session.commit() # ---------------- Manhole Excavation (Subcontractor) ---------------- def process_manhole_excavation(self, df, subcontractor_id, RA_Bill_No): df.columns = ( df.columns.astype(str) .str.strip() .str.replace(r"[^\w]", "_", regex=True) .str.replace("__+", "_", regex=True) .str.strip("_") ) df = df.dropna(how="all") if "Location" in df.columns: df["Location"] = df["Location"].ffill() errors = [] for idx, row in df.iterrows(): location = self.normalize(row.get("Location")) mh_no = self.normalize(row.get("MH_NO")) if not location or not mh_no: continue # exists = ManholeExcavation.query.filter_by( # subcontractor_id=subcontractor_id, # RA_Bill_No=RA_Bill_No, # Location=location, # MH_NO=mh_no, # ).first() # if exists: # errors.append( # f"Model-MH Ex. (Row {idx+1}): Duplicate → Location={location}, MH_NO={mh_no}" # ) # continue record_data = {} for col in df.columns: if hasattr(ManholeExcavation, col): val = row[col] if pd.isna(val) or str(val).strip() in ["", "-", "—", "nan"]: val = None record_data[col] = val record = ManholeExcavation( subcontractor_id=subcontractor_id, RA_Bill_No=RA_Bill_No, **record_data, ) db.session.add(record) if errors: raise Exception(" | ".join(errors)) db.session.commit() # ---------------- Manhole & Domestic Chamber (Subcontractor) ---------------- def process_manhole_domestic_chamber(self, df, subcontractor_id, RA_Bill_No): df.columns = ( df.columns.astype(str) .str.strip() .str.replace(r"[^\w]", "_", regex=True) .str.replace("__+", "_", regex=True) .str.strip("_") ) df = df.dropna(how="all") if "Location" in df.columns: df["Location"] = df["Location"].ffill() errors = [] for idx, row in df.iterrows(): location = self.normalize(row.get("Location")) mh_no = self.normalize(row.get("MH_NO")) if not location or not mh_no: continue # exists = ManholeDomesticChamber.query.filter_by( # subcontractor_id=subcontractor_id, # RA_Bill_No=RA_Bill_No, # Location=location, # MH_NO=mh_no, # ).first() # if exists: # errors.append( # f"Model-MH & DC (Row {idx+1}): Duplicate → Location={location}, MH_NO={mh_no}" # ) # continue record_data = {} for col in df.columns: if hasattr(ManholeDomesticChamber, col): val = row[col] if pd.isna(val) or str(val).strip() in ["", "-", "—", "nan"]: val = None record_data[col] = val record = ManholeDomesticChamber( subcontractor_id=subcontractor_id, RA_Bill_No=RA_Bill_No, **record_data, ) db.session.add(record) if errors: raise Exception(" | ".join(errors)) db.session.commit() # ---------------- Laying (Subcontractor) ---------------- def process_laying(self, df, subcontractor_id, RA_Bill_No): df.columns = ( df.columns.astype(str) .str.strip() .str.replace(r"[^\w]", "_", regex=True) .str.replace("__+", "_", regex=True) .str.strip("_") ) df = df.dropna(how="all") if "Location" in df.columns: df["Location"] = df["Location"].ffill() errors = [] for idx, row in df.iterrows(): location = self.normalize(row.get("Location")) mh_no = self.normalize(row.get("MH_NO")) if not location or not mh_no: continue # exists = ManholeDomesticChamber.query.filter_by( # subcontractor_id=subcontractor_id, # RA_Bill_No=RA_Bill_No, # Location=location, # MH_NO=mh_no, # ).first() # if exists: # errors.append( # f"Model-MH & DC (Row {idx+1}): Duplicate → Location={location}, MH_NO={mh_no}" # ) # continue record_data = {} for col in df.columns: if hasattr(Laying, col): val = row[col] if pd.isna(val) or str(val).strip() in ["", "-", "—", "nan"]: val = None record_data[col] = val record = Laying( subcontractor_id=subcontractor_id, RA_Bill_No=RA_Bill_No, **record_data, ) db.session.add(record) if errors: raise Exception(" | ".join(errors)) db.session.commit() # ---------------- CLIENT FILE UPLOAD ---------------- def handle_client_file_upload(self, file, RA_Bill_No): if not RA_Bill_No: return False, "Please Enter RA Bill No." if not file or file.filename == "": return False, "No file selected." if not self.allowed_file(file.filename): return False, "Invalid file type! Allowed: CSV, XLSX, XLS" ensure_upload_folder() folder = os.path.join(Config.UPLOAD_FOLDER, f"Client_Bill_{RA_Bill_No}") os.makedirs(folder, exist_ok=True) filename = secure_filename(file.filename) filepath = os.path.join(folder, filename) file.save(filepath) try: df_tr_ex = pd.read_excel(filepath, sheet_name="Tr.Ex.", header=4) df_mh_ex = pd.read_excel(filepath, sheet_name="MH Ex.", header=4) df_mh_dc = pd.read_excel(filepath, sheet_name="MH & DC", header=3) self.save_client_data(df_tr_ex, TrenchExcavationClient, RA_Bill_No) self.save_client_data(df_mh_ex, ManholeExcavationClient, RA_Bill_No) self.save_client_data(df_mh_dc, ManholeDomesticChamberClient, RA_Bill_No) db.session.commit() return True, "Client file uploaded successfully." except Exception as e: db.session.rollback() return False, f"Client import failed: {e}" # ---------------- CLIENT SAVE METHOD ---------------- def save_client_data(self, df, model, RA_Bill_No): df.columns = [str(c).strip() for c in df.columns] if "Location" in df.columns: df["Location"] = df["Location"].ffill() df = df.dropna(how="all") for idx, row in df.iterrows(): record_data = {} for col in df.columns: if hasattr(model, col): val = row[col] if pd.isna(val) or str(val).strip() in ["", "-", "—", "nan"]: val = None record_data[col] = val record = model(RA_Bill_No=RA_Bill_No, **record_data) db.session.add(record)