-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Npath with DFS and multiprocessing #39
base: develop
Are you sure you want to change the base?
Changes from all commits
115ba8b
c112c9c
c73061e
5612e8c
98dd68c
861a0d6
a30a7c9
2e1c310
5c778b0
4c95c77
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,23 +4,26 @@ | |
|
||
import copy | ||
import os.path | ||
|
||
import re | ||
import readline | ||
import subprocess | ||
import tempfile | ||
import time | ||
from collections import defaultdict | ||
from enum import Enum | ||
from functools import partial | ||
from multiprocessing import Manager, Pool | ||
from multiprocessing import Manager, Pool, Lock # pylint: disable=unused-import | ||
from os import listdir | ||
from pathlib import Path | ||
from typing import Iterable, Optional, Union, cast | ||
from queue import Queue | ||
from typing import Iterable, Optional, Union, cast, Tuple, Callable | ||
|
||
import numpy # type: ignore | ||
from rich.console import Console | ||
from rich.table import Table | ||
|
||
from core.command_data import AnyDict, Data, ObjTypes, PathComplexityRes | ||
from core.command_data import AnyDict, Data, MetricRes, ObjTypes, PathComplexityRes | ||
from core.env import KnownExtensions | ||
from core.error_messages import (EXTENSION, MISSING_FILENAME, MISSING_NAME, MISSING_TYPE_AND_NAME, | ||
NO_FILE_EXT, ReplErrors) | ||
|
@@ -84,7 +87,7 @@ def get_graph_generator(self, file_extension: str) -> converter.ConverterAbstrac | |
return self.graph_generators[file_extension] | ||
|
||
|
||
def worker_main(shared_dict: dict[str, ControlFlowGraph], file: str) -> None: | ||
def multiprocess_import(shared_dict: dict[str, ControlFlowGraph], file: str) -> None: | ||
"""Handle the multiprocessing of import.""" | ||
graph = ControlFlowGraph.from_file(file) | ||
if isinstance(graph, dict): | ||
|
@@ -94,23 +97,40 @@ def worker_main(shared_dict: dict[str, ControlFlowGraph], file: str) -> None: | |
shared_dict[filepath] = graph | ||
|
||
|
||
def worker_main_two(metrics_generator: metric.MetricAbstract, | ||
shared_dict: dict[tuple[str, str], Union[int, PathComplexityRes]], | ||
graph: ControlFlowGraph) -> None: | ||
"""Handle the multiprocessing of convert.""" | ||
try: | ||
with Timeout(10, "Took too long!"): | ||
result = metrics_generator.evaluate(graph) | ||
def multiprocess_metrics(metrics_generators: dict[str, metric.MetricAbstract], | ||
shared_dict: dict[tuple[str, str], Union[int, PathComplexityRes]], | ||
queue: Queue[Tuple[ControlFlowGraph, str]], | ||
lock: Callable[[], None], | ||
process_count: int) -> None: | ||
"""Handle the multiprocessing of metrics.""" | ||
print(f"Starting thread {process_count}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pass in the logger |
||
while True: | ||
with lock: # type: ignore | ||
if not queue.empty(): | ||
graph, generator_name = queue.get() | ||
else: | ||
break | ||
metrics_generator = metrics_generators[generator_name] | ||
timeout = 1200 if metrics_generator.name() == "Path Complexity" else 180 | ||
try: | ||
if metrics_generator.name() == "Lines of Code" and \ | ||
graph.metadata.language is not KnownExtensions.Python: | ||
continue | ||
|
||
with Timeout(timeout, "Took too long!"): | ||
result = metrics_generator.evaluate(graph) | ||
|
||
if graph.name is None: | ||
raise ValueError("No Graph name.") | ||
if graph.name is None: | ||
raise ValueError("No Graph name.") | ||
|
||
shared_dict[(graph.name, metrics_generator.name())] = result | ||
except IndexError as err: | ||
print(graph) | ||
print(err) | ||
except TimeoutError as err: | ||
print(err, graph.name, metrics_generator.name()) | ||
shared_dict[(graph.name, metrics_generator.name())] = result | ||
except IndexError as err: | ||
print(graph) | ||
print(err) | ||
except TimeoutError as err: | ||
print(err, graph.name, metrics_generator.name()) | ||
shared_dict[(graph.name, metrics_generator.name())] = ("NA", "Timeout") | ||
print(f"Thread {process_count} is done.") | ||
|
||
|
||
class REPLOptions(): | ||
|
@@ -455,7 +475,8 @@ def do_convert(self, args: str) -> None: # pylint: disable=too-many-branches | |
if graph == {}: | ||
self.logger.v_msg("Converted without errors, but no graphs created.") | ||
else: | ||
self.logger.v_msg(f"Created graph objects {' '.join(list(graph.keys()))}") | ||
self.logger.v_msg(f"Created graph objects {Colors.MAGENTA.value}" | ||
f"{' '.join(list(graph.keys()))}{Colors.ENDC.value}") | ||
self.data.graphs.update(graph) | ||
elif isinstance(graph, ControlFlowGraph): | ||
self.logger.v_msg(f"Created graph {graph.name}") | ||
|
@@ -494,17 +515,20 @@ def do_import(self, flags: Options, *args_list: str) -> None: | |
manager = Manager() | ||
shared_dict: dict[str, ControlFlowGraph] = manager.dict() | ||
pool = Pool(8) | ||
pool.map(partial(worker_main, shared_dict), all_files) | ||
pool.map(partial(multiprocess_import, shared_dict), all_files) | ||
self.logger.v_msg(f"Created graph objects " | ||
f"{Colors.MAGENTA.value}{' '.join(shared_dict.keys())}{Colors.ENDC.value}") | ||
self.data.graphs.update(shared_dict) | ||
else: | ||
graphs = [] | ||
for file in all_files: | ||
filepath, _ = os.path.splitext(file) | ||
graph = ControlFlowGraph.from_file(file) | ||
self.logger.v_msg(str(graph)) | ||
if isinstance(graph, dict): | ||
self.data.graphs.update(graph) | ||
else: | ||
self.data.graphs[filepath] = graph | ||
graphs.append(graph) | ||
self.data.graphs[filepath] = graph | ||
names = [graph.name for graph in graphs] | ||
self.logger.v_msg(f"Created graph objects " | ||
f"{Colors.MAGENTA.value}{' '.join(names)}{Colors.ENDC.value}") | ||
|
||
def do_list(self, flags: Options, list_typename: str) -> None: | ||
""" | ||
|
@@ -538,14 +562,35 @@ def do_list(self, flags: Options, list_typename: str) -> None: | |
else: | ||
self.logger.v_msg(f"Type {list_type} not recognized") | ||
|
||
def do_metrics_multithreaded(self, graphs: list[ControlFlowGraph]) -> None: | ||
def do_metrics_multithreaded(self, cfgs: list[ControlFlowGraph]) -> None: | ||
"""Compute all of the metrics for some set of graphs using parallelization.""" | ||
pool = Pool(8) | ||
pool_size = 8 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. define in constructor (for now) & use in do_import as well |
||
pool = Pool(pool_size) | ||
manager = Manager() | ||
graph_queue = manager.Queue() | ||
lock = manager.Lock() # pylint: disable=no-member | ||
cfgs = sorted(cfgs, key=lambda cfg: len(cfg.graph.vertices()), reverse=True) | ||
results: defaultdict[str, list[tuple[str, MetricRes]]] = defaultdict(list) | ||
shared_dict: dict[tuple[str, str], Union[int, PathComplexityRes]] = manager.dict() | ||
for metrics_generator in self.controller.metrics_generators: | ||
pool.map(partial(worker_main_two, metrics_generator, shared_dict), graphs) | ||
self.logger.v_msg(str(shared_dict)) | ||
# Queue up all of the cfgs / metrics to execute | ||
for metrics_generator in self.controller.metrics_generators[::-1]: | ||
for cfg in cfgs: | ||
graph_queue.put((cfg, metrics_generator.name())) | ||
|
||
generator_dict = {generator.name(): generator for generator in self.controller.metrics_generators} | ||
|
||
func_to_execute = partial( | ||
multiprocess_metrics, | ||
generator_dict, | ||
shared_dict, | ||
graph_queue, | ||
lock) | ||
args = list(range(pool_size)) | ||
|
||
result = pool.map(func_to_execute, args, chunksize=1) # pylint: disable=unused-variable | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't think we need this var |
||
for (name, metric_generator), res in shared_dict.items(): | ||
results[name].append((metric_generator, res)) | ||
self.data.metrics.update(results) | ||
|
||
def get_metrics_list(self, name: str) -> list[str]: | ||
"""Get the list of metric names from command argument.""" | ||
|
@@ -579,58 +624,64 @@ def do_metrics(self, flags: Options, name: str) -> None: | |
# pylint: disable=R1702 | ||
# pylint: disable=R0912 | ||
graphs = [self.data.graphs[name] for name in self.get_metrics_list(name)] | ||
for graph in graphs: | ||
self.logger.v_msg(f"Computing metrics for {graph.name}") | ||
results = [] | ||
if self.rich: | ||
table = Table(title=f"Metrics for {graph.name}") | ||
table.add_column("Metric", style="cyan") | ||
table.add_column("Result", style="magenta", no_wrap=False) | ||
table.add_column("Time Elapsed", style="green") | ||
for metric_generator in self.controller.metrics_generators: | ||
# Lines of Code is currently only supported in Python. | ||
if metric_generator.name() == "Lines of Code" and \ | ||
graph.metadata.language is not KnownExtensions.Python: | ||
continue | ||
start_time = time.time() | ||
|
||
try: | ||
with Timeout(6000, "Took too long!"): | ||
result = metric_generator.evaluate(graph) | ||
runtime = time.time() - start_time | ||
if result is not None: | ||
results.append((metric_generator.name(), result)) | ||
time_out = f"{runtime:.5f} seconds" | ||
if metric_generator.name() == "Path Complexity": | ||
result_ = cast(tuple[Union[float, str], Union[float, str]], | ||
result) | ||
path_out = f"(APC: {result_[0]}, Path Complexity: {result_[1]})" | ||
|
||
if self.rich: | ||
table.add_row(metric_generator.name(), path_out, time_out) | ||
if self.multi_threaded: | ||
start_time = time.time() | ||
self.do_metrics_multithreaded(graphs) | ||
elapsed = time.time() - start_time | ||
print(f"TIME ELAPSED: {elapsed}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. log |
||
else: | ||
for graph in graphs: | ||
self.logger.v_msg(f"Computing metrics for {graph.name}") | ||
results = [] | ||
if self.rich: | ||
table = Table(title=f"Metrics for {graph.name}") | ||
table.add_column("Metric", style="cyan") | ||
table.add_column("Result", style="magenta", no_wrap=False) | ||
table.add_column("Time Elapsed", style="green") | ||
for metric_generator in self.controller.metrics_generators: | ||
# Lines of Code is currently only supported in Python. | ||
if metric_generator.name() == "Lines of Code" and \ | ||
graph.metadata.language is not KnownExtensions.Python: | ||
continue | ||
start_time = time.time() | ||
|
||
try: | ||
with Timeout(6000, "Took too long!"): | ||
result = metric_generator.evaluate(graph) | ||
runtime = time.time() - start_time | ||
if result is not None: | ||
results.append((metric_generator.name(), result)) | ||
time_out = f"{runtime:.5f} seconds" | ||
if metric_generator.name() == "Path Complexity": | ||
result_ = cast(tuple[Union[float, str], Union[float, str]], | ||
result) | ||
path_out = f"(APC: {result_[0]}, Path Complexity: {result_[1]})" | ||
|
||
if self.rich: | ||
table.add_row(metric_generator.name(), path_out, time_out) | ||
else: | ||
self.logger.v_msg(f"Got {path_out}, {time_out}") | ||
else: | ||
self.logger.v_msg(f"Got {path_out}, {time_out}") | ||
if self.rich: | ||
table.add_row(metric_generator.name(), str(result), time_out) | ||
else: | ||
self.logger.v_msg(f" Got {result}, took {runtime:.3e} seconds") | ||
else: | ||
if self.rich: | ||
table.add_row(metric_generator.name(), str(result), time_out) | ||
else: | ||
self.logger.v_msg(f" Got {result}, took {runtime:.3e} seconds") | ||
else: | ||
self.logger.v_msg("Got None") | ||
except TimeoutError: | ||
self.logger.e_msg("Timeout!") | ||
except IndexError as err: | ||
self.logger.e_msg("Index Error") | ||
self.logger.e_msg(str(err)) | ||
except numpy.linalg.LinAlgError as err: | ||
self.logger.e_msg("Lin Alg Error") | ||
self.logger.e_msg(str(err)) | ||
if self.rich: | ||
console = Console() | ||
console.print(table) | ||
|
||
if graph.name is not None: | ||
self.data.metrics[graph.name] = results | ||
self.logger.v_msg("Got None") | ||
except TimeoutError: | ||
self.logger.e_msg("Timeout!") | ||
except IndexError as err: | ||
self.logger.e_msg("Index Error") | ||
self.logger.e_msg(str(err)) | ||
except numpy.linalg.LinAlgError as err: | ||
self.logger.e_msg("Lin Alg Error") | ||
self.logger.e_msg(str(err)) | ||
if self.rich: | ||
console = Console() | ||
console.print(table) | ||
|
||
if graph.name is not None: | ||
self.data.metrics[graph.name] = results | ||
|
||
def log_name(self, name: str) -> bool: | ||
"""Log all objects of a given name.""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,7 @@ def export_metrics(self, name: str, new_name: str) -> None: | |
"cyclo": [], "npath": []}) | ||
for m_name in self.metrics: | ||
metric_value = self.metrics[m_name] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. itemgetter(0) |
||
metric_value = sorted(metric_value, key=lambda val: val[0]) | ||
new_row = {"graph_name": m_name, "apc": metric_value[2][1], | ||
"cyclo": metric_value[0][1], "npath": metric_value[1][1]} | ||
data = data.append(new_row, ignore_index=True) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove disable unused imports