No topics yet. Start the conversation.
Summary
User supplied summary for the plot
Electricity generation (GW) for Spain last 7 days.
Description
The below description is supplied in free-text by the user
import sys
import numpy as np
import requests, pandas as pd
from lxml import etree
from novem import Plot
import inspect
import os
import dotenv
dotenv.load_dotenv()
TOKEN = os.getenv("ENTSO_TOKEN")
REGIONS = {
"Norway": {
"documentType": "A75",
"domains": [
"10YNO-1--------2", # NO1
"10YNO-2--------T", # NO2
"10YNO-3--------J", # NO3
"10YNO-4--------9", # NO4
"10Y1001A1001A48H", # NO5
],
},
"Spain": {
"documentType": "A75",
"domains": [
"10YES-REE------0", # ES mainland
],
},
"Sweden": {
"documentType": "A75",
"domains": [
"10Y1001A1001A44P", # SE1
"10Y1001A1001A45N", # SE2
"10Y1001A1001A46L", # SE3
"10Y1001A1001A47J", # SE4
],
},
"Germany": {
"documentType": "A75",
"domains": [
"10Y1001A1001A82H", # Germany
],
}
}
PSR_TO_NAME = {
'B01': 'Biomass',
'B02': 'Fossil Brown coal/Lignite',
'B03': 'Fossil Coal-derived gas',
'B04': 'Fossil Gas',
'B05': 'Fossil Hard coal',
'B06': 'Fossil Oil',
'B07': 'Fossil Oil shale',
'B08': 'Fossil Peat',
'B09': 'Geothermal',
'B10': 'Hydro Pumped Storage',
'B11': 'Hydro Run-of-river and poundage',
'B12': 'Hydro Water Reservoir',
'B13': 'Marine',
'B14': 'Nuclear',
'B15': 'Other renewable',
'B16': 'Solar',
'B17': 'Waste',
'B18': 'Wind Offshore',
'B19': 'Wind Onshore',
'B20': 'Other',
'B25': 'Energy storage'
}
AGGREGATION = {
'Biomass': 'Organic',
'Energy storage': 'Battery',
'Fossil Brown coal/Lignite': 'Fossil',
'Fossil Coal-derived gas': 'Fossil',
'Fossil Gas': 'Fossil',
'Fossil Hard coal': 'Fossil',
'Fossil Oil': 'Fossil',
'Fossil Oil shale': 'Fossil',
'Fossil Peat': 'Fossil',
'Geothermal': 'Other',
'Hydro Pumped Storage': 'Hydro',
'Hydro Run-of-river and poundage': 'Hydro',
'Hydro Water Reservoir': 'Hydro',
'Marine': 'Other',
'Nuclear': 'Nuclear',
'Other': 'Other',
'Other renewable': 'Other',
'Solar': 'Solar',
'Waste': 'Organic',
'Wind Offshore': 'Wind',
'Wind Onshore': 'Wind'
}
T_END = pd.Timestamp.utcnow().floor('60min')
T_START = T_END - pd.Timedelta('7d')
T_INTERVAL = f"{T_START:%Y-%m-%dT%H:%MZ}/{T_END:%Y-%m-%dT%H:%MZ}"
PARAMETERS_BASE = {
"securityToken": TOKEN,
"documentType": "A65",
"processType": "A16",
"TimeInterval": T_INTERVAL,
}
def fetch_data(eics, document_type):
dfs = []
for eic in eics:
p = PARAMETERS_BASE.copy()
if document_type == "A16":
p["area"] = eic
else:
p["in_Domain"] = eic
p["documentType"] = document_type
if document_type == "A65":
p["outBiddingZone_Domain"] = eic
if document_type == "A16":
p["processType"] = "A01"
print(f"Fetching data for {eic}...")
r = requests.get("https://web-api.tp.entsoe.eu/api", params=p)
try:
r.raise_for_status()
except requests.HTTPError as e:
print("\n❌ HTTP error for:", p)
print("Response:", r.text)
raise
dfs.append(parse_gl(r.content))
df = pd.concat(dfs).groupby(["ts_utc","psrType"], as_index=False)["MW"].sum()
return df
# Minimal XML → tidy dataframe (timestamp, psrType, MW)
# Emits explicit zeros for positions absent within a Period — ENTSO-E
# omits positions when a source is not producing (e.g. Solar at night).
def parse_gl(xml_bytes):
ns = {"gl":"urn:iec62325.351:tc57wg16:451-6:generationloaddocument:3:0"}
root = etree.fromstring(xml_bytes)
rows = []
for ts in root.findall(".//gl:TimeSeries", ns):
psr = ts.findtext(".//gl:MktPSRType/gl:psrType", namespaces=ns)
period = ts.find(".//gl:Period", ns)
start = pd.to_datetime(period.findtext("./gl:timeInterval/gl:start", namespaces=ns))
end = pd.to_datetime(period.findtext("./gl:timeInterval/gl:end", namespaces=ns))
res = pd.Timedelta(period.findtext("./gl:resolution", namespaces=ns).replace("PT","").lower())
n_expected = int((end - start) / res)
# Build lookup of reported positions
reported = {}
for pt in period.findall("./gl:Point", ns):
pos = int(pt.findtext("./gl:position", namespaces=ns))
reported[pos] = float(pt.findtext("./gl:quantity", namespaces=ns))
if not reported:
continue
last_reported = max(reported)
# Emit positions up to the last reported one; gaps between
# reported points get 0 (source not producing, e.g. Solar at
# night). Positions after the last report are omitted — the
# TSO hasn't submitted that data yet, not necessarily zero.
for pos in range(1, last_reported + 1):
ts_utc = start + (pos - 1) * res
rows.append((ts_utc, psr, reported.get(pos, 0.0)))
return pd.DataFrame(rows, columns=["ts_utc","psrType","MW"])
def clean_data(df):
df['ts_utc'] = pd.to_datetime(df['ts_utc'])
all_ts = df['ts_utc'].sort_values().unique()
full_index = pd.MultiIndex.from_product(
[all_ts, df['psrType'].unique()],
names=['ts_utc', 'psrType']
)
df = (
df.set_index(['ts_utc', 'psrType'])
.reindex(full_index, fill_value=0)
.reset_index()
)
# Map names
df['Production Type'] = df['psrType'].map(PSR_TO_NAME)
df['prod_type_agg'] = df['Production Type'].map(AGGREGATION)
# Rename and convert
df = df.rename(columns={'ts_utc': 'start', 'MW': 'Generation (MW)'})
df['Generation (MW)'] = pd.to_numeric(df['Generation (MW)'], errors='coerce')
df['Generation (GW)'] = df['Generation (MW)'] / 1000 # Convert to GW
df.index.name = 'Timestamp'
# Drop unused columns early
df = df.drop(columns=['psrType', 'Production Type'])
# Aggregate
df = df.groupby(['start', 'prod_type_agg'])['Generation (GW)'].sum()
# ---- KEY PART: make a complete grid ----
# All timestamps
all_times = df.index.get_level_values('start').unique()
# All categories
all_cats = df.index.get_level_values('prod_type_agg').unique()
# Full multi-index
full_index = pd.MultiIndex.from_product(
[all_times, all_cats],
names=["start", "prod_type_agg"]
)
# Reindex and fill missing values with 0
df = df.reindex(full_index, fill_value=0).reset_index()
# --- Gap repair: fill zero-runs where the source is clearly still
# producing (substantial values on both sides). The both-sides
# check (>0.5 GW) is the safety mechanism: sources that naturally
# go to zero (e.g. Solar) ramp down before the gap, so the
# "before" value is already low and the fill won't trigger. ---
repaired = []
for cat, grp in df.groupby('prod_type_agg'):
grp = grp.sort_values('start').copy()
vals = grp['Generation (GW)'].values
near_zero = vals < 0.05
# Label consecutive runs of near-zero values
nz_series = pd.Series(near_zero)
labels = (nz_series != nz_series.shift()).cumsum().values
for run_id in np.unique(labels[near_zero]):
run_mask = labels == run_id
# Check that values on both sides of the gap are substantial
idxs = np.where(run_mask)[0]
before = vals[idxs[0] - 1] if idxs[0] > 0 else 0
after = vals[idxs[-1] + 1] if idxs[-1] < len(vals) - 1 else None
is_trailing = after is None
if after is None:
after = 0
# Interior gaps: both sides must be substantial.
# Trailing gaps (end of series): fill if before is substantial
# and gap is short (≤12h / 48 periods) — data lag, not shutdown.
if (before > 0.5 and after > 0.5) or \
(is_trailing and before > 0.5 and run_mask.sum() <= 48):
grp.iloc[idxs, grp.columns.get_loc('Generation (GW)')] = pd.NA
grp['Generation (GW)'] = grp['Generation (GW)'].ffill()
grp['Generation (GW)'] = grp['Generation (GW)'].bfill()
repaired.append(grp)
df = pd.concat(repaired, ignore_index=True)
return df
def annotate_categories(df, min_pct=2):
"""Merge small categories into Other and add share % to labels."""
total_by_cat = df.groupby('prod_type_agg')['Generation (GW)'].sum()
grand_total = total_by_cat.sum()
pct = (total_by_cat / grand_total * 100)
# Merge categories below threshold into Other
small = pct[pct < min_pct].index
df.loc[df['prod_type_agg'].isin(small), 'prod_type_agg'] = 'Other'
df = df.groupby(['start', 'prod_type_agg'], as_index=False)['Generation (GW)'].sum()
# Recalculate percentages after merge
total_by_cat = df.groupby('prod_type_agg')['Generation (GW)'].sum()
grand_total = total_by_cat.sum()
pct = (total_by_cat / grand_total * 100).round(1)
# Rename categories to include percentage
label_map = {cat: f"{cat} ({pct[cat]:.0f}%)" if pct[cat] >= 1 else f"{cat} (<1%)"
for cat in pct.index}
df['prod_type_agg'] = df['prod_type_agg'].map(label_map)
return df
def novem_plot(df_hourly, region):
plot_id = f'entso-e-generation-{region.lower()}'
plt = Plot(plot_id)
plt.type = 'custom'
plt.data = df_hourly
plt.name = f'Electricity generation (GW) for {region} last 7 days. '
plt.title = f'Generation (GW) {region}'
plt.caption = 'Source: ENTSO-E, calculations by novem.'
plt.shared = 'public'
with open('./custom/custom-entso-e.js', 'r') as f:
plt.api_write('/config/custom/custom.js', f.read())
plt.description = '```python' + ' ' + inspect.getsource(sys.modules[__name__]) + '```' # Include this python script in Description
plt.summary = plt.name
# %%
for region in REGIONS.keys():
print(f"Fetching {region} data...")
eics = REGIONS[region]['domains']
document_type = REGIONS[region]['documentType']
df = fetch_data(eics,document_type)
df = clean_data(df)
df = annotate_categories(df)
novem_plot(df, region)
# %%