create project and create model and dashboard

This commit is contained in:
2025-12-11 10:16:43 +05:30
parent 2371202ac3
commit 2e62320167
55 changed files with 1072 additions and 0 deletions

0
app/services/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,19 @@
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
db = SQLAlchemy()
migrate = Migrate()
# import mysql.connector
# from app.config import Config
# class DBService:
# def connect(self):
# return mysql.connector.connect(
# host=Config.DB_HOST,
# user=Config.DB_USER,
# password=Config.DB_PASSWORD,
# database=Config.DB_NAME
# )

View File

@@ -0,0 +1,103 @@
# app/services/file_service.py
import os
import pandas as pd
from werkzeug.utils import secure_filename
from app.config import Config
from app import db
from app.models.trench_excavation_model import TrenchExcavation
from app.utils.file_utils import ensure_upload_folder
class FileService:
def allowed_file(self, filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in Config.ALLOWED_EXTENSIONS
def handle_file_upload(self, file, subcontractor_id, file_type):
if not subcontractor_id:
return False, "Please select subcontractor."
if not file_type:
return False, "Please select file type."
if not file or file.filename == "":
return False, "No file selected."
if not self.allowed_file(file.filename):
return False, "Invalid file type! Allowed: CSV, XLSX, XLS"
ensure_upload_folder()
folder = os.path.join(Config.UPLOAD_FOLDER, f"sub_{subcontractor_id}")
os.makedirs(folder, exist_ok=True)
filename = secure_filename(file.filename)
filepath = os.path.join(folder, filename)
file.save(filepath)
try:
df = pd.read_csv(filepath) if filename.endswith(".csv") else pd.read_excel(filepath)
print("\n=== Uploaded File Preview ===")
print(df.head())
print("=============================\n")
if file_type == "trench_excavation":
return self.process_trench_excavation(df, subcontractor_id)
return True, "File uploaded successfully."
except Exception as e:
return False, f"Processing failed: {e}"
# CLEAN & SAVE TRENCH EXCAVATION DATA
def process_trench_excavation(self, df, subcontractor_id):
# Clean column names (strip whitespace)
df.columns = [str(c).strip() for c in df.columns]
# If the sheet has merged cells -> forward fill Location
if "Location" in df.columns:
df["Location"] = df["Location"].ffill()
# REMOVE empty rows
df = df.dropna(how="all")
# Identify missing location rows before insert
missing_loc = df[df["Location"].isna() | (df["Location"].astype(str).str.strip() == "")]
if not missing_loc.empty:
return False, f"Error: Some rows have empty Location. Rows: {missing_loc.index.tolist()}"
saved_count = 0
try:
for index, row in df.iterrows():
record_data = {}
# Insert only fields that exist in model
for col in df.columns:
if hasattr(TrenchExcavation, col):
value = row[col]
# Normalize empty values
if pd.isna(value) or str(value).strip() in ["", "-", "", "nan", "NaN"]:
value = None
record_data[col] = value
record = TrenchExcavation(
subcontractor_id=subcontractor_id,
**record_data
)
db.session.add(record)
saved_count += 1
db.session.commit()
return True, f"Trench Excavation data saved successfully. Total rows: {saved_count}"
except Exception as e:
db.session.rollback()
return False, f"Trench Excavation Save Failed: {e}"

View File

@@ -0,0 +1,47 @@
# from app.models.user_model import User
# from app.services.db_service import db
# import logging
# class UserService:
# @staticmethod
# def register_user(name, email, password):
# user = User(name=name, email=email)
# user.set_password(password)
# db.session.add(user)
# db.session.commit()
# logging.info(f"New user registered: {email}")
# return user
# @staticmethod
# def validate_login(email, password):
# user = User.query.filter_by(email=email).first()
# if user and user.check_password(password):
# logging.info(f"Login success: {email}")
# return user
# logging.warning(f"Login failed for: {email}")
# return None
# @staticmethod
# def get_all_users():
# return User.query.all()
from app.services.db_service import DBService
from app.models.user_model import User
class UserService:
def get_all_users(self):
db = DBService().connect()
cursor = db.cursor(dictionary=True)
cursor.execute("SELECT id, name, email FROM users")
rows = cursor.fetchall()
users = [User(**row) for row in rows]
cursor.close()
db.close()
return users