Spaces:
Sleeping
Sleeping
| from flask import Blueprint, render_template, request, redirect, url_for, session, flash, jsonify, send_file | |
| from app.models.models import Token, Submission, Settings | |
| from app import db | |
| from app.analyzer import get_analyzer | |
| from functools import wraps | |
| import json | |
| import csv | |
| import io | |
| from datetime import datetime | |
| import os | |
| bp = Blueprint('admin', __name__, url_prefix='/admin') | |
| CONTRIBUTOR_TYPES = [ | |
| {'value': 'government', 'label': 'Government Officer', 'description': 'Public sector representatives'}, | |
| {'value': 'community', 'label': 'Community Member', 'description': 'Local residents and community leaders'}, | |
| {'value': 'industry', 'label': 'Industry Representative', 'description': 'Business and industry stakeholders'}, | |
| {'value': 'ngo', 'label': 'NGO/Non-Profit', 'description': 'Civil society organizations'}, | |
| {'value': 'academic', 'label': 'Academic/Researcher', 'description': 'Universities and research institutions'}, | |
| {'value': 'other', 'label': 'Other Stakeholder', 'description': 'Other interested parties'} | |
| ] | |
| CATEGORIES = ['Vision', 'Problem', 'Objectives', 'Directives', 'Values', 'Actions'] | |
| def admin_required(f): | |
| def decorated_function(*args, **kwargs): | |
| if 'token' not in session or session.get('type') != 'admin': | |
| return redirect(url_for('auth.login')) | |
| return f(*args, **kwargs) | |
| return decorated_function | |
| def overview(): | |
| total_submissions = Submission.query.count() | |
| total_tokens = Token.query.filter(Token.type != 'admin').count() | |
| flagged_count = Submission.query.filter_by(flagged_as_offensive=True).count() | |
| unanalyzed_count = Submission.query.filter_by(category=None).count() | |
| submission_open = Settings.get_setting('submission_open', 'true') == 'true' | |
| token_generation_enabled = Settings.get_setting('token_generation_enabled', 'true') == 'true' | |
| analyzed = Submission.query.filter(Submission.category != None).count() > 0 | |
| return render_template('admin/overview.html', | |
| total_submissions=total_submissions, | |
| total_tokens=total_tokens, | |
| flagged_count=flagged_count, | |
| unanalyzed_count=unanalyzed_count, | |
| submission_open=submission_open, | |
| token_generation_enabled=token_generation_enabled, | |
| analyzed=analyzed) | |
| def registration(): | |
| token_generation_enabled = Settings.get_setting('token_generation_enabled', 'true') == 'true' | |
| recent_tokens = Token.query.filter(Token.type != 'admin').order_by(Token.created_at.desc()).limit(10).all() | |
| registration_url = request.host_url.rstrip('/') + url_for('auth.generate') | |
| return render_template('admin/registration.html', | |
| token_generation_enabled=token_generation_enabled, | |
| recent_tokens=recent_tokens, | |
| registration_url=registration_url) | |
| def tokens(): | |
| all_tokens = Token.query.all() | |
| return render_template('admin/tokens.html', | |
| tokens=all_tokens, | |
| contributor_types=CONTRIBUTOR_TYPES) | |
| def submissions(): | |
| category_filter = request.args.get('category', 'all') | |
| flagged_only = request.args.get('flagged', 'false') == 'true' | |
| query = Submission.query | |
| if category_filter != 'all': | |
| query = query.filter_by(category=category_filter) | |
| if flagged_only: | |
| query = query.filter_by(flagged_as_offensive=True) | |
| all_submissions = query.order_by(Submission.timestamp.desc()).all() | |
| flagged_count = Submission.query.filter_by(flagged_as_offensive=True).count() | |
| analyzed = Submission.query.filter(Submission.category != None).count() > 0 | |
| return render_template('admin/submissions.html', | |
| submissions=all_submissions, | |
| categories=CATEGORIES, | |
| category_filter=category_filter, | |
| flagged_only=flagged_only, | |
| flagged_count=flagged_count, | |
| analyzed=analyzed) | |
| def dashboard(): | |
| # Check if analyzed | |
| analyzed = Submission.query.filter(Submission.category != None).count() > 0 | |
| if not analyzed: | |
| flash('Please analyze submissions first', 'warning') | |
| return redirect(url_for('admin.overview')) | |
| submissions = Submission.query.filter(Submission.category != None).all() | |
| # Contributor stats | |
| contributor_stats = db.session.query( | |
| Submission.contributor_type, | |
| db.func.count(Submission.id) | |
| ).group_by(Submission.contributor_type).all() | |
| # Category stats | |
| category_stats = db.session.query( | |
| Submission.category, | |
| db.func.count(Submission.id) | |
| ).filter(Submission.category != None).group_by(Submission.category).all() | |
| # Geotagged submissions | |
| geotagged_submissions = Submission.query.filter( | |
| Submission.latitude != None, | |
| Submission.longitude != None, | |
| Submission.category != None | |
| ).all() | |
| # Category breakdown by contributor type | |
| breakdown = {} | |
| for cat in CATEGORIES: | |
| breakdown[cat] = {} | |
| for ctype in CONTRIBUTOR_TYPES: | |
| count = Submission.query.filter_by( | |
| category=cat, | |
| contributor_type=ctype['value'] | |
| ).count() | |
| breakdown[cat][ctype['value']] = count | |
| return render_template('admin/dashboard.html', | |
| submissions=submissions, | |
| contributor_stats=contributor_stats, | |
| category_stats=category_stats, | |
| geotagged_submissions=geotagged_submissions, | |
| categories=CATEGORIES, | |
| contributor_types=CONTRIBUTOR_TYPES, | |
| breakdown=breakdown) | |
| # API Endpoints | |
| def toggle_submissions(): | |
| current = Settings.get_setting('submission_open', 'true') | |
| new_value = 'false' if current == 'true' else 'true' | |
| Settings.set_setting('submission_open', new_value) | |
| return jsonify({'success': True, 'submission_open': new_value == 'true'}) | |
| def toggle_token_generation(): | |
| current = Settings.get_setting('token_generation_enabled', 'true') | |
| new_value = 'false' if current == 'true' else 'true' | |
| Settings.set_setting('token_generation_enabled', new_value) | |
| return jsonify({'success': True, 'token_generation_enabled': new_value == 'true'}) | |
| def create_token(): | |
| data = request.json | |
| contributor_type = data.get('type') | |
| name = data.get('name', '').strip() | |
| if not contributor_type or contributor_type not in [t['value'] for t in CONTRIBUTOR_TYPES]: | |
| return jsonify({'success': False, 'error': 'Invalid contributor type'}), 400 | |
| import random | |
| import string | |
| prefix = contributor_type[:3].upper() | |
| random_part = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)) | |
| timestamp_part = str(int(datetime.now().timestamp()))[-4:] | |
| token_str = f"{prefix}-{random_part}{timestamp_part}" | |
| final_name = name if name else f"{contributor_type.capitalize()} User" | |
| new_token = Token( | |
| token=token_str, | |
| type=contributor_type, | |
| name=final_name | |
| ) | |
| db.session.add(new_token) | |
| db.session.commit() | |
| return jsonify({'success': True, 'token': new_token.to_dict()}) | |
| def delete_token(token_id): | |
| token = Token.query.get_or_404(token_id) | |
| if token.token == 'ADMIN123': | |
| return jsonify({'success': False, 'error': 'Cannot delete admin token'}), 400 | |
| db.session.delete(token) | |
| db.session.commit() | |
| return jsonify({'success': True}) | |
| def update_category(submission_id): | |
| submission = Submission.query.get_or_404(submission_id) | |
| data = request.json | |
| category = data.get('category') | |
| # Validate category | |
| if category and category not in CATEGORIES: | |
| return jsonify({'success': False, 'error': 'Invalid category'}), 400 | |
| submission.category = category | |
| db.session.commit() | |
| return jsonify({'success': True, 'category': category}) | |
| def toggle_flag(submission_id): | |
| submission = Submission.query.get_or_404(submission_id) | |
| submission.flagged_as_offensive = not submission.flagged_as_offensive | |
| db.session.commit() | |
| return jsonify({'success': True, 'flagged': submission.flagged_as_offensive}) | |
| def delete_submission(submission_id): | |
| submission = Submission.query.get_or_404(submission_id) | |
| db.session.delete(submission) | |
| db.session.commit() | |
| return jsonify({'success': True}) | |
| def analyze_submissions(): | |
| data = request.json | |
| analyze_all = data.get('analyze_all', False) | |
| # Get submissions to analyze | |
| if analyze_all: | |
| to_analyze = Submission.query.all() | |
| else: | |
| to_analyze = Submission.query.filter_by(category=None).all() | |
| if not to_analyze: | |
| return jsonify({'success': False, 'error': 'No submissions to analyze'}), 400 | |
| # Get the analyzer instance | |
| analyzer = get_analyzer() | |
| success_count = 0 | |
| error_count = 0 | |
| for submission in to_analyze: | |
| try: | |
| # Use the free Hugging Face model for classification | |
| category = analyzer.analyze(submission.message) | |
| submission.category = category | |
| success_count += 1 | |
| except Exception as e: | |
| print(f"Error analyzing submission {submission.id}: {e}") | |
| error_count += 1 | |
| continue | |
| db.session.commit() | |
| return jsonify({ | |
| 'success': True, | |
| 'analyzed': success_count, | |
| 'errors': error_count | |
| }) | |
| def export_json(): | |
| data = { | |
| 'tokens': [t.to_dict() for t in Token.query.all()], | |
| 'submissions': [s.to_dict() for s in Submission.query.all()], | |
| 'submissionOpen': Settings.get_setting('submission_open', 'true') == 'true', | |
| 'tokenGenerationEnabled': Settings.get_setting('token_generation_enabled', 'true') == 'true', | |
| 'exportDate': datetime.utcnow().isoformat() | |
| } | |
| json_str = json.dumps(data, indent=2) | |
| buffer = io.BytesIO() | |
| buffer.write(json_str.encode('utf-8')) | |
| buffer.seek(0) | |
| return send_file( | |
| buffer, | |
| mimetype='application/json', | |
| as_attachment=True, | |
| download_name=f'participatory-planning-{datetime.now().strftime("%Y-%m-%d")}.json' | |
| ) | |
| def export_csv(): | |
| submissions = Submission.query.all() | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| # Header | |
| writer.writerow(['Timestamp', 'Contributor Type', 'Category', 'Message', 'Latitude', 'Longitude', 'Flagged']) | |
| # Rows | |
| for s in submissions: | |
| writer.writerow([ | |
| s.timestamp.isoformat() if s.timestamp else '', | |
| s.contributor_type, | |
| s.category or 'Not analyzed', | |
| s.message, | |
| s.latitude or '', | |
| s.longitude or '', | |
| 'Yes' if s.flagged_as_offensive else 'No' | |
| ]) | |
| buffer = io.BytesIO() | |
| buffer.write(output.getvalue().encode('utf-8')) | |
| buffer.seek(0) | |
| return send_file( | |
| buffer, | |
| mimetype='text/csv', | |
| as_attachment=True, | |
| download_name=f'contributions-{datetime.now().strftime("%Y-%m-%d")}.csv' | |
| ) | |
| def import_data(): | |
| if 'file' not in request.files: | |
| return jsonify({'success': False, 'error': 'No file uploaded'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'success': False, 'error': 'No file selected'}), 400 | |
| try: | |
| data = json.load(file) | |
| # Clear existing data (except admin token) | |
| Submission.query.delete() | |
| Token.query.filter(Token.token != 'ADMIN123').delete() | |
| # Import tokens | |
| for token_data in data.get('tokens', []): | |
| if token_data['token'] != 'ADMIN123': # Skip admin token as it already exists | |
| token = Token( | |
| token=token_data['token'], | |
| type=token_data['type'], | |
| name=token_data['name'] | |
| ) | |
| db.session.add(token) | |
| # Import submissions | |
| for sub_data in data.get('submissions', []): | |
| location = sub_data.get('location') | |
| submission = Submission( | |
| message=sub_data['message'], | |
| contributor_type=sub_data['contributorType'], | |
| latitude=location['lat'] if location else None, | |
| longitude=location['lng'] if location else None, | |
| timestamp=datetime.fromisoformat(sub_data['timestamp']) if sub_data.get('timestamp') else datetime.utcnow(), | |
| category=sub_data.get('category'), | |
| flagged_as_offensive=sub_data.get('flaggedAsOffensive', False) | |
| ) | |
| db.session.add(submission) | |
| # Import settings | |
| Settings.set_setting('submission_open', 'true' if data.get('submissionOpen', True) else 'false') | |
| Settings.set_setting('token_generation_enabled', 'true' if data.get('tokenGenerationEnabled', True) else 'false') | |
| db.session.commit() | |
| return jsonify({'success': True}) | |
| except Exception as e: | |
| db.session.rollback() | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |