|
|
import os |
|
|
import logging |
|
|
import warnings |
|
|
from datetime import datetime |
|
|
from urllib.parse import urlparse |
|
|
|
|
|
import pytz |
|
|
from dotenv import load_dotenv |
|
|
from simple_salesforce import Salesforce, SalesforceMalformedRequest |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
warnings.filterwarnings("ignore", category=UserWarning) |
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
SF_USERNAME = os.getenv("SF_USERNAME") |
|
|
SF_PASSWORD = os.getenv("SF_PASSWORD") |
|
|
SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN") |
|
|
SF_DOMAIN_RAW = os.getenv("SF_DOMAIN", "login") |
|
|
|
|
|
PROXY_HTTP = os.getenv("HTTP_PROXY") |
|
|
PROXY_HTTPS = os.getenv("HTTPS_PROXY") |
|
|
PROXIES = {"http": PROXY_HTTP, "https": PROXY_HTTPS} if (PROXY_HTTP or PROXY_HTTPS) else None |
|
|
|
|
|
def normalize_sf_domain(raw: str) -> str: |
|
|
""" |
|
|
Normalize env SF_DOMAIN into a value acceptable by simple_salesforce.Salesforce(domain=...). |
|
|
- Accepts 'login' or 'test' |
|
|
- Accepts full URLs or hosts and extracts the correct domain component |
|
|
- Maps lightning host to my.salesforce.com |
|
|
- Fixes 'saleforce.com' typo |
|
|
- If a host ends with '.my.salesforce.com', returns ONLY the custom subdomain before it, |
|
|
because simple_salesforce will append '.salesforce.com' internally. |
|
|
""" |
|
|
if not raw: |
|
|
return "login" |
|
|
raw = raw.strip() |
|
|
if raw in ("login", "test"): |
|
|
return raw |
|
|
parsed = urlparse(raw) |
|
|
host = (parsed.netloc or parsed.path).strip().lower() |
|
|
host = host.replace("saleforce.com", "salesforce.com") |
|
|
host = host.replace(".lightning.force.com", ".my.salesforce.com") |
|
|
host = host.replace("https://", "").replace("http://", "") |
|
|
if host.endswith(".my.salesforce.com"): |
|
|
return host[: -len(".my.salesforce.com")] |
|
|
if host.endswith(".my.salesforce.com/"): |
|
|
return host[: -len(".my.salesforce.com/")] |
|
|
if host.endswith(".salesforce.com"): |
|
|
prefix = host[: -len(".salesforce.com")] |
|
|
if any(prefix.startswith(pfx) for pfx in ("na", "ap", "eu", "ca", "um", "cs")): |
|
|
return "login" |
|
|
return prefix |
|
|
return host |
|
|
|
|
|
SF_DOMAIN = normalize_sf_domain(SF_DOMAIN_RAW) |
|
|
logger.info(f"Using Salesforce domain: {SF_DOMAIN}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sf = None |
|
|
def connect_sf(): |
|
|
global sf |
|
|
try: |
|
|
sf = Salesforce( |
|
|
username=SF_USERNAME, |
|
|
password=SF_PASSWORD, |
|
|
security_token=SF_SECURITY_TOKEN, |
|
|
domain=SF_DOMAIN, |
|
|
proxies=PROXIES, |
|
|
) |
|
|
logger.info("Connected to Salesforce successfully.") |
|
|
return True, "Connected to Salesforce successfully." |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to connect to Salesforce: {e}") |
|
|
sf = None |
|
|
return False, f"Failed to connect to Salesforce: {e}" |
|
|
|
|
|
connect_sf() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_THRESHOLDS = { |
|
|
"pH": (6.0, 9.0), |
|
|
"Turbidity": (0.0, 5.0), |
|
|
"DissolvedOxygen": (5.0, None), |
|
|
"Conductivity": (0.0, 1000.0), |
|
|
"Temperature": (None, None), |
|
|
} |
|
|
|
|
|
def fetch_thresholds_from_sf(): |
|
|
if not sf: |
|
|
return DEFAULT_THRESHOLDS.copy() |
|
|
try: |
|
|
res = sf.query("SELECT Id, Parameter__c, MinValue__c, MaxValue__c FROM QualityThreshold__c") |
|
|
thresholds = DEFAULT_THRESHOLDS.copy() |
|
|
for row in res.get("records", []): |
|
|
param = row.get("Parameter__c") |
|
|
mn = row.get("MinValue__c") |
|
|
mx = row.get("MaxValue__c") |
|
|
if param: |
|
|
thresholds[param] = (mn, mx) |
|
|
logger.info("Loaded thresholds from Salesforce.") |
|
|
return thresholds |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not load thresholds from Salesforce, using defaults. Error: {e}") |
|
|
return DEFAULT_THRESHOLDS.copy() |
|
|
|
|
|
THRESHOLDS = fetch_thresholds_from_sf() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LABELS = ["Excellent", "Good", "Moderate", "Poor", "Very Poor"] |
|
|
|
|
|
def decide_quality(pH, turbidity, temperature, dissolved_oxygen, conductivity): |
|
|
score = 0 |
|
|
if 6.5 <= pH <= 8.5: score += 2 |
|
|
elif 6.0 <= pH <= 9.0: score += 1 |
|
|
else: score -= 1 |
|
|
|
|
|
if turbidity <= 1: score += 2 |
|
|
elif turbidity <= 5: score += 1 |
|
|
else: score -= 1 |
|
|
|
|
|
if dissolved_oxygen >= 7: score += 2 |
|
|
elif dissolved_oxygen >= 5: score += 1 |
|
|
else: score -= 1 |
|
|
|
|
|
if conductivity <= 500: score += 2 |
|
|
elif conductivity <= 1000: score += 1 |
|
|
else: score -= 1 |
|
|
|
|
|
if 20 <= temperature <= 28: score += 1 |
|
|
|
|
|
if score >= 6: return LABELS[0] |
|
|
if score >= 3: return LABELS[1] |
|
|
if score >= 1: return LABELS[2] |
|
|
if score >= -1: return LABELS[3] |
|
|
return LABELS[4] |
|
|
|
|
|
def check_anomalies(values, thresholds): |
|
|
reasons = [] |
|
|
def _check(name, value): |
|
|
mn, mx = thresholds.get(name, (None, None)) |
|
|
if mn is not None and value < mn: |
|
|
reasons.append(f"{name} below min ({value} < {mn})") |
|
|
if mx is not None and value > mx: |
|
|
reasons.append(f"{name} above max ({value} > {mx})") |
|
|
_check("pH", values["pH"]) |
|
|
_check("Turbidity", values["Turbidity"]) |
|
|
_check("DissolvedOxygen", values["DissolvedOxygen"]) |
|
|
_check("Conductivity", values["Conductivity"]) |
|
|
if "Temperature" in thresholds: |
|
|
_check("Temperature", values["Temperature"]) |
|
|
return reasons |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def to_salesforce_datetime(dt_utc: datetime) -> str: |
|
|
"""Return strict ISO8601 UTC format Salesforce expects, e.g. 2025-08-22T14:30:12Z""" |
|
|
return dt_utc.astimezone(pytz.UTC).strftime("%Y-%m-%dT%H:%M:%SZ") |
|
|
|
|
|
def _extract_salesforce_error(e) -> str: |
|
|
"""Human-friendly error text from simple_salesforce exceptions.""" |
|
|
try: |
|
|
details = getattr(e, "content", None) |
|
|
if isinstance(details, list) and details: |
|
|
msgs = [] |
|
|
for item in details: |
|
|
m = item.get("message") |
|
|
flds = item.get("fields") |
|
|
code = item.get("errorCode") |
|
|
if flds: |
|
|
msgs.append(f"{code}: {m} (fields: {', '.join(flds)})") |
|
|
else: |
|
|
msgs.append(f"{code}: {m}") |
|
|
return " | ".join(msgs) |
|
|
return str(e) |
|
|
except Exception: |
|
|
return str(e) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_reading_in_sf(values, sensor_id, latitude, longitude, timestamp_utc): |
|
|
""" |
|
|
Create WaterQualityReading__c and return Id, or None. |
|
|
Two-step: insert base fields, then update geolocation if provided. |
|
|
NOTE: Name is Auto Number per your org setup, so we do NOT send Name. |
|
|
""" |
|
|
if not sf: |
|
|
return None |
|
|
|
|
|
base_data = { |
|
|
"pH__c": values["pH"], |
|
|
"Turbidity__c": values["Turbidity"], |
|
|
"Temperature__c": values["Temperature"], |
|
|
"DissolvedOxygen__c": values["DissolvedOxygen"], |
|
|
"Conductivity__c": values["Conductivity"], |
|
|
"SensorId__c": sensor_id, |
|
|
"Timestamp__c": to_salesforce_datetime(timestamp_utc), |
|
|
} |
|
|
|
|
|
try: |
|
|
res = sf.WaterQualityReading__c.create(base_data) |
|
|
rec_id = res.get("id") |
|
|
if rec_id and (latitude is not None and longitude is not None): |
|
|
try: |
|
|
sf.WaterQualityReading__c.update( |
|
|
rec_id, |
|
|
{"Location__Latitude__s": latitude, "Location__Longitude__s": longitude}, |
|
|
) |
|
|
except Exception as e2: |
|
|
logger.warning(f"Created reading but failed to set geolocation: {e2}") |
|
|
return rec_id |
|
|
except SalesforceMalformedRequest as e: |
|
|
msg = _extract_salesforce_error(e) |
|
|
logger.error(f"Salesforce malformed request (Reading): {msg}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to create WaterQualityReading__c: {e}") |
|
|
return None |
|
|
|
|
|
def create_analysis_in_sf(reading_id, label, anomaly_reasons): |
|
|
if not sf or not reading_id: |
|
|
return None |
|
|
details = "; ".join(anomaly_reasons) if anomaly_reasons else "None" |
|
|
data = { |
|
|
"WaterQualityReading__c": reading_id, |
|
|
"QualityLevel__c": label, |
|
|
"AnomalyDetected__c": bool(anomaly_reasons), |
|
|
"AnomalyDetails__c": details, |
|
|
} |
|
|
try: |
|
|
res = sf.WaterQualityAnalysis__c.create(data) |
|
|
return res.get("id") |
|
|
except SalesforceMalformedRequest as e: |
|
|
msg = _extract_salesforce_error(e) |
|
|
logger.error(f"Salesforce malformed request (Analysis): {msg}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to create WaterQualityAnalysis__c: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def predict_and_save(pH, turbidity, temperature, dissolved_oxygen, conductivity, |
|
|
sensor_id, latitude, longitude): |
|
|
|
|
|
try: |
|
|
values = { |
|
|
"pH": float(pH), |
|
|
"Turbidity": float(turbidity), |
|
|
"Temperature": float(temperature), |
|
|
"DissolvedOxygen": float(dissolved_oxygen), |
|
|
"Conductivity": float(conductivity), |
|
|
} |
|
|
except Exception: |
|
|
return "Invalid numeric inputs.", None |
|
|
|
|
|
lat = float(latitude) if latitude is not None else None |
|
|
lon = float(longitude) if longitude is not None else None |
|
|
now_utc = datetime.now(pytz.UTC) |
|
|
|
|
|
label = decide_quality(values["pH"], values["Turbidity"], values["Temperature"], |
|
|
values["DissolvedOxygen"], values["Conductivity"]) |
|
|
|
|
|
reasons = check_anomalies(values, THRESHOLDS) |
|
|
anomaly_status = "Anomaly Detected" if reasons else "No Anomaly" |
|
|
|
|
|
reading_id = create_reading_in_sf(values, sensor_id, lat, lon, now_utc) |
|
|
analysis_id = create_analysis_in_sf(reading_id, label, reasons) |
|
|
|
|
|
lines = [ |
|
|
f"Quality Level: {label}", |
|
|
f"Anomaly: {anomaly_status}", |
|
|
] |
|
|
if reasons: |
|
|
lines.append("Reasons: " + "; ".join(reasons)) |
|
|
if reading_id: |
|
|
lines.append(f"Saved: WaterQualityReading__c Id = {reading_id}") |
|
|
else: |
|
|
lines.append("⚠️ Could not save WaterQualityReading__c. Check server logs for the exact Salesforce error.") |
|
|
if analysis_id: |
|
|
lines.append(f"Saved: WaterQualityAnalysis__c Id = {analysis_id}") |
|
|
elif reading_id: |
|
|
lines.append("⚠️ Reading saved but Analysis insert failed. See logs.") |
|
|
|
|
|
return "\n".join(lines), { |
|
|
"quality_level": label, |
|
|
"anomaly": bool(reasons), |
|
|
"reasons": reasons, |
|
|
"reading_id": reading_id, |
|
|
"analysis_id": analysis_id, |
|
|
} |
|
|
|
|
|
with gr.Blocks(title="💧 AI Water Quality Checker") as iface: |
|
|
gr.Markdown("# 💧 AI Water Quality Checker with Salesforce Integration") |
|
|
with gr.Row(): |
|
|
pH = gr.Number(label="pH", value=7.2) |
|
|
turbidity = gr.Number(label="Turbidity (NTU)", value=1.5) |
|
|
temperature = gr.Number(label="Temperature (°C)", value=25.0) |
|
|
dissolved_oxygen = gr.Number(label="Dissolved Oxygen (mg/L)", value=8.0) |
|
|
conductivity = gr.Number(label="Conductivity (µS/cm)", value=350.0) |
|
|
with gr.Row(): |
|
|
sensor_id = gr.Textbox(label="Sensor ID", value="SENSOR001") |
|
|
latitude = gr.Number(label="Latitude", value=17.3850) |
|
|
longitude = gr.Number(label="Longitude", value=78.4867) |
|
|
|
|
|
btn = gr.Button("Analyze & Save to Salesforce") |
|
|
out_text = gr.Textbox(label="Result", lines=8) |
|
|
out_json = gr.JSON(label="Debug / IDs") |
|
|
|
|
|
btn.click( |
|
|
fn=predict_and_save, |
|
|
inputs=[pH, turbidity, temperature, dissolved_oxygen, conductivity, sensor_id, latitude, longitude], |
|
|
outputs=[out_text, out_json], |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
iface.launch() |
|
|
|