diff --git a/jira/client.py b/jira/client.py index 0c456a3f9..10f449036 100644 --- a/jira/client.py +++ b/jira/client.py @@ -18,20 +18,19 @@ import os import re import sys +import tempfile import time import urllib import warnings from collections import OrderedDict -from collections.abc import Iterable -from functools import lru_cache, wraps +from collections.abc import Iterable, Iterator +from functools import cache, wraps from io import BufferedReader from numbers import Number from typing import ( Any, Callable, Generic, - Iterator, - List, Literal, SupportsIndex, TypeVar, @@ -42,7 +41,6 @@ import requests from packaging.version import parse as parse_version -from PIL import Image from requests import Response from requests.auth import AuthBase from requests.structures import CaseInsensitiveDict @@ -76,6 +74,7 @@ IssueTypeScheme, NotificationScheme, PermissionScheme, + PinnedComment, Priority, PriorityScheme, Project, @@ -208,7 +207,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: def _field_worker( - fields: dict[str, Any] = None, **fieldargs: Any + fields: dict[str, Any] | None = None, **fieldargs: Any ) -> dict[str, dict[str, Any]] | dict[str, dict[str, str]]: if fields is not None: return {"fields": fields} @@ -221,7 +220,7 @@ def _field_worker( class ResultList(list, Generic[ResourceType]): def __init__( self, - iterable: Iterable = None, + iterable: Iterable | None = None, _startAt: int = 0, _maxResults: int = 0, _total: int | None = None, @@ -315,7 +314,10 @@ class JiraCookieAuth(AuthBase): """ def __init__( - self, session: ResilientSession, session_api_url: str, auth: tuple[str, str] + self, + session: ResilientSession, + session_api_url: str, + auth: tuple[str, str], ): """Cookie Based Authentication. @@ -464,23 +466,23 @@ class JIRA: def __init__( self, - server: str = None, - options: dict[str, str | bool | Any] = None, + server: str | None = None, + options: dict[str, str | bool | Any] | None = None, basic_auth: tuple[str, str] | None = None, token_auth: str | None = None, - oauth: dict[str, Any] = None, - jwt: dict[str, Any] = None, + oauth: dict[str, Any] | None = None, + jwt: dict[str, Any] | None = None, kerberos=False, - kerberos_options: dict[str, Any] = None, + kerberos_options: dict[str, Any] | None = None, validate=False, get_server_info: bool = True, async_: bool = False, async_workers: int = 5, logging: bool = True, max_retries: int = 3, - proxies: Any = None, + proxies: Any | None = None, timeout: None | float | tuple[float, float] | tuple[float, None] | None = None, - auth: tuple[str, str] = None, + auth: tuple[str, str] | None = None, default_batch_sizes: dict[type[Resource], int | None] | None = None, ): """Construct a Jira client instance. @@ -559,7 +561,6 @@ def __init__( """ # force a copy of the tuple to be used in __del__() because # sys.version_info could have already been deleted in __del__() - self.sys_version_info = tuple(sys.version_info) if options is None: options = {} @@ -745,7 +746,8 @@ def close(self): # because other references are also in the process to be torn down, # see warning section in https://docs.python.org/2/reference/datamodel.html#object.__del__ pass - self._session = None + # TODO: https://github.com/pycontribs/jira/issues/1881 + self._session = None # type: ignore[arg-type,assignment] def _check_for_html_error(self, content: str): # Jira has the bad habit of returning errors in pages with 200 and embedding the @@ -771,7 +773,7 @@ def _fetch_pages( request_path: str, startAt: int = 0, maxResults: int = 50, - params: dict[str, Any] = None, + params: dict[str, Any] | None = None, base: str = JIRA_BASE_URL, use_post: bool = False, ) -> ResultList[ResourceType]: @@ -888,7 +890,10 @@ def json_params() -> dict[str, Any]: page_params["maxResults"] = page_size resource = self._get_json( - request_path, params=page_params, base=base, use_post=use_post + request_path, + params=page_params, + base=base, + use_post=use_post, ) if resource: next_items_page = self._get_items_from_page( @@ -901,12 +906,20 @@ def json_params() -> dict[str, Any]: break return ResultList( - items, start_at_from_response, max_results_from_response, total, is_last + items, + start_at_from_response, + max_results_from_response, + total, + is_last, ) else: # TODO: unreachable # it seems that search_users can return a list() containing a single user! return ResultList( - [item_type(self._options, self._session, resource)], 0, 1, 1, True + [item_type(self._options, self._session, resource)], + 0, + 1, + 1, + True, ) def _get_items_from_page( @@ -991,7 +1004,7 @@ def async_do(self, size: int = 10): # non-resource def application_properties( - self, key: str = None + self, key: str | None = None ) -> dict[str, str] | list[dict[str, str]]: """Return the mutable server application properties. @@ -1065,7 +1078,7 @@ def add_attachment( self, issue: str | int, attachment: str | BufferedReader, - filename: str = None, + filename: str | None = None, ) -> Attachment: """Attach an attachment to an issue and returns a Resource for it. @@ -1089,8 +1102,7 @@ def add_attachment( attachment_io = attachment if isinstance(attachment, BufferedReader) and attachment.mode != "rb": self.log.warning( - "%s was not opened in 'rb' mode, attaching file may fail." - % attachment.name + f"{attachment.name} was not opened in 'rb' mode, attaching file may fail." ) fname = filename @@ -1139,7 +1151,7 @@ def prepare( if not js or not isinstance(js, Iterable): raise JIRAError(f"Unable to parse JSON: {js}. Failed to add attachment?") jira_attachment = Attachment( - self._options, self._session, js[0] if isinstance(js, List) else js + self._options, self._session, js[0] if isinstance(js, list) else js ) if jira_attachment.size == 0: raise JIRAError( @@ -1424,7 +1436,11 @@ def dashboard_item_property( return dashboard_item_property def set_dashboard_item_property( - self, dashboard_id: str, item_id: str, property_key: str, value: dict[str, Any] + self, + dashboard_id: str, + item_id: str, + property_key: str, + value: dict[str, Any], ) -> DashboardItemProperty: """Set a dashboard item property. @@ -1570,10 +1586,10 @@ def favourite_filters(self) -> list[Filter]: def create_filter( self, - name: str = None, - description: str = None, - jql: str = None, - favourite: bool = None, + name: str | None = None, + description: str | None = None, + jql: str | None = None, + favourite: bool | None = None, ) -> Filter: """Create a new filter and return a filter Resource for it. @@ -1604,10 +1620,10 @@ def create_filter( def update_filter( self, filter_id, - name: str = None, - description: str = None, - jql: str = None, - favourite: bool = None, + name: str | None = None, + description: str | None = None, + jql: str | None = None, + favourite: bool | None = None, ): """Update a filter and return a filter Resource for it. @@ -1629,7 +1645,9 @@ def update_filter( url = self._get_url(f"filter/{filter_id}") r = self._session.put( - url, headers={"content-type": "application/json"}, data=json.dumps(data) + url, + headers={"content-type": "application/json"}, + data=json.dumps(data), ) raw_filter_json = json.loads(r.text) @@ -1637,7 +1655,7 @@ def update_filter( # Groups - def group(self, id: str, expand: Any = None) -> Group: + def group(self, id: str, expand: Any | None = None) -> Group: """Get a group Resource from the server. Args: @@ -1689,29 +1707,54 @@ def group_members(self, group: str) -> OrderedDict: Args: group (str): Name of the group. """ + users = {} + if self._version < (6, 0, 0): raise NotImplementedError( "Group members is not implemented in Jira before version 6.0, upgrade the instance, if possible." ) - params = {"groupname": group, "expand": "users"} - r = self._get_json("group", params=params) - size = r["users"]["size"] - end_index = r["users"]["end-index"] - - while end_index < size - 1: - params = { - "groupname": group, - "expand": f"users[{end_index + 1}:{end_index + 50}]", - } - r2 = self._get_json("group", params=params) - for user in r2["users"]["items"]: - r["users"]["items"].append(user) - end_index = r2["users"]["end-index"] + elif self._version < (10, 0, 0): + params = {"groupname": group, "expand": "users"} + r = self._get_json("group", params=params) size = r["users"]["size"] + end_index = r["users"]["end-index"] + + while end_index < size - 1: + params = { + "groupname": group, + "expand": f"users[{end_index + 1}:{end_index + 50}]", + } + r2 = self._get_json("group", params=params) + for user in r2["users"]["items"]: + r["users"]["items"].append(user) + end_index = r2["users"]["end-index"] + size = r["users"]["size"] + + users = r["users"]["items"] + + else: + params = {"groupname": group} + group_member_api_endpoint = "group/member" + r = self._get_json(group_member_api_endpoint, params=params) + end_index = r["maxResults"] + isLast = r["isLast"] + + while isLast is False: + params = { + "groupname": group, + "startAt": f"{end_index}", + } + r2 = self._get_json(group_member_api_endpoint, params=params) + isLast = r2["isLast"] + for user in r2["values"]: + r["values"].append(user) + end_index += r2["maxResults"] + + users = r["values"] result = {} - for user in r["users"]["items"]: + for user in users: # 'id' is likely available only in older JIRA Server, # it's not available on newer JIRA Server. # 'name' is not available in JIRA Cloud. @@ -1860,7 +1903,10 @@ def create_issue( raw_issue_json = json_loads(r) if "key" not in raw_issue_json: raise JIRAError( - status_code=r.status_code, response=r, url=url, text=json.dumps(data) + status_code=r.status_code, + response=r, + url=url, + text=json.dumps(data), ) if prefetch: return self.issue(raw_issue_json["key"]) @@ -2010,7 +2056,10 @@ def service_desk(self, id: str) -> ServiceDesk: @no_type_check # FIXME: This function does not do what it wants to with fieldargs def create_customer_request( - self, fields: dict[str, Any] = None, prefetch: bool = True, **fieldargs + self, + fields: dict[str, Any] | None = None, + prefetch: bool = True, + **fieldargs, ) -> Issue: """Create a new customer request and return an issue Resource for it. @@ -2438,7 +2487,10 @@ def add_remote_link( data: dict[str, Any] = {} if isinstance(destination, Issue) and destination.raw: - data["object"] = {"title": str(destination), "url": destination.permalink()} + data["object"] = { + "title": str(destination), + "url": destination.permalink(), + } for x in applicationlinks: if x["application"]["displayUrl"] == destination._options["server"]: data["globalId"] = "appId={}&issueId={}".format( @@ -2809,14 +2861,8 @@ def add_worklog( started (Optional[datetime.datetime]): Moment when the work is logged, if not specified will default to now user (Optional[str]): the user ID or name to use for this worklog visibility (Optional[Dict[str,Any]]): Details about any restrictions in the visibility of the worklog. - Optional when creating or updating a worklog. :: - ```js - { - "type": "group", # "group" or "role" - "value": "", - "identifier": "" # OPTIONAL - } - ``` + Example of visibility options when creating or updating a worklog. + ``{ "type": "group", "value": "", "identifier": ""}`` Returns: Worklog @@ -3262,7 +3308,7 @@ def create_temp_project_avatar( filename: str, size: int, avatar_img: bytes, - contentType: str = None, + contentType: str | None = None, auto_confirm: bool = False, ): """Register an image file as a project avatar. @@ -3277,7 +3323,7 @@ def create_temp_project_avatar( This method returns a dict of properties that can be used to crop a subarea of a larger image for use. This dict should be saved and passed to :py:meth:`confirm_project_avatar` to finish the avatar creation process. - If you want to cut out the middleman and confirm the avatar with Jira's default cropping, + If you want to confirm the avatar with Jira's default cropping, pass the 'auto_confirm' argument with a truthy value and :py:meth:`confirm_project_avatar` will be called for you before this method returns. Args: @@ -3485,6 +3531,36 @@ def resolution(self, id: str) -> Resolution: # Search + @overload + def search_issues( + self, + jql_str: str, + startAt: int = 0, + maxResults: int = 50, + validate_query: bool = True, + fields: str | list[str] | None = "*all", + expand: str | None = None, + properties: str | None = None, + *, + json_result: Literal[False] = False, + use_post: bool = False, + ) -> ResultList[Issue]: ... + + @overload + def search_issues( + self, + jql_str: str, + startAt: int = 0, + maxResults: int = 50, + validate_query: bool = True, + fields: str | list[str] | None = "*all", + expand: str | None = None, + properties: str | None = None, + *, + json_result: Literal[True], + use_post: bool = False, + ) -> dict[str, Any]: ... + def search_issues( self, jql_str: str, @@ -3494,6 +3570,7 @@ def search_issues( fields: str | list[str] | None = "*all", expand: str | None = None, properties: str | None = None, + *, json_result: bool = False, use_post: bool = False, ) -> dict[str, Any] | ResultList[Issue]: @@ -3808,7 +3885,7 @@ def create_temp_user_avatar( filename: str, size: int, avatar_img: bytes, - contentType: Any = None, + contentType: Any | None = None, auto_confirm: bool = False, ): """Register an image file as a user avatar. @@ -3823,7 +3900,7 @@ def create_temp_user_avatar( This method returns a dict of properties that can be used to crop a subarea of a larger image for use. This dict should be saved and passed to :py:meth:`confirm_user_avatar` to finish the avatar creation process. - If you want to cut out the middleman and confirm the avatar with Jira's default cropping, pass the ``auto_confirm`` argument with a truthy value and + If you want to confirm the avatar with Jira's default cropping, pass the ``auto_confirm`` argument with a truthy value and :py:meth:`confirm_user_avatar` will be called for you before this method returns. Args: @@ -3981,8 +4058,8 @@ def search_users( def search_allowed_users_for_issue( self, user: str, - issueKey: str = None, - projectKey: str = None, + issueKey: str | None = None, + projectKey: str | None = None, startAt: int = 0, maxResults: int = 50, ) -> ResultList: @@ -4014,9 +4091,9 @@ def create_version( self, name: str, project: str, - description: str = None, - releaseDate: Any = None, - startDate: Any = None, + description: str | None = None, + releaseDate: Any | None = None, + startDate: Any | None = None, archived: bool = False, released: bool = False, ) -> Version: @@ -4054,7 +4131,9 @@ def create_version( version = Version(self._options, self._session, raw=json_loads(r)) return version - def move_version(self, id: str, after: str = None, position: str = None) -> Version: + def move_version( + self, id: str, after: str | None = None, position: str | None = None + ) -> Version: """Move a version within a project's ordered version list and return a new version Resource for it. One, but not both, of ``after`` and ``position`` must be specified. @@ -4079,7 +4158,7 @@ def move_version(self, id: str, after: str = None, position: str = None) -> Vers version = Version(self._options, self._session, raw=json_loads(r)) return version - def version(self, id: str, expand: Any = None) -> Version: + def version(self, id: str, expand: Any | None = None) -> Version: """Get a version Resource. Args: @@ -4177,7 +4256,11 @@ def _create_oauth_session(self, oauth: dict[str, Any]): FALLBACK_SHA = DEFAULT_SHA _logging.debug("Fallback SHA 'SIGNATURE_RSA_SHA1' could not be imported.") - for sha_type in (oauth.get("signature_method"), DEFAULT_SHA, FALLBACK_SHA): + for sha_type in ( + oauth.get("signature_method"), + DEFAULT_SHA, + FALLBACK_SHA, + ): if sha_type is None: continue oauth_instance = OAuth1( @@ -4203,7 +4286,7 @@ def _create_oauth_session(self, oauth: dict[str, Any]): def _create_kerberos_session( self, - kerberos_options: dict[str, Any] = None, + kerberos_options: dict[str, Any] | None = None, ): if kerberos_options is None: kerberos_options = {} @@ -4216,8 +4299,9 @@ def _create_kerberos_session( mutual_authentication = DISABLED else: raise ValueError( - "Unknown value for mutual_authentication: %s" - % kerberos_options["mutual_authentication"] + "Unknown value for mutual_authentication: {}".format( + kerberos_options["mutual_authentication"] + ) ) self._session.auth = HTTPKerberosAuth( @@ -4229,7 +4313,7 @@ def _add_client_cert_to_session(self): If configured through the constructor. - https://docs.python-requests.org/en/master/user/advanced/#client-side-certificates + https://docs.python-requests.org/en/latest/user/advanced/#client-side-certificates - str: a single file (containing the private key and the certificate) - Tuple[str,str] a tuple of both files’ paths """ @@ -4241,7 +4325,7 @@ def _add_ssl_cert_verif_strategy_to_session(self): If configured through the constructor. - https://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification + https://docs.python-requests.org/en/latest/user/advanced/#ssl-cert-verification - str: Path to a `CA_BUNDLE` file or directory with certificates of trusted CAs. - bool: True/False """ @@ -4249,7 +4333,7 @@ def _add_ssl_cert_verif_strategy_to_session(self): self._session.verify = ssl_cert @staticmethod - def _timestamp(dt: datetime.timedelta = None): + def _timestamp(dt: datetime.timedelta | None = None): t = datetime.datetime.utcnow() if dt is not None: t += dt @@ -4297,7 +4381,11 @@ def _get_internal_url(self, path: str, base: str = JIRA_BASE_URL) -> str: """ options = self._options.copy() options.update( - {"path": path, "rest_api_version": "latest", "rest_path": "internal"} + { + "path": path, + "rest_api_version": "latest", + "rest_path": "internal", + } ) return base.format(**options) @@ -4336,7 +4424,7 @@ def _get_latest_url(self, path: str, base: str = JIRA_BASE_URL) -> str: def _get_json( self, path: str, - params: dict[str, Any] = None, + params: dict[str, Any] | None = None, base: str = JIRA_BASE_URL, use_post: bool = False, ): @@ -4425,7 +4513,10 @@ def _get_mime_type(self, buff: bytes) -> str | None: if self._magic is not None: return self._magic.id_buffer(buff) try: - return mimetypes.guess_type("f." + Image.open(buff).format)[0] + with tempfile.TemporaryFile() as f: + f.write(buff) + return mimetypes.guess_type(f.name)[0] + return mimetypes.guess_type(f.name)[0] except (OSError, TypeError): self.log.warning( "Couldn't detect content type of avatar image" @@ -4451,7 +4542,7 @@ def rename_user(self, old_user: str, new_user: str): self._session.put(url, params=params, data=json.dumps(payload)) else: raise NotImplementedError( - "Support for renaming users in Jira " "< 6.0.0 has been removed." + "Support for renaming users in Jira < 6.0.0 has been removed." ) def delete_user(self, username: str) -> bool: @@ -4518,9 +4609,9 @@ def deactivate_user(self, username: str) -> str | int: raise JIRAError(f"Error Deactivating {username}: {e}") else: url = self.server_url + "/secure/admin/user/EditUser.jspa" - self._options["headers"][ - "Content-Type" - ] = "application/x-www-form-urlencoded; charset=UTF-8" + self._options["headers"]["Content-Type"] = ( + "application/x-www-form-urlencoded; charset=UTF-8" + ) user = self.user(username) userInfo = { "inline": "true", @@ -4580,7 +4671,10 @@ def reindex(self, force: bool = False, background: bool = True) -> bool: r = self._session.post( url, headers=self._options["headers"], - params={"indexingStrategy": indexingStrategy, "reindex": "Re-Index"}, + params={ + "indexingStrategy": indexingStrategy, + "reindex": "Re-Index", + }, ) if r.text.find("All issues are being re-indexed") != -1: return True @@ -4657,6 +4751,8 @@ def backup_complete(self) -> bool | None: self.log.warning("This functionality is not available in Server version") return None status = self.backup_progress() + if not status: + raise RuntimeError("Failed to retrieve backup progress.") perc_search = re.search(r"\s([0-9]*)\s", status["alternativePercentage"]) perc_complete = int( perc_search.group(1) # type: ignore # ignore that re.search can return None @@ -4664,12 +4760,15 @@ def backup_complete(self) -> bool | None: file_size = int(status["size"]) return perc_complete >= 100 and file_size > 0 - def backup_download(self, filename: str = None): + def backup_download(self, filename: str | None = None): """Download backup file from WebDAV (cloud only).""" if not self._is_cloud: self.log.warning("This functionality is not available in Server version") return None - remote_file = self.backup_progress()["fileName"] + progress = self.backup_progress() + if not progress: + raise RuntimeError("Unable to retrieve backup progress.") + remote_file = progress["fileName"] local_file = filename or remote_file url = self.server_url + "/webdav/backupmanager/" + remote_file try: @@ -4768,7 +4867,7 @@ def _gain_sudo_session(self, options, destination): data=payload, ) - @lru_cache(maxsize=None) + @cache def templates(self) -> dict: url = self.server_url + "/rest/project-templates/latest/templates" @@ -4783,7 +4882,7 @@ def templates(self) -> dict: # pprint(templates.keys()) return templates - @lru_cache(maxsize=None) + @cache def permissionschemes(self): url = self._get_url("permissionscheme") @@ -4792,7 +4891,7 @@ def permissionschemes(self): return data["permissionSchemes"] - @lru_cache(maxsize=None) + @cache def issue_type_schemes(self) -> list[IssueTypeScheme]: """Get all issue type schemes defined (Admin required). @@ -4806,7 +4905,7 @@ def issue_type_schemes(self) -> list[IssueTypeScheme]: return data["schemes"] - @lru_cache(maxsize=None) + @cache def issuesecurityschemes(self): url = self._get_url("issuesecurityschemes") @@ -4815,7 +4914,7 @@ def issuesecurityschemes(self): return data["issueSecuritySchemes"] - @lru_cache(maxsize=None) + @cache def projectcategories(self): url = self._get_url("projectCategory") @@ -4824,7 +4923,7 @@ def projectcategories(self): return data - @lru_cache(maxsize=None) + @cache def avatars(self, entity="project"): url = self._get_url(f"avatar/{entity}/system") @@ -4833,7 +4932,7 @@ def avatars(self, entity="project"): return data["system"] - @lru_cache(maxsize=None) + @cache def notificationschemes(self): # TODO(ssbarnea): implement pagination support url = self._get_url("notificationscheme") @@ -4842,7 +4941,7 @@ def notificationschemes(self): data: dict[str, Any] = json_loads(r) return data["values"] - @lru_cache(maxsize=None) + @cache def screens(self): # TODO(ssbarnea): implement pagination support url = self._get_url("screens") @@ -4851,7 +4950,7 @@ def screens(self): data: dict[str, Any] = json_loads(r) return data["values"] - @lru_cache(maxsize=None) + @cache def workflowscheme(self): # TODO(ssbarnea): implement pagination support url = self._get_url("workflowschemes") @@ -4860,7 +4959,7 @@ def workflowscheme(self): data = json_loads(r) return data # ['values'] - @lru_cache(maxsize=None) + @cache def workflows(self): # TODO(ssbarnea): implement pagination support url = self._get_url("workflow") @@ -4904,16 +5003,16 @@ def get_issue_type_scheme_associations(self, id: str) -> list[Project]: def create_project( self, key: str, - name: str = None, - assignee: str = None, + name: str | None = None, + assignee: str | None = None, ptype: str = "software", - template_name: str = None, - avatarId: int = None, - issueSecurityScheme: int = None, - permissionScheme: int = None, - projectCategory: int = None, + template_name: str | None = None, + avatarId: int | None = None, + issueSecurityScheme: int | None = None, + permissionScheme: int | None = None, + projectCategory: int | None = None, notificationScheme: int = 10000, - categoryId: int = None, + categoryId: int | None = None, url: str = "", ): """Create a project with the specified parameters. @@ -4956,6 +5055,8 @@ def create_project( break if permissionScheme is None and ps_list: permissionScheme = ps_list[0]["id"] + if permissionScheme is None: + raise RuntimeError("Unable to identify valid permissionScheme") if issueSecurityScheme is None: ps_list = self.issuesecurityschemes() @@ -4965,6 +5066,8 @@ def create_project( break if issueSecurityScheme is None and ps_list: issueSecurityScheme = ps_list[0]["id"] + if issueSecurityScheme is None: + raise RuntimeError("Unable to identify valid issueSecurityScheme") # If categoryId provided instead of projectCategory, attribute the categoryId value # to the projectCategory variable @@ -5080,8 +5183,8 @@ def add_user( username: str, email: str, directoryId: int = 1, - password: str = None, - fullname: str = None, + password: str | None = None, + fullname: str | None = None, notify: bool = False, active: bool = True, ignore_existing: bool = False, @@ -5217,8 +5320,8 @@ def boards( self, startAt: int = 0, maxResults: int = 50, - type: str = None, - name: str = None, + type: str | None = None, + name: str | None = None, projectKeyOrID=None, ) -> ResultList[Board]: """Get a list of board resources. @@ -5258,7 +5361,7 @@ def sprints( extended: bool | None = None, startAt: int = 0, maxResults: int = 50, - state: str = None, + state: str | None = None, ) -> ResultList[Sprint]: """Get a list of sprint Resources. @@ -5291,7 +5394,7 @@ def sprints( ) def sprints_by_name( - self, id: str | int, extended: bool = False, state: str = None + self, id: str | int, extended: bool = False, state: str | None = None ) -> dict[str, dict[str, Any]]: """Get a dictionary of sprint Resources where the name of the sprint is the key. @@ -5425,7 +5528,7 @@ def create_board( self, name: str, filter_id: str, - project_ids: str = None, + project_ids: str | None = None, preset: str = "scrum", location_type: Literal["user", "project"] = "user", location_id: str | None = None, @@ -5529,7 +5632,10 @@ def add_issues_to_sprint(self, sprint_id: int, issue_keys: list[str]) -> Respons return self._session.post(url, data=json.dumps(payload)) def add_issues_to_epic( - self, epic_id: str, issue_keys: str | list[str], ignore_epics: bool = None + self, + epic_id: str, + issue_keys: str | list[str], + ignore_epics: bool | None = None, ) -> Response: """Add the issues in ``issue_keys`` to the ``epic_id``. @@ -5625,3 +5731,36 @@ def move_to_backlog(self, issue_keys: list[str]) -> Response: url = self._get_url("backlog/issue", base=self.AGILE_BASE_URL) payload = {"issues": issue_keys} # TODO: should be list of issues return self._session.post(url, data=json.dumps(payload)) + + @translate_resource_args + def pinned_comments(self, issue: int | str) -> list[PinnedComment]: + """Get a list of pinned comment Resources of the issue provided. + + Args: + issue (Union[int, str]): the issue ID or key to get the comments from + + Returns: + List[PinnedComment] + """ + r_json = self._get_json(f"issue/{issue}/pinned-comments", params={}) + + pinned_comments = [ + PinnedComment(self._options, self._session, raw_comment_json) + for raw_comment_json in r_json + ] + return pinned_comments + + @translate_resource_args + def pin_comment(self, issue: int | str, comment: int | str, pin: bool) -> Response: + """Pin/Unpin a comment on the issue. + + Args: + issue (Union[int, str]): the issue ID or key to get the comments from + comment (Union[int, str]): the comment ID + pin (bool): Pin (True) or Unpin (False) + + Returns: + Response + """ + url = self._get_url("issue/" + str(issue) + "/comment/" + str(comment) + "/pin") + return self._session.put(url, data=str(pin).lower())