jayaprakashgedela's picture
Update app.py
1cceeed verified
import os
import logging
import warnings
from datetime import datetime
from urllib.parse import urlparse
import pytz
from dotenv import load_dotenv
from simple_salesforce import Salesforce, SalesforceMalformedRequest
import gradio as gr
# -----------------------------
# Setup
# -----------------------------
warnings.filterwarnings("ignore", category=UserWarning)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
load_dotenv()
SF_USERNAME = os.getenv("SF_USERNAME")
SF_PASSWORD = os.getenv("SF_PASSWORD")
SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN")
SF_DOMAIN_RAW = os.getenv("SF_DOMAIN", "login")
PROXY_HTTP = os.getenv("HTTP_PROXY")
PROXY_HTTPS = os.getenv("HTTPS_PROXY")
PROXIES = {"http": PROXY_HTTP, "https": PROXY_HTTPS} if (PROXY_HTTP or PROXY_HTTPS) else None
def normalize_sf_domain(raw: str) -> str:
"""
Normalize env SF_DOMAIN into a value acceptable by simple_salesforce.Salesforce(domain=...).
- Accepts 'login' or 'test'
- Accepts full URLs or hosts and extracts the correct domain component
- Maps lightning host to my.salesforce.com
- Fixes 'saleforce.com' typo
- If a host ends with '.my.salesforce.com', returns ONLY the custom subdomain before it,
because simple_salesforce will append '.salesforce.com' internally.
"""
if not raw:
return "login"
raw = raw.strip()
if raw in ("login", "test"):
return raw
parsed = urlparse(raw)
host = (parsed.netloc or parsed.path).strip().lower()
host = host.replace("saleforce.com", "salesforce.com")
host = host.replace(".lightning.force.com", ".my.salesforce.com")
host = host.replace("https://", "").replace("http://", "")
if host.endswith(".my.salesforce.com"):
return host[: -len(".my.salesforce.com")]
if host.endswith(".my.salesforce.com/"):
return host[: -len(".my.salesforce.com/")]
if host.endswith(".salesforce.com"):
prefix = host[: -len(".salesforce.com")]
if any(prefix.startswith(pfx) for pfx in ("na", "ap", "eu", "ca", "um", "cs")):
return "login"
return prefix
return host
SF_DOMAIN = normalize_sf_domain(SF_DOMAIN_RAW)
logger.info(f"Using Salesforce domain: {SF_DOMAIN}")
# -----------------------------
# Connect (on import)
# -----------------------------
sf = None
def connect_sf():
global sf
try:
sf = Salesforce(
username=SF_USERNAME,
password=SF_PASSWORD,
security_token=SF_SECURITY_TOKEN,
domain=SF_DOMAIN,
proxies=PROXIES,
)
logger.info("Connected to Salesforce successfully.")
return True, "Connected to Salesforce successfully."
except Exception as e:
logger.error(f"Failed to connect to Salesforce: {e}")
sf = None
return False, f"Failed to connect to Salesforce: {e}"
connect_sf()
# -----------------------------
# Thresholds
# -----------------------------
DEFAULT_THRESHOLDS = {
"pH": (6.0, 9.0),
"Turbidity": (0.0, 5.0),
"DissolvedOxygen": (5.0, None),
"Conductivity": (0.0, 1000.0),
"Temperature": (None, None),
}
def fetch_thresholds_from_sf():
if not sf:
return DEFAULT_THRESHOLDS.copy()
try:
res = sf.query("SELECT Id, Parameter__c, MinValue__c, MaxValue__c FROM QualityThreshold__c")
thresholds = DEFAULT_THRESHOLDS.copy()
for row in res.get("records", []):
param = row.get("Parameter__c")
mn = row.get("MinValue__c")
mx = row.get("MaxValue__c")
if param:
thresholds[param] = (mn, mx)
logger.info("Loaded thresholds from Salesforce.")
return thresholds
except Exception as e:
logger.warning(f"Could not load thresholds from Salesforce, using defaults. Error: {e}")
return DEFAULT_THRESHOLDS.copy()
THRESHOLDS = fetch_thresholds_from_sf()
# -----------------------------
# Quality & anomaly logic
# -----------------------------
LABELS = ["Excellent", "Good", "Moderate", "Poor", "Very Poor"]
def decide_quality(pH, turbidity, temperature, dissolved_oxygen, conductivity):
score = 0
if 6.5 <= pH <= 8.5: score += 2
elif 6.0 <= pH <= 9.0: score += 1
else: score -= 1
if turbidity <= 1: score += 2
elif turbidity <= 5: score += 1
else: score -= 1
if dissolved_oxygen >= 7: score += 2
elif dissolved_oxygen >= 5: score += 1
else: score -= 1
if conductivity <= 500: score += 2
elif conductivity <= 1000: score += 1
else: score -= 1
if 20 <= temperature <= 28: score += 1
if score >= 6: return LABELS[0]
if score >= 3: return LABELS[1]
if score >= 1: return LABELS[2]
if score >= -1: return LABELS[3]
return LABELS[4]
def check_anomalies(values, thresholds):
reasons = []
def _check(name, value):
mn, mx = thresholds.get(name, (None, None))
if mn is not None and value < mn:
reasons.append(f"{name} below min ({value} < {mn})")
if mx is not None and value > mx:
reasons.append(f"{name} above max ({value} > {mx})")
_check("pH", values["pH"])
_check("Turbidity", values["Turbidity"])
_check("DissolvedOxygen", values["DissolvedOxygen"])
_check("Conductivity", values["Conductivity"])
if "Temperature" in thresholds:
_check("Temperature", values["Temperature"])
return reasons
# -----------------------------
# Helpers
# -----------------------------
def to_salesforce_datetime(dt_utc: datetime) -> str:
"""Return strict ISO8601 UTC format Salesforce expects, e.g. 2025-08-22T14:30:12Z"""
return dt_utc.astimezone(pytz.UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
def _extract_salesforce_error(e) -> str:
"""Human-friendly error text from simple_salesforce exceptions."""
try:
details = getattr(e, "content", None)
if isinstance(details, list) and details:
msgs = []
for item in details:
m = item.get("message")
flds = item.get("fields")
code = item.get("errorCode")
if flds:
msgs.append(f"{code}: {m} (fields: {', '.join(flds)})")
else:
msgs.append(f"{code}: {m}")
return " | ".join(msgs)
return str(e)
except Exception:
return str(e)
# -----------------------------
# Persistence
# -----------------------------
def create_reading_in_sf(values, sensor_id, latitude, longitude, timestamp_utc):
"""
Create WaterQualityReading__c and return Id, or None.
Two-step: insert base fields, then update geolocation if provided.
NOTE: Name is Auto Number per your org setup, so we do NOT send Name.
"""
if not sf:
return None
base_data = {
"pH__c": values["pH"],
"Turbidity__c": values["Turbidity"],
"Temperature__c": values["Temperature"],
"DissolvedOxygen__c": values["DissolvedOxygen"],
"Conductivity__c": values["Conductivity"],
"SensorId__c": sensor_id,
"Timestamp__c": to_salesforce_datetime(timestamp_utc),
}
try:
res = sf.WaterQualityReading__c.create(base_data)
rec_id = res.get("id")
if rec_id and (latitude is not None and longitude is not None):
try:
sf.WaterQualityReading__c.update(
rec_id,
{"Location__Latitude__s": latitude, "Location__Longitude__s": longitude},
)
except Exception as e2:
logger.warning(f"Created reading but failed to set geolocation: {e2}")
return rec_id
except SalesforceMalformedRequest as e:
msg = _extract_salesforce_error(e)
logger.error(f"Salesforce malformed request (Reading): {msg}")
except Exception as e:
logger.error(f"Failed to create WaterQualityReading__c: {e}")
return None
def create_analysis_in_sf(reading_id, label, anomaly_reasons):
if not sf or not reading_id:
return None
details = "; ".join(anomaly_reasons) if anomaly_reasons else "None"
data = {
"WaterQualityReading__c": reading_id,
"QualityLevel__c": label,
"AnomalyDetected__c": bool(anomaly_reasons),
"AnomalyDetails__c": details,
}
try:
res = sf.WaterQualityAnalysis__c.create(data)
return res.get("id")
except SalesforceMalformedRequest as e:
msg = _extract_salesforce_error(e)
logger.error(f"Salesforce malformed request (Analysis): {msg}")
except Exception as e:
logger.error(f"Failed to create WaterQualityAnalysis__c: {e}")
return None
# -----------------------------
# Gradio app
# -----------------------------
def predict_and_save(pH, turbidity, temperature, dissolved_oxygen, conductivity,
sensor_id, latitude, longitude):
# Validate numeric inputs
try:
values = {
"pH": float(pH),
"Turbidity": float(turbidity),
"Temperature": float(temperature),
"DissolvedOxygen": float(dissolved_oxygen),
"Conductivity": float(conductivity),
}
except Exception:
return "Invalid numeric inputs.", None
lat = float(latitude) if latitude is not None else None
lon = float(longitude) if longitude is not None else None
now_utc = datetime.now(pytz.UTC)
label = decide_quality(values["pH"], values["Turbidity"], values["Temperature"],
values["DissolvedOxygen"], values["Conductivity"])
reasons = check_anomalies(values, THRESHOLDS)
anomaly_status = "Anomaly Detected" if reasons else "No Anomaly"
reading_id = create_reading_in_sf(values, sensor_id, lat, lon, now_utc)
analysis_id = create_analysis_in_sf(reading_id, label, reasons)
lines = [
f"Quality Level: {label}",
f"Anomaly: {anomaly_status}",
]
if reasons:
lines.append("Reasons: " + "; ".join(reasons))
if reading_id:
lines.append(f"Saved: WaterQualityReading__c Id = {reading_id}")
else:
lines.append("⚠️ Could not save WaterQualityReading__c. Check server logs for the exact Salesforce error.")
if analysis_id:
lines.append(f"Saved: WaterQualityAnalysis__c Id = {analysis_id}")
elif reading_id:
lines.append("⚠️ Reading saved but Analysis insert failed. See logs.")
return "\n".join(lines), {
"quality_level": label,
"anomaly": bool(reasons),
"reasons": reasons,
"reading_id": reading_id,
"analysis_id": analysis_id,
}
with gr.Blocks(title="💧 AI Water Quality Checker") as iface:
gr.Markdown("# 💧 AI Water Quality Checker with Salesforce Integration")
with gr.Row():
pH = gr.Number(label="pH", value=7.2)
turbidity = gr.Number(label="Turbidity (NTU)", value=1.5)
temperature = gr.Number(label="Temperature (°C)", value=25.0)
dissolved_oxygen = gr.Number(label="Dissolved Oxygen (mg/L)", value=8.0)
conductivity = gr.Number(label="Conductivity (µS/cm)", value=350.0)
with gr.Row():
sensor_id = gr.Textbox(label="Sensor ID", value="SENSOR001")
latitude = gr.Number(label="Latitude", value=17.3850)
longitude = gr.Number(label="Longitude", value=78.4867)
btn = gr.Button("Analyze & Save to Salesforce")
out_text = gr.Textbox(label="Result", lines=8)
out_json = gr.JSON(label="Debug / IDs")
btn.click(
fn=predict_and_save,
inputs=[pH, turbidity, temperature, dissolved_oxygen, conductivity, sensor_id, latitude, longitude],
outputs=[out_text, out_json],
)
if __name__ == "__main__":
iface.launch()