import bz2
import io
import os
import re
import shlex
import warnings
import zlib
from collections import OrderedDict
from typing import IO, Any, AnyStr, Iterable, Tuple
import nrrd
from nrrd.parsers import *
from nrrd.types import IndexOrder, NRRDFieldMap, NRRDFieldType, NRRDHeader
# Older versions of Python had issues when uncompressed data was larger than 4GB (2^32). This should be fixed in latest
# version of Python 2.7 and all versions of Python 3. The fix for this issue is to read the data in smaller chunks.
# Chunk size is set to be large at 1GB to improve performance. If issues arise decompressing larger files, try to reduce
# this value
_READ_CHUNKSIZE: int = 2 ** 32
_NRRD_REQUIRED_FIELDS = ['dimension', 'type', 'encoding', 'sizes']
ALLOW_DUPLICATE_FIELD: bool = False
"""Allow duplicate header fields when reading NRRD files
When there are duplicated fields in a NRRD file header, pynrrd throws an error by default. Setting this field as
:obj:`True` will instead show a warning.
Example:
Reading a NRRD file with duplicated header field 'space' with field set to :obj:`False`.
>>> filedata, fileheader = nrrd.read('filename_duplicatedheader.nrrd')
nrrd.errors.NRRDError: Duplicate header field: 'space'
Set the field as :obj:`True` to receive a warning instead.
>>> nrrd.reader.ALLOW_DUPLICATE_FIELD = True
>>> filedata, fileheader = nrrd.read('filename_duplicatedheader.nrrd')
UserWarning: Duplicate header field: 'space' warnings.warn(dup_message)
Note:
Duplicated fields are prohibited by the NRRD file specification.
"""
_TYPEMAP_NRRD2NUMPY = {
'signed char': 'i1',
'int8': 'i1',
'int8_t': 'i1',
'uchar': 'u1',
'unsigned char': 'u1',
'uint8': 'u1',
'uint8_t': 'u1',
'short': 'i2',
'short int': 'i2',
'signed short': 'i2',
'signed short int': 'i2',
'int16': 'i2',
'int16_t': 'i2',
'ushort': 'u2',
'unsigned short': 'u2',
'unsigned short int': 'u2',
'uint16': 'u2',
'uint16_t': 'u2',
'int': 'i4',
'signed int': 'i4',
'int32': 'i4',
'int32_t': 'i4',
'uint': 'u4',
'unsigned int': 'u4',
'uint32': 'u4',
'uint32_t': 'u4',
'longlong': 'i8',
'long long': 'i8',
'long long int': 'i8',
'signed long long': 'i8',
'signed long long int': 'i8',
'int64': 'i8',
'int64_t': 'i8',
'ulonglong': 'u8',
'unsigned long long': 'u8',
'unsigned long long int': 'u8',
'uint64': 'u8',
'uint64_t': 'u8',
'float': 'f4',
'double': 'f8',
'block': 'V'
}
def _get_field_type(field: str, custom_field_map: Optional[NRRDFieldMap]) -> NRRDFieldType:
if field in ['dimension', 'lineskip', 'line skip', 'byteskip', 'byte skip', 'space dimension']:
return 'int'
elif field in ['min', 'max', 'oldmin', 'old min', 'oldmax', 'old max']:
return 'double'
elif field in ['endian', 'encoding', 'content', 'sample units', 'datafile', 'data file', 'space', 'type']:
return 'string'
elif field in ['sizes']:
return 'int list'
elif field in ['spacings', 'thicknesses', 'axismins', 'axis mins', 'axismaxs', 'axis maxs']:
return 'double list'
elif field in ['kinds', 'centerings']:
return 'string list'
elif field in ['labels', 'units', 'space units']:
return 'quoted string list'
# No int vector fields yet
# elif field in []:
# return 'int vector'
elif field in ['space origin']:
return 'double vector'
elif field in ['measurement frame']:
return 'double matrix'
elif field in ['space directions']:
return nrrd.SPACE_DIRECTIONS_TYPE
else:
if custom_field_map and field in custom_field_map:
return custom_field_map[field]
# Default the type to string if unknown type
return 'string'
def _parse_field_value(value: str, field_type: NRRDFieldType) -> Any:
if field_type == 'int':
return int(value)
elif field_type == 'double':
return float(value)
elif field_type == 'string':
return str(value)
elif field_type == 'int list':
return parse_number_list(value, dtype=int)
elif field_type == 'double list':
return parse_number_list(value, dtype=float)
elif field_type == 'string list':
return [str(x) for x in value.split()]
elif field_type == 'quoted string list':
return shlex.split(value)
elif field_type == 'int vector':
return parse_vector(value, dtype=int)
elif field_type == 'double vector':
return parse_vector(value, dtype=float)
elif field_type == 'int matrix':
return parse_matrix(value, dtype=int)
elif field_type == 'double matrix':
# For matrices of double type, parse as an optional matrix to allow for rows of the matrix to be none
# This is only valid for double matrices because the matrix is represented with NaN in the entire row
# for none rows. NaN is only valid for floating point numbers
return parse_optional_matrix(value)
elif field_type == 'int vector list':
return parse_optional_vector_list(value, dtype=int)
elif field_type == 'double vector list':
return parse_optional_vector_list(value, dtype=float)
else:
raise NRRDError(f'Invalid field type given: {field_type}')
def _determine_datatype(header: NRRDHeader) -> np.dtype:
"""Determine the numpy dtype of the data."""
# Convert the NRRD type string identifier into a NumPy string identifier using a map
np_typestring = _TYPEMAP_NRRD2NUMPY[header['type']]
# This is only added if the datatype has more than one byte and is not using ASCII encoding
# Note: Endian is not required for ASCII encoding
if np.dtype(np_typestring).itemsize > 1 and header['encoding'] not in ['ASCII', 'ascii', 'text', 'txt']:
if 'endian' not in header:
raise NRRDError('Header is missing required field: endian')
elif header['endian'] == 'big':
np_typestring = '>' + np_typestring
elif header['endian'] == 'little':
np_typestring = '<' + np_typestring
else:
raise NRRDError(f'Invalid endian value in header: {header["endian"]}')
return np.dtype(np_typestring)
def _validate_magic_line(line: str) -> int:
"""For NRRD files, the first four characters are always "NRRD", and
remaining characters give information about the file format version
>>> _validate_magic_line('NRRD0005')
8
>>> _validate_magic_line('NRRD0006')
Traceback (most recent call last):
...
NrrdError: NRRD file version too new for this library.
>>> _validate_magic_line('NRRD')
Traceback (most recent call last):
...
NrrdError: Invalid NRRD magic line: NRRD
"""
if not line.startswith('NRRD'):
raise NRRDError('Invalid NRRD magic line. Is this an NRRD file?')
try:
version = int(line[4:])
if version > 5:
raise NRRDError(f'Unsupported NRRD file version (version: {version}). This library only supports v5 '
'and below.')
except ValueError:
raise NRRDError(f'Invalid NRRD magic line: {line}')
return len(line)
[docs]def read_data(header: NRRDHeader, fh: Optional[IO] = None, filename: Optional[str] = None,
index_order: IndexOrder = 'F') -> npt.NDArray:
"""Read data from file into :class:`numpy.ndarray`
The two parameters :obj:`fh` and :obj:`filename` are optional depending on the parameters but it never hurts to
specify both. The file handle (:obj:`fh`) is necessary if the header is attached with the NRRD data. However, if
the NRRD data is detached from the header, then the :obj:`filename` parameter is required to obtain the absolute
path to the data file.
See :ref:`background/how-to-use:reading nrrd files` for more information on reading NRRD files.
Parameters
----------
header : :class:`dict` (:class:`str`, :obj:`Object`)
Parsed fields/values obtained from :meth:`read_header` function
fh : file-object, optional
File object pointing to first byte of data. Only necessary if data is attached to header.
filename : :class:`str`, optional
Filename of the header file. Only necessary if data is detached from the header. This is used to get the
absolute data path.
index_order : {'C', 'F'}, optional
Specifies the index order of the resulting data array. Either 'C' (C-order) where the dimensions are ordered
from slowest-varying to fastest-varying (e.g. (z, y, x)), or 'F' (Fortran-order) where the dimensions are
ordered from fastest-varying to slowest-varying (e.g. (x, y, z)).
Returns
-------
data : :class:`numpy.ndarray`
Data read from NRRD file
See Also
--------
:meth:`read`, :meth:`read_header`
"""
if index_order not in ['F', 'C']:
raise NRRDError('Invalid index order')
# Check that the required fields are in the header
for field in _NRRD_REQUIRED_FIELDS:
if field not in header:
raise NRRDError(f'Header is missing required field: {field}')
if header['dimension'] != len(header['sizes']):
raise NRRDError(f'Number of elements in sizes does not match dimension. Dimension: {header["dimension"]}, '
f'len(sizes): {len(header["sizes"])}')
# Determine the data type from the header
dtype = _determine_datatype(header)
# Determine the byte skip, line skip and the data file
# These all can be written with or without the space according to the NRRD spec, so we check them both
line_skip = header.get('lineskip', header.get('line skip', 0))
byte_skip = header.get('byteskip', header.get('byte skip', 0))
data_filename = header.get('datafile', header.get('data file', None))
# If the data file is separate from the header file, then open the data file to read from that instead
if data_filename is not None:
# If the pathname is relative, then append the current directory from the filename
if not os.path.isabs(data_filename):
if filename is None:
raise NRRDError('Filename parameter must be specified when a relative data file path is given')
data_filename = os.path.join(os.path.dirname(filename), data_filename)
# Override the fh parameter with the data filename
# Note that this is opened without a "with" block, thus it must be closed manually in all circumstances
fh = open(data_filename, 'rb')
# Get the total number of data points by multiplying the size of each dimension together
total_data_points = header['sizes'].prod(dtype=np.int64)
# Skip the number of lines requested when line_skip >= 0
# Irrespective of the NRRD file having attached/detached header
# Lines are skipped before getting to the beginning of the data
if line_skip >= 0:
for _ in range(line_skip):
fh.readline()
else:
# Must close the file because if the file was opened above from detached filename, there is no "with" block to
# close it for us
if data_filename is not None:
fh.close()
raise NRRDError('Invalid lineskip, allowed values are greater than or equal to 0')
# Skip the requested number of bytes or seek backward, and then parse the data using NumPy
if byte_skip < -1:
# Must close the file because if the file was opened above from detached filename, there is no "with" block to
# close it for us
if data_filename is not None:
fh.close()
raise NRRDError('Invalid byteskip, allowed values are greater than or equal to -1')
elif byte_skip >= 0:
fh.seek(byte_skip, os.SEEK_CUR)
elif byte_skip == -1 and header['encoding'] not in ['gzip', 'gz', 'bzip2', 'bz2']:
fh.seek(-dtype.itemsize * total_data_points, os.SEEK_END)
else:
# The only case left should be: byte_skip == -1 and header['encoding'] == 'gzip'
byte_skip = -dtype.itemsize * total_data_points
# If a compression encoding is used, then byte skip AFTER decompressing
if header['encoding'] == 'raw':
if isinstance(fh, io.BytesIO):
raw_data = bytearray(fh.read(total_data_points * dtype.itemsize))
data = np.frombuffer(raw_data, dtype)
else:
data = np.fromfile(fh, dtype)
elif header['encoding'] in ['ASCII', 'ascii', 'text', 'txt']:
if isinstance(fh, io.BytesIO):
data = np.fromstring(fh.read(), dtype, sep=' ')
else:
data = np.fromfile(fh, dtype, sep=' ')
else:
# Handle compressed data now
# Construct the decompression object based on encoding
if header['encoding'] in ['gzip', 'gz']:
decompobj = zlib.decompressobj(zlib.MAX_WBITS | 16)
elif header['encoding'] in ['bzip2', 'bz2']:
decompobj = bz2.BZ2Decompressor()
else:
# Must close the file because if the file was opened above from detached filename, there is no "with" block
# to close it for us
if data_filename is not None:
fh.close()
raise NRRDError(f'Unsupported encoding: {header["encoding"]}')
# Loop through the file and read a chunk at a time (see _READ_CHUNKSIZE why it is read in chunks)
decompressed_data = bytearray()
# Read all the remaining data from the file
# Obtain the length of the compressed data since we will be using it repeatedly, more efficient
compressed_data = fh.read()
compressed_data_len = len(compressed_data)
start_index = 0
# Loop through data and decompress it chunk by chunk
while start_index < compressed_data_len:
# Calculate the end index = start index plus chunk size
# Set to the string length to read the remaining chunk at the end
end_index = min(start_index + _READ_CHUNKSIZE, compressed_data_len)
# Decompress and append data
decompressed_data += decompobj.decompress(compressed_data[start_index:end_index])
# Update start index
start_index = end_index
# Delete the compressed data since we do not need it anymore
# This could potentially be using a lot of memory
del compressed_data
# Byte skip is applied AFTER the decompression. Skip first x bytes of the decompressed data and parse it using
# NumPy
data = np.frombuffer(decompressed_data[byte_skip:], dtype)
# Close the file if we opened it
if data_filename is not None:
fh.close()
if total_data_points != data.size:
raise NRRDError(f'Size of the data does not equal the product of all the dimensions: '
f'{total_data_points}-{data.size}={total_data_points - data.size}')
# In the NRRD header, the fields are specified in Fortran order, i.e, the first index is the one that changes
# fastest and last index changes slowest. This needs to be taken into consideration since numpy uses C-order
# indexing.
# The array shape from NRRD (x,y,z) needs to be reversed as numpy expects (z,y,x).
data = np.reshape(data, tuple(header['sizes'][::-1]))
# Transpose data to enable Fortran indexing if requested.
if index_order == 'F':
data = data.T
return data
[docs]def read(filename: str, custom_field_map: Optional[NRRDFieldMap] = None, index_order: IndexOrder = 'F') \
-> Tuple[npt.NDArray, NRRDHeader]:
"""Read a NRRD file and return the header and data
See :ref:`background/how-to-use:reading nrrd files` for more information on reading NRRD files.
.. note::
Users should be aware that the `index_order` argument needs to be consistent between `nrrd.read` and
`nrrd.write`. I.e., reading an array with `index_order='F'` will result in a transposed version of the
original data and hence the writer needs to be aware of this.
Parameters
----------
filename : :class:`str`
Filename of the NRRD file
custom_field_map : :class:`dict` (:class:`str`, :class:`str`), optional
Dictionary used for parsing custom field types where the key is the custom field name and the value is a
string identifying datatype for the custom field.
index_order : {'C', 'F'}, optional
Specifies the index order of the resulting data array. Either 'C' (C-order) where the dimensions are ordered
from slowest-varying to fastest-varying (e.g. (z, y, x)), or 'F' (Fortran-order) where the dimensions are
ordered from fastest-varying to slowest-varying (e.g. (x, y, z)).
Returns
-------
data : :class:`numpy.ndarray`
Data read from NRRD file
header : :class:`dict` (:class:`str`, :obj:`Object`)
Dictionary containing the header fields and their corresponding parsed value
See Also
--------
:meth:`write`, :meth:`read_header`, :meth:`read_data`
"""
with open(filename, 'rb') as fh:
header = read_header(fh, custom_field_map)
data = read_data(header, fh, filename, index_order)
return data, header