File size: 16,338 Bytes
9b3d9a0 95bbd6d 4bcc686 78f9096 4bcc686 6524787 78f9096 ef144b9 4bcc686 78f9096 6524787 ef144b9 78f9096 ef144b9 78f9096 4bcc686 e521f8a 6524787 7a5eb22 e521f8a 960fabc 7a5eb22 e521f8a 95bbd6d ef144b9 4bcc686 ef144b9 95bbd6d 36baf26 95bbd6d 7a5eb22 71845c1 7a5eb22 4bcc686 7a5eb22 95bbd6d 4bcc686 95bbd6d 4bcc686 95bbd6d 4bcc686 7a5eb22 36baf26 95bbd6d 36baf26 7a5eb22 95bbd6d 4bcc686 95bbd6d 7a5eb22 95bbd6d 95cc7e5 4bcc686 e521f8a 4bcc686 e521f8a 4bcc686 e521f8a 4bcc686 7a5eb22 4bcc686 95bbd6d 4bcc686 e521f8a 95bbd6d ef144b9 4bcc686 ef144b9 4bcc686 ef144b9 56c4a12 ef144b9 2b1b1b6 ef144b9 4bcc686 2b1b1b6 4bcc686 56c4a12 4bcc686 a2fac34 4bcc686 a2fac34 4bcc686 ef144b9 4bcc686 ef144b9 c921174 ef144b9 56c4a12 ef144b9 4bcc686 a2fac34 c921174 a2fac34 c921174 a2fac34 36baf26 4bcc686 ef144b9 36baf26 ef144b9 36baf26 a2fac34 4bcc686 56c4a12 a2fac34 56c4a12 a2fac34 56c4a12 a2fac34 56c4a12 ef144b9 36baf26 ef144b9 56c4a12 2b1b1b6 56c4a12 2b1b1b6 56c4a12 a2fac34 c921174 a2fac34 56c4a12 ef144b9 2b1b1b6 ef144b9 a2fac34 56c4a12 ef144b9 4bcc686 ef144b9 4bcc686 ef144b9 4bcc686 ef144b9 4bcc686 ef144b9 56c4a12 ef144b9 95bbd6d ef144b9 95bbd6d 7a5eb22 95bbd6d 36baf26 ef144b9 95bbd6d 7a5eb22 a2fac34 f32805f a2fac34 c921174 a2fac34 95bbd6d c921174 95bbd6d 4bcc686 2b1b1b6 95bbd6d 56c4a12 a2fac34 56c4a12 95bbd6d 7a5eb22 95bbd6d c921174 56c4a12 95bbd6d 95cc7e5 95bbd6d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 |
import os
import sys
import traceback
from pathlib import Path
from typing import List, Tuple, Any
import duckdb
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use("Agg") # headless for Spaces
import matplotlib.pyplot as plt
import gradio as gr
# =========================
# Basic configuration
# =========================
APP_TITLE = "ALCO Liquidity & Interest-Rate Risk Dashboard"
TABLE_FQN = "my_db.main.masterdataset_v" # source table
VIEW_FQN = "my_db.main.positions_v" # normalized view created by this app
PRODUCT_ASSETS = [
"loan", "overdraft", "advances", "bills", "bill",
"tbond", "t-bond", "tbill", "t-bill", "repo_asset", "assets"
]
PRODUCT_SOF = [
"fd", "term_deposit", "td", "savings", "current",
"call", "repo_liab"
]
# =========================
# Helpers
# =========================
def connect_md() -> duckdb.DuckDBPyConnection:
token = os.environ.get("MOTHERDUCK_TOKEN", "")
if not token:
raise RuntimeError("MOTHERDUCK_TOKEN is not set. Add it in Space β Settings β Secrets.")
return duckdb.connect(f"md:?motherduck_token={token}")
def discover_columns(conn: duckdb.DuckDBPyConnection, table_fqn: str) -> List[str]:
# Try DESCRIBE first (fast), fall back to information_schema
try:
df = conn.execute(f"DESCRIBE {table_fqn};").fetchdf()
name_col = "column_name" if "column_name" in df.columns else df.columns[0]
return [str(c).lower() for c in df[name_col].tolist()]
except Exception:
df = conn.execute(
f"""
SELECT lower(column_name) AS col
FROM information_schema.columns
WHERE table_catalog = split_part('{table_fqn}', '.', 1)
AND table_schema = split_part('{table_fqn}', '.', 2)
AND table_name = split_part('{table_fqn}', '.', 3)
"""
).fetchdf()
return df["col"].tolist()
def build_view_sql(existing_cols: List[str]) -> str:
wanted = [
"as_of_date", "product", "months", "segments",
"currency", "Portfolio_value", "Interest_rate",
"days_to_maturity"
]
sel = []
for c in wanted:
if c.lower() in existing_cols:
sel.append(c)
else:
if c in ("Portfolio_value", "Interest_rate", "days_to_maturity", "months"):
sel.append(f"CAST(NULL AS DOUBLE) AS {c}")
else:
sel.append(f"CAST(NULL AS VARCHAR) AS {c}")
sof_list = ", ".join([f"'{p}'" for p in PRODUCT_SOF])
asset_list = ", ".join([f"'{p}'" for p in PRODUCT_ASSETS])
bucket_case = (
f"CASE "
f"WHEN lower(product) IN ({sof_list}) THEN 'SoF' "
f"WHEN lower(product) IN ({asset_list}) THEN 'Assets' "
f"ELSE 'Unknown' END AS bucket"
)
select_sql = ",\n ".join(sel + [bucket_case])
return f"""
CREATE OR REPLACE VIEW {VIEW_FQN} AS
SELECT
{select_sql}
FROM {TABLE_FQN};
"""
def ensure_view(conn: duckdb.DuckDBPyConnection, cols: List[str]) -> None:
required = {"product", "portfolio_value", "days_to_maturity"}
if not required.issubset(set(cols)):
raise RuntimeError(
f"Source table {TABLE_FQN} must contain columns {sorted(required)}; found {sorted(cols)}"
)
conn.execute(build_view_sql(cols))
def safe_num(x) -> float:
try:
return float(0.0 if x is None or (isinstance(x, float) and np.isnan(x)) else x)
except Exception:
return 0.0
def zeros_like_index(index) -> pd.Series:
return pd.Series([0] * len(index), index=index)
def plot_ladder(df: pd.DataFrame):
try:
if df is None or df.empty:
fig, ax = plt.subplots(figsize=(7, 3))
ax.text(0.5, 0.5, "No data", ha="center", va="center")
ax.axis("off")
return fig
pivot = df.pivot(index="time_bucket", columns="bucket", values="Amount (LKR Mn)").fillna(0)
order = ["T+1", "T+2..7", "T+8..30", "T+31+"]
pivot = pivot.reindex(order)
fig, ax = plt.subplots(figsize=(7, 4))
assets = pivot["Assets"] if "Assets" in pivot.columns else zeros_like_index(pivot.index)
sof = pivot["SoF"] if "SoF" in pivot.columns else zeros_like_index(pivot.index)
ax.bar(pivot.index, assets, label="Assets")
ax.bar(pivot.index, -sof, label="SoF")
ax.axhline(0, color="gray", lw=1)
ax.set_ylabel("LKR (Mn)")
ax.set_title("Maturity Ladder (Assets vs SoF)")
ax.legend()
fig.tight_layout()
return fig
except Exception as e:
fig, ax = plt.subplots(figsize=(7, 3))
ax.text(0.01, 0.8, "Chart Error:", fontsize=12, ha="left")
ax.text(0.01, 0.5, str(e), fontsize=10, ha="left", wrap=True)
ax.axis("off")
return fig
# =========================
# Query fragments
# =========================
KPI_SQL = f"""
SELECT
COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS assets_t1,
COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS sof_t1,
COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0)
- COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS net_gap_t1
FROM {VIEW_FQN};
"""
LADDER_SQL = f"""
SELECT
CASE
WHEN days_to_maturity <= 1 THEN 'T+1'
WHEN days_to_maturity BETWEEN 2 AND 7 THEN 'T+2..7'
WHEN days_to_maturity BETWEEN 8 AND 30 THEN 'T+8..30'
ELSE 'T+31+'
END AS time_bucket,
bucket,
SUM(Portfolio_value) / 1000000.0 AS "Amount (LKR Mn)"
FROM {VIEW_FQN}
GROUP BY 1,2
ORDER BY 1,2;
"""
GAP_DRIVERS_SQL = f"""
SELECT
product,
bucket,
SUM(Portfolio_value) / 1000000.0 AS "Amount (LKR Mn)"
FROM {VIEW_FQN}
WHERE days_to_maturity <= 1
GROUP BY 1, 2
ORDER BY 3 DESC;
"""
def irr_sql(cols: List[str]) -> str:
has_months = "months" in cols
has_ir = "interest_rate" in cols
t_expr = "CASE WHEN days_to_maturity IS NOT NULL THEN days_to_maturity/365.0"
if has_months:
t_expr += " WHEN months IS NOT NULL THEN months/12.0"
t_expr += " ELSE NULL END"
y_expr = "(Interest_rate/100.0)" if has_ir else "0.0"
return f"""
WITH irr_calcs AS (
SELECT
bucket,
Portfolio_value AS pv,
-- Modified Duration = Macaulay Duration / (1 + yield)
-- We approximate Macaulay Duration with time-to-maturity in years (t_expr)
({t_expr}) / (1 + {y_expr}) AS mod_dur
FROM {VIEW_FQN}
)
SELECT
bucket,
SUM(pv) / 1000000.0 AS "Portfolio Value (LKR Mn)",
-- BPV (DV01) = SUM(Portfolio Value * Modified Duration * 0.0001)
SUM(pv * mod_dur * 0.0001) AS "BPV (DV01)"
FROM irr_calcs
GROUP BY bucket;
"""
# =========================
# Dashboard callback
# =========================
def run_dashboard(scenario: str, runoff_pct: float, rate_shock_bps_input: float) -> Tuple[str, str, str, str, str, Any, pd.DataFrame, pd.DataFrame, str, pd.DataFrame]:
"""
Returns:
status, as_of, a1_text, a2_text, a3_text, figure, ladder_df, irr_df,
explain_text, drivers_df
"""
try:
conn = connect_md()
# --- Scenario Application ---
# Create a temporary view with scenario adjustments.
# Subsequent queries will use this stressed view.
stressed_view_fqn = "positions_v_stressed"
runoff_factor = 1.0
rate_shock_bps = 0.0
if scenario == "Liquidity Stress: High Deposit Runoff" and runoff_pct > 0:
runoff_factor = (100.0 - runoff_pct) / 100.0
elif scenario == "IRR Stress: Rate Shock" and rate_shock_bps_input != 0:
rate_shock_bps = rate_shock_bps_input
scenario_sql = f"""
CREATE OR REPLACE TEMP VIEW {stressed_view_fqn} AS
SELECT *,
CASE WHEN lower(product) IN ('savings', 'fd', 'td', 'term_deposit') THEN Portfolio_value * {runoff_factor} ELSE Portfolio_value END AS stressed_pv
FROM {VIEW_FQN};
"""
conn.execute(scenario_sql)
# 1) Discover columns & build view
cols = discover_columns(conn, TABLE_FQN)
ensure_view(conn, cols)
# 2) As-of (optional)
as_of = "N/A"
if "as_of_date" in cols:
tmp = conn.execute(f"SELECT max(as_of_date) AS d FROM {VIEW_FQN}").fetchdf()
if not tmp.empty and not pd.isna(tmp["d"].iloc[0]):
as_of = str(tmp["d"].iloc[0])[:10]
# 3) KPIs
# Modify queries to use the stressed view and value column
kpi_sql_stressed = KPI_SQL.replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv")
kpi = conn.execute(kpi_sql_stressed).fetchdf()
assets_t1 = safe_num(kpi["assets_t1"].iloc[0]) if not kpi.empty else 0.0
sof_t1 = safe_num(kpi["sof_t1"].iloc[0]) if not kpi.empty else 0.0
net_gap = safe_num(kpi["net_gap_t1"].iloc[0]) if not kpi.empty else 0.0
# 4) Ladder, IRR, and Gap Drivers
ladder_sql_stressed = LADDER_SQL.replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv")
drivers_sql_stressed = GAP_DRIVERS_SQL.replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv")
irr_sql_stressed = irr_sql(cols).replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv")
ladder = conn.execute(ladder_sql_stressed).fetchdf()
irr = conn.execute(irr_sql_stressed).fetchdf()
drivers = conn.execute(drivers_sql_stressed).fetchdf()
# Create display copies of dataframes and format them for the UI
ladder_display = ladder.copy()
if "Amount (LKR Mn)" in ladder.columns:
ladder_display["Amount (LKR Mn)"] = ladder_display["Amount (LKR Mn)"].map('{:,.2f}'.format)
else:
ladder_display = pd.DataFrame()
# Format IRR table
irr_display = irr.copy()
if not irr_display.empty:
irr_display["Portfolio Value (LKR Mn)"] = irr_display["Portfolio Value (LKR Mn)"].map('{:,.2f}'.format)
irr_display["BPV (DV01)"] = irr_display["BPV (DV01)"].map('{:,.2f}'.format)
if "Amount (LKR Mn)" in drivers.columns:
drivers_display = drivers.copy()
drivers_display["Amount (LKR Mn)"] = drivers_display["Amount (LKR Mn)"].map('{:,.2f}'.format)
else:
drivers_display = pd.DataFrame()
# 5) Chart
fig = plot_ladder(ladder)
# 6) Explanations
assets_t1_mn_str = f"{(assets_t1 / 1_000_000):,.2f}"
sof_t1_mn_str = f"{(sof_t1 / 1_000_000):,.2f}"
net_gap_mn_str = f"{(net_gap / 1_000_000):,.2f}"
gap_sign_str = "positive" if net_gap >= 0 else "negative"
a1_text = f"The amount of Assets maturing tomorrow (T+1) is **LKR {assets_t1_mn_str} Mn**."
a2_text = f"The amount of Sources of Funds (SoF) maturing tomorrow (T+1) is **LKR {sof_t1_mn_str} Mn**."
a3_text = f"The resulting Net Liquidity Gap for tomorrow (T+1) is **LKR {net_gap_mn_str} Mn**."
# Build "Why" text
sof_drivers = drivers[drivers["bucket"] == "SoF"]
asset_drivers = drivers[drivers["bucket"] == "Assets"]
top_sof_prod = sof_drivers.iloc[0] if not sof_drivers.empty else None
top_asset_prod = asset_drivers.iloc[0] if not asset_drivers.empty else None
explain_text = f"### Why is the T+1 Gap {gap_sign_str}?\n\n"
if top_sof_prod is not None:
explain_text += f"* **Largest Liability Maturity:** The largest outflow comes from `{top_sof_prod['product']}`, with **LKR {top_sof_prod['Amount (LKR Mn)']:,.2f} Mn** maturing.\n"
else:
explain_text += "* **Largest Liability Maturity:** No significant liabilities are maturing tomorrow.\n"
if top_asset_prod is not None:
explain_text += f"* **Largest Asset Inflow:** The largest inflow comes from `{top_asset_prod['product']}`, with **LKR {top_asset_prod['Amount (LKR Mn)']:,.2f} Mn** maturing.\n"
else:
explain_text += "* **Largest Asset Inflow:** No significant assets are maturing to provide inflows tomorrow.\n"
# Note: The data source does not contain features for seasonal analysis (e.g., day_of_week, is_month_end).
explain_text += "* **Seasonal Pattern:** Analysis not possible without relevant time-series features in the source data."
# Add scenario explanation for IRR stress
if scenario == "IRR Stress: Rate Shock" and rate_shock_bps != 0 and not irr.empty:
net_bpv = irr["BPV (DV01)"].sum()
eve_impact = net_bpv * rate_shock_bps
eve_impact_mn = eve_impact / 1_000_000
explain_text += f"\n\n### IRR Stress Scenario Impact\n* A **+{rate_shock_bps:.0f} bps** rate shock is projected to change the portfolio's Economic Value by **LKR {eve_impact_mn:,.2f} Mn**."
status = f"β
OK (as of {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')})"
return (
status,
as_of,
a1_text,
a2_text,
a3_text,
fig,
ladder_display,
irr_display,
explain_text,
drivers_display,
)
except Exception as e:
tb = traceback.format_exc()
empty_df = pd.DataFrame()
fig = plot_ladder(empty_df)
return (
f"β Error: {e}\n\n{tb}",
"N/A",
"0",
"0",
"0",
fig,
empty_df,
empty_df,
"Analysis could not be performed.",
empty_df,
)
# =========================
# Build Gradio UI
# =========================
with gr.Blocks(title=APP_TITLE) as demo:
gr.Markdown(f"# {APP_TITLE}\n_Source:_ `{TABLE_FQN}` β `{VIEW_FQN}`")
status = gr.Textbox(label="Status", interactive=False, lines=8)
with gr.Row():
refresh_btn = gr.Button("π Refresh", variant="primary")
theme_btn = gr.Button("π Toggle Theme")
theme_btn.click(
None,
None,
js="() => { document.querySelector('html').classList.toggle('dark'); }"
)
scenario_dd = gr.Dropdown(
label="Select Stress Scenario",
choices=["Baseline", "Liquidity Stress: High Deposit Runoff", "IRR Stress: Rate Shock"],
value="Baseline"
)
with gr.Accordion("Stress Scenario Parameters", open=False):
runoff_slider = gr.Slider(
label="Deposit Runoff (%)",
minimum=0, maximum=100, step=1, value=20,
info="For Liquidity Stress: Percentage of key deposits that run off."
)
shock_slider = gr.Slider(
label="Rate Shock (bps)",
minimum=-500, maximum=500, step=25, value=200,
info="For IRR Stress: Parallel shift in the yield curve in basis points."
)
with gr.Row():
as_of = gr.Textbox(label="As of date", interactive=False)
a1 = gr.Markdown("The amount of Assets maturing tomorrow (T+1) is...")
a2 = gr.Markdown("The amount of Sources of Funds (SoF) maturing tomorrow (T+1) is...")
a3 = gr.Markdown("The resulting Net Liquidity Gap for tomorrow (T+1) is...")
with gr.Row():
with gr.Column(scale=2):
chart = gr.Plot(label="Maturity Ladder")
ladder_df = gr.Dataframe(label="Ladder Detail")
irr_df = gr.Dataframe(
label="Interest-Rate Risk (BPV/DV01)",
headers=["Bucket", "Portfolio Value (LKR Mn)", "BPV (DV01)"]
)
with gr.Column(scale=1):
explain_text = gr.Markdown("Analysis of the T+1 gap will appear here...")
drivers_df = gr.Dataframe(
label="T+1 Gap Drivers (Top Products)",
headers=["Product", "Bucket", "Amount (LKR Mn)"],
)
refresh_btn.click(
fn=run_dashboard,
inputs=[scenario_dd, runoff_slider, shock_slider],
outputs=[status, as_of, a1, a2, a3, chart, ladder_df, irr_df, explain_text, drivers_df],
)
if __name__ == "__main__":
demo.launch()
|