Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions tests/test_trajectory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import tempfile
from collections.abc import Callable, Generator
from pathlib import Path

Expand Down Expand Up @@ -834,3 +835,231 @@ def test_write_ase_trajectory_importerror(
with pytest.raises(ImportError, match="ASE is required to convert to ASE trajectory"):
traj.write_ase_trajectory(tmp_path / "dummy.traj")
traj.close()


def test_optimize_append_to_trajectory(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""Test appending to an existing trajectory when running ts.optimize."""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [f"{temp_dir}/optimize_trajectory_{idx}.h5" for idx in range(2)]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
)

# First optimization run
opt_state = ts.optimize(
system=si_double_sim_state,
model=lj_model,
max_steps=5,
optimizer=ts.Optimizer.fire,
trajectory_reporter=trajectory_reporter,
steps_between_swaps=100,
)

for traj in trajectory_reporter.trajectories:
with TorchSimTrajectory(traj.filename, mode="r") as traj:
# Check that the trajectory file has 5 frames
np.testing.assert_allclose(traj.get_steps("positions"), range(1, 6))

trajectory_reporter_2 = ts.TrajectoryReporter(
traj_files, state_frequency=1, trajectory_kwargs=dict(mode="a")
)
_ = ts.optimize(
system=opt_state,
model=lj_model,
max_steps=7,
optimizer=ts.Optimizer.fire,
trajectory_reporter=trajectory_reporter_2,
steps_between_swaps=100,
)
for traj in trajectory_reporter_2.trajectories:
with TorchSimTrajectory(traj.filename, mode="r") as traj:
# Check that the trajectory file now has 7 frames
np.testing.assert_allclose(traj.get_steps("positions"), range(1, 8))


def test_integrate_append_to_trajectory(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""Test appending to an existing trajectory when running ts.integrate."""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [f"{temp_dir}/integrate_trajectory_{idx}.h5" for idx in range(2)]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
)

# First integration run
int_state = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
n_steps=5,
temperature=300.0,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter,
)

for traj in trajectory_reporter.trajectories:
with TorchSimTrajectory(traj.filename, mode="r") as traj:
# Check that the trajectory file has 5 frames
np.testing.assert_allclose(traj.get_steps("positions"), range(1, 6))

trajectory_reporter_2 = ts.TrajectoryReporter(
traj_files, state_frequency=1, trajectory_kwargs=dict(mode="a")
)
# Nothing to do here, as we already have step 5 in the trajectory
# and n_steps is 5.
state_2 = ts.integrate(
system=int_state,
model=lj_model,
timestep=0.001,
temperature=300.0,
n_steps=5,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter_2,
)
torch.testing.assert_close(state_2.positions, int_state.positions)
# run two (7 - 5) more steps of integration.
_ = ts.integrate(
system=int_state,
model=lj_model,
timestep=0.001,
temperature=300.0,
n_steps=7,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter_2,
)
for traj in trajectory_reporter_2.trajectories:
with TorchSimTrajectory(traj.filename, mode="r") as traj:
# Check that the trajectory file now has 7 frames
np.testing.assert_allclose(traj.get_steps("positions"), range(1, 8))


def test_truncate_trajectory(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""
Test trajectory.truncate_to_step().
"""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [f"{temp_dir}/truncate_trajectory_{idx}.h5" for idx in range(2)]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
prop_calculators={1: {"velocities": lambda state: state.velocities}},
)

# First integration run for 5 steps.
_ = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
n_steps=5,
temperature=300.0,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter,
)

# Manually remove last two frames from second trajectory to create unevenness
with TorchSimTrajectory(traj_files[1], mode="a") as traj:
traj.truncate_to_step(3)
# Verify that it has 3 frames now.
for array_name in traj.array_registry:
target_length = 3
target_steps = [1, 2, 3]
# Special cases: global arrays
if array_name in ["atomic_numbers", "masses"]:
target_length = 1
target_steps = [0]
if array_name == "pbc":
target_length = 3
target_steps = [0]
assert len(traj.get_array(array_name)) == target_length
np.testing.assert_allclose(traj.get_steps(array_name), target_steps)
with pytest.raises(
ValueError,
match=(
"Cannot truncate to a step greater than the last step. "
"self.last_step=3 < step=10"
),
):
traj.truncate_to_step(10)
with pytest.raises(
ValueError, match="Step must be larger than 0. Got step=0"
):
traj.truncate_to_step(0)


def test_integrate_uneven_trajectory_append(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""
Test appending to an existing trajectory with uneven frames running ts.integrate.
Expected behavior: ts.integrate should first truncate all trajectories to the shortest
length, and then append new frames to all trajectories.
"""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [
f"{temp_dir}/uneven_integrate_trajectory_{idx}.h5" for idx in range(2)
]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
prop_calculators={1: {"velocities": lambda state: state.velocities}},
)

# First integration run for 5 steps.
_ = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
n_steps=5,
temperature=300.0,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter,
)

# Manually remove last two frames from second trajectory to create unevenness
with TorchSimTrajectory(traj_files[1], mode="a") as traj:
traj.truncate_to_step(3)

trajectory_reporter_2 = ts.TrajectoryReporter(
traj_files, state_frequency=1, trajectory_kwargs=dict(mode="a")
)
# Continue integration for one step, which means we first
# truncate the first trajectory to 3 steps to match the second one,
# and then append 4th step to both.
_ = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
temperature=300.0,
n_steps=4,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter_2,
)

for traj in trajectory_reporter_2.trajectories:
# both trajectories should have 4 frames now.
with TorchSimTrajectory(traj.filename, mode="r") as traj:
expected_steps = [1, 2, 3, 4]
np.testing.assert_allclose(traj.get_steps("positions"), expected_steps)
Loading