4
4
from typing import Optional
5
5
6
6
from cryptojwt .jwk .jwk import key_from_jwk_dict
7
- from cryptojwt .jws .jws import JWS
8
7
from cryptojwt .jws .jws import factory
8
+ from cryptojwt .jws .jws import JWS
9
9
from cryptojwt .key_bundle import key_by_alg
10
10
11
+ from idpyoidc .client .client_auth import BearerHeader
12
+ from idpyoidc .client .client_auth import find_token_info
11
13
from idpyoidc .client .service_context import ServiceContext
14
+ from idpyoidc .message import Message
12
15
from idpyoidc .message import SINGLE_OPTIONAL_STRING
13
16
from idpyoidc .message import SINGLE_REQUIRED_INT
14
17
from idpyoidc .message import SINGLE_REQUIRED_JSON
15
18
from idpyoidc .message import SINGLE_REQUIRED_STRING
16
- from idpyoidc .message import Message
17
19
from idpyoidc .metadata import get_signing_algs
18
20
from idpyoidc .time_util import utc_time_sans_frac
19
21
@@ -91,13 +93,13 @@ def verify_header(self, dpop_header) -> Optional["DPoPProof"]:
91
93
92
94
93
95
def dpop_header (
94
- service_context : ServiceContext ,
95
- service_endpoint : str ,
96
- http_method : str ,
97
- headers : Optional [dict ] = None ,
98
- token : Optional [str ] = "" ,
99
- nonce : Optional [str ] = "" ,
100
- ** kwargs
96
+ service_context : ServiceContext ,
97
+ service_endpoint : str ,
98
+ http_method : str ,
99
+ headers : Optional [dict ] = None ,
100
+ token : Optional [str ] = "" ,
101
+ nonce : Optional [str ] = "" ,
102
+ ** kwargs
101
103
) -> dict :
102
104
"""
103
105
@@ -159,7 +161,7 @@ def dpop_header(
159
161
return headers
160
162
161
163
162
- def add_support (services , dpop_signing_alg_values_supported ):
164
+ def add_support (services , dpop_signing_alg_values_supported , with_dpop_header = None ):
163
165
"""
164
166
Add the necessary pieces to make pushed authorization happen.
165
167
@@ -185,3 +187,53 @@ def add_support(services, dpop_signing_alg_values_supported):
185
187
_userinfo_service = services .get ("userinfo" )
186
188
if _userinfo_service :
187
189
_userinfo_service .construct_extra_headers .append (dpop_header )
190
+ # To be backward compatible
191
+ if with_dpop_header is None :
192
+ with_dpop_header = ["userinfo" ]
193
+
194
+ # Add dpop HTTP header to these
195
+ for _srv in with_dpop_header :
196
+ if _srv == "accesstoken" :
197
+ continue
198
+ _service = services .get (_srv )
199
+ if _service :
200
+ _service .construct_extra_headers .append (dpop_header )
201
+
202
+
203
+ class DPoPClientAuth (BearerHeader ):
204
+ tag = "dpop_client_auth"
205
+
206
+ def construct (self , request = None , service = None , http_args = None , ** kwargs ):
207
+ """
208
+ Constructing the Authorization header. The value of
209
+ the Authorization header is "Bearer <access_token>".
210
+
211
+ :param request: Request class instance
212
+ :param service: The service this authentication method applies to.
213
+ :param http_args: HTTP header arguments
214
+ :param kwargs: extra keyword arguments
215
+ :return:
216
+ """
217
+
218
+ _token_type = "access_token"
219
+
220
+ _token_info = find_token_info (request , _token_type , service , ** kwargs )
221
+
222
+ if not _token_info :
223
+ raise KeyError ("No bearer token available" )
224
+
225
+ # The authorization value starts with the token_type
226
+ # if _token_info["token_type"].to_lower() != "bearer":
227
+ _bearer = f"DPoP { _token_info [_token_type ]} "
228
+
229
+ # Add 'Authorization' to the headers
230
+ if http_args is None :
231
+ http_args = {"headers" : {}}
232
+ http_args ["headers" ]["Authorization" ] = _bearer
233
+ else :
234
+ try :
235
+ http_args ["headers" ]["Authorization" ] = _bearer
236
+ except KeyError :
237
+ http_args ["headers" ] = {"Authorization" : _bearer }
238
+
239
+ return http_args
0 commit comments