Skip to content
Open
1 change: 1 addition & 0 deletions doc/changes/dev/13945.other.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor the EGI MFF reader to use ``mffpy`` for metadata parsing, improving robustness and maintainability. This includes a shim for PNS sensor parsing to bypass an upstream bug in ``mffpy`` and resolves channel naming inconsistencies, by `Pragnya Khandelwal`_.
32 changes: 32 additions & 0 deletions mne/fixes.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,3 +910,35 @@ def read_from_file_or_buffer(
dtype = np.dtype(dtype)
buffer = file.read(dtype.itemsize * count)
return np.frombuffer(buffer, dtype=dtype, count=count)


def _get_mffpy_pns_sensors(pns_obj):
"""Safely extract PNS sensors from mffpy PNSSet object.

TODO VERSION: Remove once a mffpy release includes BEL-Public/mffpy#145.
mffpy <= 0.11.0 raises KeyError: 'conversion' when parsing pnsSet.xml files
that contain a <conversion> tag. The upstream fix was merged in
BEL-Public/mffpy#145 (Jun 12, 2026) but is not yet in a released version.
"""
try:
# Try native mffpy parsing (will work once bug is patched upstream)
sensors = pns_obj.sensors
if isinstance(sensors, dict):
return list(sensors.values())
return list(sensors)
except Exception:
# Fallback: recursively iterate through the raw XML tree to bypass namespaces
sensors = []
for sensor_el in pns_obj.root.iter():
if sensor_el.tag.endswith("sensor"):
sensor_dict = {}
for prop in sensor_el:
# Safely strip out XML namespaces
# (e.g., '{http://...}name' -> 'name')
tag = prop.tag.split("}")[-1] if "}" in prop.tag else prop.tag
if tag == "name":
sensor_dict["name"] = prop.text or ""
elif tag == "unit":
sensor_dict["unit"] = prop.text or ""
sensors.append(sensor_dict)
return sensors
129 changes: 60 additions & 69 deletions mne/io/egi/egimff.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
from ...annotations import Annotations
from ...channels.montage import make_dig_montage
from ...evoked import EvokedArray
from ...fixes import _get_mffpy_pns_sensors
from ...utils import _check_fname, _check_option, _soft_import, logger, verbose, warn
from ..base import BaseRaw
from .events import _combine_triggers, _read_events, _triage_include_exclude
from .general import (
_block_r,
_extract,
_get_blocks,
_get_ep_info,
_get_gains,
Expand All @@ -37,30 +37,32 @@

def _read_mff_header(filepath):
"""Read mff header."""
_soft_import("defusedxml", "reading EGI MFF data")
from defusedxml.minidom import parse
_soft_import("mffpy", "reading EGI MFF data")
from mffpy.xml_files import XML

all_files = _get_signalfname(filepath)
eeg_file = all_files["EEG"]["signal"]
eeg_info_file = all_files["EEG"]["info"]

info_filepath = op.join(filepath, "info.xml") # add with filepath
tags = ["mffVersion", "recordTime"]
version_and_date = _extract(tags, filepath=info_filepath)
version = ""
if len(version_and_date["mffVersion"]):
version = version_and_date["mffVersion"][0]
# 1. Parse info.xml natively via absolute path
info_filepath = op.join(filepath, "info.xml")
info_obj = XML.from_file(info_filepath)

mff_vers_elem = info_obj.find("mffVersion")
if mff_vers_elem is None:
mff_vers_elem = info_obj.find("version")
version = str(mff_vers_elem.text) if mff_vers_elem is not None else ""

rt_elem = info_obj.find("recordTime")
record_time = str(rt_elem.text) if rt_elem is not None else ""

fname = op.join(filepath, eeg_file)
signal_blocks = _get_blocks(fname)
epochs = _get_ep_info(filepath)
summaryinfo = dict(eeg_fname=eeg_file, info_fname=eeg_info_file)
summaryinfo.update(signal_blocks)

# sanity check and update relevant values
record_time = version_and_date["recordTime"][0]
# e.g.,
# 2018-07-30T10:47:01.021673-04:00
# 2017-09-20T09:55:44.072000000+01:00
g = re.match(
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.(\d{6}(?:\d{3})?)[+-]\d{2}:\d{2}", # noqa: E501
record_time,
Expand All @@ -77,14 +79,6 @@ def _read_mff_header(filepath):
raise RuntimeError(f"Could not parse epoch time {e}")
epochs[key][ei] = e // div
epochs[key] = np.array(epochs[key], np.uint64)
# I guess they refer to times in milliseconds?
# What we really need to do here is:
# epochs[key] *= signal_blocks['sfreq']
# epochs[key] //= 1000
# But that multiplication risks an overflow, so let's only multiply
# by what we need to (e.g., a sample rate of 500 means we can multiply
# by 1 and divide by 2 rather than multiplying by 500 and dividing by
# 1000)
numerator = int(signal_blocks["sfreq"])
denominator = 1000
this_gcd = math.gcd(numerator, denominator)
Expand All @@ -93,8 +87,6 @@ def _read_mff_header(filepath):
with np.errstate(over="raise"):
epochs[key] *= numerator
epochs[key] //= denominator
# Should be safe to cast to int now, which makes things later not
# upbroadcast to float
epochs[key] = epochs[key].astype(np.int64)
n_samps_block = signal_blocks["samples_block"].sum()
n_samps_epochs = (epochs["last_samps"] - epochs["first_samps"]).sum()
Expand All @@ -109,8 +101,7 @@ def _read_mff_header(filepath):
f"{list(epochs['first_samps'])}\n{list(epochs['last_samps'])}"
)
summaryinfo.update(epochs)
# index which samples in raw are actually readable from disk (i.e., not
# in a skip)

disk_samps = np.full(epochs["last_samps"][-1], -1)
offset = 0
for first, last in zip(epochs["first_samps"], epochs["last_samps"]):
Expand All @@ -119,34 +110,31 @@ def _read_mff_header(filepath):
offset += n_this
summaryinfo["disk_samps"] = disk_samps

# Add the sensor info.
sensor_layout_file = op.join(filepath, "sensorLayout.xml")
sensor_layout_obj = parse(sensor_layout_file)
# 2. Parse sensorLayout.xml natively via absolute path
sensor_layout_filepath = op.join(filepath, "sensorLayout.xml")
sensor_layout_obj = XML.from_file(sensor_layout_filepath)

summaryinfo["device"] = sensor_layout_obj.getElementsByTagName("name")[
0
].firstChild.data
sensors = sensor_layout_obj.getElementsByTagName("sensor")
summaryinfo["device"] = getattr(sensor_layout_obj, "name", "Unknown")
chan_type = list()
chan_unit = list()
n_chans = 0
numbers = list() # used for identification
for sensor in sensors:
sensortype = int(sensor.getElementsByTagName("type")[0].firstChild.data)

for sensor_dict in sensor_layout_obj.sensors.values():
sensortype = int(sensor_dict.get("type", -1))
if sensortype in [0, 1]:
sn = sensor.getElementsByTagName("number")[0].firstChild.data
sn = sn.encode()
sn = str(sensor_dict.get("number", "")).encode()
numbers.append(sn)
chan_type.append("eeg")
chan_unit.append("uV")
n_chans = n_chans + 1
n_chans += 1

if n_chans != summaryinfo["n_channels"]:
raise RuntimeError(
f"Number of defined channels ({n_chans}) did not match the "
f"expected channels ({summaryinfo['n_channels']})."
)

# Check presence of PNS data
pns_names = []
if "PNS" in all_files:
pns_fpath = op.join(filepath, all_files["PNS"]["signal"])
Expand All @@ -162,19 +150,19 @@ def _read_mff_header(filepath):
f"{list(pns_samples)}\nvs\n{list(signal_samples)}"
)

pns_file = op.join(filepath, "pnsSet.xml")
pns_obj = parse(pns_file)
sensors = pns_obj.getElementsByTagName("sensor")
# 3. Parse pnsSet.xml using the fallback shim in fixes.py
pns_set_filepath = op.join(filepath, "pnsSet.xml")
pns_obj = XML.from_file(pns_set_filepath)

pns_types = []
pns_units = []

# Route through the shim to bypass the mffpy KeyError
sensors = _get_mffpy_pns_sensors(pns_obj)

for sensor in sensors:
# sensor number:
# sensor.getElementsByTagName('number')[0].firstChild.data
name = sensor.getElementsByTagName("name")[0].firstChild.data
unit_elem = sensor.getElementsByTagName("unit")[0].firstChild
unit = ""
if unit_elem is not None:
unit = unit_elem.data
name = sensor.get("name", "")
unit = sensor.get("unit", "")

if name == "ECG":
ch_type = "ecg"
Expand All @@ -192,10 +180,11 @@ def _read_mff_header(filepath):
pns_fname=all_files["PNS"]["signal"],
pns_sample_blocks=pns_blocks,
)

