Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK/CLI] Use new trace link instead of original run portal link #3193

Merged
merged 13 commits into from
May 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Download swagger.json from [here](https://int.api.azureml-test.ms/flow/swagger/v
- 2024.3.14 - [Add enable_multi_container](https://github.com/microsoft/promptflow/pull/2313)
- 2024.4.7 - [Update SDK restclient](https://github.com/microsoft/promptflow/pull/2670)
- 2024.5.9 - [Support init Cosmos DB with setup API](https://github.com/microsoft/promptflow/pull/3167)
- 2024.5.10 - [Use new trace link instead of original run portal link](https://github.com/microsoft/promptflow/pull/3193)

## Troubleshooting

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19451,6 +19451,8 @@ class FlowRunInfo(msrest.serialization.Model):
:vartype session_id: str
:ivar studio_portal_endpoint:
:vartype studio_portal_endpoint: str
:ivar studio_portal_trace_endpoint:
:vartype studio_portal_trace_endpoint: str
"""

_attribute_map = {
Expand All @@ -19476,6 +19478,7 @@ class FlowRunInfo(msrest.serialization.Model):
'flow_snapshot_id': {'key': 'flowSnapshotId', 'type': 'str'},
'session_id': {'key': 'sessionId', 'type': 'str'},
'studio_portal_endpoint': {'key': 'studioPortalEndpoint', 'type': 'str'},
'studio_portal_trace_endpoint': {'key': 'studioPortalTraceEndpoint', 'type': 'str'},
}

def __init__(
Expand Down Expand Up @@ -19528,6 +19531,8 @@ def __init__(
:paramtype session_id: str
:keyword studio_portal_endpoint:
:paramtype studio_portal_endpoint: str
:keyword studio_portal_trace_endpoint:
:paramtype studio_portal_trace_endpoint: str
"""
super(FlowRunInfo, self).__init__(**kwargs)
self.flow_graph = kwargs.get('flow_graph', None)
Expand All @@ -19552,6 +19557,7 @@ def __init__(
self.flow_snapshot_id = kwargs.get('flow_snapshot_id', None)
self.session_id = kwargs.get('session_id', None)
self.studio_portal_endpoint = kwargs.get('studio_portal_endpoint', None)
self.studio_portal_trace_endpoint = kwargs.get('studio_portal_trace_endpoint', None)


class FlowRunResult(msrest.serialization.Model):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21962,6 +21962,8 @@ class FlowRunInfo(msrest.serialization.Model):
:vartype session_id: str
:ivar studio_portal_endpoint:
:vartype studio_portal_endpoint: str
:ivar studio_portal_trace_endpoint:
:vartype studio_portal_trace_endpoint: str
"""

_attribute_map = {
Expand All @@ -21987,6 +21989,7 @@ class FlowRunInfo(msrest.serialization.Model):
'flow_snapshot_id': {'key': 'flowSnapshotId', 'type': 'str'},
'session_id': {'key': 'sessionId', 'type': 'str'},
'studio_portal_endpoint': {'key': 'studioPortalEndpoint', 'type': 'str'},
'studio_portal_trace_endpoint': {'key': 'studioPortalTraceEndpoint', 'type': 'str'},
}

def __init__(
Expand Down Expand Up @@ -22014,6 +22017,7 @@ def __init__(
flow_snapshot_id: Optional[str] = None,
session_id: Optional[str] = None,
studio_portal_endpoint: Optional[str] = None,
studio_portal_trace_endpoint: Optional[str] = None,
**kwargs
):
"""
Expand Down Expand Up @@ -22062,6 +22066,8 @@ def __init__(
:paramtype session_id: str
:keyword studio_portal_endpoint:
:paramtype studio_portal_endpoint: str
:keyword studio_portal_trace_endpoint:
:paramtype studio_portal_trace_endpoint: str
"""
super(FlowRunInfo, self).__init__(**kwargs)
self.flow_graph = flow_graph
Expand All @@ -22086,6 +22092,7 @@ def __init__(
self.flow_snapshot_id = flow_snapshot_id
self.session_id = session_id
self.studio_portal_endpoint = studio_portal_endpoint
self.studio_portal_trace_endpoint = studio_portal_trace_endpoint


class FlowRunResult(msrest.serialization.Model):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17879,6 +17879,10 @@
"studioPortalEndpoint": {
"type": "string",
"nullable": true
},
"studioPortalTraceEndpoint": {
"type": "string",
"nullable": true
}
},
"additionalProperties": false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ def _get_run_portal_url(self, run_id: str):
except Exception as e:
logger.warning(f"Failed to get run portal url from pfs for run {run_id!r}: {str(e)}")

if run_info and hasattr(run_info, "studio_portal_endpoint"):
portal_url = run_info.studio_portal_endpoint
if run_info and hasattr(run_info, "studio_portal_trace_endpoint"):
zhengfeiwang marked this conversation as resolved.
Show resolved Hide resolved
portal_url = run_info.studio_portal_trace_endpoint

return portal_url

Expand Down Expand Up @@ -936,7 +936,7 @@ def download(
logger.info(f"Successfully downloaded run {run!r} to {result_path!r}.")
return result_path

def _upload(self, run: Union[str, Run]):
def _upload(self, run: Union[str, Run]) -> str:
from promptflow._sdk._pf_client import PFClient
from promptflow.azure.operations._async_run_uploader import AsyncRunUploader

Expand Down Expand Up @@ -973,16 +973,19 @@ def _upload(self, run: Union[str, Run]):
logger.debug(f"Successfully uploaded run details of {run!r} to cloud.")

# registry the run in the cloud
self._registry_existing_bulk_run(run=run)
self._register_existing_bulk_run(run=run)

# post process after run upload, it can only be done after the run history record is created
async_run_allowing_running_loop(run_uploader.post_process)

portal_url = self._get_run_portal_url(run_id=run.name)
# print portal url when executing in jupyter notebook
if in_jupyter_notebook():
print(f"Portal url: {self._get_run_portal_url(run_id=run.name)}")
print(f"Portal url: {portal_url}")

return portal_url

def _registry_existing_bulk_run(self, run: Run):
def _register_existing_bulk_run(self, run: Run):
"""Register the run in the cloud"""
rest_obj = run._to_rest_object()
self._service_caller.create_existing_bulk_run(
Expand Down
3 changes: 3 additions & 0 deletions src/promptflow-devkit/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## v1.11.0 (Upcoming)

### Features Added
- Upload local run details to cloud when trace destination is configured to cloud.

### Improvements
- Interactive browser credential is excluded by default when using Azure AI connections, user could set `PF_NO_INTERACTIVE_LOGIN=False` to enable it.
- Visualize flex flow run(s) switches to trace UI page.
Expand Down
5 changes: 5 additions & 0 deletions src/promptflow-devkit/promptflow/_sdk/_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DEFAULT_ENCODING,
FLOW_DIRECTORY_MACRO_IN_CONFIG,
HOME_PROMPT_FLOW_DIR,
REMOTE_URI_PREFIX,
SERVICE_CONFIG_FILE,
)
from promptflow._sdk._utilities.general_utils import call_from_extension, gen_uuid_by_compute_info, read_write_by_user
Expand Down Expand Up @@ -215,6 +216,10 @@ def get_trace_destination(self, path: Optional[Path] = None) -> Optional[str]:
logger.debug("trace destination does not need to be resolved, directly return...")
return value

def _is_cloud_trace_destination(self, path: Optional[Path] = None) -> bool:
trace_destination = self.get_trace_destination(path=path)
return trace_destination and trace_destination.startswith(REMOTE_URI_PREFIX)

def _resolve_trace_destination(self, path: Optional[Path] = None) -> str:
return "azureml:/" + self._get_workspace_from_config(path=path)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Union

from promptflow._constants import SystemMetricKeys
from promptflow._sdk._constants import REMOTE_URI_PREFIX, ContextAttributeKey, FlowRunProperties
from promptflow._sdk._constants import ContextAttributeKey, FlowRunProperties
from promptflow._sdk.entities._flows import Flow, Prompty
from promptflow._sdk.entities._run import Run
from promptflow._sdk.operations._local_storage_operations import LocalStorageOperations
Expand Down Expand Up @@ -229,10 +229,9 @@ def _submit_bulk_run(
)

# upload run to cloud if the trace destination is set to cloud
trace_destination = self._config.get_trace_destination(path=run._get_flow_dir().resolve())
if trace_destination and trace_destination.startswith(REMOTE_URI_PREFIX):
logger.debug(f"Trace destination set to {trace_destination!r}, uploading run to cloud...")
self._upload_run_to_cloud(run=run, config=self._config)
if self._config._is_cloud_trace_destination(path=run._get_flow_dir().resolve()):
portal_url = self._upload_run_to_cloud(run=run, config=self._config)
self.run_operations.update(name=run.name, portal_url=portal_url)

def _resolve_input_dirs(self, run: Run):
result = {"data": run.data if run.data else None}
Expand Down Expand Up @@ -264,7 +263,7 @@ def _validate_column_mapping(cls, column_mapping: dict):
)

@classmethod
def _upload_run_to_cloud(cls, run: Run, config=None):
def _upload_run_to_cloud(cls, run: Run, config=None) -> str:
error_msg_prefix = f"Failed to upload run {run.name!r} to cloud."
try:
from promptflow._sdk._tracing import _get_ws_triad_from_pf_config
Expand All @@ -276,7 +275,7 @@ def _upload_run_to_cloud(cls, run: Run, config=None):
resource_group=ws_triad.resource_group_name,
workspace_name=ws_triad.workspace_name,
)
pf.runs._upload(run=run)
return pf.runs._upload(run=run)
except ImportError as e:
error_message = (
f'{error_msg_prefix}. "promptflow[azure]" is required for local to cloud tracing experience, '
Expand Down
7 changes: 6 additions & 1 deletion src/promptflow-devkit/promptflow/_sdk/_orm/run_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ class RunInfo(Base):
end_time = Column(TEXT) # ISO8601("YYYY-MM-DD HH:MM:SS.SSS"), string
data = Column(TEXT) # local path of original run data, string
run_source = Column(TEXT) # run source, string
portal_url = Column(TEXT) # portal url when trace destination is set to cloud, string

__table_args__ = (Index(RUN_INFO_CREATED_ON_INDEX_NAME, "created_on"),)
# schema version, increase the version number when you change the schema
__pf_schema_version__ = "3"
__pf_schema_version__ = "4"

@sqlite_retry
def dump(self) -> None:
Expand Down Expand Up @@ -96,6 +97,7 @@ def update(
start_time: Optional[Union[str, datetime.datetime]] = None,
end_time: Optional[Union[str, datetime.datetime]] = None,
system_metrics: Optional[Dict[str, int]] = None,
portal_url: Optional[str] = None,
) -> None:
update_dict = {}
if status is not None:
Expand All @@ -116,6 +118,9 @@ def update(
if end_time is not None:
self.end_time = end_time if isinstance(end_time, str) else end_time.isoformat()
update_dict["end_time"] = self.end_time
if portal_url is not None:
self.portal_url = portal_url
update_dict["portal_url"] = self.portal_url
with mgmt_db_session() as session:
# if not update system metrics, we can directly update the row;
# otherwise, we need to get properties first, update the dict and finally update the row
Expand Down
17 changes: 15 additions & 2 deletions src/promptflow-devkit/promptflow/_sdk/_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,10 @@ def _print_tracing_url_from_azure_portal(
return

url = url.format(query=query)
print(f"You can view the traces in cloud from Azure portal: {url}")
print(
f"You can view the traces in the cloud from the Azure portal. "
f"Please note that the page may remain empty for a while until the traces are successfully uploaded:\n{url}."
)


def _inject_res_attrs_to_environ(
Expand Down Expand Up @@ -371,7 +374,9 @@ def start_trace_with_devkit(collection: str, **kwargs: typing.Any) -> None:
_logger.debug("kwargs: %s", kwargs)
attrs = kwargs.get("attributes", None)
run = kwargs.get("run", None)
flow_path = None
if isinstance(run, Run):
flow_path = run._get_flow_dir().resolve()
run_config = run._config
run = run.name
else:
Expand Down Expand Up @@ -440,7 +445,15 @@ def start_trace_with_devkit(collection: str, **kwargs: typing.Any) -> None:
return
# print tracing url(s) when run is specified
_print_tracing_url_from_local(pfs_port=pfs_port, collection=collection, exp=exp, run=run)
if ws_triad is not None and is_azure_ext_installed:

if run is not None and run_config._is_cloud_trace_destination(path=flow_path):
trace_destination = run_config.get_trace_destination(path=flow_path)
print(
zhengfeiwang marked this conversation as resolved.
Show resolved Hide resolved
f"You can view the traces in azure portal since trace destination is set to: {trace_destination}. "
f"The link will be printed once the run is finished."
)
elif ws_triad is not None and is_azure_ext_installed:
# if run does not exist, print collection trace link
_print_tracing_url_from_azure_portal(ws_triad=ws_triad, collection=collection, exp=exp, run=run)
0mza987 marked this conversation as resolved.
Show resolved Hide resolved


Expand Down
4 changes: 4 additions & 0 deletions src/promptflow-devkit/promptflow/_sdk/entities/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ def _from_orm_object(cls, obj: ORMRun) -> "Run":
command=properties_json.get(FlowRunProperties.COMMAND, None),
outputs=properties_json.get(FlowRunProperties.OUTPUTS, None),
column_mapping=properties_json.get(FlowRunProperties.COLUMN_MAPPING, None),
portal_url=obj.portal_url,
)

@classmethod
Expand Down Expand Up @@ -428,6 +429,7 @@ def _to_orm_object(self) -> ORMRun:
properties=json.dumps(self.properties, default=asdict),
data=Path(self.data).resolve().absolute().as_posix() if self.data else None,
run_source=self._run_source,
portal_url=self._portal_url,
)

def _dump(self) -> None:
Expand Down Expand Up @@ -474,6 +476,8 @@ def _to_dict(
if exclude_debug_info:
exception_dict.pop("debugInfo", None)
result["error"] = exception_dict
if self._portal_url:
result[RunDataKeys.PORTAL_URL] = self._portal_url
elif self._run_source == RunInfoSources.INDEX_SERVICE:
result["creation_context"] = self._creation_context
result["flow_name"] = self._experiment_name
Expand Down