import threading from flask import Flask, g, render_template, request, redirect, url_for, session import os import time from huggingface_hub import login, HfApi, hf_hub_download # For Hugging Face integration import os import logging import csv import random import shortuuid import json import pandas as pd from filelock import FileLock # Flask application setup app = Flask(__name__) import os, secrets app = Flask(__name__) # Use a persistent env var in prod; fallback only for local/dev app.config["SECRET_KEY"] = os.environ.get("FLASK_SECRET_KEY") or secrets.token_hex(32) # app.config['SECRET_KEY'] = os.environ.get('FLASK_SECRET_KEY') # required for sessions app.config.update( SESSION_COOKIE_SAMESITE="None", # allow cross-site SESSION_COOKIE_SECURE=True # required for "None" ) logging.basicConfig( level=logging.DEBUG, # Set to DEBUG for more granular logs format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger(__name__) HF_TOKEN = os.environ.get("HF_TOKEN") if HF_TOKEN: try: login(token=HF_TOKEN) logger.info("Logged into Hugging Face successfully.") except Exception as e: logger.exception(f"Failed to log into Hugging Face: {e}") else: logger.warning("HF_TOKEN not found in environment variables. Session data will not be uploaded.") # Initialize Hugging Face API hf_api = HfApi() HF_REPO_ID = "pooyanrg/BlindTest" # Update as needed HF_REPO_PATH = "responses" QUESTIONS_FILE = "./data/dataset.csv" AVAILABLE_FILE = "/tmp/available.json" LOCK_FILE = "/tmp/available.lock" # Load questions into memory with open(QUESTIONS_FILE, newline="", encoding="utf-8") as f: reader = csv.DictReader(f) all_questions = list(reader) # Track which questions are available if not os.path.exists(AVAILABLE_FILE): with open(AVAILABLE_FILE, "w") as f: json.dump(list(range(len(all_questions))), f) def get_questions(num=10): with FileLock(LOCK_FILE): with open(AVAILABLE_FILE, "r") as f: available = json.load(f) if len(available) == 0: return [] # Not enough questions left sample_size = num if num < len(available) else len(available) selected_ids = random.sample(available, sample_size) remaining = [qid for qid in available if qid not in selected_ids] with open(AVAILABLE_FILE, "w") as f: json.dump(remaining, f) return [all_questions[qid] | {"id": qid} for qid in selected_ids] @app.route("/", methods=['GET', 'POST']) def splash(): if request.method == 'POST': username = request.form.get('username') if not username: logger.warning("Username not provided by the user.") return render_template('splash.html', error="Please enter a username.") return redirect(url_for('instructions', username=username)) # GET request - show intro page logger.info("Splash page rendered.") return render_template('splash.html') @app.route('/instructions', methods=['GET', 'POST']) def instructions(): username = request.args.get('username') questions = get_questions(num=2) current_index = 0 answers = [] session['questions'] = questions session['answers'] = answers q_ids = [question['image_id'] for question in questions] if len(questions) == 0: return render_template('thanks.html') if request.method == 'POST': # User clicked the "Begin Quiz" button start_time = time.time() session['start_time'] = start_time session['question_ids'] = q_ids return redirect(url_for('prep', username=username, current_index=current_index)) # If GET, render the final instructions page return render_template('instructions.html', username=username) @app.route('/prep', methods=['GET', 'POST']) def prep(): username = request.args.get('username') current_index = int(request.args.get('current_index')) questions = session.get('questions', []) if request.method == 'POST': # User clicked "Start" button # Redirect to the actual question display return redirect(url_for('prompt', username=username, current_index=current_index)) return render_template('prep.html', question_number=current_index + 1, total=len(questions)) @app.route('/prompt', methods=['GET', 'POST']) def prompt(): username = request.args.get('username') current_index = int(request.args.get('current_index')) questions = session.get('questions', []) if request.method == 'POST': # User clicked "Start" button # Redirect to the image display return redirect(url_for('image', username=username, current_index=current_index)) question_raw = questions[current_index].get('question', '') if "count" in question_raw.lower(): idx = question_raw.index('.') else: idx = question_raw.index('?') question = question_raw[:idx+1] task = questions[current_index].get('task', '') if task == 'Counting Grid - Blank Grids' or task == 'Counting Grid - Word Grids': question = "Count the number of rows and columns." return render_template('prompt.html', question_text=question) @app.route('/image', methods=['GET', 'POST']) def image(): username = request.args.get('username') current_index = int(request.args.get('current_index')) questions = session.get('questions', []) image_id = str(questions[current_index].get('image_id', 'default_image.png')) if request.method == 'POST': # Move on to the "quiz" route to see if we still have more questions return redirect(url_for('respond', username=username, current_index=current_index)) # If GET, display the current image with a 10-seconds countdown return render_template('image.html', image_name=image_id + '.jpg') @app.route('/respond', methods=['GET', 'POST']) def respond(): username = request.args.get('username') current_index = int(request.args.get('current_index')) questions = session.get('questions', []) answers = session.get('answers', []) if request.method == 'POST': response = request.form.get('response') question_id = questions[current_index]['image_id'] # Submit the answer answers.append({"image_id": question_id, "answer": response}) session['answers'] = answers current_index += 1 if current_index >= len(questions): # Store the elapsed time start_time = session.get('start_time', time.time()) q_ids = session.get('question_ids', []) end_time = time.time() elapsed_time = end_time - start_time response_all = {'username': username, 'time': elapsed_time, 'responses':answers} json_data = json.dumps(response_all, indent=4) file_name = f"{shortuuid.uuid()}.json" temp_file_path = os.path.join("/tmp", file_name) with open(temp_file_path, 'w') as f: f.write(json_data) return render_template('thanks.html') return redirect(url_for('prep', username=username, current_index=current_index)) question_raw = questions[current_index].get('question', '') if "count" in question_raw.lower(): idx = question_raw.index('.') else: idx = question_raw.index('?') question = question_raw[:idx+1] task = questions[current_index].get('task', '') if task == 'Circled Letter': form = "Enter a letter" elif task == 'Touching Circles': form = "Enter Y/N" elif task == 'Counting Grid - Blank Grids' or task == 'Counting Grid - Word Grids': form = "Enter two numbers seperated with a comma, for example: 5,6" question = "Count the number of rows and columns." else: form = "Enter a number" return render_template('respond.html', question_text=question, instruction=form) if __name__ == "__main__": # Initialize database when running the script # example_usage() # Run Flask app # app.run(debug=False, threaded=True) app.run(host="0.0.0.0", port=7860, debug=False)