Spaces:
Sleeping
Sleeping
Evgueni Poloukarov
feat: Phase 1 complete - Master CNEC list + synchronized feature engineering
d4939ce
| """ | |
| Collect Complete 24-Month ENTSO-E Dataset | |
| ========================================== | |
| Collects all ENTSO-E data for FBMC forecasting: | |
| - Generation by PSR type (8 types × 12 zones) | |
| - Demand (12 zones) | |
| - Day-ahead prices (12 zones) | |
| - Hydro reservoir storage (7 zones) | |
| - Pumped storage generation (7 zones) | |
| - Load forecasts (12 zones) | |
| - Asset-specific transmission outages (200 CNECs) | |
| - Generation outages by technology (5 types × 7 priority zones) | |
| Period: October 2023 - September 2025 (24 months) | |
| Estimated time: 4-6 hours with rate limiting (27 req/min) | |
| """ | |
| import sys | |
| from pathlib import Path | |
| import polars as pl | |
| from datetime import datetime | |
| # Add src to path | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| from src.data_collection.collect_entsoe import EntsoECollector, BIDDING_ZONES, PUMPED_STORAGE_ZONES, HYDRO_RESERVOIR_ZONES | |
| print("="*80) | |
| print("COMPLETE 24-MONTH ENTSO-E DATA COLLECTION") | |
| print("="*80) | |
| print() | |
| print("Period: October 2023 - September 2025") | |
| print("Target features: ~246-351 ENTSO-E features (including generation outages)") | |
| print() | |
| # Initialize collector (OPTIMIZED: 55 req/min = 92% of 60 limit, yearly chunks) | |
| collector = EntsoECollector(requests_per_minute=55) | |
| # Output directory | |
| output_dir = Path(__file__).parent.parent / 'data' / 'raw' | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Collection parameters | |
| START_DATE = '2023-10-01' | |
| END_DATE = '2025-09-30' | |
| # Key PSR types for generation (8 most important) | |
| KEY_PSR_TYPES = { | |
| 'B14': 'Nuclear', | |
| 'B04': 'Fossil Gas', | |
| 'B05': 'Fossil Hard coal', | |
| 'B06': 'Fossil Oil', | |
| 'B19': 'Wind Onshore', | |
| 'B16': 'Solar', | |
| 'B11': 'Hydro Run-of-river', | |
| 'B12': 'Hydro Water Reservoir' | |
| } | |
| results = {} | |
| # ============================================================================ | |
| # 1. Generation by PSR Type (8 types × 12 zones = 96 features) | |
| # ============================================================================ | |
| print("-"*80) | |
| print("[1/8] GENERATION BY PSR TYPE") | |
| print("-"*80) | |
| print() | |
| # Check if generation data already exists | |
| gen_path = output_dir / "entsoe_generation_by_psr_24month.parquet" | |
| if gen_path.exists(): | |
| print(f"[SKIP] Generation data already exists at {gen_path}") | |
| print(f" File size: {gen_path.stat().st_size / (1024**2):.1f} MB") | |
| results['generation'] = gen_path | |
| else: | |
| print(f"Collecting 8 PSR types for 12 FBMC zones...") | |
| print(f"PSR types: {', '.join(KEY_PSR_TYPES.values())}") | |
| print() | |
| generation_data = [] | |
| total_queries = len(BIDDING_ZONES) * len(KEY_PSR_TYPES) | |
| completed = 0 | |
| start_time = datetime.now() | |
| for zone in BIDDING_ZONES.keys(): | |
| for psr_code, psr_name in KEY_PSR_TYPES.items(): | |
| completed += 1 | |
| print(f"[{completed}/{total_queries}] {zone} - {psr_name}...") | |
| try: | |
| df = collector.collect_generation_by_psr_type( | |
| zone=zone, | |
| psr_type=psr_code, | |
| start_date=START_DATE, | |
| end_date=END_DATE | |
| ) | |
| if not df.is_empty(): | |
| generation_data.append(df) | |
| print(f" [OK] {len(df):,} records") | |
| else: | |
| print(f" - No data") | |
| except Exception as e: | |
| print(f" [ERROR] {e}") | |
| if generation_data: | |
| generation_df = pl.concat(generation_data) | |
| generation_df.write_parquet(gen_path) | |
| results['generation'] = gen_path | |
| print() | |
| print(f"[SUCCESS] Generation: {len(generation_df):,} records -> {gen_path}") | |
| print(f" File size: {gen_path.stat().st_size / (1024**2):.1f} MB") | |
| print() | |
| # ============================================================================ | |
| # 2. Demand / Load (12 zones = 12 features) | |
| # ============================================================================ | |
| print("-"*80) | |
| print("[2/8] DEMAND / LOAD") | |
| print("-"*80) | |
| print() | |
| # Check if demand data already exists | |
| load_path = output_dir / "entsoe_demand_24month.parquet" | |
| if load_path.exists(): | |
| print(f"[SKIP] Demand data already exists at {load_path}") | |
| print(f" File size: {load_path.stat().st_size / (1024**2):.1f} MB") | |
| results['demand'] = load_path | |
| else: | |
| load_data = [] | |
| for i, zone in enumerate(BIDDING_ZONES.keys(), 1): | |
| print(f"[{i}/{len(BIDDING_ZONES)}] {zone} demand...") | |
| try: | |
| df = collector.collect_load( | |
| zone=zone, | |
| start_date=START_DATE, | |
| end_date=END_DATE | |
| ) | |
| if not df.is_empty(): | |
| load_data.append(df) | |
| print(f" [OK] {len(df):,} records") | |
| else: | |
| print(f" - No data") | |
| except Exception as e: | |
| print(f" [ERROR] {e}") | |
| if load_data: | |
| load_df = pl.concat(load_data) | |
| load_df.write_parquet(load_path) | |
| results['demand'] = load_path | |
| print() | |
| print(f"[SUCCESS] Demand: {len(load_df):,} records -> {load_path}") | |
| print(f" File size: {load_path.stat().st_size / (1024**2):.1f} MB") | |
| print() | |
| # ============================================================================ | |
| # 3. Day-Ahead Prices (12 zones = 12 features) | |
| # ============================================================================ | |
| print("-"*80) | |
| print("[3/8] DAY-AHEAD PRICES") | |
| print("-"*80) | |
| print() | |
| prices_data = [] | |
| for i, zone in enumerate(BIDDING_ZONES.keys(), 1): | |
| print(f"[{i}/{len(BIDDING_ZONES)}] {zone} prices...") | |
| try: | |
| df = collector.collect_day_ahead_prices( | |
| zone=zone, | |
| start_date=START_DATE, | |
| end_date=END_DATE | |
| ) | |
| if not df.is_empty(): | |
| prices_data.append(df) | |
| print(f" [OK] {len(df):,} records") | |
| else: | |
| print(f" - No data") | |
| except Exception as e: | |
| print(f" [ERROR] {e}") | |
| if prices_data: | |
| prices_df = pl.concat(prices_data) | |
| prices_path = output_dir / "entsoe_prices_24month.parquet" | |
| prices_df.write_parquet(prices_path) | |
| results['prices'] = prices_path | |
| print() | |
| print(f"[SUCCESS] Prices: {len(prices_df):,} records -> {prices_path}") | |
| print(f" File size: {prices_path.stat().st_size / (1024**2):.1f} MB") | |
| print() | |
| # ============================================================================ | |
| # 4. Hydro Reservoir Storage (7 zones = 7 features) | |
| # ============================================================================ | |
| print("-"*80) | |
| print("[4/8] HYDRO RESERVOIR STORAGE") | |
| print("-"*80) | |
| print() | |
| print(f"Collecting for {len(HYDRO_RESERVOIR_ZONES)} zones with significant hydro capacity...") | |
| print() | |
| hydro_data = [] | |
| for i, zone in enumerate(HYDRO_RESERVOIR_ZONES, 1): | |
| print(f"[{i}/{len(HYDRO_RESERVOIR_ZONES)}] {zone} hydro storage...") | |
| try: | |
| df = collector.collect_hydro_reservoir_storage( | |
| zone=zone, | |
| start_date=START_DATE, | |
| end_date=END_DATE | |
| ) | |
| if not df.is_empty(): | |
| hydro_data.append(df) | |
| print(f" [OK] {len(df):,} records (weekly)") | |
| else: | |
| print(f" - No data") | |
| except Exception as e: | |
| print(f" [ERROR] {e}") | |
| if hydro_data: | |
| hydro_df = pl.concat(hydro_data) | |
| hydro_path = output_dir / "entsoe_hydro_storage_24month.parquet" | |
| hydro_df.write_parquet(hydro_path) | |
| results['hydro_storage'] = hydro_path | |
| print() | |
| print(f"[SUCCESS] Hydro Storage: {len(hydro_df):,} records (weekly) -> {hydro_path}") | |
| print(f" File size: {hydro_path.stat().st_size / (1024**2):.1f} MB") | |
| print(f" Note: Will be interpolated to hourly in processing step") | |
| print() | |
| # ============================================================================ | |
| # 5. Pumped Storage Generation (7 zones = 7 features) | |
| # ============================================================================ | |
| print("-"*80) | |
| print("[5/8] PUMPED STORAGE GENERATION") | |
| print("-"*80) | |
| print() | |
| print(f"Collecting for {len(PUMPED_STORAGE_ZONES)} zones...") | |
| print("Note: Consumption data not available from ENTSO-E API (Phase 1 finding)") | |
| print() | |
| pumped_data = [] | |
| for i, zone in enumerate(PUMPED_STORAGE_ZONES, 1): | |
| print(f"[{i}/{len(PUMPED_STORAGE_ZONES)}] {zone} pumped storage...") | |
| try: | |
| df = collector.collect_pumped_storage_generation( | |
| zone=zone, | |
| start_date=START_DATE, | |
| end_date=END_DATE | |
| ) | |
| if not df.is_empty(): | |
| pumped_data.append(df) | |
| print(f" [OK] {len(df):,} records") | |
| else: | |
| print(f" - No data") | |
| except Exception as e: | |
| print(f" [ERROR] {e}") | |
| if pumped_data: | |
| pumped_df = pl.concat(pumped_data) | |
| pumped_path = output_dir / "entsoe_pumped_storage_24month.parquet" | |
| pumped_df.write_parquet(pumped_path) | |
| results['pumped_storage'] = pumped_path | |
| print() | |
| print(f"[SUCCESS] Pumped Storage: {len(pumped_df):,} records -> {pumped_path}") | |
| print(f" File size: {pumped_path.stat().st_size / (1024**2):.1f} MB") | |
| print() | |
| # ============================================================================ | |
| # 6. Load Forecasts (12 zones = 12 features) | |
| # ============================================================================ | |
| print("-"*80) | |
| print("[6/8] LOAD FORECASTS") | |
| print("-"*80) | |
| print() | |
| forecast_data = [] | |
| for i, zone in enumerate(BIDDING_ZONES.keys(), 1): | |
| print(f"[{i}/{len(BIDDING_ZONES)}] {zone} load forecast...") | |
| try: | |
| df = collector.collect_load_forecast( | |
| zone=zone, | |
| start_date=START_DATE, | |
| end_date=END_DATE | |
| ) | |
| if not df.is_empty(): | |
| forecast_data.append(df) | |
| print(f" [OK] {len(df):,} records") | |
| else: | |
| print(f" - No data") | |
| except Exception as e: | |
| print(f" [ERROR] {e}") | |
| if forecast_data: | |
| forecast_df = pl.concat(forecast_data) | |
| forecast_path = output_dir / "entsoe_load_forecast_24month.parquet" | |
| forecast_df.write_parquet(forecast_path) | |
| results['load_forecast'] = forecast_path | |
| print() | |
| print(f"[SUCCESS] Load Forecast: {len(forecast_df):,} records -> {forecast_path}") | |
| print(f" File size: {forecast_path.stat().st_size / (1024**2):.1f} MB") | |
| print() | |
| # ============================================================================ | |
| # 7. Asset-Specific Transmission Outages (200 CNECs = 80-165 features expected) | |
| # ============================================================================ | |
| print("-"*80) | |
| print("[7/8] ASSET-SPECIFIC TRANSMISSION OUTAGES") | |
| print("-"*80) | |
| print() | |
| print("Loading 200 CNEC EIC codes...") | |
| try: | |
| cnec_file = Path(__file__).parent.parent / 'data' / 'processed' / 'critical_cnecs_all.csv' | |
| cnec_df = pl.read_csv(cnec_file) | |
| cnec_eics = cnec_df.select('cnec_eic').to_series().to_list() | |
| print(f"[OK] Loaded {len(cnec_eics)} CNEC EICs") | |
| print() | |
| print("Collecting asset-specific transmission outages...") | |
| print("Using Phase 1 validated XML parsing method") | |
| print("Querying all 22 FBMC borders...") | |
| print() | |
| outages_df = collector.collect_transmission_outages_asset_specific( | |
| cnec_eics=cnec_eics, | |
| start_date=START_DATE, | |
| end_date=END_DATE | |
| ) | |
| if not outages_df.is_empty(): | |
| outages_path = output_dir / "entsoe_transmission_outages_24month.parquet" | |
| outages_df.write_parquet(outages_path) | |
| results['transmission_outages'] = outages_path | |
| unique_cnecs = outages_df.select('asset_eic').n_unique() | |
| coverage_pct = unique_cnecs / len(cnec_eics) * 100 | |
| print() | |
| print(f"[SUCCESS] Transmission Outages: {len(outages_df):,} records -> {outages_path}") | |
| print(f" File size: {outages_path.stat().st_size / (1024**2):.1f} MB") | |
| print(f" Unique CNECs matched: {unique_cnecs} / {len(cnec_eics)} ({coverage_pct:.1f}%)") | |
| # Show border summary | |
| border_summary = outages_df.group_by('border').agg( | |
| pl.len().alias('outage_count') | |
| ).sort('outage_count', descending=True) | |
| print() | |
| print(" Outages by border (top 10):") | |
| for row in border_summary.head(10).iter_rows(named=True): | |
| print(f" {row['border']}: {row['outage_count']:,} outages") | |
| else: | |
| print() | |
| print(" Warning: No CNEC-matched outages found") | |
| except Exception as e: | |
| print(f"[ERROR] collecting transmission outages: {e}") | |
| print() | |
| # ============================================================================ | |
| # 8. Generation Outages by Technology (5 types × 7 priority zones = 20-30 features) | |
| # ============================================================================ | |
| print("-"*80) | |
| print("[8/8] GENERATION OUTAGES BY TECHNOLOGY") | |
| print("-"*80) | |
| print() | |
| print("Collecting generation unit outages for priority zones with nuclear/fossil capacity...") | |
| print() | |
| # Priority zones with significant nuclear or fossil generation | |
| NUCLEAR_ZONES = ['FR', 'BE', 'CZ', 'HU', 'RO', 'SI', 'SK'] | |
| # Technology types (PSR) prioritized by impact on cross-border flows | |
| OUTAGE_PSR_TYPES = { | |
| 'B14': 'Nuclear', # Highest priority - large capacity, planned months ahead | |
| 'B04': 'Fossil_Gas', # Flexible generation affecting flow patterns | |
| 'B05': 'Fossil_Hard_coal', | |
| 'B02': 'Fossil_Brown_coal_Lignite', | |
| 'B06': 'Fossil_Oil' | |
| } | |
| gen_outages_data = [] | |
| total_combos = len(NUCLEAR_ZONES) * len(OUTAGE_PSR_TYPES) | |
| combo_count = 0 | |
| for zone in NUCLEAR_ZONES: | |
| for psr_code, psr_name in OUTAGE_PSR_TYPES.items(): | |
| combo_count += 1 | |
| print(f"[{combo_count}/{total_combos}] {zone} - {psr_name}...") | |
| try: | |
| df = collector.collect_generation_outages( | |
| zone=zone, | |
| psr_type=psr_code, | |
| start_date=START_DATE, | |
| end_date=END_DATE | |
| ) | |
| if not df.is_empty(): | |
| gen_outages_data.append(df) | |
| total_capacity = df.select('capacity_mw').sum().item() | |
| print(f" [OK] {len(df):,} outages ({total_capacity:,.0f} MW affected)") | |
| else: | |
| print(f" - No outages") | |
| except Exception as e: | |
| print(f" [ERROR] {e}") | |
| if gen_outages_data: | |
| gen_outages_df = pl.concat(gen_outages_data) | |
| gen_outages_path = output_dir / "entsoe_generation_outages_24month.parquet" | |
| gen_outages_df.write_parquet(gen_outages_path) | |
| results['generation_outages'] = gen_outages_path | |
| unique_combos = gen_outages_df.select( | |
| (pl.col('zone') + "_" + pl.col('psr_name')).alias('zone_tech') | |
| ).n_unique() | |
| print() | |
| print(f"[SUCCESS] Generation Outages: {len(gen_outages_df):,} records -> {gen_outages_path}") | |
| print(f" File size: {gen_outages_path.stat().st_size / (1024**2):.1f} MB") | |
| print(f" Unique zone-technology combinations: {unique_combos}") | |
| print(f" Features expected: {unique_combos * 2} (binary + MW for each)") | |
| # Show technology summary | |
| tech_summary = gen_outages_df.group_by('psr_name').agg([ | |
| pl.len().alias('outage_count'), | |
| pl.col('capacity_mw').sum().alias('total_capacity_mw') | |
| ]).sort('total_capacity_mw', descending=True) | |
| print() | |
| print(" Outages by technology:") | |
| for row in tech_summary.iter_rows(named=True): | |
| print(f" {row['psr_name']}: {row['outage_count']:,} outages, {row['total_capacity_mw']:,.0f} MW") | |
| else: | |
| print() | |
| print(" Warning: No generation outages found") | |
| print(" This may be normal if no outages occurred in 24-month period") | |
| print() | |
| # ============================================================================ | |
| # SUMMARY | |
| # ============================================================================ | |
| end_time = datetime.now() | |
| total_time = end_time - start_time | |
| print("="*80) | |
| print("24-MONTH ENTSO-E COLLECTION COMPLETE") | |
| print("="*80) | |
| print() | |
| print(f"Total time: {total_time}") | |
| print(f"Files created: {len(results)}") | |
| print() | |
| total_size = 0 | |
| for data_type, path in results.items(): | |
| file_size = path.stat().st_size / (1024**2) | |
| total_size += file_size | |
| print(f" {data_type}: {file_size:.1f} MB") | |
| print() | |
| print(f"Total data size: {total_size:.1f} MB") | |
| print() | |
| print("Output directory: data/raw/") | |
| print() | |
| print("Next steps:") | |
| print(" 1. Run process_entsoe_features.py to:") | |
| print(" - Encode transmission outages to hourly binary") | |
| print(" - Encode generation outages to hourly (binary + MW)") | |
| print(" - Interpolate hydro weekly storage to hourly") | |
| print(" 2. Merge all ENTSO-E features into single matrix") | |
| print(" 3. Combine with JAO features (726) -> ~972-1,077 total features") | |
| print() | |
| print("="*80) | |