fbmc-chronos2 / scripts /collect_entsoe_24month.py
Evgueni Poloukarov
feat: Phase 1 complete - Master CNEC list + synchronized feature engineering
d4939ce
raw
history blame
16.6 kB
"""
Collect Complete 24-Month ENTSO-E Dataset
==========================================
Collects all ENTSO-E data for FBMC forecasting:
- Generation by PSR type (8 types × 12 zones)
- Demand (12 zones)
- Day-ahead prices (12 zones)
- Hydro reservoir storage (7 zones)
- Pumped storage generation (7 zones)
- Load forecasts (12 zones)
- Asset-specific transmission outages (200 CNECs)
- Generation outages by technology (5 types × 7 priority zones)
Period: October 2023 - September 2025 (24 months)
Estimated time: 4-6 hours with rate limiting (27 req/min)
"""
import sys
from pathlib import Path
import polars as pl
from datetime import datetime
# Add src to path
sys.path.append(str(Path(__file__).parent.parent))
from src.data_collection.collect_entsoe import EntsoECollector, BIDDING_ZONES, PUMPED_STORAGE_ZONES, HYDRO_RESERVOIR_ZONES
print("="*80)
print("COMPLETE 24-MONTH ENTSO-E DATA COLLECTION")
print("="*80)
print()
print("Period: October 2023 - September 2025")
print("Target features: ~246-351 ENTSO-E features (including generation outages)")
print()
# Initialize collector (OPTIMIZED: 55 req/min = 92% of 60 limit, yearly chunks)
collector = EntsoECollector(requests_per_minute=55)
# Output directory
output_dir = Path(__file__).parent.parent / 'data' / 'raw'
output_dir.mkdir(parents=True, exist_ok=True)
# Collection parameters
START_DATE = '2023-10-01'
END_DATE = '2025-09-30'
# Key PSR types for generation (8 most important)
KEY_PSR_TYPES = {
'B14': 'Nuclear',
'B04': 'Fossil Gas',
'B05': 'Fossil Hard coal',
'B06': 'Fossil Oil',
'B19': 'Wind Onshore',
'B16': 'Solar',
'B11': 'Hydro Run-of-river',
'B12': 'Hydro Water Reservoir'
}
results = {}
# ============================================================================
# 1. Generation by PSR Type (8 types × 12 zones = 96 features)
# ============================================================================
print("-"*80)
print("[1/8] GENERATION BY PSR TYPE")
print("-"*80)
print()
# Check if generation data already exists
gen_path = output_dir / "entsoe_generation_by_psr_24month.parquet"
if gen_path.exists():
print(f"[SKIP] Generation data already exists at {gen_path}")
print(f" File size: {gen_path.stat().st_size / (1024**2):.1f} MB")
results['generation'] = gen_path
else:
print(f"Collecting 8 PSR types for 12 FBMC zones...")
print(f"PSR types: {', '.join(KEY_PSR_TYPES.values())}")
print()
generation_data = []
total_queries = len(BIDDING_ZONES) * len(KEY_PSR_TYPES)
completed = 0
start_time = datetime.now()
for zone in BIDDING_ZONES.keys():
for psr_code, psr_name in KEY_PSR_TYPES.items():
completed += 1
print(f"[{completed}/{total_queries}] {zone} - {psr_name}...")
try:
df = collector.collect_generation_by_psr_type(
zone=zone,
psr_type=psr_code,
start_date=START_DATE,
end_date=END_DATE
)
if not df.is_empty():
generation_data.append(df)
print(f" [OK] {len(df):,} records")
else:
print(f" - No data")
except Exception as e:
print(f" [ERROR] {e}")
if generation_data:
generation_df = pl.concat(generation_data)
generation_df.write_parquet(gen_path)
results['generation'] = gen_path
print()
print(f"[SUCCESS] Generation: {len(generation_df):,} records -> {gen_path}")
print(f" File size: {gen_path.stat().st_size / (1024**2):.1f} MB")
print()
# ============================================================================
# 2. Demand / Load (12 zones = 12 features)
# ============================================================================
print("-"*80)
print("[2/8] DEMAND / LOAD")
print("-"*80)
print()
# Check if demand data already exists
load_path = output_dir / "entsoe_demand_24month.parquet"
if load_path.exists():
print(f"[SKIP] Demand data already exists at {load_path}")
print(f" File size: {load_path.stat().st_size / (1024**2):.1f} MB")
results['demand'] = load_path
else:
load_data = []
for i, zone in enumerate(BIDDING_ZONES.keys(), 1):
print(f"[{i}/{len(BIDDING_ZONES)}] {zone} demand...")
try:
df = collector.collect_load(
zone=zone,
start_date=START_DATE,
end_date=END_DATE
)
if not df.is_empty():
load_data.append(df)
print(f" [OK] {len(df):,} records")
else:
print(f" - No data")
except Exception as e:
print(f" [ERROR] {e}")
if load_data:
load_df = pl.concat(load_data)
load_df.write_parquet(load_path)
results['demand'] = load_path
print()
print(f"[SUCCESS] Demand: {len(load_df):,} records -> {load_path}")
print(f" File size: {load_path.stat().st_size / (1024**2):.1f} MB")
print()
# ============================================================================
# 3. Day-Ahead Prices (12 zones = 12 features)
# ============================================================================
print("-"*80)
print("[3/8] DAY-AHEAD PRICES")
print("-"*80)
print()
prices_data = []
for i, zone in enumerate(BIDDING_ZONES.keys(), 1):
print(f"[{i}/{len(BIDDING_ZONES)}] {zone} prices...")
try:
df = collector.collect_day_ahead_prices(
zone=zone,
start_date=START_DATE,
end_date=END_DATE
)
if not df.is_empty():
prices_data.append(df)
print(f" [OK] {len(df):,} records")
else:
print(f" - No data")
except Exception as e:
print(f" [ERROR] {e}")
if prices_data:
prices_df = pl.concat(prices_data)
prices_path = output_dir / "entsoe_prices_24month.parquet"
prices_df.write_parquet(prices_path)
results['prices'] = prices_path
print()
print(f"[SUCCESS] Prices: {len(prices_df):,} records -> {prices_path}")
print(f" File size: {prices_path.stat().st_size / (1024**2):.1f} MB")
print()
# ============================================================================
# 4. Hydro Reservoir Storage (7 zones = 7 features)
# ============================================================================
print("-"*80)
print("[4/8] HYDRO RESERVOIR STORAGE")
print("-"*80)
print()
print(f"Collecting for {len(HYDRO_RESERVOIR_ZONES)} zones with significant hydro capacity...")
print()
hydro_data = []
for i, zone in enumerate(HYDRO_RESERVOIR_ZONES, 1):
print(f"[{i}/{len(HYDRO_RESERVOIR_ZONES)}] {zone} hydro storage...")
try:
df = collector.collect_hydro_reservoir_storage(
zone=zone,
start_date=START_DATE,
end_date=END_DATE
)
if not df.is_empty():
hydro_data.append(df)
print(f" [OK] {len(df):,} records (weekly)")
else:
print(f" - No data")
except Exception as e:
print(f" [ERROR] {e}")
if hydro_data:
hydro_df = pl.concat(hydro_data)
hydro_path = output_dir / "entsoe_hydro_storage_24month.parquet"
hydro_df.write_parquet(hydro_path)
results['hydro_storage'] = hydro_path
print()
print(f"[SUCCESS] Hydro Storage: {len(hydro_df):,} records (weekly) -> {hydro_path}")
print(f" File size: {hydro_path.stat().st_size / (1024**2):.1f} MB")
print(f" Note: Will be interpolated to hourly in processing step")
print()
# ============================================================================
# 5. Pumped Storage Generation (7 zones = 7 features)
# ============================================================================
print("-"*80)
print("[5/8] PUMPED STORAGE GENERATION")
print("-"*80)
print()
print(f"Collecting for {len(PUMPED_STORAGE_ZONES)} zones...")
print("Note: Consumption data not available from ENTSO-E API (Phase 1 finding)")
print()
pumped_data = []
for i, zone in enumerate(PUMPED_STORAGE_ZONES, 1):
print(f"[{i}/{len(PUMPED_STORAGE_ZONES)}] {zone} pumped storage...")
try:
df = collector.collect_pumped_storage_generation(
zone=zone,
start_date=START_DATE,
end_date=END_DATE
)
if not df.is_empty():
pumped_data.append(df)
print(f" [OK] {len(df):,} records")
else:
print(f" - No data")
except Exception as e:
print(f" [ERROR] {e}")
if pumped_data:
pumped_df = pl.concat(pumped_data)
pumped_path = output_dir / "entsoe_pumped_storage_24month.parquet"
pumped_df.write_parquet(pumped_path)
results['pumped_storage'] = pumped_path
print()
print(f"[SUCCESS] Pumped Storage: {len(pumped_df):,} records -> {pumped_path}")
print(f" File size: {pumped_path.stat().st_size / (1024**2):.1f} MB")
print()
# ============================================================================
# 6. Load Forecasts (12 zones = 12 features)
# ============================================================================
print("-"*80)
print("[6/8] LOAD FORECASTS")
print("-"*80)
print()
forecast_data = []
for i, zone in enumerate(BIDDING_ZONES.keys(), 1):
print(f"[{i}/{len(BIDDING_ZONES)}] {zone} load forecast...")
try:
df = collector.collect_load_forecast(
zone=zone,
start_date=START_DATE,
end_date=END_DATE
)
if not df.is_empty():
forecast_data.append(df)
print(f" [OK] {len(df):,} records")
else:
print(f" - No data")
except Exception as e:
print(f" [ERROR] {e}")
if forecast_data:
forecast_df = pl.concat(forecast_data)
forecast_path = output_dir / "entsoe_load_forecast_24month.parquet"
forecast_df.write_parquet(forecast_path)
results['load_forecast'] = forecast_path
print()
print(f"[SUCCESS] Load Forecast: {len(forecast_df):,} records -> {forecast_path}")
print(f" File size: {forecast_path.stat().st_size / (1024**2):.1f} MB")
print()
# ============================================================================
# 7. Asset-Specific Transmission Outages (200 CNECs = 80-165 features expected)
# ============================================================================
print("-"*80)
print("[7/8] ASSET-SPECIFIC TRANSMISSION OUTAGES")
print("-"*80)
print()
print("Loading 200 CNEC EIC codes...")
try:
cnec_file = Path(__file__).parent.parent / 'data' / 'processed' / 'critical_cnecs_all.csv'
cnec_df = pl.read_csv(cnec_file)
cnec_eics = cnec_df.select('cnec_eic').to_series().to_list()
print(f"[OK] Loaded {len(cnec_eics)} CNEC EICs")
print()
print("Collecting asset-specific transmission outages...")
print("Using Phase 1 validated XML parsing method")
print("Querying all 22 FBMC borders...")
print()
outages_df = collector.collect_transmission_outages_asset_specific(
cnec_eics=cnec_eics,
start_date=START_DATE,
end_date=END_DATE
)
if not outages_df.is_empty():
outages_path = output_dir / "entsoe_transmission_outages_24month.parquet"
outages_df.write_parquet(outages_path)
results['transmission_outages'] = outages_path
unique_cnecs = outages_df.select('asset_eic').n_unique()
coverage_pct = unique_cnecs / len(cnec_eics) * 100
print()
print(f"[SUCCESS] Transmission Outages: {len(outages_df):,} records -> {outages_path}")
print(f" File size: {outages_path.stat().st_size / (1024**2):.1f} MB")
print(f" Unique CNECs matched: {unique_cnecs} / {len(cnec_eics)} ({coverage_pct:.1f}%)")
# Show border summary
border_summary = outages_df.group_by('border').agg(
pl.len().alias('outage_count')
).sort('outage_count', descending=True)
print()
print(" Outages by border (top 10):")
for row in border_summary.head(10).iter_rows(named=True):
print(f" {row['border']}: {row['outage_count']:,} outages")
else:
print()
print(" Warning: No CNEC-matched outages found")
except Exception as e:
print(f"[ERROR] collecting transmission outages: {e}")
print()
# ============================================================================
# 8. Generation Outages by Technology (5 types × 7 priority zones = 20-30 features)
# ============================================================================
print("-"*80)
print("[8/8] GENERATION OUTAGES BY TECHNOLOGY")
print("-"*80)
print()
print("Collecting generation unit outages for priority zones with nuclear/fossil capacity...")
print()
# Priority zones with significant nuclear or fossil generation
NUCLEAR_ZONES = ['FR', 'BE', 'CZ', 'HU', 'RO', 'SI', 'SK']
# Technology types (PSR) prioritized by impact on cross-border flows
OUTAGE_PSR_TYPES = {
'B14': 'Nuclear', # Highest priority - large capacity, planned months ahead
'B04': 'Fossil_Gas', # Flexible generation affecting flow patterns
'B05': 'Fossil_Hard_coal',
'B02': 'Fossil_Brown_coal_Lignite',
'B06': 'Fossil_Oil'
}
gen_outages_data = []
total_combos = len(NUCLEAR_ZONES) * len(OUTAGE_PSR_TYPES)
combo_count = 0
for zone in NUCLEAR_ZONES:
for psr_code, psr_name in OUTAGE_PSR_TYPES.items():
combo_count += 1
print(f"[{combo_count}/{total_combos}] {zone} - {psr_name}...")
try:
df = collector.collect_generation_outages(
zone=zone,
psr_type=psr_code,
start_date=START_DATE,
end_date=END_DATE
)
if not df.is_empty():
gen_outages_data.append(df)
total_capacity = df.select('capacity_mw').sum().item()
print(f" [OK] {len(df):,} outages ({total_capacity:,.0f} MW affected)")
else:
print(f" - No outages")
except Exception as e:
print(f" [ERROR] {e}")
if gen_outages_data:
gen_outages_df = pl.concat(gen_outages_data)
gen_outages_path = output_dir / "entsoe_generation_outages_24month.parquet"
gen_outages_df.write_parquet(gen_outages_path)
results['generation_outages'] = gen_outages_path
unique_combos = gen_outages_df.select(
(pl.col('zone') + "_" + pl.col('psr_name')).alias('zone_tech')
).n_unique()
print()
print(f"[SUCCESS] Generation Outages: {len(gen_outages_df):,} records -> {gen_outages_path}")
print(f" File size: {gen_outages_path.stat().st_size / (1024**2):.1f} MB")
print(f" Unique zone-technology combinations: {unique_combos}")
print(f" Features expected: {unique_combos * 2} (binary + MW for each)")
# Show technology summary
tech_summary = gen_outages_df.group_by('psr_name').agg([
pl.len().alias('outage_count'),
pl.col('capacity_mw').sum().alias('total_capacity_mw')
]).sort('total_capacity_mw', descending=True)
print()
print(" Outages by technology:")
for row in tech_summary.iter_rows(named=True):
print(f" {row['psr_name']}: {row['outage_count']:,} outages, {row['total_capacity_mw']:,.0f} MW")
else:
print()
print(" Warning: No generation outages found")
print(" This may be normal if no outages occurred in 24-month period")
print()
# ============================================================================
# SUMMARY
# ============================================================================
end_time = datetime.now()
total_time = end_time - start_time
print("="*80)
print("24-MONTH ENTSO-E COLLECTION COMPLETE")
print("="*80)
print()
print(f"Total time: {total_time}")
print(f"Files created: {len(results)}")
print()
total_size = 0
for data_type, path in results.items():
file_size = path.stat().st_size / (1024**2)
total_size += file_size
print(f" {data_type}: {file_size:.1f} MB")
print()
print(f"Total data size: {total_size:.1f} MB")
print()
print("Output directory: data/raw/")
print()
print("Next steps:")
print(" 1. Run process_entsoe_features.py to:")
print(" - Encode transmission outages to hourly binary")
print(" - Encode generation outages to hourly (binary + MW)")
print(" - Interpolate hydro weekly storage to hourly")
print(" 2. Merge all ENTSO-E features into single matrix")
print(" 3. Combine with JAO features (726) -> ~972-1,077 total features")
print()
print("="*80)