Skip to content

Commit

Permalink
reimplement temp directory creation so that each process gets its own…
Browse files Browse the repository at this point in the history
… dir thats automatically cleaned up when the process completes
  • Loading branch information
sophiamaedler committed Apr 28, 2024
1 parent e774fec commit 967c2e4
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 62 deletions.
6 changes: 3 additions & 3 deletions example_data/example_5/config_example5.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Cytosol_Cellpose_TimecourseSegmentation:
output_masks: 2
shard_size: 4000000 # Average number of pixel per tile. 10.000 * 10.000 pixel are recommended. Can be adapted to memory and computation needs.
chunk_size: 50 # chunk size for chunked HDF5 storage. is needed for correct caching and high performance reading. should be left at 50.
cache: "."
cache: "/Users/sophia/Documents/GitHub/SPARCSpy/development/cache"
lower_quantile_normalization: 0.001
upper_quantile_normalization: 0.999
median_filter_size: 4 # Size in pixels
Expand All @@ -23,7 +23,7 @@ Multithreaded_Cytosol_Cellpose_TimecourseSegmentation:
shard_size: 4000000 # Average number of pixel per tile. 10.000 * 10.000 pixel are recommended. Can be adapted to memory and computation needs.
chunk_size: 50 # chunk size for chunked HDF5 storage. is needed for correct caching and high performance reading. should be left at 50.
threads: 5 # number of shards / tiles segmented at the same size. should be adapted to the maximum amount allowed by memory.
cache: "."
cache: "/Users/sophia/Documents/GitHub/SPARCSpy/development/cache"
lower_quantile_normalization: 0.001
upper_quantile_normalization: 0.999
median_filter_size: 4 # Size in pixels
Expand All @@ -37,7 +37,7 @@ TimecourseHDF5CellExtraction:
compression: True
threads: 80 # threads used in multithreading
image_size: 128 # image size in pixel
cache: "."
cache: "/Users/sophia/Documents/GitHub/SPARCSpy/development/cache"
hdf5_rdcc_nbytes: 5242880000 # 5gb 1024 * 1024 * 5000
hdf5_rdcc_w0: 1
hdf5_rdcc_nslots: 50000
36 changes: 35 additions & 1 deletion src/sparcscore/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import warnings
import shutil
import tempfile


class Logable(object):
Expand Down Expand Up @@ -102,7 +103,7 @@ def __init__(
self.directory = directory
self.project_location = project_location
self.config = config

self.create_temp_dir()

def __call__(
self, *args, debug=None, intermediate_output=None, overwrite=None, **kwargs
Expand Down Expand Up @@ -140,6 +141,9 @@ def __call__(
return x
else:
warnings.warn("no process method defined")

#after call is completed empty out temporary directories
self.clear_temp_dir()

def __call_empty__(
self, *args, debug=None, intermediate_output=None, overwrite=None, **kwargs
Expand Down Expand Up @@ -176,6 +180,9 @@ def __call_empty__(
else:
warnings.warn("no return_empty_mask method defined")

#also clear empty temp directory here
self.clear_temp_dir()

def register_parameter(self, key, value):
"""
Registers a new parameter by updating the configuration dictionary if the key didn't exist.
Expand Down Expand Up @@ -210,3 +217,30 @@ def get_directory(self):
str: Directory path.
"""
return self.directory

def create_temp_dir(self):
"""
Create a temporary directory in the cache directory specified in the config for saving all intermediate results.
If "cache" not specified in the config for the method no directory will be created.
"""
global TEMP_DIR_NAME #this is the global variable name used within alphabase.io.tempmmap which is required to intialize a memory mapped temp array using this code

if "cache" in self.config.keys():
self._tmp_dir_path = os.path.join(self.config["cache"], f"{self.__class__.__name__}_")
self._tmp_dir = tempfile.TemporaryDirectory(prefix = self._tmp_dir_path)
self.log(f"Initialized temporary directory for saving all temp results at {self._tmp_dir_path}")
print(f"Initialized temporary directory for saving all temp results at {self._tmp_dir_path} for {self.__class__.__name__}")
TEMP_DIR_NAME = self._tmp_dir.name
else:
self.log("No cache directory specified in config. Skipping temporary directory creation")

def clear_temp_dir(self):
"""Delete created temporary directory."""

if "_tmp_dir" in self.__dict__.keys():
shutil.rmtree(self._tmp_dir)
self.log(f"Cleaned up temporary directory at {self._tmp_dir}")

del self._tmp_dir, self._tmp_dir_path
else:
self.log(f"Temporary directory not found, skipping cleanup")
19 changes: 3 additions & 16 deletions src/sparcscore/pipeline/extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import timeit

import matplotlib.pyplot as plt
from alphabase.io import tempmmap

import _pickle as cPickle
#to perform garbage collection
Expand Down Expand Up @@ -226,10 +227,6 @@ def _initialize_tempmmap_array(self, index_len = 2):
self.n_channels_output,
self.config["image_size"],
self.config["image_size"])

#import tempmmap module and reset temp folder location
from alphabase.io import tempmmap
TEMP_DIR_NAME = tempmmap.redefine_temp_location(self.config["cache"])

#generate container for single_cell_data
_tmp_single_cell_data = tempmmap.array(shape = self.single_cell_data_shape, dtype = np.float16)
Expand All @@ -240,8 +237,6 @@ def _initialize_tempmmap_array(self, index_len = 2):
else:
#use a regulary numpy array instead of a tempmmap array to be able to save strings as well as ints
_tmp_single_cell_index = np.empty(self.single_cell_index_shape, dtype = "<U64") #need to use U64 here otherwise information potentially becomes truncated

self.TEMP_DIR_NAME = TEMP_DIR_NAME

def _transfer_tempmmap_to_hdf5(self):
global _tmp_single_cell_data, _tmp_single_cell_index
Expand Down Expand Up @@ -295,12 +290,8 @@ def _transfer_tempmmap_to_hdf5(self):
#this is required to process large datasets to not run into memory issues
for ix, i in enumerate(keep_index):
single_cell_data[ix] = _tmp_single_cell_data[i]

#delete tempobjects (to cleanup directory)
self.log(f"Tempmmap Folder location {self.TEMP_DIR_NAME} will now be removed.")
shutil.rmtree(self.TEMP_DIR_NAME, ignore_errors=True)

del self.TEMP_DIR_NAME, _tmp_single_cell_data, _tmp_single_cell_index
del _tmp_single_cell_data, _tmp_single_cell_index

def _get_label_info(self, arg):
index, save_index, cell_id = arg
Expand Down Expand Up @@ -805,12 +796,8 @@ def _transfer_tempmmap_to_hdf5(self):

hf.create_dataset('single_cell_index', data = index, dtype="uint64")
del index

#delete tempobjects (to cleanup directory)
self.log(f"Tempmmap Folder location {self.TEMP_DIR_NAME} will now be removed.")
shutil.rmtree(self.TEMP_DIR_NAME, ignore_errors=True)

del _tmp_single_cell_data, _tmp_single_cell_index, keep_index, self.TEMP_DIR_NAME
del _tmp_single_cell_data, _tmp_single_cell_index, keep_index
gc.collect()

def _save_cell_info(self, index, cell_id, image_index, label_info, stack):
Expand Down
13 changes: 3 additions & 10 deletions src/sparcscore/pipeline/filter_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#to perform garbage collection
import gc
import sys
from alphabase.io import tempmmap

class SegmentationFilter(ProcessingStep):
"""SegmentationFilter helper class used for creating workflows to filter generated segmentation masks before extraction.
Expand All @@ -38,12 +39,6 @@ def read_input_masks(self, input_path):

with h5py.File(input_path, "r") as hf:
hdf_input = hf.get("labels")

#use a memory mapped numpy array to save the input image to better utilize memory consumption
from alphabase.io import tempmmap
TEMP_DIR_NAME = tempmmap.redefine_temp_location(self.config["cache"])
self.TEMP_DIR_NAME = TEMP_DIR_NAME #save for later to be able to remove cached folders

input_masks = tempmmap.array(shape = hdf_input.shape, dtype = np.uint16)
input_masks = hdf_input[:2,:, :]

Expand Down Expand Up @@ -87,10 +82,8 @@ def call_as_tile(self):
with h5py.File(self.input_path, "r") as hf:
hdf_input = hf.get("labels")

#use a memory mapped numpy array to save the input image to better utilize memory consumption
from alphabase.io import tempmmap
TEMP_DIR_NAME = tempmmap.redefine_temp_location(self.config["cache"])

### use a memory mapped numpy array to save the input image to better utilize memory consumption

#calculate shape of required datacontainer
c, _, _ = hdf_input.shape
x1 = self.window[0].start
Expand Down
15 changes: 0 additions & 15 deletions src/sparcscore/pipeline/segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,6 @@ def call_as_shard(self):
with h5py.File(self.input_path, "r") as hf:
hdf_input = hf.get("channels")

#use a memory mapped numpy array to save the input image to better utilize memory consumption
from alphabase.io import tempmmap
TEMP_DIR_NAME = tempmmap.redefine_temp_location(self.config["cache"])

#calculate shape of required datacontainer
c, _, _ = hdf_input.shape
x1 = self.window[0].start
Expand Down Expand Up @@ -197,7 +193,6 @@ def call_as_shard(self):
#cleanup generated temp dir and variables
del input_image
gc.collect()
shutil.rmtree(TEMP_DIR_NAME) #remove create temp directory to cleanup directory

#write out window location
self.log(f"Writing out window location to file at {self.directory}/window.csv")
Expand Down Expand Up @@ -1090,8 +1085,6 @@ def call_as_shard(self):
except Exception:
self.log(traceback.format_exc())
self.log(f"Segmentation on index {index} completed.")

return None

def save_segmentation(
self,
Expand Down Expand Up @@ -1122,9 +1115,6 @@ def save_segmentation(
del _tmp_seg

def _initialize_tempmmap_array(self):
#reset tempmmap dir
TEMP_DIR_NAME = tempmmap.redefine_temp_location(self.config["cache"])
self.TEMP_DIR_NAME = TEMP_DIR_NAME

# create an empty HDF5 file prepared for using as a memory mapped temp array to save segmentation results to
# this required when trying to segment so many images that the results can no longer fit into memory
Expand Down Expand Up @@ -1171,11 +1161,6 @@ def _transfer_tempmmap_to_hdf5(self):
dtype=dt,
)

# delete tempobjects (to cleanup directory)
self.log(f"Tempmmap Folder location {self.TEMP_DIR_NAME} will now be removed.")
shutil.rmtree(self.TEMP_DIR_NAME, ignore_errors=True)

del _tmp_seg, self.TEMP_DIR_NAME
gc.collect()

def save_image(self, array, save_name="", cmap="magma", **kwargs):
Expand Down
8 changes: 5 additions & 3 deletions src/sparcscore/pipeline/selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,17 @@ class will automaticly provide the most recent segmentation forward together wit
"""

self.log("Selection process started")

## TO Do
#check if classes and seglookup table already exist as pickle file
# if not create them
#else load them and proceed with selection

# load segmentation from hdf5
hf = h5py.File(hdf_location, 'r')
hdf_labels = hf.get('labels')

#create memory mapped temporary array for saving the segmentation
TEMP_DIR_NAME = tempmmap.redefine_temp_location(self.config["cache"])
c, x, y = hdf_labels.shape
segmentation = tempmmap.array(shape = (x, y), dtype = hdf_labels.dtype)
segmentation = hdf_labels[self.config['segmentation_channel'],:,:]
Expand Down Expand Up @@ -152,7 +156,5 @@ class will automaticly provide the most recent segmentation forward together wit
shape_collection.save(savepath)

del segmentation
self.log(f"Tempmmap Folder location {TEMP_DIR_NAME} will now be removed.")
shutil.rmtree(TEMP_DIR_NAME, ignore_errors=True)

self.log(f"Saved output at {savepath}")
15 changes: 1 addition & 14 deletions src/sparcscore/pipeline/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

# for cellpose segmentation
from cellpose import models
from alphabase.io import tempmmap

class BaseSegmentation(Segmentation):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -1050,9 +1051,6 @@ def cellpose_segmentation(self, input_image):
torch.cuda.empty_cache()

def process(self, input_image):
from alphabase.io import tempmmap
TEMP_DIR_NAME = tempmmap.redefine_temp_location(self.config["cache"])

# initialize location to save masks to
self.maps = {
"normalized": tempmmap.array(shape = input_image.shape, dtype = float),
Expand Down Expand Up @@ -1169,10 +1167,6 @@ def process(self, input_image):

downsampled_image_size = (2, _size[1]+pad_x[1], _size[2]+pad_y[1])

#initialize memory mapped numpy arrays to save results into
from alphabase.io import tempmmap
TEMP_DIR_NAME = tempmmap.redefine_temp_location(self.config["cache"])

# initialize location to save masks to
self.maps = {
"normalized": tempmmap.array(shape = input_image.shape, dtype = float),
Expand Down Expand Up @@ -1311,9 +1305,6 @@ def cellpose_segmentation(self, input_image):

def process(self, input_image):

from alphabase.io import tempmmap
TEMP_DIR_NAME = tempmmap.redefine_temp_location(self.config["cache"])

# initialize location to save masks to
self.maps = {
"normalized": tempmmap.array(shape = input_image.shape, dtype = float),
Expand Down Expand Up @@ -1423,10 +1414,6 @@ def process(self, input_image):

downsampled_image_size = (2, _size[1]+pad_x[1], _size[2]+pad_y[1])

#initialize memory mapped numpy arrays to save results into
from alphabase.io import tempmmap
TEMP_DIR_NAME = tempmmap.redefine_temp_location(self.config["cache"])

# initialize location to save masks to
self.maps = {
"normalized": tempmmap.array(shape = input_image.shape, dtype = float),
Expand Down

0 comments on commit 967c2e4

Please sign in to comment.