Two fixes: 1. Improved deleted item row styling (opacity + background) 2. Allow feedback_attachment route to serve images from deleted items (admin only) Before: Screenshots shown as links only (2026-02-10_13_29_39.png) After: Screenshots shown as images/thumbnails Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
509 lines
16 KiB
Python
509 lines
16 KiB
Python
from .routes_shared import * # noqa: F401,F403
|
|
from .routes_shared import _format_datetime
|
|
from werkzeug.utils import secure_filename
|
|
import imghdr
|
|
|
|
|
|
# Allowed image extensions and max file size
|
|
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
|
|
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5 MB
|
|
|
|
|
|
def _validate_image_file(file):
|
|
"""Validate uploaded image file.
|
|
|
|
Returns (is_valid, error_message, mime_type)
|
|
"""
|
|
if not file or not file.filename:
|
|
return False, "No file selected", None
|
|
|
|
# Check file size
|
|
file.seek(0, 2) # Seek to end
|
|
size = file.tell()
|
|
file.seek(0) # Reset to beginning
|
|
|
|
if size > MAX_FILE_SIZE:
|
|
return False, f"File too large (max {MAX_FILE_SIZE // (1024*1024)}MB)", None
|
|
|
|
if size == 0:
|
|
return False, "Empty file", None
|
|
|
|
# Check extension
|
|
filename = secure_filename(file.filename)
|
|
if '.' not in filename:
|
|
return False, "File must have an extension", None
|
|
|
|
ext = filename.rsplit('.', 1)[1].lower()
|
|
if ext not in ALLOWED_EXTENSIONS:
|
|
return False, f"Only images allowed ({', '.join(ALLOWED_EXTENSIONS)})", None
|
|
|
|
# Verify it's actually an image by reading header
|
|
file_data = file.read()
|
|
file.seek(0)
|
|
|
|
image_type = imghdr.what(None, h=file_data)
|
|
if image_type is None:
|
|
return False, "Invalid image file", None
|
|
|
|
mime_type = f"image/{image_type}"
|
|
|
|
return True, None, mime_type
|
|
|
|
|
|
@main_bp.route("/feedback")
|
|
@login_required
|
|
@roles_required("admin", "operator", "reporter", "viewer")
|
|
def feedback_page():
|
|
item_type = (request.args.get("type") or "").strip().lower()
|
|
if item_type not in ("", "bug", "feature"):
|
|
item_type = ""
|
|
|
|
# Default to showing only open items. Users can still switch to Resolved or All via the filter.
|
|
status = (request.args.get("status") or "open").strip().lower()
|
|
if status not in ("open", "resolved", "all"):
|
|
status = "all"
|
|
|
|
q = (request.args.get("q") or "").strip()
|
|
|
|
sort = (request.args.get("sort") or "votes").strip().lower()
|
|
if sort not in ("votes", "newest", "updated"):
|
|
sort = "votes"
|
|
|
|
# Admin-only: show deleted items
|
|
show_deleted = False
|
|
if get_active_role() == "admin":
|
|
show_deleted = request.args.get("show_deleted", "0") in ("1", "true", "yes", "on")
|
|
|
|
where = []
|
|
if not show_deleted:
|
|
where.append("fi.deleted_at IS NULL")
|
|
params = {"user_id": int(current_user.id)}
|
|
|
|
if item_type:
|
|
where.append("fi.item_type = :item_type")
|
|
params["item_type"] = item_type
|
|
|
|
if status != "all":
|
|
where.append("fi.status = :status")
|
|
params["status"] = status
|
|
|
|
if q:
|
|
where.append("(fi.title ILIKE :q OR fi.description ILIKE :q OR COALESCE(fi.component,'') ILIKE :q)")
|
|
params["q"] = f"%{q}%"
|
|
|
|
where_sql = " AND ".join(where)
|
|
|
|
if sort == "newest":
|
|
order_sql = "fi.created_at DESC"
|
|
elif sort == "updated":
|
|
order_sql = "fi.updated_at DESC"
|
|
else:
|
|
order_sql = "vote_count DESC, fi.created_at DESC"
|
|
|
|
# Always keep resolved items at the bottom when mixing statuses.
|
|
order_sql = "CASE WHEN fi.status = 'resolved' THEN 1 ELSE 0 END, " + order_sql
|
|
|
|
sql = text(
|
|
f"""
|
|
SELECT
|
|
fi.id,
|
|
fi.item_type,
|
|
fi.title,
|
|
fi.component,
|
|
fi.status,
|
|
fi.created_at,
|
|
fi.updated_at,
|
|
fi.deleted_at,
|
|
fi.deleted_by_user_id,
|
|
u.username AS created_by,
|
|
COALESCE(v.vote_count, 0) AS vote_count,
|
|
EXISTS (
|
|
SELECT 1
|
|
FROM feedback_votes fv
|
|
WHERE fv.feedback_item_id = fi.id
|
|
AND fv.user_id = :user_id
|
|
) AS user_voted
|
|
FROM feedback_items fi
|
|
JOIN users u ON u.id = fi.created_by_user_id
|
|
LEFT JOIN (
|
|
SELECT feedback_item_id, COUNT(*) AS vote_count
|
|
FROM feedback_votes
|
|
GROUP BY feedback_item_id
|
|
) v ON v.feedback_item_id = fi.id
|
|
WHERE {where_sql}
|
|
ORDER BY {order_sql}
|
|
LIMIT 500
|
|
"""
|
|
)
|
|
|
|
rows = db.session.execute(sql, params).mappings().all()
|
|
|
|
items = []
|
|
for r in rows:
|
|
items.append(
|
|
{
|
|
"id": int(r["id"]),
|
|
"item_type": (r["item_type"] or "").lower(),
|
|
"title": r["title"] or "",
|
|
"component": r["component"] or "",
|
|
"status": (r["status"] or "open").lower(),
|
|
"created_at": _format_datetime(r["created_at"]),
|
|
"updated_at": _format_datetime(r["updated_at"]),
|
|
"created_by": r["created_by"] or "-",
|
|
"vote_count": int(r["vote_count"] or 0),
|
|
"user_voted": bool(r["user_voted"]),
|
|
"is_deleted": bool(r["deleted_at"]),
|
|
"deleted_at": _format_datetime(r["deleted_at"]) if r["deleted_at"] else "",
|
|
}
|
|
)
|
|
|
|
return render_template(
|
|
"main/feedback.html",
|
|
items=items,
|
|
item_type=item_type,
|
|
status=status,
|
|
q=q,
|
|
sort=sort,
|
|
show_deleted=show_deleted,
|
|
)
|
|
|
|
|
|
@main_bp.route("/feedback/new", methods=["GET", "POST"])
|
|
@login_required
|
|
@roles_required("admin", "operator", "reporter", "viewer")
|
|
def feedback_new():
|
|
if request.method == "POST":
|
|
item_type = (request.form.get("item_type") or "").strip().lower()
|
|
if item_type not in ("bug", "feature"):
|
|
flash("Invalid type.", "danger")
|
|
return redirect(url_for("main.feedback_new"))
|
|
|
|
title = (request.form.get("title") or "").strip()
|
|
description = (request.form.get("description") or "").strip()
|
|
component = (request.form.get("component") or "").strip() or None
|
|
|
|
if not title or not description:
|
|
flash("Title and description are required.", "danger")
|
|
return redirect(url_for("main.feedback_new"))
|
|
|
|
item = FeedbackItem(
|
|
item_type=item_type,
|
|
title=title,
|
|
description=description,
|
|
component=component,
|
|
status="open",
|
|
created_by_user_id=int(current_user.id),
|
|
)
|
|
db.session.add(item)
|
|
db.session.flush() # Get item.id for attachments
|
|
|
|
# Handle file uploads (multiple files allowed)
|
|
files = request.files.getlist('screenshots')
|
|
for file in files:
|
|
if file and file.filename:
|
|
is_valid, error_msg, mime_type = _validate_image_file(file)
|
|
if not is_valid:
|
|
db.session.rollback()
|
|
flash(f"Screenshot error: {error_msg}", "danger")
|
|
return redirect(url_for("main.feedback_new"))
|
|
|
|
filename = secure_filename(file.filename)
|
|
file_data = file.read()
|
|
|
|
attachment = FeedbackAttachment(
|
|
feedback_item_id=item.id,
|
|
feedback_reply_id=None,
|
|
filename=filename,
|
|
file_data=file_data,
|
|
mime_type=mime_type,
|
|
file_size=len(file_data),
|
|
)
|
|
db.session.add(attachment)
|
|
|
|
db.session.commit()
|
|
|
|
flash("Feedback item created.", "success")
|
|
return redirect(url_for("main.feedback_detail", item_id=item.id))
|
|
|
|
return render_template("main/feedback_new.html")
|
|
|
|
|
|
@main_bp.route("/feedback/<int:item_id>")
|
|
@login_required
|
|
@roles_required("admin", "operator", "reporter", "viewer")
|
|
def feedback_detail(item_id: int):
|
|
item = FeedbackItem.query.get_or_404(item_id)
|
|
# Allow admins to view deleted items
|
|
if item.deleted_at is not None and get_active_role() != "admin":
|
|
abort(404)
|
|
|
|
vote_count = (
|
|
db.session.query(db.func.count(FeedbackVote.id))
|
|
.filter(FeedbackVote.feedback_item_id == item.id)
|
|
.scalar()
|
|
or 0
|
|
)
|
|
|
|
user_voted = (
|
|
FeedbackVote.query.filter(
|
|
FeedbackVote.feedback_item_id == item.id,
|
|
FeedbackVote.user_id == int(current_user.id),
|
|
).first()
|
|
is not None
|
|
)
|
|
|
|
created_by = User.query.get(item.created_by_user_id)
|
|
created_by_name = created_by.username if created_by else "-"
|
|
|
|
resolved_by_name = ""
|
|
if item.resolved_by_user_id:
|
|
resolved_by = User.query.get(item.resolved_by_user_id)
|
|
resolved_by_name = resolved_by.username if resolved_by else ""
|
|
|
|
# Get attachments for the main item (not linked to a reply)
|
|
item_attachments = (
|
|
FeedbackAttachment.query.filter(
|
|
FeedbackAttachment.feedback_item_id == item.id,
|
|
FeedbackAttachment.feedback_reply_id.is_(None),
|
|
)
|
|
.order_by(FeedbackAttachment.created_at.asc())
|
|
.all()
|
|
)
|
|
|
|
replies = (
|
|
FeedbackReply.query.filter(FeedbackReply.feedback_item_id == item.id)
|
|
.order_by(FeedbackReply.created_at.asc())
|
|
.all()
|
|
)
|
|
|
|
# Get attachments for each reply
|
|
reply_ids = [r.id for r in replies]
|
|
reply_attachments_list = []
|
|
if reply_ids:
|
|
reply_attachments_list = (
|
|
FeedbackAttachment.query.filter(
|
|
FeedbackAttachment.feedback_reply_id.in_(reply_ids)
|
|
)
|
|
.order_by(FeedbackAttachment.created_at.asc())
|
|
.all()
|
|
)
|
|
|
|
# Map reply_id -> list of attachments
|
|
reply_attachments_map = {}
|
|
for att in reply_attachments_list:
|
|
if att.feedback_reply_id not in reply_attachments_map:
|
|
reply_attachments_map[att.feedback_reply_id] = []
|
|
reply_attachments_map[att.feedback_reply_id].append(att)
|
|
|
|
reply_user_ids = sorted({int(r.user_id) for r in replies})
|
|
reply_users = (
|
|
User.query.filter(User.id.in_(reply_user_ids)).all() if reply_user_ids else []
|
|
)
|
|
reply_user_map = {int(u.id): (u.username or "") for u in reply_users}
|
|
|
|
return render_template(
|
|
"main/feedback_detail.html",
|
|
item=item,
|
|
created_by_name=created_by_name,
|
|
resolved_by_name=resolved_by_name,
|
|
vote_count=int(vote_count),
|
|
user_voted=bool(user_voted),
|
|
replies=replies,
|
|
reply_user_map=reply_user_map,
|
|
item_attachments=item_attachments,
|
|
reply_attachments_map=reply_attachments_map,
|
|
)
|
|
|
|
@main_bp.route("/feedback/<int:item_id>/reply", methods=["POST"])
|
|
@login_required
|
|
@roles_required("admin", "operator", "reporter", "viewer")
|
|
def feedback_reply(item_id: int):
|
|
item = FeedbackItem.query.get_or_404(item_id)
|
|
if item.deleted_at is not None:
|
|
abort(404)
|
|
|
|
if (item.status or "").strip().lower() != "open":
|
|
flash("Only open feedback items can be replied to.", "warning")
|
|
return redirect(url_for("main.feedback_detail", item_id=item.id))
|
|
|
|
message = (request.form.get("message") or "").strip()
|
|
if not message:
|
|
flash("Reply message is required.", "danger")
|
|
return redirect(url_for("main.feedback_detail", item_id=item.id))
|
|
|
|
reply = FeedbackReply(
|
|
feedback_item_id=int(item.id),
|
|
user_id=int(current_user.id),
|
|
message=message,
|
|
created_at=datetime.utcnow(),
|
|
)
|
|
db.session.add(reply)
|
|
db.session.flush() # Get reply.id for attachments
|
|
|
|
# Handle file uploads (multiple files allowed)
|
|
files = request.files.getlist('screenshots')
|
|
for file in files:
|
|
if file and file.filename:
|
|
is_valid, error_msg, mime_type = _validate_image_file(file)
|
|
if not is_valid:
|
|
db.session.rollback()
|
|
flash(f"Screenshot error: {error_msg}", "danger")
|
|
return redirect(url_for("main.feedback_detail", item_id=item.id))
|
|
|
|
filename = secure_filename(file.filename)
|
|
file_data = file.read()
|
|
|
|
attachment = FeedbackAttachment(
|
|
feedback_item_id=item.id,
|
|
feedback_reply_id=reply.id,
|
|
filename=filename,
|
|
file_data=file_data,
|
|
mime_type=mime_type,
|
|
file_size=len(file_data),
|
|
)
|
|
db.session.add(attachment)
|
|
|
|
db.session.commit()
|
|
|
|
flash("Reply added.", "success")
|
|
return redirect(url_for("main.feedback_detail", item_id=item.id))
|
|
|
|
|
|
|
|
|
|
|
|
@main_bp.route("/feedback/<int:item_id>/vote", methods=["POST"])
|
|
@login_required
|
|
@roles_required("admin", "operator", "reporter", "viewer")
|
|
def feedback_vote(item_id: int):
|
|
item = FeedbackItem.query.get_or_404(item_id)
|
|
if item.deleted_at is not None:
|
|
abort(404)
|
|
|
|
existing = FeedbackVote.query.filter(
|
|
FeedbackVote.feedback_item_id == item.id,
|
|
FeedbackVote.user_id == int(current_user.id),
|
|
).first()
|
|
|
|
if existing:
|
|
db.session.delete(existing)
|
|
db.session.commit()
|
|
flash("Vote removed.", "secondary")
|
|
else:
|
|
vote = FeedbackVote(
|
|
feedback_item_id=item.id,
|
|
user_id=int(current_user.id),
|
|
)
|
|
db.session.add(vote)
|
|
try:
|
|
db.session.commit()
|
|
flash("Voted.", "success")
|
|
except Exception:
|
|
db.session.rollback()
|
|
flash("Could not vote.", "danger")
|
|
|
|
ref = request.form.get("ref") or "detail"
|
|
if ref == "list":
|
|
return redirect(request.referrer or url_for("main.feedback_page"))
|
|
return redirect(url_for("main.feedback_detail", item_id=item.id))
|
|
|
|
|
|
@main_bp.route("/feedback/<int:item_id>/resolve", methods=["POST"])
|
|
@login_required
|
|
@roles_required("admin")
|
|
def feedback_resolve(item_id: int):
|
|
item = FeedbackItem.query.get_or_404(item_id)
|
|
if item.deleted_at is not None:
|
|
abort(404)
|
|
|
|
action = (request.form.get("action") or "resolve").strip().lower()
|
|
if action not in ("resolve", "reopen"):
|
|
action = "resolve"
|
|
|
|
if action == "resolve":
|
|
item.status = "resolved"
|
|
item.resolved_by_user_id = int(current_user.id)
|
|
item.resolved_at = datetime.utcnow()
|
|
flash("Marked as resolved.", "success")
|
|
else:
|
|
item.status = "open"
|
|
item.resolved_by_user_id = None
|
|
item.resolved_at = None
|
|
flash("Reopened.", "secondary")
|
|
|
|
db.session.commit()
|
|
return redirect(url_for("main.feedback_detail", item_id=item.id))
|
|
|
|
|
|
@main_bp.route("/feedback/<int:item_id>/delete", methods=["POST"])
|
|
@login_required
|
|
@roles_required("admin")
|
|
def feedback_delete(item_id: int):
|
|
item = FeedbackItem.query.get_or_404(item_id)
|
|
if item.deleted_at is not None:
|
|
abort(404)
|
|
|
|
item.deleted_at = datetime.utcnow()
|
|
item.deleted_by_user_id = int(current_user.id)
|
|
db.session.commit()
|
|
|
|
flash("Feedback item deleted.", "success")
|
|
return redirect(url_for("main.feedback_page"))
|
|
|
|
|
|
@main_bp.route("/feedback/<int:item_id>/permanent-delete", methods=["POST"])
|
|
@login_required
|
|
@roles_required("admin")
|
|
def feedback_permanent_delete(item_id: int):
|
|
"""Permanently delete a feedback item and all its attachments from the database.
|
|
|
|
This is a hard delete - the item and all associated data will be removed permanently.
|
|
Only available for items that are already soft-deleted.
|
|
"""
|
|
item = FeedbackItem.query.get_or_404(item_id)
|
|
|
|
# Only allow permanent delete on already soft-deleted items
|
|
if item.deleted_at is None:
|
|
flash("Item must be deleted first before permanent deletion.", "warning")
|
|
return redirect(url_for("main.feedback_detail", item_id=item.id))
|
|
|
|
# Get attachment count for feedback message
|
|
attachment_count = FeedbackAttachment.query.filter_by(feedback_item_id=item.id).count()
|
|
|
|
# Hard delete - CASCADE will automatically delete:
|
|
# - feedback_votes
|
|
# - feedback_replies
|
|
# - feedback_attachments (via replies CASCADE)
|
|
# - feedback_attachments (direct, via item CASCADE)
|
|
db.session.delete(item)
|
|
db.session.commit()
|
|
|
|
flash(f"Feedback item permanently deleted ({attachment_count} screenshot(s) removed).", "success")
|
|
return redirect(url_for("main.feedback_page", show_deleted="1"))
|
|
|
|
|
|
@main_bp.route("/feedback/attachment/<int:attachment_id>")
|
|
@login_required
|
|
@roles_required("admin", "operator", "reporter", "viewer")
|
|
def feedback_attachment(attachment_id: int):
|
|
"""Serve a feedback attachment image."""
|
|
attachment = FeedbackAttachment.query.get_or_404(attachment_id)
|
|
|
|
# Check if the feedback item is deleted - allow admins to view
|
|
item = FeedbackItem.query.get(attachment.feedback_item_id)
|
|
if not item:
|
|
abort(404)
|
|
if item.deleted_at is not None and get_active_role() != "admin":
|
|
abort(404)
|
|
|
|
# Serve the image
|
|
from flask import send_file
|
|
import io
|
|
|
|
return send_file(
|
|
io.BytesIO(attachment.file_data),
|
|
mimetype=attachment.mime_type,
|
|
as_attachment=False,
|
|
download_name=attachment.filename,
|
|
)
|