Custom Nodes¶
NeuroDAGs lets you define new computation nodes without modifying or forking the package. Custom nodes are registered at runtime via a Python module pointed to by new_definitions in pipeline.yml.
The built-in nodes target EEG/MEG/ECG via MNE-Python and xarray, but the node system is domain-agnostic. If your data comes in files — audio, fMRI volumes, tabular CSVs, images, genomics — you can use NeuroDAGs orchestration with entirely custom nodes and loaders. The caching, dependency ordering, HPC templates, and dataframe assembly work identically regardless of what the nodes do.
Key Ideas¶
A node is a Python function.
It returns a
NodeResult.A
NodeResultcontainsartifacts— a dict mapping file extension toArtifact(item, writer).
Minimal Example¶
# custom_nodes.py
from neurodags.nodes import register_node
from neurodags.definitions import Artifact, NodeResult
@register_node
def my_node(data) -> NodeResult:
result = compute(data)
return NodeResult(
artifacts={
".nc": Artifact(
item=result,
writer=lambda path: result.to_netcdf(path),
),
},
)
Multiple Artifacts¶
A node can produce more than one artifact:
@register_node
def my_node_with_report(data) -> NodeResult:
result = compute(data)
report = make_html_report(result)
return NodeResult(
artifacts={
".nc": Artifact(
item=result,
writer=lambda path: result.to_netcdf(path),
),
".report.html": Artifact(
item=report,
writer=lambda path: report.save(path),
),
},
)
Custom Node Name¶
By default the node is registered under the function name. Override it:
@register_node(name="my_custom_name", override=True)
def _internal_function_name(data) -> NodeResult:
...
override=True allows re-registering a name (e.g. in tests or hot-reload scenarios).
Artifact Types¶
The item field holds the in-memory object; writer is a callable that receives a full file path and saves it. Common patterns:
# xarray DataArray → NetCDF (recommended for numerical results)
".nc": Artifact(item=da, writer=lambda path: da.to_netcdf(path))
# MNE object → FIF
".fif": Artifact(item=raw, writer=lambda path: raw.save(path, overwrite=True))
# Pandas DataFrame → CSV
".csv": Artifact(item=df, writer=lambda path: df.to_csv(path))
# Any object → pickle
".pkl": Artifact(item=obj, writer=lambda path: pickle.dump(obj, open(path, "wb")))
xarray Convention¶
For numerical derivatives, prefer returning an xr.DataArray or xr.Dataset saved as .nc. This enables:
Dimension-aware caching
Automatic dataframe assembly via
build_derivative_dataframeVisualization in the built-in Dash-Plotly explorer
Language-agnostic downstream analysis
Registering in pipeline.yml¶
new_definitions: custom_nodes.py
# or multiple files:
new_definitions:
- custom_nodes/nodes_a.py
- /abs/path/to/nodes_b.py
Paths relative to the pipeline YAML. Each module is imported once before any derivative executes.
Using Custom Nodes in Derivatives¶
DerivativeDefinitions:
MyResult:
nodes:
- id: 0
derivative: SourceFile
- id: 1
node: my_node # matches the registered name
args:
data: id.0
threshold: 0.5
Built-in Node Registry API¶
from neurodags.nodes import register_node, get_node, list_nodes, iter_nodes
# List all registered nodes
print(list_nodes())
# Retrieve a node by name
fn = get_node("basic_preprocessing")
# Iterate all (name, function) pairs
for name, fn in iter_nodes():
print(name, fn)
Using NeuroDAGs Beyond EEG/MEG¶
Any per-file analysis pipeline works. You need:
A glob pattern that discovers your input files (
file_patternindatasets.yml)Nodes that load and process your data and return a
NodeResult
The SourceFile pseudo-derivative gives each node the raw file path as a string — your loader node reads it however it likes.
Example: CSV time-series files → per-file statistics → assembled dataframe.
# custom_nodes.py
import pandas as pd
import xarray as xr
from neurodags.nodes import register_node
from neurodags.definitions import Artifact, NodeResult
@register_node
def load_csv_timeseries(file_path: str) -> NodeResult:
df = pd.read_csv(file_path, index_col="time")
da = xr.DataArray(df.values, dims=("time", "channel"),
coords={"time": df.index, "channel": df.columns})
return NodeResult(
artifacts={".nc": Artifact(item=da, writer=lambda p: da.to_netcdf(p))}
)
@register_node
def channel_statistics(data) -> NodeResult:
da = data.artifacts[".nc"].item
stats = da.mean("time").to_dataset(name="mean")
stats["std"] = da.std("time")
return NodeResult(
artifacts={".nc": Artifact(item=stats, writer=lambda p: stats.to_netcdf(p))}
)
# datasets.yml
my_study:
name: MyStudy
file_pattern: data/**/*.csv
derivatives_path: derivatives/
# pipeline.yml
new_definitions: custom_nodes.py
DerivativeList:
- Timeseries
- Stats
DerivativeDefinitions:
Timeseries:
nodes:
- id: 0
derivative: SourceFile
- id: 1
node: load_csv_timeseries
args:
file_path: id.0
Stats:
for_dataframe: true
nodes:
- id: 0
derivative: Timeseries.nc
- id: 1
node: channel_statistics
args:
data: id.0
Everything else — caching, dependency ordering, joblib parallelism, SLURM templates, dataframe assembly — works identically to an EEG pipeline.