307 lines
9.9 KiB
Python
307 lines
9.9 KiB
Python
import os
|
|
import pandas as pd
|
|
from werkzeug.utils import secure_filename
|
|
|
|
from app.config import Config
|
|
from app import db
|
|
|
|
from app.models.trench_excavation_model import TrenchExcavation
|
|
from app.models.manhole_excavation_model import ManholeExcavation
|
|
from app.models.manhole_domestic_chamber_model import ManholeDomesticChamber
|
|
|
|
from app.models.tr_ex_client_model import TrenchExcavationClient
|
|
from app.models.mh_ex_client_model import ManholeExcavationClient
|
|
from app.models.mh_dc_client_model import ManholeDomesticChamberClient
|
|
|
|
from app.utils.file_utils import ensure_upload_folder
|
|
|
|
|
|
class FileService:
|
|
|
|
# ---------------- COMMON HELPERS ----------------
|
|
def allowed_file(self, filename):
|
|
return ("." in filename and filename.rsplit(".", 1)[1].lower() in Config.ALLOWED_EXTENSIONS)
|
|
|
|
def normalize(self, val):
|
|
if val is None or pd.isna(val):
|
|
return None
|
|
|
|
val = str(val).strip()
|
|
if val.lower() in ["", "nan", "none", "-", "—"]:
|
|
return None
|
|
|
|
return val.upper()
|
|
|
|
# ---------------- SUBCONTRACTOR FILE UPLOAD ----------------
|
|
def handle_file_upload(self, file, subcontractor_id, RA_Bill_No):
|
|
|
|
if not subcontractor_id:
|
|
return False, "Please select subcontractor."
|
|
|
|
if not RA_Bill_No:
|
|
return False, "Please Enter RA Bill No."
|
|
|
|
if not file or file.filename == "":
|
|
return False, "No file selected."
|
|
|
|
if not self.allowed_file(file.filename):
|
|
return False, "Invalid file type! Allowed: CSV, XLSX, XLS"
|
|
|
|
ensure_upload_folder()
|
|
|
|
folder = os.path.join(Config.UPLOAD_FOLDER, f"sub_{subcontractor_id}")
|
|
os.makedirs(folder, exist_ok=True)
|
|
|
|
filename = secure_filename(file.filename)
|
|
filepath = os.path.join(folder, filename)
|
|
file.save(filepath)
|
|
|
|
try:
|
|
df_tr_ex = pd.read_excel(filepath, sheet_name="Tr.Ex.", header=12, dtype={"MH No": str})
|
|
df_mh_ex = pd.read_excel(filepath, sheet_name="MH Ex.", header=12, dtype={"MH No": str})
|
|
df_mh_dc = pd.read_excel(filepath, sheet_name="MH & DC", header=11, dtype={"MH No": str})
|
|
|
|
self.process_trench_excavation(df_tr_ex, subcontractor_id, RA_Bill_No)
|
|
self.process_manhole_excavation(df_mh_ex, subcontractor_id, RA_Bill_No)
|
|
self.process_manhole_domestic_chamber(df_mh_dc, subcontractor_id, RA_Bill_No)
|
|
|
|
return True, "SUBCONTRACTOR File uploaded successfully."
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return False, f"Import failed: {e}"
|
|
|
|
# ---------------- Trench Excavation (Subcontractor) ----------------
|
|
def process_trench_excavation(self, df, subcontractor_id, RA_Bill_No):
|
|
|
|
df.columns = (
|
|
df.columns.astype(str)
|
|
.str.strip()
|
|
.str.replace(r"[^\w]", "_", regex=True)
|
|
.str.replace("__+", "_", regex=True)
|
|
.str.strip("_")
|
|
)
|
|
|
|
df = df.dropna(how="all")
|
|
|
|
if "Location" in df.columns:
|
|
df["Location"] = df["Location"].ffill()
|
|
|
|
errors = []
|
|
|
|
for idx, row in df.iterrows():
|
|
location = self.normalize(row.get("Location"))
|
|
mh_no = self.normalize(row.get("MH_NO"))
|
|
|
|
if not location or not mh_no:
|
|
continue
|
|
|
|
exists = TrenchExcavation.query.filter_by(
|
|
subcontractor_id=subcontractor_id,
|
|
RA_Bill_No=RA_Bill_No,
|
|
Location=location,
|
|
MH_NO=mh_no,
|
|
).first()
|
|
|
|
if exists:
|
|
errors.append(
|
|
f"Model-Tr.Ex. (Row {idx+1}): Duplicate → Location={location}, MH_NO={mh_no}"
|
|
)
|
|
continue
|
|
|
|
record_data = {}
|
|
for col in df.columns:
|
|
if hasattr(TrenchExcavation, col):
|
|
val = row[col]
|
|
if pd.isna(val) or str(val).strip() in ["", "-", "—", "nan"]:
|
|
val = None
|
|
record_data[col] = val
|
|
|
|
record = TrenchExcavation(
|
|
subcontractor_id=subcontractor_id,
|
|
RA_Bill_No=RA_Bill_No,
|
|
**record_data,
|
|
)
|
|
db.session.add(record)
|
|
|
|
if errors:
|
|
raise Exception(" | ".join(errors))
|
|
|
|
db.session.commit()
|
|
|
|
# ---------------- Manhole Excavation (Subcontractor) ----------------
|
|
def process_manhole_excavation(self, df, subcontractor_id, RA_Bill_No):
|
|
|
|
df.columns = (
|
|
df.columns.astype(str)
|
|
.str.strip()
|
|
.str.replace(r"[^\w]", "_", regex=True)
|
|
.str.replace("__+", "_", regex=True)
|
|
.str.strip("_")
|
|
)
|
|
|
|
df = df.dropna(how="all")
|
|
|
|
if "Location" in df.columns:
|
|
df["Location"] = df["Location"].ffill()
|
|
|
|
errors = []
|
|
|
|
for idx, row in df.iterrows():
|
|
location = self.normalize(row.get("Location"))
|
|
mh_no = self.normalize(row.get("MH_NO"))
|
|
|
|
if not location or not mh_no:
|
|
continue
|
|
|
|
exists = ManholeExcavation.query.filter_by(
|
|
subcontractor_id=subcontractor_id,
|
|
RA_Bill_No=RA_Bill_No,
|
|
Location=location,
|
|
MH_NO=mh_no,
|
|
).first()
|
|
|
|
if exists:
|
|
errors.append(
|
|
f"Model-MH Ex. (Row {idx+1}): Duplicate → Location={location}, MH_NO={mh_no}"
|
|
)
|
|
continue
|
|
|
|
record_data = {}
|
|
for col in df.columns:
|
|
if hasattr(ManholeExcavation, col):
|
|
val = row[col]
|
|
if pd.isna(val) or str(val).strip() in ["", "-", "—", "nan"]:
|
|
val = None
|
|
record_data[col] = val
|
|
|
|
record = ManholeExcavation(
|
|
subcontractor_id=subcontractor_id,
|
|
RA_Bill_No=RA_Bill_No,
|
|
**record_data,
|
|
)
|
|
db.session.add(record)
|
|
|
|
if errors:
|
|
raise Exception(" | ".join(errors))
|
|
|
|
db.session.commit()
|
|
|
|
# ---------------- Manhole & Domestic Chamber (Subcontractor) ----------------
|
|
def process_manhole_domestic_chamber(self, df, subcontractor_id, RA_Bill_No):
|
|
|
|
df.columns = (
|
|
df.columns.astype(str)
|
|
.str.strip()
|
|
.str.replace(r"[^\w]", "_", regex=True)
|
|
.str.replace("__+", "_", regex=True)
|
|
.str.strip("_")
|
|
)
|
|
|
|
df = df.dropna(how="all")
|
|
|
|
if "Location" in df.columns:
|
|
df["Location"] = df["Location"].ffill()
|
|
|
|
errors = []
|
|
|
|
for idx, row in df.iterrows():
|
|
location = self.normalize(row.get("Location"))
|
|
mh_no = self.normalize(row.get("MH_NO"))
|
|
|
|
if not location or not mh_no:
|
|
continue
|
|
|
|
exists = ManholeDomesticChamber.query.filter_by(
|
|
subcontractor_id=subcontractor_id,
|
|
RA_Bill_No=RA_Bill_No,
|
|
Location=location,
|
|
MH_NO=mh_no,
|
|
).first()
|
|
|
|
if exists:
|
|
errors.append(
|
|
f"Model-MH & DC (Row {idx+1}): Duplicate → Location={location}, MH_NO={mh_no}"
|
|
)
|
|
continue
|
|
|
|
record_data = {}
|
|
for col in df.columns:
|
|
if hasattr(ManholeDomesticChamber, col):
|
|
val = row[col]
|
|
if pd.isna(val) or str(val).strip() in ["", "-", "—", "nan"]:
|
|
val = None
|
|
record_data[col] = val
|
|
|
|
record = ManholeDomesticChamber(
|
|
subcontractor_id=subcontractor_id,
|
|
RA_Bill_No=RA_Bill_No,
|
|
**record_data,
|
|
)
|
|
db.session.add(record)
|
|
|
|
if errors:
|
|
raise Exception(" | ".join(errors))
|
|
|
|
db.session.commit()
|
|
|
|
# ---------------- CLIENT FILE UPLOAD ----------------
|
|
def handle_client_file_upload(self, file, RA_Bill_No):
|
|
|
|
if not RA_Bill_No:
|
|
return False, "Please Enter RA Bill No."
|
|
|
|
if not file or file.filename == "":
|
|
return False, "No file selected."
|
|
|
|
if not self.allowed_file(file.filename):
|
|
return False, "Invalid file type! Allowed: CSV, XLSX, XLS"
|
|
|
|
ensure_upload_folder()
|
|
|
|
folder = os.path.join(Config.UPLOAD_FOLDER, f"Client_Bill_{RA_Bill_No}")
|
|
os.makedirs(folder, exist_ok=True)
|
|
|
|
filename = secure_filename(file.filename)
|
|
filepath = os.path.join(folder, filename)
|
|
file.save(filepath)
|
|
|
|
try:
|
|
df_tr_ex = pd.read_excel(filepath, sheet_name="Tr.Ex.", header=4)
|
|
df_mh_ex = pd.read_excel(filepath, sheet_name="MH Ex.", header=4)
|
|
df_mh_dc = pd.read_excel(filepath, sheet_name="MH & DC", header=3)
|
|
|
|
self.save_client_data(df_tr_ex, TrenchExcavationClient, RA_Bill_No)
|
|
self.save_client_data(df_mh_ex, ManholeExcavationClient, RA_Bill_No)
|
|
self.save_client_data(df_mh_dc, ManholeDomesticChamberClient, RA_Bill_No)
|
|
|
|
db.session.commit()
|
|
return True, "Client file uploaded successfully."
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return False, f"Client import failed: {e}"
|
|
|
|
# ---------------- CLIENT SAVE METHOD ----------------
|
|
def save_client_data(self, df, model, RA_Bill_No):
|
|
|
|
df.columns = [str(c).strip() for c in df.columns]
|
|
|
|
if "Location" in df.columns:
|
|
df["Location"] = df["Location"].ffill()
|
|
|
|
df = df.dropna(how="all")
|
|
|
|
for idx, row in df.iterrows():
|
|
record_data = {}
|
|
|
|
for col in df.columns:
|
|
if hasattr(model, col):
|
|
val = row[col]
|
|
if pd.isna(val) or str(val).strip() in ["", "-", "—", "nan"]:
|
|
val = None
|
|
record_data[col] = val
|
|
|
|
record = model(RA_Bill_No=RA_Bill_No, **record_data)
|
|
db.session.add(record)
|