summaryinfo.update(
pns_names=pns_names,
version=version,
date=version_and_date["recordTime"][0],
date=record_time,
chan_type=chan_type,
chan_unit=chan_unit,
numbers=numbers,
Expand Down Expand Up @@ -275,54 +264,56 @@ def _get_eeg_calibration_info(filepath, egi_info):

def _read_locs(filepath, egi_info, channel_naming):
"""Read channel locations."""
_soft_import("defusedxml", "reading EGI MFF data")
from defusedxml.minidom import parse
_soft_import("mffpy", "reading EGI MFF data")
from mffpy.xml_files import XML

fname = op.join(filepath, "coordinates.xml")
if not op.exists(fname):
warn("File coordinates.xml not found, not setting channel locations")
ch_names = [channel_naming % (i + 1) for i in range(egi_info["n_channels"])]
return ch_names, None

# 4. Parse coordinates.xml natively via absolute path
coordinates = XML.from_file(fname)

dig_ident_map = {
"Left periauricular point": "lpa",
"Right periauricular point": "rpa",
"Nasion": "nasion",
}
numbers = np.array(egi_info["numbers"])
coordinates = parse(fname)
sensors = coordinates.getElementsByTagName("sensor")

ch_pos = OrderedDict()
hsp = list()
nlr = dict()
ch_names = list()

for sensor in sensors:
name_element = sensor.getElementsByTagName("name")[0].firstChild
num_element = sensor.getElementsByTagName("number")[0].firstChild
name = (
channel_naming % int(num_element.data)
if name_element is None
else name_element.data
)
nr = num_element.data.encode()
coords = [
float(sensor.getElementsByTagName(coord)[0].firstChild.data)
for coord in "xyz"
]
for sensor in coordinates.sensors.values():
name = sensor.get("name")
# Fix for MFF files that explicitly write <name>None</name>
if name == "None":
name = None

num_str = str(sensor.get("number", ""))

if not name:
name = channel_naming % int(num_str)

nr = num_str.encode()
coords = [float(sensor.get(coord, 0.0)) for coord in "xyz"]
loc = np.array(coords) / 100 # cm -> m

# create dig entry
if name in dig_ident_map:
nlr[dig_ident_map[name]] = loc
else:
# id_ is the index of the channel in egi_info['numbers']
id_ = np.flatnonzero(numbers == nr)
# if it's not in egi_info['numbers'], it's a headshape point
if len(id_) == 0:
hsp.append(loc)
# not HSP, must be a data or reference channel
else:
ch_names.append(name)
ch_pos[name] = loc

mon = make_dig_montage(ch_pos=ch_pos, hsp=hsp, **nlr)
return ch_names, mon

Expand Down
Loading