Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QST] Reset GPU to release memory resources #5889

Open
m946107011 opened this issue May 14, 2024 · 2 comments
Open

[QST] Reset GPU to release memory resources #5889

m946107011 opened this issue May 14, 2024 · 2 comments
Labels
? - Needs Triage Need team to review and classify question Further information is requested

Comments

@m946107011
Copy link

m946107011 commented May 14, 2024

Hello,

I'm currently using cuML and cuDF to train multiple models on an A100-40g GPU with 96GB of RAM (Rapids24.04, CUDA11.4).
Despite attempts to manage GPU memory usage by deleting variables using del, calling gc.collect(), and using RMM methods, I consistently encounter out-of-memory (OOM) errors after training several models.

Could someone provide advice on how to reset the GPU after training each model to prevent OOM errors?

Thank you very much.

Ps:This is my first time submitting a question, so I apologize for the messy layout.
import cudf import cuml from sklearn import model_selection import cuda as cp from cuml import datasets from cuml.dask.common import utils as dask_utils from dask.distributed import Client, wait from dask_cuda import LocalCUDACluster from cuml import RandomForestClassifier as cuRF import dask_cudf from tqdm import tqdm from scipy import stats from sklearn import metrics import pickle import os import random import shutil import time import gc import warnings import numpy as np import pandas as pd from cuml import datasets import rmm

