Could be cool to have the spot power-price overlaid on the right hand axis in this chart. Especially when you see the kind of moves we saw on 25th of march 26 wrt to fuelmix
Martin Myrseth feels like it would be cool to both add screenshots to comments as well as the ability to link to a chart point in time. As time moves the abvoe comment isn't gonna make as much sense
Indeed. I know Bjørnar Snoksrud has started looking into point-in-time rendering of plots. I've also been thinking that image attachments would be nice.
Summary
User supplied summary for the plot
Electricity generation (GW) for Germany last 7 days.
Description
The below description is supplied in free-text by the user
import sys
import numpy as np
import requests, pandas as pd
from lxml import etree
from novem import Plot
import inspect
import os
import dotenv
dotenv.load_dotenv()
TOKEN = os.getenv("ENTSO_TOKEN")
REGIONS = {
"Norway": {
"documentType": "A75",
"domains": [
"10YNO-1--------2", # NO1
"10YNO-2--------T", # NO2
"10YNO-3--------J", # NO3
"10YNO-4--------9", # NO4
"10Y1001A1001A48H", # NO5
],
},
"Spain": {
"documentType": "A75",
"domains": [
"10YES-REE------0", # ES mainland
],
},
"Sweden": {
"documentType": "A75",
"domains": [
"10Y1001A1001A44P", # SE1
"10Y1001A1001A45N", # SE2
"10Y1001A1001A46L", # SE3
"10Y1001A1001A47J", # SE4
],
},
"Germany": {
"documentType": "A75",
"domains": [
"10Y1001A1001A82H", # Germany
],
}
}
PSR_TO_NAME = {
'B01': 'Biomass',
'B02': 'Fossil Brown coal/Lignite',
'B03': 'Fossil Coal-derived gas',
'B04': 'Fossil Gas',
'B05': 'Fossil Hard coal',
'B06': 'Fossil Oil',
'B07': 'Fossil Oil shale',
'B08': 'Fossil Peat',
'B09': 'Geothermal',
'B10': 'Hydro Pumped Storage',
'B11': 'Hydro Run-of-river and poundage',
'B12': 'Hydro Water Reservoir',
'B13': 'Marine',
'B14': 'Nuclear',
'B15': 'Other renewable',
'B16': 'Solar',
'B17': 'Waste',
'B18': 'Wind Offshore',
'B19': 'Wind Onshore',
'B20': 'Other',
'B25': 'Energy storage'
}
AGGREGATION = {
'Biomass': 'Organic',
'Energy storage': 'Battery',
'Fossil Brown coal/Lignite': 'Fossil',
'Fossil Coal-derived gas': 'Fossil',
'Fossil Gas': 'Fossil',
'Fossil Hard coal': 'Fossil',
'Fossil Oil': 'Fossil',
'Fossil Oil shale': 'Fossil',
'Fossil Peat': 'Fossil',
'Geothermal': 'Other',
'Hydro Pumped Storage': 'Hydro',
'Hydro Run-of-river and poundage': 'Hydro',
'Hydro Water Reservoir': 'Hydro',
'Marine': 'Other',
'Nuclear': 'Nuclear',
'Other': 'Other',
'Other renewable': 'Other',
'Solar': 'Solar',
'Waste': 'Organic',
'Wind Offshore': 'Wind',
'Wind Onshore': 'Wind'
}
T_END = pd.Timestamp.utcnow().floor('60min')
T_START = T_END - pd.Timedelta('7d')
T_INTERVAL = f"{T_START:%Y-%m-%dT%H:%MZ}/{T_END:%Y-%m-%dT%H:%MZ}"
PARAMETERS_BASE = {
"securityToken": TOKEN,
"documentType": "A65",
"processType": "A16",
"TimeInterval": T_INTERVAL,
}
def fetch_data(eics, document_type):
dfs = []
for eic in eics:
p = PARAMETERS_BASE.copy()
if document_type == "A16":
p["area"] = eic
else:
p["in_Domain"] = eic
p["documentType"] = document_type
if document_type == "A65":
p["outBiddingZone_Domain"] = eic
if document_type == "A16":
p["processType"] = "A01"
print(f"Fetching data for {eic}...")
r = requests.get("https://web-api.tp.entsoe.eu/api", params=p)
try:
r.raise_for_status()
except requests.HTTPError as e:
print("\n❌ HTTP error for:", p)
print("Response:", r.text)
raise
dfs.append(parse_gl(r.content))
df = pd.concat(dfs).groupby(["ts_utc","psrType"], as_index=False)["MW"].sum()
return df
# Minimal XML → tidy dataframe (timestamp, psrType, MW)
# Emits explicit zeros for positions absent within a Period — ENTSO-E
# omits positions when a source is not producing (e.g. Solar at night).
def parse_gl(xml_bytes):
ns = {"gl":"urn:iec62325.351:tc57wg16:451-6:generationloaddocument:3:0"}
root = etree.fromstring(xml_bytes)
rows = []
for ts in root.findall(".//gl:TimeSeries", ns):
psr = ts.findtext(".//gl:MktPSRType/gl:psrType", namespaces=ns)
period = ts.find(".//gl:Period", ns)
start = pd.to_datetime(period.findtext("./gl:timeInterval/gl:start", namespaces=ns))
end = pd.to_datetime(period.findtext("./gl:timeInterval/gl:end", namespaces=ns))
res = pd.Timedelta(period.findtext("./gl:resolution", namespaces=ns).replace("PT","").lower())
n_expected = int((end - start) / res)
# Build lookup of reported positions
reported = {}
for pt in period.findall("./gl:Point", ns):
pos = int(pt.findtext("./gl:position", namespaces=ns))
reported[pos] = float(pt.findtext("./gl:quantity", namespaces=ns))
if not reported:
continue
last_reported = max(reported)
# Emit positions up to the last reported one; gaps between
# reported points get 0 (source not producing, e.g. Solar at
# night). Positions after the last report are omitted — the
# TSO hasn't submitted that data yet, not necessarily zero.
for pos in range(1, last_reported + 1):
ts_utc = start + (pos - 1) * res
rows.append((ts_utc, psr, reported.get(pos, 0.0)))
return pd.DataFrame(rows, columns=["ts_utc","psrType","MW"])
def clean_data(df):
df['ts_utc'] = pd.to_datetime(df['ts_utc'])
all_ts = df['ts_utc'].sort_values().unique()
full_index = pd.MultiIndex.from_product(
[all_ts, df['psrType'].unique()],
names=['ts_utc', 'psrType']
)
df = (
df.set_index(['ts_utc', 'psrType'])
.reindex(full_index, fill_value=0)
.reset_index()
)
# Map names
df['Production Type'] = df['psrType'].map(PSR_TO_NAME)
df['prod_type_agg'] = df['Production Type'].map(AGGREGATION)
# Rename and convert
df = df.rename(columns={'ts_utc': 'start', 'MW': 'Generation (MW)'})
df['Generation (MW)'] = pd.to_numeric(df['Generation (MW)'], errors='coerce')
df['Generation (GW)'] = df['Generation (MW)'] / 1000 # Convert to GW
df.index.name = 'Timestamp'
# Drop unused columns early
df = df.drop(columns=['psrType', 'Production Type'])
# Aggregate
df = df.groupby(['start', 'prod_type_agg'])['Generation (GW)'].sum()
# ---- KEY PART: make a complete grid ----
# All timestamps
all_times = df.index.get_level_values('start').unique()
# All categories
all_cats = df.index.get_level_values('prod_type_agg').unique()
# Full multi-index
full_index = pd.MultiIndex.from_product(
[all_times, all_cats],
names=["start", "prod_type_agg"]
)
# Reindex and fill missing values with 0
df = df.reindex(full_index, fill_value=0).reset_index()
# --- Gap repair: fill zero-runs where the source is clearly still
# producing (substantial values on both sides). The both-sides
# check (>0.5 GW) is the safety mechanism: sources that naturally
# go to zero (e.g. Solar) ramp down before the gap, so the
# "before" value is already low and the fill won't trigger. ---
repaired = []
for cat, grp in df.groupby('prod_type_agg'):
grp = grp.sort_values('start').copy()
vals = grp['Generation (GW)'].values
near_zero = vals < 0.05
# Label consecutive runs of near-zero values
nz_series = pd.Series(near_zero)
labels = (nz_series != nz_series.shift()).cumsum().values
for run_id in np.unique(labels[near_zero]):
run_mask = labels == run_id
# Check that values on both sides of the gap are substantial
idxs = np.where(run_mask)[0]
before = vals[idxs[0] - 1] if idxs[0] > 0 else 0
after = vals[idxs[-1] + 1] if idxs[-1] < len(vals) - 1 else None
is_trailing = after is None
if after is None:
after = 0
# Interior gaps: both sides must be substantial.
# Trailing gaps (end of series): fill if before is substantial
# and gap is short (≤12h / 48 periods) — data lag, not shutdown.
if (before > 0.5 and after > 0.5) or \
(is_trailing and before > 0.5 and run_mask.sum() <= 48):
grp.iloc[idxs, grp.columns.get_loc('Generation (GW)')] = pd.NA
grp['Generation (GW)'] = grp['Generation (GW)'].ffill()
grp['Generation (GW)'] = grp['Generation (GW)'].bfill()
repaired.append(grp)
df = pd.concat(repaired, ignore_index=True)
return df
def annotate_categories(df, min_pct=2):
"""Merge small categories into Other and add share % to labels."""
total_by_cat = df.groupby('prod_type_agg')['Generation (GW)'].sum()
grand_total = total_by_cat.sum()
pct = (total_by_cat / grand_total * 100)
# Merge categories below threshold into Other
small = pct[pct < min_pct].index
df.loc[df['prod_type_agg'].isin(small), 'prod_type_agg'] = 'Other'
df = df.groupby(['start', 'prod_type_agg'], as_index=False)['Generation (GW)'].sum()
# Recalculate percentages after merge
total_by_cat = df.groupby('prod_type_agg')['Generation (GW)'].sum()
grand_total = total_by_cat.sum()
pct = (total_by_cat / grand_total * 100).round(1)
# Rename categories to include percentage
label_map = {cat: f"{cat} ({pct[cat]:.0f}%)" if pct[cat] >= 1 else f"{cat} (<1%)"
for cat in pct.index}
df['prod_type_agg'] = df['prod_type_agg'].map(label_map)
return df
def novem_plot(df_hourly, region):
plot_id = f'entso-e-generation-{region.lower()}'
plt = Plot(plot_id)
plt.type = 'custom'
plt.data = df_hourly
plt.name = f'Electricity generation (GW) for {region} last 7 days. '
plt.title = f'Generation (GW) {region}'
plt.caption = 'Source: ENTSO-E, calculations by novem.'
plt.shared = 'public'
with open('./custom/custom-entso-e.js', 'r') as f:
plt.api_write('/config/custom/custom.js', f.read())
plt.description = '```python' + ' ' + inspect.getsource(sys.modules[__name__]) + '```' # Include this python script in Description
plt.summary = plt.name
# %%
for region in REGIONS.keys():
print(f"Fetching {region} data...")
eics = REGIONS[region]['domains']
document_type = REGIONS[region]['documentType']
df = fetch_data(eics,document_type)
df = clean_data(df)
df = annotate_categories(df)
novem_plot(df, region)
# %%