import math
import re
from copy import deepcopy
from typing import Any
from typing import Tuple
import pint
import pytest
from pydantic import ValidationError
from ..models import IcatDatasetParameters
from ..models._base.custom_types.metadata.quantity import REGISTRY
from . import compare
from .assert_raises import raises_validationerror
from .validation_error import build_validation_error_dict
from .validation_error import dimensionality_error
from .validation_error import extra_forbidden_error
from .validation_error import invalid_type_error
from .validation_error import min_length_error
from .validation_error import missing_error
from .validation_error import pattern_mismatch_error
[docs]
def test_entry(input_dict, python_dict, json_dict, nexus_dict, icat_dict):
_assert_round_trip(input_dict, python_dict, json_dict, nexus_dict, icat_dict)
[docs]
def test_value_with_units(input_dict, python_dict, json_dict, nexus_dict, icat_dict):
input_dict["PTYCHO"] = {"beamSize": (0.001, "mm")}
python_dict["PTYCHO"] = {"beamSize": REGISTRY.Quantity(1, "um")}
json_dict["PTYCHO"] = {"beamSize": 1.0}
nexus_dict["PTYCHO"] = {
"@NX_class": "NXsubentry",
"beamSize": 1.0,
"beamSize@units": "µm",
}
icat_dict["PTYCHO_beamSize"] = "1.0"
_assert_round_trip(input_dict, python_dict, json_dict, nexus_dict, icat_dict)
[docs]
def test_wrong_value(input_dict):
input_dict["PTYCHO"] = {"non_existing": 42}
errors = extra_forbidden_error(
input_value=42, field="non_existing", model_name="PTYCHO"
)
with raises_validationerror(errors):
_ = IcatDatasetParameters(**input_dict)
[docs]
def test_json_schema():
model_json_schema = IcatDatasetParameters.model_json_schema()
json_schema = model_json_schema["$defs"]["IcatPtycho"]["properties"]["beamSize"]
expected = {
"anyOf": [
{
"description": "Physical quantity represented as a [magnitude, "
"unit_string] pair. The value will be converted to "
"'µm' during validation.",
"items": [
{"description": "Magnitude of the quantity", "type": "number"},
{
"description": "Unit symbol (e.g., 'mm', 'µm', 'kg')",
"type": "string",
},
],
"maxItems": 2,
"minItems": 2,
"type": "array",
},
{"type": "null"},
],
"default": None,
"description": "Beam size on the sample in microns",
"migrate": False,
"migration_metadata": [],
"record": "final",
"include_nexus_url": False,
"rename_to": None,
"title": "Beamsize",
}
assert json_schema == expected
[docs]
@pytest.mark.parametrize(
"input_value, expected",
[
(["Alice", "Bob", "John"], ["Alice", "Bob", "John"]),
(["Alice"], ["Alice"]),
([], []),
("", []),
("Alice", ["Alice"]),
("Alice, Bob, John", ["Alice", "Bob", "John"]),
(" Alice , Bob , John ", ["Alice", "Bob", "John"]),
],
ids=[
"multi_items",
"single_item",
"empty_list",
"empty_string",
"single_string",
"multi_string",
"multi_string_spaces",
],
)
def test_comma_separated_list(
input_value, expected, input_dict, python_dict, json_dict, nexus_dict, icat_dict
):
input_dict["external_references"] = {"datacollector": {"endnote": input_value}}
python_dict["external_references"] = {"datacollector": {"endnote": expected}}
json_dict["external_references"] = {"datacollector": {"endnote": expected}}
nexus_dict["external_references"] = {
"@NX_class": "NXnote",
"datacollector": {
"@NX_class": "NXcite",
"endnote": expected,
},
}
icat_dict["ExternalReferencesDatacollector_endnote"] = ",".join(expected)
model = IcatDatasetParameters(**input_dict)
assert model.external_references.datacollector.endnote == expected
_assert_round_trip(input_dict, python_dict, json_dict, nexus_dict, icat_dict)
[docs]
def test_comma_separated_list_invalid_type(input_dict):
input_dict["external_references"] = {
"datacollector": {"endnote": bytearray(b"invalid")}
}
with raises_validationerror(
invalid_type_error(
bytearray(b"invalid"),
("external_references", "datacollector", "endnote"),
)
):
_ = IcatDatasetParameters(**input_dict)
[docs]
def test_comma_separated_list_invalid_separator(input_dict):
input_dict["external_references"] = {
"datacollector": {"endnote": ["Alice", "Bob,John"]}
}
error = ValueError("List element contains separator: list=Bob,John separator=,")
with raises_validationerror(
build_validation_error_dict(
["Alice", "Bob,John"],
error,
("external_references", "datacollector", "endnote"),
)
):
_ = IcatDatasetParameters(**input_dict)
[docs]
@pytest.mark.parametrize(
"input_value, expected",
[
(["name1"], ["name1"]),
([], []),
("", []),
("name1", ["name1"]),
(
"name1 name2 name3",
["name1", "name2", "name3"],
),
(
" name1 name2 name3 ",
["name1", "name2", "name3"],
),
(["1", "2.0"], ["1", "2.0"]),
],
ids=[
"single_item",
"empty_list",
"empty_string",
"single_string",
"multi_string",
"multi_string_spaces",
"multi_item",
],
)
def test_space_separated_list(
input_value, expected, input_dict, python_dict, json_dict, nexus_dict, icat_dict
):
input_dict["instrument"] = {"variables": {"name": input_value}}
python_dict["instrument"] = {"variables": {"name": expected}}
json_dict["instrument"] = {"variables": {"name": expected}}
nexus_dict["instrument"] = {
"@NX_class": "NXinstrument",
"variables": {"@NX_class": "NXcollection", "name": expected},
}
icat_dict["InstrumentVariables_name"] = " ".join(expected)
model = IcatDatasetParameters(**input_dict)
assert model.instrument.variables.name == expected
_assert_round_trip(input_dict, python_dict, json_dict, nexus_dict, icat_dict)
[docs]
@pytest.mark.parametrize(
"input_value, expected",
[
([1.0], [1.0]),
([1.1, 2.2, 3.3], [1.1, 2.2, 3.3]),
(1, [1.0]),
([], []),
(
"1.0 2.0 3.0 ",
[1.0, 2.0, 3.0],
),
],
ids=["single_item", "multi_items", "number", "empty_list", "multi_string"],
)
def test_space_separated_list_float(
input_value, expected, input_dict, python_dict, json_dict, nexus_dict, icat_dict
):
input_dict["instrument"] = {"beam": {"incident_wavelength_weights": input_value}}
python_dict["instrument"] = {"beam": {"incident_wavelength_weights": expected}}
json_dict["instrument"] = {"beam": {"incident_wavelength_weights": expected}}
nexus_dict["instrument"] = {
"@NX_class": "NXinstrument",
"beam": {
"@NX_class": "NXbeam",
"incident_wavelength_weights": expected,
},
}
icat_dict["InstrumentBeam_incident_wavelength_weights"] = " ".join(
map(str, expected)
)
model = IcatDatasetParameters(**input_dict)
assert model.instrument.beam.incident_wavelength_weights == expected
_assert_round_trip(input_dict, python_dict, json_dict, nexus_dict, icat_dict)
[docs]
def test_space_separated_list_invalid_type(input_dict):
input_dict["instrument"] = {"variables": {"name": bytearray(b"invalid")}}
with raises_validationerror(
invalid_type_error(bytearray(b"invalid"), ("instrument", "variables", "name"))
):
_ = IcatDatasetParameters(**input_dict)
[docs]
def test_space_separated_list_invalid_separator(input_dict):
input_dict["instrument"] = {"variables": {"name": ["name1 name2", "name3"]}}
error = ValueError("List element contains separator: list=name1 name2 separator= ")
with raises_validationerror(
build_validation_error_dict(
["name1 name2", "name3"],
error,
("instrument", "variables", "name"),
)
):
_ = IcatDatasetParameters(**input_dict)
[docs]
@pytest.mark.parametrize(
"input_value, expected",
[
(
[["Smith", "Alice", "0000-0000-0000-0000"], ["Doe", "John"]],
[["Smith", "Alice", "0000-0000-0000-0000"], ["Doe", "John"]],
),
([], []),
("", []),
(
"Smith, Alice, 0000-0000-0000-0000",
[["Smith", "Alice", "0000-0000-0000-0000"]],
),
(
"Smith, Alice, 0000-0000-0000-0000; Doe, John",
[["Smith", "Alice", "0000-0000-0000-0000"], ["Doe", "John"]],
),
(
" Smith, Alice, 0000-0000-0000-0000 ; Doe, John ",
[["Smith", "Alice", "0000-0000-0000-0000"], ["Doe", "John"]],
),
],
ids=[
"multi_items",
"empty_list",
"empty_string",
"single_string",
"multi_string",
"multi_string_spaces",
],
)
def test_doi_users_list(
input_value, expected, input_dict, python_dict, json_dict, nexus_dict, icat_dict
):
input_dict["doi_users"] = input_value
python_dict["doi_users"] = expected
json_dict["doi_users"] = expected
nexus_dict["doi_users"] = expected
icat_dict["DOI_users"] = ";".join([",".join(u) for u in expected])
model = IcatDatasetParameters(**input_dict)
assert model.doi_users == expected
_assert_round_trip(input_dict, python_dict, json_dict, nexus_dict, icat_dict)
[docs]
def test_doi_users_list_invalid_type(input_dict):
input_dict["doi_users"] = bytearray(b"invalid")
cause = TypeError("Invalid type: <class 'bytearray'>")
error = ValueError(str(cause))
error.__cause__ = cause
with raises_validationerror(
build_validation_error_dict(bytearray(b"invalid"), error, ("doi_users",))
):
_ = IcatDatasetParameters(**input_dict)
[docs]
def test_doi_users_list_invalid_separator(input_dict):
input_dict["doi_users"] = (
[["Smith", "Alice", "0000-0000-0000-0000"], ["Doe;Smith", "John"]],
)
error = ValueError("List element contains separator: list=['Doe;Smith' separator=;")
with raises_validationerror(
build_validation_error_dict(
([["Smith", "Alice", "0000-0000-0000-0000"], ["Doe;Smith", "John"]],),
error,
("doi_users",),
)
):
_ = IcatDatasetParameters(**input_dict)
[docs]
@pytest.mark.parametrize(
"input_value, expected",
[
(
[(1, 2), (3, 4)],
[(1, 2), (3, 4)],
),
([], []),
("", []),
(
"1,2",
[(1, 2)],
),
(
"1,2 3,4",
[(1, 2), (3, 4)],
),
(
" 1,2 3,4 ",
[(1, 2), (3, 4)],
),
],
ids=[
"multi_items",
"empty_list",
"empty_string",
"single_string",
"multi_string",
"multi_string_spaces",
],
)
def test_roi_list(
input_value, expected, input_dict, python_dict, json_dict, nexus_dict, icat_dict
):
input_dict["instrument"] = {"detector01": {"rois": {"value": input_value}}}
python_dict["instrument"] = {"detector01": {"rois": {"value": expected}}}
json_dict["instrument"] = {
"detector01": {"rois": {"value": list(map(list, expected))}}
}
nexus_dict["instrument"] = {
"@NX_class": "NXinstrument",
"detector01": {
"@NX_class": "NXdetector",
"rois": {"@NX_class": "NXcollection", "value": expected},
},
}
icat_dict["InstrumentDetector01Rois_value"] = " ".join(
[",".join(map(str, roi)) for roi in expected]
)
model = IcatDatasetParameters(**input_dict)
assert model.instrument.detector01.rois.value == expected
_assert_round_trip(input_dict, python_dict, json_dict, nexus_dict, icat_dict)
[docs]
def test_roi_list_invalid_type(input_dict):
input_dict["instrument"] = {
"detector01": {"rois": {"value": bytearray(b"invalid")}}
}
with raises_validationerror(
invalid_type_error(
bytearray(b"invalid"), ("instrument", "detector01", "rois", "value")
)
):
_ = IcatDatasetParameters(**input_dict)
[docs]
@pytest.mark.parametrize(
"input_value, expected",
[
(([10, 20, 30], "keV"), REGISTRY.Quantity([10.0, 20.0, 30.0], "keV")),
(([0.1, 0.2], "MeV"), REGISTRY.Quantity([100.0, 200.0], "keV")),
(([], "MeV"), REGISTRY.Quantity([], "keV")),
(10, REGISTRY.Quantity([10.0], "keV")),
("10", REGISTRY.Quantity([10.0], "keV")),
(["10", "20"], REGISTRY.Quantity([10.0, 20.0], "keV")),
],
ids=[
"same_unit",
"unit_conversion",
"empty",
"scalar",
"scalar_string",
"strings",
],
)
def test_incident_energy_array(
input_value,
expected: pint.Quantity,
input_dict,
python_dict,
json_dict,
nexus_dict,
icat_dict,
):
input_dict["instrument"] = {"beam": {"incident_energy": input_value}}
python_dict["instrument"] = {"beam": {"incident_energy": expected}}
json_dict["instrument"] = {"beam": {"incident_energy": expected.magnitude.tolist()}}
nexus_dict["instrument"] = {
"@NX_class": "NXinstrument",
"beam": {
"@NX_class": "NXbeam",
"incident_energy": expected.magnitude,
"incident_energy@units": "keV",
},
}
icat_dict["InstrumentBeam_incident_energy"] = " ".join(
str(x) for x in expected.magnitude
)
model = IcatDatasetParameters(**input_dict)
q = model.instrument.beam.incident_energy
assert q is not None
assert q.magnitude.tolist() == expected.magnitude.tolist()
assert q.units == expected.units
_assert_round_trip(input_dict, python_dict, json_dict, nexus_dict, icat_dict)
def _assert_round_trip(
input_dict: dict,
python_dict: dict,
json_dict: dict,
nexus_dict: dict,
icat_dict: dict,
):
original_input_dict = deepcopy(input_dict)
original_python_dict = deepcopy(python_dict)
original_json_dict = deepcopy(json_dict)
original_nexus_dict = deepcopy(nexus_dict)
original_icat_dict = deepcopy(icat_dict)
model = IcatDatasetParameters(**input_dict)
# Check serialization
dumped_dict = model.model_dump(mode="python", exclude_unset=True, by_alias=True)
compare.assert_equal_serialize_models(dumped_dict, python_dict)
dumped_dict = model.model_dump(mode="json", exclude_unset=True, by_alias=True)
compare.assert_equal_serialize_models(dumped_dict, json_dict)
dumped_dict = model.to_nexus_dict()
compare.assert_equal_serialize_models(dumped_dict, nexus_dict)
dumped_dict = model.to_icat_dict()
compare.assert_equal_serialize_models(dumped_dict, icat_dict)
# Check validation
new_model = IcatDatasetParameters(**python_dict)
compare.assert_equal_pydantic_model(new_model, model)
new_model = IcatDatasetParameters(**json_dict)
compare.assert_equal_pydantic_model(new_model, model)
new_model = IcatDatasetParameters.from_nexus_dict(nexus_dict)
compare.assert_equal_pydantic_model(new_model, model)
new_model = IcatDatasetParameters.from_icat_dict(icat_dict)
compare.assert_equal_pydantic_model(new_model, model)
# Check that inputs are not modified in-place
compare.assert_equal_serialize_models(original_input_dict, input_dict)
compare.assert_equal_serialize_models(original_python_dict, python_dict)
compare.assert_equal_serialize_models(original_json_dict, json_dict)
compare.assert_equal_serialize_models(original_nexus_dict, nexus_dict)
compare.assert_equal_serialize_models(original_icat_dict, icat_dict)
[docs]
@pytest.mark.parametrize(
"old_field, new_field, old_value, expected_value, expected_msg",
[
pytest.param(
("HOLO", "pixelSize"),
("HOLO", "pixel_size"),
5.0,
(5000.0, "nm"),
re.escape(
"HOLO.pixelSize (HOLO_pixelSize) is deprecated. "
"Use HOLO.pixel_size (HOLO_pixel_size) instead."
),
id="pixelSize",
),
pytest.param(
("HOLO", "sampleDetectorDistances"),
("HOLO", "sample_detector_distances"),
"100.0",
(100.0, "mm"),
re.escape(
"HOLO.sampleDetectorDistances (HOLO_holoSampleDetectorDistances) is deprecated. "
"Use HOLO.sample_detector_distances (HOLO_sample_detector_distances) instead."
),
id="sampleDetectorDistances",
),
pytest.param(
("HOLO", "sourceSampleDistances"),
("HOLO", "source_sample_distances"),
"200.0",
(200.0, "mm"),
re.escape(
"HOLO.sourceSampleDistances (HOLO_holoSourceSampleDistances) is deprecated. "
"Use HOLO.source_sample_distances (HOLO_source_sample_distances) instead."
),
id="sourceSampleDistances",
),
pytest.param(
("sample", "paleo", "scientific_domain"),
("sample", "subject"),
"protein crystals",
"protein crystals",
re.escape(
"sample.paleo.scientific_domain (SamplePaleo_scientific_domain) is deprecated. "
"Use sample.subject (Sample_subject) instead."
),
id="sample.paleo.scientific_domain->sample.subject",
),
],
)
def test_rename_deprecated_fields(
old_field: Tuple[str, ...],
new_field: Tuple[str, ...],
old_value: Any,
expected_value: Any,
expected_msg: str,
ignore_pydantic_deprecation_warnings,
input_dict,
):
def set_value(location, value):
d = input_dict
for key in location[:-1]:
d = d.setdefault(key, {})
d[location[-1]] = value
set_value(old_field, old_value)
model = IcatDatasetParameters(**input_dict)
def get_value(location):
obj = model
for key in location:
obj = getattr(obj, key)
return obj
new_field_value = get_value(new_field)
with pytest.warns(DeprecationWarning, match=expected_msg):
old_field_value = get_value(old_field)
assert old_field_value is None
assert new_field_value is not None
if isinstance(new_field_value, pint.Quantity):
assert new_field_value.magnitude == pytest.approx(expected_value[0])
assert str(new_field_value.units) == expected_value[1]
else:
assert new_field_value == expected_value
[docs]
def test_icat_fields_non_renamed():
all_fields = IcatDatasetParameters.icat_fields()
non_renamed = IcatDatasetParameters.icat_fields_non_renamed()
for icat_field_info in non_renamed.values():
assert icat_field_info in all_fields.values()
for icat_field_info in non_renamed.values():
assert not icat_field_info.is_renamed
[docs]
@pytest.mark.parametrize(
"input_value, output_value",
[
# literal
pytest.param(True, True, id="true"),
pytest.param(False, False, id="false"),
# JSON booleans
pytest.param("true", True, id="'true'"),
pytest.param("false", False, id="'false'"),
# numeric forms
pytest.param(1, True, id="1"),
pytest.param(0, False, id="0"),
pytest.param("1", True, id="'1'"),
pytest.param("0", False, id="'0'"),
# textual synonyms
pytest.param("yes", True, id="'yes'"),
pytest.param("no", False, id="'no'"),
# case variations
pytest.param("True", True, id="'True'"),
pytest.param("False", False, id="'False'"),
pytest.param("TRUE", True, id="'TRUE'"),
pytest.param("FALSE", False, id="'FALSE'"),
pytest.param("YES", True, id="'YES'"),
pytest.param("NO", False, id="'NO'"),
],
)
def test_booleans(input_value, output_value, input_dict):
input_dict["complete"] = input_value
model = IcatDatasetParameters(**input_dict)
assert model.complete is output_value
icat_value = model.to_icat_dict()["complete"]
expected = "true" if output_value else "false"
assert icat_value == expected
[docs]
@pytest.mark.parametrize(
"input_value, expected",
[
(["Keyword1"], ["Keyword1"]),
([], []),
("", []),
("Keyword1", ["Keyword1"]),
(
"Keyword1;Keyword2;Keyword3",
["Keyword1", "Keyword2", "Keyword3"],
),
],
ids=[
"single_item",
"empty_list",
"empty_string",
"single_string",
"multi_string",
],
)
def test_semicolon_separated_list(
input_value, expected, input_dict, python_dict, json_dict, nexus_dict, icat_dict
):
input_dict["doi_keywords"] = input_value
python_dict["doi_keywords"] = expected
json_dict["doi_keywords"] = expected
nexus_dict["doi_keywords"] = expected
icat_dict["DOI_keywords"] = ";".join(expected)
model = IcatDatasetParameters(**input_dict)
assert model.doi_keywords == expected
_assert_round_trip(input_dict, python_dict, json_dict, nexus_dict, icat_dict)
[docs]
def test_semicolon_separated_list_invalid_type(input_dict):
input_dict["doi_keywords"] = bytearray(b"invalid")
with raises_validationerror(
invalid_type_error(bytearray(b"invalid"), ("doi_keywords",))
):
_ = IcatDatasetParameters(**input_dict)
[docs]
def test_semicolon_separated_list_invalid_separator(input_dict):
input_dict["doi_keywords"] = ["Keyword1;Keyword2", "Keyword3"]
error = ValueError(
"List element contains separator: list=Keyword1;Keyword2 separator=;"
)
with raises_validationerror(
build_validation_error_dict(
["Keyword1;Keyword2", "Keyword3"], error, ("doi_keywords",)
)
):
_ = IcatDatasetParameters(**input_dict)
[docs]
@pytest.mark.parametrize(
"field_name, min_length",
[
("doi_title", 40),
("doi_abstract", 400),
],
)
def test_min_length(field_name, min_length, input_dict):
input_dict[field_name] = "a" * (min_length - 1)
with raises_validationerror(
min_length_error(input_dict[field_name], min_length, (field_name,))
):
_ = IcatDatasetParameters(**input_dict)
input_dict[field_name] = "a" * min_length
model = IcatDatasetParameters(**input_dict)
assert getattr(model, field_name) == "a" * min_length
[docs]
def test_doi_pattern(input_dict):
input_dict["doi_citation_publication_doi"] = "invalid-doi"
pattern = r"^10\.\d{4,9}/[\w()\-\./:;]+$"
with raises_validationerror(
pattern_mismatch_error(
"invalid-doi", pattern, ("doi_citation_publication_doi",)
)
):
_ = IcatDatasetParameters(**input_dict)
input_dict["doi_citation_publication_doi"] = "10.1234/abc123"
model = IcatDatasetParameters(**input_dict)
assert model.doi_citation_publication_doi == "10.1234/abc123"
[docs]
def test_icat_field_names_with_validation_error(input_dict):
input_dict["instrument"] = {
"beam": {"incident_energy": (7.1, "keV"), "distance": (5, "s")}
}
input_dict["sample"] = {"patient": {"weight": (70, "m"), "size": (197, "cm")}}
errors = (
missing_error(
{"patient": {"size": (197, "cm"), "weight": (70, "m")}}, ("sample", "name")
)
+ dimensionality_error(
(70, "m"),
"meter",
"kilogram",
"[length]",
"[mass]",
pydantic_location=("sample", "patient", "weight"),
)
+ dimensionality_error(
(5, "s"),
"second",
"meter",
"[time]",
"[length]",
pydantic_location=("instrument", "beam", "distance"),
)
)
with raises_validationerror(errors) as exc_info:
_ = IcatDatasetParameters(**input_dict)
names = IcatDatasetParameters.icat_field_names_with_validation_error(exc_info.value)
expected = ["Sample_name", "SamplePatient_weight", "InstrumentBeam_distance"]
assert names == expected
[docs]
def test_motor_position_list(input_dict, python_dict):
motor_values = ["1", "2.5", "ERR", "None", "3e3"]
expected_floats = [1.0, 2.5, float("nan"), float("nan"), 3000.0]
expected_icat = "1.0 2.5 nan nan 3000.0"
input_dict["instrument"] = {
"primary_slit": {"vertical_gap": " ".join(motor_values)}
}
python_dict["instrument"] = {"primary_slit": {"vertical_gap": expected_floats}}
model = IcatDatasetParameters(**input_dict)
for actual, expected in zip(
model.instrument.primary_slit.vertical_gap, expected_floats
):
if isinstance(expected, float) and math.isnan(expected):
assert math.isnan(actual)
else:
assert actual == expected
icat_str = model.to_icat_dict()["InstrumentSlitPrimary_vertical_gap"]
assert icat_str == expected_icat
[docs]
def test_motor_position_invalid_str(input_dict):
motor_values = ["1", "Unknown"]
input_dict["instrument"] = {
"primary_slit": {"vertical_gap": " ".join(motor_values)}
}
with pytest.raises(ValueError, match="Invalid motor position: Unknown"):
IcatDatasetParameters(**input_dict)