``
for file in tqdm(files):
cluster = LocalCUDACluster()
c = Client(cluster)
workers = c.has_what().keys()
dbuf = rmm.DeviceBuffer(size=1024440)
start=time.time()
file_path = f"{folder_name}/{file}"
data = dask_cudf.read_csv(file_path,chunksize="1GB")
data=data.dropna(subset=[file])
task = data.columns[0]
locals()['Task_'+str(task)]=data
locals()['Task_'+str(task)]= locals()['Task_'+str(task)].dropna(subset=[task])
locals()['Task_'+str(task)+'x']=locals()['Task'+str(task)].drop(columns=[task])
locals()['Task_'+str(task)+'y']=locals()['Task'+str(task)][task]
locals()['Task_'+str(task)+'x'] = locals()['Task'+str(task)+'x'].astype(np.float32).compute().to_numpy()
locals()['Task
'+str(task)+'y'] = locals()['Task'+str(task)+'_y'].astype(np.int32).compute().to_numpy()

  locals()['Task_'+str(task)+'_xtrain'], locals()['Task_'+str(task)+'_xtest'], locals()['Task_'+str(task)+'_ytrain'], locals()['Task_'+str(task)+'_ytest'] = model_selection.train_test_split( locals()['Task_'+str(task)+'_x'], locals()['Task_'+str(task)+'_y'],
                                                      test_size=0.2,random_state=seed , stratify=locals()['Task_'+str(task)+'_y'])
  locals()['Task_'+str(task)+'_xtrain'],  locals()['Task_'+str(task)+'_xvalid'], locals()['Task_'+str(task)+'_ytrain'],  locals()['Task_'+str(task)+'_yvalid']=model_selection.train_test_split(locals()['Task_'+str(task)+'_xtrain'], locals()['Task_'+str(task)+'_ytrain'], 
                                                       test_size=0.125,random_state=seed , stratify= locals()['Task_'+str(task)+'_ytrain'])

  locals()['Task_'+str(task)+'_xtrain'] = locals()['Task_'+str(task)+'_xtrain'].astype(np.float32)
  locals()['Task_'+str(task)+'_xvalid'] = locals()['Task_'+str(task)+'_xvalid'].astype(np.float32)
  locals()['Task_'+str(task)+'_xtest'] = locals()['Task_'+str(task)+'_xtest'].astype(np.float32)
  
  locals()['Task_'+str(task)+'_ytrain'] = locals()['Task_'+str(task)+'_ytrain'].astype(np.int32)
  locals()['Task_'+str(task)+'_yvalid'] = locals()['Task_'+str(task)+'_yvalid'].astype(np.int32)
  locals()['Task_'+str(task)+'_ytest'] = locals()['Task_'+str(task)+'_ytest'].astype(np.int32)    
  
  x_train=locals()['Task_'+str(task)+'_xtrain']
  x_valid=locals()['Task_'+str(task)+'_xvalid']
  x_test=locals()['Task_'+str(task)+'_xtest']
  y_train=locals()['Task_'+str(task)+'_ytrain'] 
  y_valid=locals()['Task_'+str(task)+'_yvalid']
  y_test=locals()['Task_'+str(task)+'_ytest']

  model_parameter=cuRF(n_estimators=500, n_streams=1,max_features='log2',random_state=seed)

  model=model_parameter.fit(x_train, y_train)
  
  pickle.dump(model, open('./Layer_' + str(layer_count) + '/model_' + str(task)+'.pkl', "wb"))

  y_train_pred=model.predict(x_train)
  y_valid_pred=model.predict(x_valid)
  y_test_pred=model.predict(x_test)
  
  y_prob_train=model.predict_proba(x_train)[:, 1]
  y_prob_valid=model.predict_proba(x_valid)[:, 1]
  y_prob_test=model.predict_proba(x_test)[:, 1]

  del model 
  del x_train
  del x_valid 
  del y_train
  del y_valid
  del x_test
  del y_test
  del model_parameter
  del dbuf    
  rmm.reinitialize()
  gc.collect()     

`

@m946107011 m946107011 added ? - Needs Triage Need team to review and classify question Further information is requested labels May 14, 2024
@m946107011 m946107011 changed the title [QST] Reset GPU to release memory [QST] Reset GPU to release memory resources May 14, 2024
@taureandyernv
Copy link
Contributor

taureandyernv commented May 18, 2024

hey @m946107011 , great question and thanks for providing some code as best you could. While we usually may want some data (even fake data is fine), I will take a shot, and assume your for loop captures the entirety of your code.

  1. Move the local cuda cluster creation outside of the for loop. You only need it once. You can run multiple models without having to redo the cluster. Assuming you really do want to redo your cluster for each iteration, you did not shut down the cluster in each iteration. To do that, client.restart() may work for you. If you want to burn it all, between iterations, you can also use client.shutdown() at the end of your run before rebuilding it on your next run.
  2. When we were doing some large benchmarking, we actually put gc.collect() before we ran our model, not at the end. Right before start=time.time() would be appropriate. This should clean up everything before you start timing and hurts nothing if there is nothing to clean up. Where you call the deletion of your dataframes is appropriate, as i'm assuming it happens at the end of your for loop.

Outside of this, while it sounds like you're using a single A100 40GB, if you are using a multi GPU set up, below is some code that you can play with that will utilize UCX, which is a faster interconnect than the default tcp. It might help with performance. Tuning most likely will be required for best performance.

  # limit work-stealing as much as possible
  dask.config.set({'distributed.scheduler.work-stealing': False})
  dask.config.set({'distributed.scheduler.bandwidth': 1})

  cluster = LocalCUDACluster(rmm_pool_size=1024440,
                             device_memory_limit=1024440*.85, # 85% of pool size.  you may also want to change this.
                             protocol="ucx",
                             enable_tcp_over_ucx=True,
                             enable_nvlink=True)

Also, in terms of formatting, I know its your first question, you may want to edit it a bit to make it one solid block. I don't have edit access. You can use Preview to see what it will look like before you post.

@m946107011
Copy link
Author

hey @m946107011 , great question and thanks for providing some code as best you could. While we usually may want some data (even fake data is fine), I will take a shot, and assume your for loop captures the entirety of your code.

  1. Move the local cuda cluster creation outside of the for loop. You only need it once. You can run multiple models without having to redo the cluster. Assuming you really do want to redo your cluster for each iteration, you did not shut down the cluster in each iteration. To do that, client.restart() may work for you. If you want to burn it all, between iterations, you can also use client.shutdown() at the end of your run before rebuilding it on your next run.
  2. When we were doing some large benchmarking, we actually put gc.collect() before we ran our model, not at the end. Right before start=time.time() would be appropriate. This should clean up everything before you start timing and hurts nothing if there is nothing to clean up. Where you call the deletion of your dataframes is appropriate, as i'm assuming it happens at the end of your for loop.

Outside of this, while it sounds like you're using a single A100 40GB, if you are using a multi GPU set up, below is some code that you can play with that will utilize UCX, which is a faster interconnect than the default tcp. It might help with performance. Tuning most likely will be required for best performance.

  # limit work-stealing as much as possible
  dask.config.set({'distributed.scheduler.work-stealing': False})
  dask.config.set({'distributed.scheduler.bandwidth': 1})

  cluster = LocalCUDACluster(rmm_pool_size=1024440,
                             device_memory_limit=1024440*.85, # 85% of pool size.  you may also want to change this.
                             protocol="ucx",
                             enable_tcp_over_ucx=True,
                             enable_nvlink=True)

Also, in terms of formatting, I know its your first question, you may want to edit it a bit to make it one solid block. I don't have edit access. You can use Preview to see what it will look like before you post.

Hi:
I appreciate your advice!
I've encountered an issue when trying to utilize NVLink with the provided code.
I'm using A600 x6 with NVLink connections, and my CUDA Version is 11.2.
Do you have any suggestions or insights on how to address this? Thank you!


KeyError Traceback (most recent call last)
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/core.py:664, in Server.start(self)
663 try:
--> 664 await wait_for(self.start_unsafe(), timeout=timeout)
665 except asyncio.TimeoutError as exc:

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/utils.py:1940, in wait_for(fut, timeout)
1939 async def wait_for(fut: Awaitable[T], timeout: float) -> T:
-> 1940 return await asyncio.wait_for(fut, timeout)

File ~/anaconda3/envs/rapids/lib/python3.9/asyncio/tasks.py:442, in wait_for(fut, timeout, loop)
441 if timeout is None:
--> 442 return await fut
444 if timeout <= 0:

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/scheduler.py:4039, in Scheduler.start_unsafe(self)
4038 for addr in self._start_address:
-> 4039 await self.listen(
4040 addr,
4041 allow_offload=False,
4042 handshake_overrides={"pickle-protocol": 4, "compression": None},
4043 **self.security.get_listen_args("scheduler"),
4044 )
4045 self.ip = get_address_host(self.listen_address)

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/core.py:859, in Server.listen(self, port_or_addr, allow_offload, **kwargs)
858 assert isinstance(addr, str)
--> 859 listener = await listen(
860 addr,
861 self.handle_comm,
862 deserialize=self.deserialize,
863 allow_offload=allow_offload,
864 **kwargs,
865 )
866 self.listeners.append(listener)

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/comm/core.py:256, in Listener.await.._()
255 async def _():
--> 256 await self.start()
257 return self

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/comm/ucx.py:527, in UCXListener.start(self)
525 await self.comm_handler(ucx)
--> 527 init_once()
528 self.ucp_server = ucp.create_listener(serve_forever, port=self._input_port)

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/comm/ucx.py:158, in init_once()
153 with patch.dict(os.environ, ucx_environment):
154 # We carefully ensure that ucx_environment only contains things
155 # that don't override ucx_config or existing slots in the
156 # environment, so the user's external environment can safely
157 # override things here.
--> 158 ucp.init(options=ucx_config, env_takes_precedence=True)
160 pool_size_str = dask.config.get("distributed.rmm.pool-size")

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/ucp/core.py:938, in init(options, env_takes_precedence, blocking_progress_mode)
935 logger.debug(
936 f"Ignoring environment {env_k}={env_v}; using option {k}={v}"
937 )
--> 938 _ctx = ApplicationContext(options, blocking_progress_mode=blocking_progress_mode)

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/ucp/core.py:223, in ApplicationContext.init(self, config_dict, blocking_progress_mode)
222 # For now, a application context only has one worker
--> 223 self.context = ucx_api.UCXContext(config_dict)
224 self.worker = ucx_api.UCXWorker(self.context)

File ucp/_libs/ucx_context.pyx:78, in ucp._libs.ucx_api.UCXContext.init()

KeyError: 'TLS'

The above exception was the direct cause of the following exception:

RuntimeError Traceback (most recent call last)
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/deploy/spec.py:325, in SpecCluster._start(self)
324 self.scheduler = cls(**self.scheduler_spec.get("options", {}))
--> 325 self.scheduler = await self.scheduler
326 self.scheduler_comm = rpc(
327 getattr(self.scheduler, "external_address", None)
328 or self.scheduler.address,
329 connection_args=self.security.get_connection_args("client"),
330 )

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/core.py:672, in Server.start(self)
671 await _close_on_failure(exc)
--> 672 raise RuntimeError(f"{type(self).name} failed to start.") from exc
673 if self.status == Status.init:

RuntimeError: Scheduler failed to start.

The above exception was the direct cause of the following exception:

RuntimeError Traceback (most recent call last)
Cell In[1], line 647
645 print('Type:'+str(problem_mode))
646 print('Evaluation Metrics:'+str(main_perform))
--> 647 MTForestNet_Multiprocess(folder_name,seed,problem_mode,main_perform,save_file_name,fast)
648 end=time.time()
649 print('Time: ',(end - start)/60,'minutes')

Cell In[1], line 56, in MTForestNet_Multiprocess(folder_name, seed, problem_mode, main_perform, save_file_name, fast)
53 dask.config.set({'distributed.scheduler.work-stealing': False})
54 dask.config.set({'distributed.scheduler.bandwidth': 1})
---> 56 cluster = LocalCUDACluster(rmm_pool_size=102444,
57 device_memory_limit=102444*.75, # 85% of pool size. you may also want to change this.
58 protocol="ucx",
59 enable_tcp_over_ucx=True,
60 enable_nvlink=True)
62 c = Client(cluster)
63 workers = c.has_what().keys()

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/dask_cuda/local_cuda_cluster.py:352, in LocalCUDACluster.init(self, CUDA_VISIBLE_DEVICES, n_workers, threads_per_worker, memory_limit, device_memory_limit, data, local_directory, shared_filesystem, protocol, enable_tcp_over_ucx, enable_infiniband, enable_nvlink, enable_rdmacm, rmm_pool_size, rmm_maximum_pool_size, rmm_managed_memory, rmm_async, rmm_release_threshold, rmm_log_directory, rmm_track_allocations, jit_unspill, log_spilling, worker_class, pre_import, **kwargs)
348 worker_class = partial(Nanny, worker_class=worker_class)
350 self.pre_import = pre_import
--> 352 super().init(
353 n_workers=0,
354 threads_per_worker=threads_per_worker,
355 memory_limit=self.memory_limit,
356 processes=True,
357 data=data,
358 local_directory=local_directory,
359 protocol=protocol,
360 worker_class=worker_class,
361 config={
362 "distributed.comm.ucx": get_ucx_config(
363 enable_tcp_over_ucx=enable_tcp_over_ucx,
364 enable_nvlink=enable_nvlink,
365 enable_infiniband=enable_infiniband,
366 enable_rdmacm=enable_rdmacm,
367 )
368 },
369 **kwargs,
370 )
372 self.new_spec["options"]["preload"] = self.new_spec["options"].get(
373 "preload", []
374 ) + ["dask_cuda.initialize"]
375 self.new_spec["options"]["preload_argv"] = self.new_spec["options"].get(
376 "preload_argv", []
377 ) + ["--create-cuda-context", "--protocol", protocol]

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/deploy/local.py:253, in LocalCluster.init(self, name, n_workers, threads_per_worker, processes, loop, start, host, ip, scheduler_port, silence_logs, dashboard_address, worker_dashboard_address, diagnostics_port, services, worker_services, service_kwargs, asynchronous, security, protocol, blocked_handlers, interface, worker_class, scheduler_kwargs, scheduler_sync_interval, **worker_kwargs)
250 worker = {"cls": worker_class, "options": worker_kwargs}
251 workers = {i: worker for i in range(n_workers)}
--> 253 super().init(
254 name=name,
255 scheduler=scheduler,
256 workers=workers,
257 worker=worker,
258 loop=loop,
259 asynchronous=asynchronous,
260 silence_logs=silence_logs,
261 security=security,
262 scheduler_sync_interval=scheduler_sync_interval,
263 )

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/deploy/spec.py:284, in SpecCluster.init(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close, scheduler_sync_interval)
282 if not self.called_from_running_loop:
283 self._loop_runner.start()
--> 284 self.sync(self._start)
285 try:
286 self.sync(self._correct_state)

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/utils.py:358, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
356 return future
357 else:
--> 358 return sync(
359 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
360 )

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/utils.py:434, in sync(loop, func, callback_timeout, *args, **kwargs)
431 wait(10)
433 if error is not None:
--> 434 raise error
435 else:
436 return result

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/utils.py:408, in sync..f()
406 awaitable = wait_for(awaitable, timeout)
407 future = asyncio.ensure_future(awaitable)
--> 408 result = yield future
409 except Exception as exception:
410 error = exception

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/tornado/gen.py:767, in Runner.run(self)
765 try:
766 try:
--> 767 value = future.result()
768 except Exception as e:
769 # Save the exception for later. It's important that
770 # gen.throw() not be called inside this try/except block
771 # because that makes sys.exc_info behave unexpectedly.
772 exc: Optional[Exception] = e

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/deploy/spec.py:335, in SpecCluster._start(self)
333 self.status = Status.failed
334 await self._close()
--> 335 raise RuntimeError(f"Cluster failed to start: {e}") from e

RuntimeError: Cluster failed to start: Scheduler failed to start.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
? - Needs Triage Need team to review and classify question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants