Files
Client-Billing-software/app/routes/main.py
2026-04-15 10:32:46 +05:30

838 lines
31 KiB
Python

from flask import Blueprint, render_template, request, redirect, url_for, flash,jsonify
from flask_login import login_user, logout_user, login_required, current_user
from sqlalchemy import func, cast, Float
from app.models import Task,WorkDetail
import pandas as pd
from werkzeug.security import generate_password_hash
import os
from app import LDAPUser
import numpy as np
from flask import current_app
from datetime import datetime
from app import db
from app.models import User
from ldap3 import Server, Connection, ALL # ✅ make sure this is at the top
main = Blueprint("main", __name__)
# @main.route("/login", methods=["GET", "POST"])
# def login():
# # Redirect if already logged in
# if current_user.is_authenticated:
# return redirect(url_for("main.dashboard"))
# if request.method == "POST":
# username = request.form.get("username", "").strip()
# password = request.form.get("password", "")
# if not username or not password:
# flash("Username and password are required.", "danger")
# return render_template("login.html")
# ldap_user_dn = f"uid={username},ou=users,dc=lcepl,dc=org"
# try:
# # Connect to LDAP server
# # server = Server("openldap", port=389, get_info=ALL)
# server = Server("localhost", port=389, get_info=ALL)
# conn = Connection(server, user=ldap_user_dn, password=password)
# if conn.bind():
# # Pass the required 'data' argument
# user = LDAPUser(dn=ldap_user_dn, username=username, data={})
# login_user(user)
# flash(f"Welcome, {username}!", "success")
# return redirect(url_for("main.dashboard"))
# else:
# flash("Invalid LDAP credentials", "danger")
# except Exception as e:
# flash(f"LDAP connection error: {e}", "danger")
# # GET request or failed login
# return render_template("login.html")
@main.route("/login", methods=["GET", "POST"])
def login():
# Redirect if already logged in
if current_user.is_authenticated:
return redirect(url_for("main.dashboard"))
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
if not username or not password:
flash("Username and password are required.", "danger")
return render_template("login.html")
ldap_user_dn = f"uid={username},ou=users,dc=lcepl,dc=org"
try:
# Try LDAP authentication first
server = Server("localhost", port=389, get_info=ALL)
conn = Connection(server, user=ldap_user_dn, password=password)
if conn.bind():
user = LDAPUser(dn=ldap_user_dn, username=username, data={})
login_user(user)
flash(f"Welcome, {username}! (LDAP)", "success")
return redirect(url_for("main.dashboard"))
else:
flash("Invalid LDAP credentials", "danger")
except Exception as e:
# Fallback to LOCAL login if LDAP not available
if username == "admin" and password == "admin":
user = LDAPUser(dn=None, username=username, data={})
login_user(user)
flash(f"Welcome, {username}! (Local Login)", "success")
return redirect(url_for("main.dashboard"))
else:
flash("LDAP unavailable and local login failed", "danger")
return render_template("login.html")
@main.route('/logout')
@login_required
def logout():
logout_user()
# flash("You have been logged out.", "info")
return redirect(url_for("main.login"))
# Home route
@main.route('/upload_excel')
@login_required
def upload_excel():
log_activity(current_user.username, "Page Load", "Upload Excel page accessed")
return render_template('upload.html')
# @main.route('/dashboard')
# def upload_file():
# return render_template('dashboard.html')
# # File upload route
# @main.route('/upload', methods=['POST'])
# @login_required
# def upload():
# if 'file' not in request.files:
# return "No file part"
# file = request.files['file']
# if file.filename == '':
# return "No selected file"
# if file:
# filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], file.filename)
# file.save(filepath)
# log_activity(current_user.username, "File Upload", f"Uploaded file: {file.filename}")
# # Read work details (first 11 rows)
# work_details_data = pd.read_excel(filepath, nrows=11, header=None)
# work_details_dict = {
# "name_of_work": work_details_data.iloc[0, 1],
# "cover_agreement_no": work_details_data.iloc[1, 1],
# "name_of_contractor": work_details_data.iloc[2, 1],
# "name_of_tpi_agency": work_details_data.iloc[3, 1],
# "name_of_division": work_details_data.iloc[4, 1],
# "name_of_village": work_details_data.iloc[5, 1],
# "block": work_details_data.iloc[6, 1],
# "scheme_id": work_details_data.iloc[7, 1],
# "date_of_billing": work_details_data.iloc[8, 1],
# "measurement_book": work_details_data.iloc[9, 1],
# "district": work_details_data.iloc[10, 1] # Example: row 11 (index 10), column 2 (index 1)
# }
# work_details_dict = {key: (None if pd.isna(value) else value) for key, value in work_details_dict.items()}
# work_detail = WorkDetail(**work_details_dict)
# db.session.add(work_detail)
# # Read task data starting from row 12
# data = pd.read_excel(filepath, skiprows=10)
# data = data.astype(str).replace({"nan": None, "NaT": None, "None": None})
# expected_columns = [
# "serial_number", "task_name", "unit", "qty", "rate", "boq_amount",
# "previous_billed_qty", "previous_billing_amount",
# "in_this_ra_bill_qty", "in_this_ra_billing_amount",
# "cumulative_billed_qty", "cumulative_billed_amount",
# "variation_qty", "variation_amount", "remark"
# ]
# if data.shape[1] == len(expected_columns):
# data.columns = expected_columns
# else:
# data.columns = expected_columns[:data.shape[1]] # Truncate
# current_main_task_serial = None
# current_main_task_name = None
# for _, row in data.iterrows():
# task_name = str(row["task_name"]) if row["task_name"] else ""
# serial_number = row["serial_number"]
# if serial_number: # Main task
# current_main_task_serial = serial_number
# current_main_task_name = task_name
# parent_id = None
# else: # Subtask
# parent_id = current_main_task_serial
# task = Task(
# district=work_details_dict.get("district"),
# block_name=work_details_dict["block"],
# village_name=work_details_dict["name_of_village"],
# serial_number=serial_number,
# task_name=task_name,
# unit=row["unit"],
# qty=row["qty"],
# rate=row["rate"],
# boq_amount=row["boq_amount"],
# previous_billed_qty=row["previous_billed_qty"],
# previous_billing_amount=row["previous_billing_amount"],
# in_this_ra_bill_qty=row["in_this_ra_bill_qty"],
# in_this_ra_billing_amount=row["in_this_ra_billing_amount"],
# cumulative_billed_qty=row["cumulative_billed_qty"],
# cumulative_billed_amount=row["cumulative_billed_amount"],
# variation_qty=row["variation_qty"],
# variation_amount=row["variation_amount"],
# parent_id=parent_id,
# parent_task_name=current_main_task_name if not serial_number else None,
# remark=row["remark"]
# )
# db.session.add(task)
# db.session.commit()
# log_activity(current_user.username, "Database Insert", f"Inserted work details and tasks from {file.filename}")
# return redirect(url_for('main.display_tasks'))
def to_2_decimal(value):
try:
if value is None or value == "":
return None
return round(float(value), 2)
except (TypeError, ValueError):
return None
@main.route('/upload', methods=['POST'])
@login_required
def upload():
if 'file' not in request.files:
return "No file part"
file = request.files['file']
if file.filename == '':
return "No selected file"
if file:
filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], file.filename)
file.save(filepath)
log_activity(current_user.username, "File Upload", f"Uploaded file: {file.filename}")
# Read work details (first 11 rows)
# work_details_data = pd.read_excel(filepath, nrows=11, header=None)
work_details_data = pd.read_excel(filepath, nrows=11, header=None, dtype=str)
work_details_dict = {
"name_of_work": work_details_data.iloc[0, 1],
"cover_agreement_no": work_details_data.iloc[1, 1],
"name_of_contractor": work_details_data.iloc[2, 1],
"name_of_tpi_agency": work_details_data.iloc[3, 1],
"name_of_division": work_details_data.iloc[4, 1],
"name_of_village": work_details_data.iloc[5, 1],
"block": work_details_data.iloc[6, 1],
"scheme_id": work_details_data.iloc[7, 1],
"date_of_billing": work_details_data.iloc[8, 1],
"measurement_book": work_details_data.iloc[9, 1],
"district": work_details_data.iloc[10, 1]
}
# Clean work details NaN
work_details_dict = {k: (None if pd.isna(v) else v) for k, v in work_details_dict.items()}
work_detail = WorkDetail(**work_details_dict)
db.session.add(work_detail)
# Read task data
# data = pd.read_excel(filepath, skiprows=10)
data = pd.read_excel(filepath, skiprows=10)
# ✅ Convert all NaN → None (critical for MySQL)
data = data.astype(object).where(pd.notna(data), None)
expected_columns = [
"serial_number", "task_name", "unit", "qty", "rate", "boq_amount",
"previous_billed_qty", "previous_billing_amount",
"in_this_ra_bill_qty", "in_this_ra_billing_amount",
"cumulative_billed_qty", "cumulative_billed_amount",
"variation_qty", "variation_amount", "remark"
]
if data.shape[1] == len(expected_columns):
data.columns = expected_columns
else:
data.columns = expected_columns[:data.shape[1]]
current_main_task_serial = None
current_main_task_name = None
for _, row in data.iterrows():
task_name = str(row["task_name"]) if row["task_name"] else ""
serial_number = str(row["serial_number"]) if row["serial_number"] else None
if serial_number:
current_main_task_serial = serial_number
current_main_task_name = task_name
parent_id = None
else:
parent_id = current_main_task_serial
task = Task(
district=work_details_dict.get("district"),
block_name=work_details_dict["block"],
village_name=work_details_dict["name_of_village"],
serial_number=serial_number,
task_name=task_name,
unit=row["unit"],
qty=to_2_decimal(row["qty"]),
rate=to_2_decimal(row["rate"]),
boq_amount=to_2_decimal(row["boq_amount"]),
previous_billed_qty=to_2_decimal(row["previous_billed_qty"]),
previous_billing_amount=to_2_decimal(row["previous_billing_amount"]),
in_this_ra_bill_qty=to_2_decimal(row["in_this_ra_bill_qty"]),
in_this_ra_billing_amount=to_2_decimal(row["in_this_ra_billing_amount"]),
cumulative_billed_qty=to_2_decimal(row["cumulative_billed_qty"]),
cumulative_billed_amount=to_2_decimal(row["cumulative_billed_amount"]),
variation_qty=to_2_decimal(row["variation_qty"]),
variation_amount=to_2_decimal(row["variation_amount"]),
parent_id=parent_id,
parent_task_name=current_main_task_name if not serial_number else None,
remark=row["remark"]
)
db.session.add(task)
db.session.commit()
log_activity(current_user.username, "Database Insert", f"Inserted work details and tasks from {file.filename}")
return redirect(url_for('main.display_tasks'))
# # Update tasks route
# @main.route('/update_tasks', methods=['POST'])
# @login_required
# def update_tasks():
# try:
# updates = request.get_json()
# update_count = 0
# for key, new_value in updates.items():
# if '_' not in key:
# continue
# field_name, task_id_str = key.rsplit('_', 1)
# if not task_id_str.isdigit():
# continue
# task_id = int(task_id_str)
# task = db.session.query(Task).filter_by(id=task_id).first()
# if task:
# current_value = getattr(task, field_name, None)
# if current_value != new_value:
# setattr(task, field_name, new_value)
# update_count += 1
# log_activity(current_user.username, "Task Update", f"Task ID {task.id} - {field_name} changed to {new_value}")
# if update_count > 0:
# db.session.commit()
# log_activity(current_user.username, "Database Commit", f"{update_count} task field(s) updated")
# return jsonify({'message': f'count: {update_count} field(s) updated.'})
# else:
# return jsonify({'message': 'No fields were updated.'})
# except Exception as e:
# log_activity(current_user.username, "Error", f"Update tasks error: {str(e)}")
# return jsonify({'error': 'An error occurred while updating tasks.'}), 500
def recalc_task(task):
rate = float(task.rate or 0)
qty = float(task.qty or 0)
prev_qty = float(task.previous_billed_qty or 0)
ra_qty = float(task.in_this_ra_bill_qty or 0)
# H = G * E
task.previous_billing_amount = round(prev_qty * rate, 2)
# J = I * E
task.in_this_ra_billing_amount = round(ra_qty * rate, 2)
# K = I + G
task.cumulative_billed_qty = round(prev_qty + ra_qty, 2)
# L = K * E
task.cumulative_billed_amount = round(task.cumulative_billed_qty * rate, 2)
# M = IF(K > D , K - D , 0)
if task.cumulative_billed_qty > qty:
task.variation_qty = round(task.cumulative_billed_qty - qty, 2)
else:
task.variation_qty = 0
# N = M * E
task.variation_amount = round(task.variation_qty * rate, 2)
@main.route('/update_tasks', methods=['POST'])
@login_required
def update_tasks():
try:
updates = request.get_json()
update_count = 0
# fields that should NOT be manually edited
formula_fields = [
"previous_billing_amount",
"in_this_ra_billing_amount",
"cumulative_billed_qty",
"cumulative_billed_amount",
"variation_qty",
"variation_amount"
]
for key, new_value in updates.items():
if '_' not in key:
continue
field_name, task_id_str = key.rsplit('_', 1)
if not task_id_str.isdigit():
continue
task_id = int(task_id_str)
task = db.session.query(Task).filter_by(id=task_id).first()
if task:
# ❌ Skip manual update for formula fields
if field_name in formula_fields:
continue
current_value = getattr(task, field_name, None)
if str(current_value) != str(new_value):
setattr(task, field_name, new_value)
# 🔥 auto recalc after any change
recalc_task(task)
update_count += 1
log_activity(
current_user.username,
"Task Update",
f"Task ID {task.id} - {field_name} changed to {new_value}"
)
if update_count > 0:
db.session.commit()
log_activity(
current_user.username,
"Database Commit",
f"{update_count} task field(s) updated"
)
return jsonify({'message': f'count: {update_count} field(s) updated.'})
else:
return jsonify({'message': 'No fields were updated.'})
except Exception as e:
log_activity(current_user.username, "Error", f"Update tasks error: {str(e)}")
return jsonify({'error': 'An error occurred while updating tasks.'}), 500
# Display tasks route
@main.route('/tasks')
@login_required
def display_tasks():
work_details = WorkDetail.query.order_by(WorkDetail.uploaded_at.desc()).first()
if not work_details:
log_activity(current_user.username, "Tasks View", "No work details available")
return "No work details available.", 404
tasks = Task.query.filter_by(
district=work_details.district,
village_name=work_details.name_of_village,
block_name=work_details.block
).order_by(Task.uploaded_at.desc()).all()
grouped_tasks = []
current_main_task = None
for task in tasks:
task_data = {
"id": task.id,
"task_name": task.task_name,
"unit": task.unit,
"qty": task.qty,
"rate": task.rate,
"boq_amount": task.boq_amount,
"previous_billed_qty": task.previous_billed_qty,
"previous_billing_amount": task.previous_billing_amount,
"in_this_ra_bill_qty": task.in_this_ra_bill_qty,
"in_this_ra_billing_amount": task.in_this_ra_billing_amount,
"cumulative_billed_qty": task.cumulative_billed_qty,
"cumulative_billed_amount": task.cumulative_billed_amount,
"variation_qty": task.variation_qty,
"variation_amount": task.variation_amount,
"remark": task.remark,
"district": task.district
}
if task.serial_number:
task_data["subtasks"] = []
grouped_tasks.append(task_data)
current_main_task = task_data
elif current_main_task:
current_main_task["subtasks"].append(task_data)
log_activity(current_user.username, "Tasks View", f"Displayed tasks for {work_details.name_of_village}, {work_details.block}")
return render_template('tasks_display.html', work_details=work_details, grouped_tasks=grouped_tasks)
@main.route('/')
@login_required
def dashboard():
selected_block = request.args.getlist('block[]', None)
rate_col = cast(Task.rate, Float)
qty_col = cast(Task.in_this_ra_bill_qty, Float)
query = db.session.query(
Task.block_name.label("block_name"),
Task.village_name.label("village_name"),
func.sum(cast(Task.boq_amount, Float)).label("total_boq_amount"),
func.sum(cast(Task.previous_billing_amount, Float)).label("prev_billed_amount"),
func.sum(cast(Task.variation_amount, Float)).label("total_variation_amount"),
func.sum(cast(Task.cumulative_billed_qty, Float)).label("cumulative_billed_qty"),
func.sum(cast(Task.cumulative_billed_qty, Float) * cast(Task.rate, Float)).label("cumulative_billed_amount"),
func.sum(qty_col).label("in_this_ra_bill_qty"),
func.sum(rate_col * qty_col).label("to_be_claimed_amount")
)
if selected_block and "All" not in selected_block:
query = query.filter(Task.block_name.in_(selected_block))
query = query.group_by(Task.block_name, Task.village_name)
villages = query.all()
village_data = []
for village in villages:
village_data.append({
"block_name": village.block_name,
"village_name": village.village_name,
"total_boq_amount": village.total_boq_amount or 0,
"rate": "-",
"prev_billed_amount": village.prev_billed_amount or 0,
"total_variation_amount": village.total_variation_amount or 0,
"cumulative_billed_qty": village.cumulative_billed_qty or 0,
"cumulative_billed_amount": village.cumulative_billed_amount or 0,
"in_this_ra_bill_qty": village.in_this_ra_bill_qty or 0,
"to_be_claimed_amount": round(village.to_be_claimed_amount or 0, 2)
})
blocks = db.session.query(Task.block_name).distinct().all()
block_list = ["All"] + [block[0] for block in blocks]
# log_activity(current_user.username, "Dashboard View", "Dashboard accessed")
return render_template('index.html', villages=village_data, blocks=block_list, selected_block=selected_block)
# @main.route('/generate_report_page')
# @login_required
# def generate_report_page():
# blocks = db.session.query(Task.block_name).distinct().all()
# blocks = [block.block_name for block in blocks]
# selected_block = request.args.get('block')
# if selected_block:
# main_tasks = db.session.query(Task.task_name).filter(
# Task.serial_number.isnot(None),
# Task.block_name == selected_block
# ).distinct().all()
# main_tasks = [task.task_name.strip().replace(",", "").replace("(", "").replace(")", "").replace(".", "").replace("&", "").replace("\n", "") for task in main_tasks]
# else:
# main_tasks = db.session.query(Task.task_name).filter(Task.serial_number.isnot(None)).distinct().all()
# main_tasks = [task.task_name.strip().replace(",", "").replace("(", "").replace(")", "").replace(".", "").replace("&", "").replace("\n", "") for task in main_tasks]
# log_activity(current_user.username, "Report Page", f"Report generation page accessed (block={selected_block})")
# return render_template('task_report.html', main_tasks=main_tasks, blocks=blocks)
@main.route('/get_blocks_by_district')
@login_required
def get_blocks_by_district():
district = request.args.get('district')
if not district:
return jsonify({'blocks': []})
blocks = db.session.query(Task.block_name)\
.filter(Task.district == district)\
.distinct().all()
return jsonify({'blocks': [b[0] for b in blocks]})
@main.route('/generate_report_page')
@login_required
def generate_report_page():
selected_district = request.args.get('district')
selected_block = request.args.get('block')
# ✅ Get all districts
districts = [d[0] for d in db.session.query(Task.district).distinct().all()]
# ✅ Get blocks based on district
if selected_district:
blocks = [b[0] for b in db.session.query(Task.block_name)
.filter(Task.district == selected_district)
.distinct().all()]
else:
blocks = []
# ✅ Get main tasks based on block
if selected_block:
main_tasks = db.session.query(Task.task_name).filter(
Task.serial_number.isnot(None),
Task.block_name == selected_block
).distinct().all()
main_tasks = [
task[0].strip()
.replace(",", "")
.replace("(", "")
.replace(")", "")
.replace(".", "")
.replace("&", "")
.replace("\n", "")
for task in main_tasks
]
else:
main_tasks = []
log_activity(
current_user.username,
"Report Page",
f"Report page accessed district={selected_district}, block={selected_block}"
)
return render_template(
'task_report.html',
districts=districts,
blocks=blocks,
main_tasks=main_tasks,
selected_district=selected_district,
selected_block=selected_block
)
@main.route('/get_tasks_by_block')
@login_required
def get_tasks_by_block():
block = request.args.get('block')
if not block:
return jsonify({'tasks': []})
tasks = db.session.query(Task.task_name)\
.filter(Task.block_name == block)\
.distinct()\
.all()
task_list = [task[0].strip()
.replace(",", "")
.replace("(", "")
.replace(")", "")
.replace(".", "")
.replace("&", "")
.replace("\n", "") for task in tasks]
log_activity(current_user.username, "Fetch Tasks", f"Fetched tasks for block {block}")
return jsonify({'tasks': task_list})
def get_villages_for_block(block_name):
villages = (
db.session.query(WorkDetail.name_of_village)
.filter(WorkDetail.block == block_name)
.distinct()
.order_by(WorkDetail.name_of_village)
.all()
)
return [v[0] for v in villages if v[0]]
@main.route('/get_villages_by_block', methods=['GET'])
@login_required
def get_villages_by_block():
block = request.args.get('block')
villages = get_villages_for_block(block)
log_activity(current_user.username, "Fetch Villages", f"Fetched villages for block {block}")
return jsonify({'villages': villages})
@main.route('/filter_tasks', methods=['GET'])
@login_required
def filter_tasks():
district = request.args.get('district')
block = request.args.get('block')
village = request.args.get('village')
# ✅ Fetch distinct districts
districts = [d[0] for d in db.session.query(WorkDetail.district).distinct()]
# ✅ Fetch blocks filtered by district
if district:
blocks = [b[0] for b in db.session.query(WorkDetail.block)
.filter(WorkDetail.district == district).distinct()]
else:
blocks = []
# ✅ Fetch villages filtered by district + block
if district and block:
villages = [v[0] for v in db.session.query(WorkDetail.name_of_village)
.filter(WorkDetail.district == district,
WorkDetail.block == block)
.distinct()]
else:
villages = []
grouped_tasks = []
# ✅ Only fetch tasks if all three (district, block, village) are selected
if district and block and village:
query = (db.session.query(Task)
.join(WorkDetail, Task.village_name == WorkDetail.name_of_village)
.filter(WorkDetail.district == district,
WorkDetail.block == block,
WorkDetail.name_of_village == village))
tasks = query.order_by(Task.uploaded_at.desc()).all()
current_main_task = None
for task in tasks:
task_data = {
"id": task.id,
"task_name": task.task_name,
"unit": task.unit,
"qty": task.qty,
"rate": task.rate,
"boq_amount": task.boq_amount,
"previous_billed_qty": task.previous_billed_qty,
"previous_billing_amount": task.previous_billing_amount,
"in_this_ra_bill_qty": task.in_this_ra_bill_qty,
"in_this_ra_billing_amount": task.in_this_ra_billing_amount,
"cumulative_billed_qty": task.cumulative_billed_qty,
"cumulative_billed_amount": task.cumulative_billed_amount,
"variation_qty": task.variation_qty,
"variation_amount": task.variation_amount,
"remark": task.remark
}
# ✅ Group main tasks (with serial_number) and subtasks
if task.serial_number:
task_data["subtasks"] = []
grouped_tasks.append(task_data)
current_main_task = task_data
elif current_main_task:
current_main_task["subtasks"].append(task_data)
log_activity(current_user.username, "Filter Tasks",
f"Filtered tasks for district={district}, block={block}, village={village}")
# ✅ Render with both filtering + grouped tasks
return render_template(
'filter_tasks.html',
grouped_tasks=grouped_tasks,
districts=districts,
blocks=blocks,
villages=villages,
selected_district=district,
selected_block=block,
selected_village=village
)
# ✅ Helper function for logging user activity
def log_activity(user, action, details=""):
try:
log_file = os.path.join(current_app.root_path, "activity.log")
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(log_file, "a") as f:
f.write(f"Timestamp: {timestamp} | User: {user} | Action: {action} | Details: {details}\n")
except Exception as e:
print(f"Logging failed: {e}")
from flask import request
from datetime import datetime
@main.route('/activity_log', methods=['GET', 'POST'])
@login_required
def activity_log():
logs = []
log_file = os.path.join(current_app.root_path, 'activity.log')
if os.path.exists(log_file):
with open(log_file, 'r') as f:
for line in f:
parts = line.strip().split(" | ")
if len(parts) == 4:
logs.append({
"timestamp": parts[0].replace("Timestamp:", "").strip(),
"user": parts[1].replace("User:", "").strip(),
"action": parts[2].replace("Action:", "").strip(),
"details": parts[3].replace("Details:", "").strip()
})
# Filters
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
username = request.args.get("username")
filtered_logs = logs
# Date filter (inclusive)
if start_date or end_date:
try:
if start_date:
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
else:
start_dt = datetime.min
if end_date:
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
else:
end_dt = datetime.max
filtered_logs = [
log for log in filtered_logs
if start_dt <= datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") <= end_dt.replace(hour=23, minute=59, second=59)
]
except Exception as e:
print("Date filter error:", e)
# Username filter
if username:
filtered_logs = [log for log in filtered_logs if log["user"].lower() == username.lower()]
return render_template(
"activity_log.html",
logs=filtered_logs,
start_date=start_date,
end_date=end_date,
username=username
)