Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 139 additions & 121 deletions src/murfey/client/contexts/fib.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,19 @@ def _parse_boolean(text: str):
)


def _is_manual_autotem(file_path: Path):
def _get_project_name(file_path: Path):
"""
Checks if the file being analysed belongs to a manual AutoTEM project.
Manual AutoTEM projects are stored in project folders that start with 'AutoTEM_'.
Get the project name from the file path. This is used in manual AutoTEM
workflows to identify the folder containing the images and site metadata
to register.
"""
try:
autotem_idx = file_path.parts.index("autotem")
project_dir = file_path.parents[-(autotem_idx + 2)]
return project_dir.stem.startswith("AutoTEM_")
return project_dir.stem
except Exception:
logger.error(
f"Error checking if file {file_path} belongs to a manual AutoTEM project:",
f"Error extracting project name from file path {file_path}:",
exc_info=True,
)
return None
Expand Down Expand Up @@ -245,6 +246,8 @@ def __init__(
super().__init__("FIBContext", acquisition_software, token)
self._basepath = basepath
self._machine_config = machine_config
self._project_data: dict[str, Path] = {}
self._target_projects: list[str] = []
self._site_info: dict[int, LamellaSiteInfo] = {}
self._drift_correction_images: dict[int, FIBImage] = {}

Expand All @@ -263,98 +266,52 @@ def post_transfer(
# AutoTEM
# -----------------------------------------------------------------------------
if self._acquisition_software == "autotem":
# Apply different logic depending of whether it's auto/manual AutoTEM
if (is_manual_autotem := _is_manual_autotem(transferred_file)) is None:
# Skip processing file if the check fails
# Extract current project name from file path
project_name = _get_project_name(transferred_file)
if project_name is None:
# Early exit if the check fails
return None
elif is_manual_autotem:
# Logic for handling manual AutotTEM will evenutlaly go here
return None
else:
# Extract metadata from fully automated AutoTEM projects
if transferred_file.name == "ProjectData.dat":
logger.info(f"Found metadata file {transferred_file} for parsing")

# Parse the metadata file
all_site_info_new = self._parse_autotem_metadata(transferred_file)
for site_num, site_info_new in all_site_info_new.items():
# Post the data to the backend if it's been changed
if (
data := site_info_new.model_dump(exclude_none=True)
) != self._site_info.get(
site_num, LamellaSiteInfo()
).model_dump(exclude_none=True):
capture_post(
base_url=str(environment.url.geturl()),
router_name="workflow_fib.router",
function_name="register_fib_milling_progress",
token=self._token,
instrument_name=environment.instrument_name,
data=data,
# Endpoint kwargs
session_id=environment.murfey_session,
)

# Update existing dict
self._site_info[site_num] = site_info_new
logger.info(f"Updating metadata for site {site_num}")

# Post drift correction GIF request if it hasn't already been done
fib_image = self._drift_correction_images.get(site_num, None)
if fib_image is not None and not fib_image.is_submitted:
# Construct the output file name if it doesn't already exist
if (output_file := fib_image.output_file) is None:
source = _get_source(transferred_file, environment)
if source is None:
logger.warning(
f"No source found for file {transferred_file}"
)
continue
destination_file = _file_transferred_to(
environment=environment,
source=source,
file_path=transferred_file,
rsync_basepath=Path(
self._machine_config.get("rsync_basepath", "")
),
)
if destination_file is None:
logger.warning(
f"Could not find destination file path for {transferred_file.name!r}"
)
continue
output_dir = self._determine_output_dir(
site_num, destination_file, environment
)
if output_dir is None:
logger.warning(
f"Could not determine output directory for lamella {site_num}"
)
continue
output_file = (
output_dir
/ "drift_correction"
/ f"lamella_{site_num}.gif"
)
with lock:
self._drift_correction_images[
site_num
].output_file = output_file
# Reload the new object
fib_image = self._drift_correction_images[site_num]

if self._make_gif(
environment=environment,
lamella_number=site_num,
images=sorted(fib_image.images),
output_file=output_file,
):
with lock:
self._drift_correction_images[
site_num
].is_submitted = True
# Store incoming ProjectData.dat files in memory
if (
transferred_file.name == "ProjectData.dat"
and self._project_data.get(project_name) is None
):
self._project_data[project_name] = transferred_file

# Identify if the current file's project is to be registered
if project_name not in self._target_projects:
if not any(
pattern in str(transferred_file)
for pattern in (
"/DCImages/",
"/LamellaEvaluationImages/",
"/Sites/Lamella",
)
):
# Early exit if the file is not from a relevant project
return None
# Mark project folder for analysis
self._target_projects.append(project_name)
logger.info(
f"AutoTEM project {project_name!r} identified for registration"
)

# Analyse file and trigger processing only from target projects
if project_name in self._target_projects:
# Perform first-time metadata extraction using stored "ProjectData.dat" file
if self._project_data.get(project_name) and not self._site_info:
project_data = self._project_data[project_name]
logger.info(
f"Performing initial metadata extraction from {project_data}"
)
self._handle_autotem_metadata(project_data, environment)
# Extract metadata directly from "ProjectData.dat" on subsequent runs
if transferred_file.name == "ProjectData.dat" and self._site_info:
logger.info(f"Found metadata file {transferred_file} for parsing")
self._handle_autotem_metadata(transferred_file, environment)
return None
# Compile and register drift correction images
elif (
"DCImages" in transferred_file.parts
and transferred_file.suffix == ".png"
Expand Down Expand Up @@ -554,14 +511,59 @@ def _determine_output_dir(
)
return None

def _handle_autotem_metadata(
self, file: Path, environment: MurfeyInstanceEnvironment
):
"""
Helper function to extract the AutoTEM metadata, update the stored FIB lamella
site info, and trigger relevant processing.
"""

# Extract all site info
all_site_info_new = self._parse_autotem_metadata(file)

# Parse the metadata file
for site_num, site_info_new in all_site_info_new.items():
# Post the data to the backend if it's been changed
if (
data := site_info_new.model_dump(exclude_none=True)
) != self._site_info.get(site_num, LamellaSiteInfo()).model_dump(
exclude_none=True
):
capture_post(
base_url=str(environment.url.geturl()),
router_name="workflow_fib.router",
function_name="register_fib_milling_progress",
token=self._token,
instrument_name=environment.instrument_name,
data=data,
# Endpoint kwargs
session_id=environment.murfey_session,
)

# Update existing dict
self._site_info[site_num] = site_info_new
logger.info(f"Updating metadata for site {site_num}")

# Post drift correction GIF request if it hasn't already been done
fib_image = self._drift_correction_images.get(site_num, None)
if fib_image is not None and not fib_image.is_submitted:
self._make_drift_correction_gif(
fib_image.images[-1],
environment,
is_destination_file=True,
)
return None

def _make_drift_correction_gif(
self,
file: Path,
environment: MurfeyInstanceEnvironment,
is_destination_file: bool = False,
):
"""
Helper function to create GIFs using the drift correction images seen by the
FIBContext class. The function uses the metadata returned
FIBContext class. The function uses the metadata extracted from the
"""
parts = file.parts
try:
Expand All @@ -572,46 +574,62 @@ def _make_drift_correction_gif(
f"Could not extract metadata from file {file}", exc_info=True
)
return None
source = _get_source(file, environment)
if source is None:
logger.warning(f"No source found for file {file}")
return
destination_file = _file_transferred_to(
environment=environment,
source=source,
file_path=file,
rsync_basepath=Path(self._machine_config.get("rsync_basepath", "")),
)
if destination_file is None:
logger.warning(f"Could not find destination file path for {file.name!r}")
return

# If the file provided is client-side, construct the destination file path
if not is_destination_file:
source = _get_source(file, environment)
if source is None:
logger.warning(f"No source found for file {file}")
return
destination_file = _file_transferred_to(
environment=environment,
source=source,
file_path=file,
rsync_basepath=Path(self._machine_config.get("rsync_basepath", "")),
)
if destination_file is None:
logger.warning(
f"Could not find destination file path for {file.name!r}"
)
return
else:
destination_file = file

# Create FIBImage instance for this lamella site, or update existing one
if not self._drift_correction_images.get(lamella_number):
with lock:
self._drift_correction_images[lamella_number] = FIBImage(
images=[destination_file]
)
else:
# Only update list if the file is not already in it
elif (
destination_file not in self._drift_correction_images[lamella_number].images
):
with lock:
self._drift_correction_images[lamella_number].images.append(
destination_file
)
self._drift_correction_images[lamella_number].is_submitted = False
if (
output_dir := self._determine_output_dir(
lamella_number,
destination_file,
environment,
)
) is None:
logger.warning(
f"Could not determine output directory for lamella {lamella_number}"

# If output GIF file path has not already been determined, construct it
output_file = self._drift_correction_images[lamella_number].output_file
if output_file is None:
if (
output_dir := self._determine_output_dir(
lamella_number,
destination_file,
environment,
)
) is None:
logger.warning(
f"Could not determine output directory for lamella {lamella_number}"
)
return None
output_file = (
output_dir / "drift_correction" / f"lamella_{lamella_number}.gif"
)
return None
output_file = output_dir / "drift_correction" / f"lamella_{lamella_number}.gif"
with lock:
self._drift_correction_images[lamella_number].output_file = output_file
with lock:
self._drift_correction_images[lamella_number].output_file = output_file

# Submit job to backend to construct a GIF
if self._make_gif(
Expand Down
Loading