-
-
Notifications
You must be signed in to change notification settings - Fork 509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add boolean permission support #3451
Changes from 30 commits
ee8d93c
9a8ce6c
ea4040b
ecedc67
f442f06
7f02182
8b08575
6300052
796dc99
9c9eedc
c682525
9694236
c38d868
d455926
af0c1d4
340f4a0
f00a5a6
7823b38
70e695e
ed67c14
7064f74
62ec845
cf53546
496cb36
6226b39
d8c9cce
425d739
4a2805a
e1c2b68
d338665
49f9d81
9ed6a54
29fe77f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
Release type: patch | ||
|
||
Adds the ability to use the `&` and `|` operators on permissions to form boolean logic. For example, if you want | ||
a field to be accessible with either the `IsAdmin` or `IsOwner` permission you | ||
could define the field as follows: | ||
|
||
```python | ||
import strawberry | ||
from strawberry.permission import PermissionExtension, BasePermission | ||
|
||
|
||
@strawberry.type | ||
class Query: | ||
@strawberry.field( | ||
extensions=[ | ||
PermissionExtension( | ||
permissions=[(IsAdmin() | IsOwner())], fail_silently=True | ||
) | ||
] | ||
) | ||
def name(self) -> str: | ||
return "ABC" | ||
``` |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -14,6 +14,7 @@ | |||||||||||||||||||||||||
Type, | ||||||||||||||||||||||||||
Union, | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
from typing_extensions import deprecated | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
from strawberry.exceptions import StrawberryGraphQLError | ||||||||||||||||||||||||||
from strawberry.exceptions.permission_fail_silently_requires_optional import ( | ||||||||||||||||||||||||||
|
@@ -51,17 +52,35 @@ | |||||||||||||||||||||||||
@abc.abstractmethod | ||||||||||||||||||||||||||
def has_permission( | ||||||||||||||||||||||||||
self, source: Any, info: Info, **kwargs: Any | ||||||||||||||||||||||||||
) -> Union[bool, Awaitable[bool]]: | ||||||||||||||||||||||||||
) -> Union[bool, Awaitable[bool], (False, dict), Awaitable[(False, dict)]]: | ||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||
This method is a required override in the permission class. It checks if the user has the necessary permissions to access a specific field. | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
The method should return a boolean value: | ||||||||||||||||||||||||||
- True: The user has the necessary permissions. | ||||||||||||||||||||||||||
- False: The user does not have the necessary permissions. In this case, the `on_unauthorized` method will be invoked. | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
Avoid raising exceptions in this method. Instead, use the `on_unauthorized` method to handle errors and customize the error response. | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
If there's a need to pass additional information to the `on_unauthorized` method, return a tuple. The first element should be False, and the second element should be a dictionary containing the additional information. | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
Args: | ||||||||||||||||||||||||||
source (Any): The source field that the permission check is being performed on. | ||||||||||||||||||||||||||
info (Info): The GraphQL resolve info associated with the field. | ||||||||||||||||||||||||||
**kwargs (Any): Additional arguments that are typically passed to the field resolver. | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||||
bool or tuple: Returns True if the user has the necessary permissions. Returns False or a tuple (False, additional_info) if the user does not have the necessary permissions. In the latter case, the `on_unauthorized` method will be invoked. | ||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||
raise NotImplementedError( | ||||||||||||||||||||||||||
"Permission classes should override has_permission method" | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def on_unauthorized(self) -> None: | ||||||||||||||||||||||||||
def on_unauthorized(self, **kwargs: dict) -> None: | ||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||
Default error raising for permissions. | ||||||||||||||||||||||||||
This can be overridden to customize the behavior. | ||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
# Instantiate error class | ||||||||||||||||||||||||||
error = self.error_class(self.message or "") | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
@@ -74,6 +93,9 @@ | |||||||||||||||||||||||||
raise error | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@property | ||||||||||||||||||||||||||
@deprecated( | ||||||||||||||||||||||||||
"@schema_directive is deprecated and will be disabled by default on 31.12.2024 with future removal planned. Use the new @permissions directive instead." | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
def schema_directive(self) -> object: | ||||||||||||||||||||||||||
if not self._schema_directive: | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
@@ -89,6 +111,85 @@ | |||||||||||||||||||||||||
|
||||||||||||||||||||||||||
return self._schema_directive | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@cached_property | ||||||||||||||||||||||||||
def is_async(self) -> bool: | ||||||||||||||||||||||||||
return iscoroutinefunction(self.has_permission) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def __and__(self, other: BasePermission): | ||||||||||||||||||||||||||
return AndPermission([self, other]) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def __or__(self, other: BasePermission): | ||||||||||||||||||||||||||
return OrPermission([self, other]) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
class CompositePermission(BasePermission, abc.ABC): | ||||||||||||||||||||||||||
def __init__(self, child_permissions: List[BasePermission]): | ||||||||||||||||||||||||||
self.child_permissions = child_permissions | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def on_unauthorized(self, **kwargs: dict) -> Any: | ||||||||||||||||||||||||||
failed_permissions = kwargs.get("failed_permissions", []) | ||||||||||||||||||||||||||
for permission in failed_permissions: | ||||||||||||||||||||||||||
permission.on_unauthorized() | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@cached_property | ||||||||||||||||||||||||||
def is_async(self) -> bool: | ||||||||||||||||||||||||||
return any(x.is_async for x in self.child_permissions) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
class AndPermission(CompositePermission): | ||||||||||||||||||||||||||
def has_permission( | ||||||||||||||||||||||||||
self, source: Any, info: Info, **kwargs: dict | ||||||||||||||||||||||||||
) -> Union[bool, Awaitable[bool], (False, dict), Awaitable[(False, dict)]]: | ||||||||||||||||||||||||||
if self.is_async: | ||||||||||||||||||||||||||
return self._has_permission_async(source, info, **kwargs) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
for permission in self.child_permissions: | ||||||||||||||||||||||||||
if not permission.has_permission(source, info, **kwargs): | ||||||||||||||||||||||||||
return False, {"failed_permissions": [permission]} | ||||||||||||||||||||||||||
return True | ||||||||||||||||||||||||||
Comment on lines
+157
to
+160
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion (code-quality): Use the built-in function
Suggested change
|
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
async def _has_permission_async( | ||||||||||||||||||||||||||
self, source: Any, info: Info, **kwargs: dict | ||||||||||||||||||||||||||
) -> Union[bool, (False, dict)]: | ||||||||||||||||||||||||||
for permission in self.child_permissions: | ||||||||||||||||||||||||||
if not await await_maybe(permission.has_permission(source, info, **kwargs)): | ||||||||||||||||||||||||||
return False, {"failed_permissions": [permission]} | ||||||||||||||||||||||||||
return True | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def __and__(self, other: BasePermission): | ||||||||||||||||||||||||||
self.child_permissions.append(other) | ||||||||||||||||||||||||||
erikwrede marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||
return self | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
class OrPermission(CompositePermission): | ||||||||||||||||||||||||||
def has_permission( | ||||||||||||||||||||||||||
self, source: Any, info: Info, **kwargs: dict | ||||||||||||||||||||||||||
) -> Union[bool, Awaitable[bool], (False, dict), Awaitable[(False, dict)]]: | ||||||||||||||||||||||||||
if self.is_async: | ||||||||||||||||||||||||||
return self._has_permission_async(source, info, **kwargs) | ||||||||||||||||||||||||||
failed_permissions = [] | ||||||||||||||||||||||||||
for permission in self.child_permissions: | ||||||||||||||||||||||||||
if permission.has_permission(source, info, **kwargs): | ||||||||||||||||||||||||||
return True | ||||||||||||||||||||||||||
failed_permissions.append(permission) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
return False, {"failed_permissions": failed_permissions} | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably get rid of dictionaries here, maybe make a dataclass instead 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea here was to remain as flexible as possible, allowing users to pass context in the future as well. With a dataclass we would be again very static about handling additional permission error context. Maybe a TypedDict can help improve our defaults for this particular case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, I'm not sure how context would be used here, is it in the release noted (that I didn't read)? 😅 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's internal for now, but it allows us to pass information to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So basically the context contains "this permission failed out of the list of and / or permissions" |
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
async def _has_permission_async( | ||||||||||||||||||||||||||
self, source: Any, info: Info, **kwargs: dict | ||||||||||||||||||||||||||
) -> Union[bool, (False, dict)]: | ||||||||||||||||||||||||||
failed_permissions = [] | ||||||||||||||||||||||||||
for permission in self.child_permissions: | ||||||||||||||||||||||||||
if await await_maybe(permission.has_permission(source, info, **kwargs)): | ||||||||||||||||||||||||||
return True | ||||||||||||||||||||||||||
failed_permissions.append(permission) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
return False, {"failed_permissions": failed_permissions} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def __or__(self, other: BasePermission): | ||||||||||||||||||||||||||
self.child_permissions.append(other) | ||||||||||||||||||||||||||
return self | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
class PermissionExtension(FieldExtension): | ||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||
|
@@ -100,8 +201,8 @@ | |||||||||||||||||||||||||
|
||||||||||||||||||||||||||
NOTE: | ||||||||||||||||||||||||||
Currently, this is automatically added to the field, when using | ||||||||||||||||||||||||||
field.permission_classes | ||||||||||||||||||||||||||
This is deprecated behavior, please manually add the extension to field.extensions | ||||||||||||||||||||||||||
field.permission_classes. You are free to use whichever method you prefer. | ||||||||||||||||||||||||||
Use PermissionExtension if you want additional customization. | ||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def __init__( | ||||||||||||||||||||||||||
|
@@ -117,12 +218,16 @@ | |||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def apply(self, field: StrawberryField) -> None: | ||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||
Applies all of the permission directives to the schema | ||||||||||||||||||||||||||
Applies all the permission directives to the schema | ||||||||||||||||||||||||||
and sets up silent permissions | ||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||
if self.use_directives: | ||||||||||||||||||||||||||
field.directives.extend( | ||||||||||||||||||||||||||
p.schema_directive for p in self.permissions if p.schema_directive | ||||||||||||||||||||||||||
[ | ||||||||||||||||||||||||||
p.schema_directive | ||||||||||||||||||||||||||
for p in self.permissions | ||||||||||||||||||||||||||
if not isinstance(p, CompositePermission) | ||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
# We can only fail silently if the field is optional or a list | ||||||||||||||||||||||||||
if self.fail_silently: | ||||||||||||||||||||||||||
|
@@ -132,13 +237,16 @@ | |||||||||||||||||||||||||
elif isinstance(field.type, StrawberryList): | ||||||||||||||||||||||||||
self.return_empty_list = True | ||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||
errror = PermissionFailSilentlyRequiresOptionalError(field) | ||||||||||||||||||||||||||
raise errror | ||||||||||||||||||||||||||
error = PermissionFailSilentlyRequiresOptionalError(field) | ||||||||||||||||||||||||||
raise error | ||||||||||||||||||||||||||
erikwrede marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def _on_unauthorized(self, permission: BasePermission) -> Any: | ||||||||||||||||||||||||||
def _on_unauthorized(self, permission: BasePermission, **kwargs: dict) -> Any: | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is probably a mistake - with kwargs you annotate value type (e.g. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there are no particular operations performed on kwargs values you could annotate them as |
||||||||||||||||||||||||||
if self.fail_silently: | ||||||||||||||||||||||||||
return [] if self.return_empty_list else None | ||||||||||||||||||||||||||
return permission.on_unauthorized() | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
if kwargs in (None, {}): | ||||||||||||||||||||||||||
return permission.on_unauthorized() | ||||||||||||||||||||||||||
return permission.on_unauthorized(**kwargs) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def resolve( | ||||||||||||||||||||||||||
self, | ||||||||||||||||||||||||||
|
@@ -151,9 +259,19 @@ | |||||||||||||||||||||||||
Checks if the permission should be accepted and | ||||||||||||||||||||||||||
raises an exception if not | ||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
for permission in self.permissions: | ||||||||||||||||||||||||||
if not permission.has_permission(source, info, **kwargs): | ||||||||||||||||||||||||||
return self._on_unauthorized(permission) | ||||||||||||||||||||||||||
permission_response = permission.has_permission(source, info, **kwargs) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
context = {} | ||||||||||||||||||||||||||
if isinstance(permission_response, tuple): | ||||||||||||||||||||||||||
has_permission, context = permission_response | ||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||
has_permission = permission_response | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
if not has_permission: | ||||||||||||||||||||||||||
return self._on_unauthorized(permission, **context) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
return next_(source, info, **kwargs) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
async def resolve_async( | ||||||||||||||||||||||||||
|
@@ -164,12 +282,18 @@ | |||||||||||||||||||||||||
**kwargs: Dict[str, Any], | ||||||||||||||||||||||||||
) -> Any: | ||||||||||||||||||||||||||
for permission in self.permissions: | ||||||||||||||||||||||||||
has_permission = await await_maybe( | ||||||||||||||||||||||||||
permission_response = await await_maybe( | ||||||||||||||||||||||||||
permission.has_permission(source, info, **kwargs) | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
context = {} | ||||||||||||||||||||||||||
if isinstance(permission_response, tuple): | ||||||||||||||||||||||||||
has_permission, context = permission_response | ||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||
has_permission = permission_response | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
if not has_permission: | ||||||||||||||||||||||||||
return self._on_unauthorized(permission) | ||||||||||||||||||||||||||
return self._on_unauthorized(permission, **context) | ||||||||||||||||||||||||||
next = next_(source, info, **kwargs) | ||||||||||||||||||||||||||
if inspect.isasyncgen(next): | ||||||||||||||||||||||||||
return next | ||||||||||||||||||||||||||
|
@@ -179,9 +303,4 @@ | |||||||||||||||||||||||||
def supports_sync(self) -> bool: | ||||||||||||||||||||||||||
"""The Permission extension always supports async checking using await_maybe, | ||||||||||||||||||||||||||
but only supports sync checking if there are no async permissions""" | ||||||||||||||||||||||||||
async_permissions = [ | ||||||||||||||||||||||||||
True | ||||||||||||||||||||||||||
for permission in self.permissions | ||||||||||||||||||||||||||
if iscoroutinefunction(permission.has_permission) | ||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||
return len(async_permissions) == 0 | ||||||||||||||||||||||||||
return all(not permission.is_async for permission in self.permissions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code_refinement): Consider simplifying the return type for better maintainability.
The method now has multiple return types, which could complicate the handling of its output. Simplifying to a consistent return type, possibly encapsulated within a class or structure, might improve maintainability and readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, this looks like a mistake
(a, b)
is not a valid type 🤔 Perhaps you needTuple[Literal[False], dict]
?