• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

requests / requests-oauthlib / 8621812797

09 Apr 2024 08:22PM UTC coverage: 90.097%. Remained the same
8621812797

Pull #538

github

web-flow
Merge 59f3d6a0e into 45eca1a69
Pull Request #538: Added secrets to actions to run test example

464 of 515 relevant lines covered (90.1%)

5.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.73
/requests_oauthlib/oauth1_session.py
1
from urllib.parse import urlparse
6✔
2

3
import logging
6✔
4

5
from oauthlib.common import add_params_to_uri
6✔
6
from oauthlib.common import urldecode as _urldecode
6✔
7
from oauthlib.oauth1 import SIGNATURE_HMAC, SIGNATURE_RSA, SIGNATURE_TYPE_AUTH_HEADER
6✔
8
import requests
6✔
9

10
from . import OAuth1
6✔
11

12

13
log = logging.getLogger(__name__)
6✔
14

15

16
def urldecode(body):
6✔
17
    """Parse query or json to python dictionary"""
18
    try:
6✔
19
        return _urldecode(body)
6✔
20
    except Exception:
6✔
21
        import json
6✔
22

23
        return json.loads(body)
6✔
24

25

26
class TokenRequestDenied(ValueError):
6✔
27
    def __init__(self, message, response):
6✔
28
        super(TokenRequestDenied, self).__init__(message)
6✔
29
        self.response = response
6✔
30

31
    @property
6✔
32
    def status_code(self):
6✔
33
        """For backwards-compatibility purposes"""
34
        return self.response.status_code
6✔
35

36

37
class TokenMissing(ValueError):
6✔
38
    def __init__(self, message, response):
6✔
39
        super(TokenMissing, self).__init__(message)
×
40
        self.response = response
×
41

42

43
class VerifierMissing(ValueError):
6✔
44
    pass
6✔
45

46

47
class OAuth1Session(requests.Session):
6✔
48
    """Request signing and convenience methods for the oauth dance.
49

50
    What is the difference between OAuth1Session and OAuth1?
51

52
    OAuth1Session actually uses OAuth1 internally and its purpose is to assist
53
    in the OAuth workflow through convenience methods to prepare authorization
54
    URLs and parse the various token and redirection responses. It also provide
55
    rudimentary validation of responses.
56

57
    An example of the OAuth workflow using a basic CLI app and Twitter.
58

59
    >>> # Credentials obtained during the registration.
60
    >>> client_key = 'client key'
61
    >>> client_secret = 'secret'
62
    >>> callback_uri = 'https://127.0.0.1/callback'
63
    >>>
64
    >>> # Endpoints found in the OAuth provider API documentation
65
    >>> request_token_url = 'https://api.twitter.com/oauth/request_token'
66
    >>> authorization_url = 'https://api.twitter.com/oauth/authorize'
67
    >>> access_token_url = 'https://api.twitter.com/oauth/access_token'
68
    >>>
69
    >>> oauth_session = OAuth1Session(client_key,client_secret=client_secret, callback_uri=callback_uri)
70
    >>>
71
    >>> # First step, fetch the request token.
72
    >>> oauth_session.fetch_request_token(request_token_url)
73
    {
74
        'oauth_token': 'kjerht2309u',
75
        'oauth_token_secret': 'lsdajfh923874',
76
    }
77
    >>>
78
    >>> # Second step. Follow this link and authorize
79
    >>> oauth_session.authorization_url(authorization_url)
80
    'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf&oauth_callback=https%3A%2F%2F127.0.0.1%2Fcallback'
81
    >>>
82
    >>> # Third step. Fetch the access token
83
    >>> redirect_response = input('Paste the full redirect URL here.')
84
    >>> oauth_session.parse_authorization_response(redirect_response)
85
    {
86
        'oauth_token: 'kjerht2309u',
87
        'oauth_token_secret: 'lsdajfh923874',
88
        'oauth_verifier: 'w34o8967345',
89
    }
90
    >>> oauth_session.fetch_access_token(access_token_url)
91
    {
92
        'oauth_token': 'sdf0o9823sjdfsdf',
93
        'oauth_token_secret': '2kjshdfp92i34asdasd',
94
    }
95
    >>> # Done. You can now make OAuth requests.
96
    >>> status_url = 'http://api.twitter.com/1/statuses/update.json'
97
    >>> new_status = {'status':  'hello world!'}
98
    >>> oauth_session.post(status_url, data=new_status)
99
    <Response [200]>
100
    """
101

102
    def __init__(
6✔
103
        self,
104
        client_key,
105
        client_secret=None,
106
        resource_owner_key=None,
107
        resource_owner_secret=None,
108
        callback_uri=None,
109
        signature_method=SIGNATURE_HMAC,
110
        signature_type=SIGNATURE_TYPE_AUTH_HEADER,
111
        rsa_key=None,
112
        verifier=None,
113
        client_class=None,
114
        force_include_body=False,
115
        **kwargs
116
    ):
117
        """Construct the OAuth 1 session.
118

119
        :param client_key: A client specific identifier.
120
        :param client_secret: A client specific secret used to create HMAC and
121
                              plaintext signatures.
122
        :param resource_owner_key: A resource owner key, also referred to as
123
                                   request token or access token depending on
124
                                   when in the workflow it is used.
125
        :param resource_owner_secret: A resource owner secret obtained with
126
                                      either a request or access token. Often
127
                                      referred to as token secret.
128
        :param callback_uri: The URL the user is redirect back to after
129
                             authorization.
130
        :param signature_method: Signature methods determine how the OAuth
131
                                 signature is created. The three options are
132
                                 oauthlib.oauth1.SIGNATURE_HMAC (default),
133
                                 oauthlib.oauth1.SIGNATURE_RSA and
134
                                 oauthlib.oauth1.SIGNATURE_PLAIN.
135
        :param signature_type: Signature type decides where the OAuth
136
                               parameters are added. Either in the
137
                               Authorization header (default) or to the URL
138
                               query parameters or the request body. Defined as
139
                               oauthlib.oauth1.SIGNATURE_TYPE_AUTH_HEADER,
140
                               oauthlib.oauth1.SIGNATURE_TYPE_QUERY and
141
                               oauthlib.oauth1.SIGNATURE_TYPE_BODY
142
                               respectively.
143
        :param rsa_key: The private RSA key as a string. Can only be used with
144
                        signature_method=oauthlib.oauth1.SIGNATURE_RSA.
145
        :param verifier: A verifier string to prove authorization was granted.
146
        :param client_class: A subclass of `oauthlib.oauth1.Client` to use with
147
                             `requests_oauthlib.OAuth1` instead of the default
148
        :param force_include_body: Always include the request body in the
149
                                   signature creation.
150
        :param **kwargs: Additional keyword arguments passed to `OAuth1`
151
        """
152
        super(OAuth1Session, self).__init__()
6✔
153
        self._client = OAuth1(
6✔
154
            client_key,
155
            client_secret=client_secret,
156
            resource_owner_key=resource_owner_key,
157
            resource_owner_secret=resource_owner_secret,
158
            callback_uri=callback_uri,
159
            signature_method=signature_method,
160
            signature_type=signature_type,
161
            rsa_key=rsa_key,
162
            verifier=verifier,
163
            client_class=client_class,
164
            force_include_body=force_include_body,
165
            **kwargs
166
        )
167
        self.auth = self._client
6✔
168

169
    @property
6✔
170
    def token(self):
6✔
171
        oauth_token = self._client.client.resource_owner_key
6✔
172
        oauth_token_secret = self._client.client.resource_owner_secret
6✔
173
        oauth_verifier = self._client.client.verifier
6✔
174

175
        token_dict = {}
6✔
176
        if oauth_token:
6✔
177
            token_dict["oauth_token"] = oauth_token
6✔
178
        if oauth_token_secret:
6✔
179
            token_dict["oauth_token_secret"] = oauth_token_secret
6✔
180
        if oauth_verifier:
6✔
181
            token_dict["oauth_verifier"] = oauth_verifier
6✔
182

183
        return token_dict
6✔
184

185
    @token.setter
6✔
186
    def token(self, value):
6✔
187
        self._populate_attributes(value)
6✔
188

189
    @property
6✔
190
    def authorized(self):
6✔
191
        """Boolean that indicates whether this session has an OAuth token
192
        or not. If `self.authorized` is True, you can reasonably expect
193
        OAuth-protected requests to the resource to succeed. If
194
        `self.authorized` is False, you need the user to go through the OAuth
195
        authentication dance before OAuth-protected requests to the resource
196
        will succeed.
197
        """
198
        if self._client.client.signature_method == SIGNATURE_RSA:
6✔
199
            # RSA only uses resource_owner_key
200
            return bool(self._client.client.resource_owner_key)
6✔
201
        else:
202
            # other methods of authentication use all three pieces
203
            return (
6✔
204
                bool(self._client.client.client_secret)
205
                and bool(self._client.client.resource_owner_key)
206
                and bool(self._client.client.resource_owner_secret)
207
            )
208

209
    def authorization_url(self, url, request_token=None, **kwargs):
6✔
210
        """Create an authorization URL by appending request_token and optional
211
        kwargs to url.
212

213
        This is the second step in the OAuth 1 workflow. The user should be
214
        redirected to this authorization URL, grant access to you, and then
215
        be redirected back to you. The redirection back can either be specified
216
        during client registration or by supplying a callback URI per request.
217

218
        :param url: The authorization endpoint URL.
219
        :param request_token: The previously obtained request token.
220
        :param kwargs: Optional parameters to append to the URL.
221
        :returns: The authorization URL with new parameters embedded.
222

223
        An example using a registered default callback URI.
224

225
        >>> request_token_url = 'https://api.twitter.com/oauth/request_token'
226
        >>> authorization_url = 'https://api.twitter.com/oauth/authorize'
227
        >>> oauth_session = OAuth1Session('client-key', client_secret='secret')
228
        >>> oauth_session.fetch_request_token(request_token_url)
229
        {
230
            'oauth_token': 'sdf0o9823sjdfsdf',
231
            'oauth_token_secret': '2kjshdfp92i34asdasd',
232
        }
233
        >>> oauth_session.authorization_url(authorization_url)
234
        'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf'
235
        >>> oauth_session.authorization_url(authorization_url, foo='bar')
236
        'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf&foo=bar'
237

238
        An example using an explicit callback URI.
239

240
        >>> request_token_url = 'https://api.twitter.com/oauth/request_token'
241
        >>> authorization_url = 'https://api.twitter.com/oauth/authorize'
242
        >>> oauth_session = OAuth1Session('client-key', client_secret='secret', callback_uri='https://127.0.0.1/callback')
243
        >>> oauth_session.fetch_request_token(request_token_url)
244
        {
245
            'oauth_token': 'sdf0o9823sjdfsdf',
246
            'oauth_token_secret': '2kjshdfp92i34asdasd',
247
        }
248
        >>> oauth_session.authorization_url(authorization_url)
249
        'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf&oauth_callback=https%3A%2F%2F127.0.0.1%2Fcallback'
250
        """
251
        kwargs["oauth_token"] = request_token or self._client.client.resource_owner_key
6✔
252
        log.debug("Adding parameters %s to url %s", kwargs, url)
6✔
253
        return add_params_to_uri(url, kwargs.items())
6✔
254

255
    def fetch_request_token(self, url, realm=None, **request_kwargs):
6✔
256
        """Fetch a request token.
257

258
        This is the first step in the OAuth 1 workflow. A request token is
259
        obtained by making a signed post request to url. The token is then
260
        parsed from the application/x-www-form-urlencoded response and ready
261
        to be used to construct an authorization url.
262

263
        :param url: The request token endpoint URL.
264
        :param realm: A list of realms to request access to.
265
        :param request_kwargs: Optional arguments passed to ''post''
266
            function in ''requests.Session''
267
        :returns: The response in dict format.
268

269
        Note that a previously set callback_uri will be reset for your
270
        convenience, or else signature creation will be incorrect on
271
        consecutive requests.
272

273
        >>> request_token_url = 'https://api.twitter.com/oauth/request_token'
274
        >>> oauth_session = OAuth1Session('client-key', client_secret='secret')
275
        >>> oauth_session.fetch_request_token(request_token_url)
276
        {
277
            'oauth_token': 'sdf0o9823sjdfsdf',
278
            'oauth_token_secret': '2kjshdfp92i34asdasd',
279
        }
280
        """
281
        self._client.client.realm = " ".join(realm) if realm else None
6✔
282
        token = self._fetch_token(url, **request_kwargs)
6✔
283
        log.debug("Resetting callback_uri and realm (not needed in next phase).")
6✔
284
        self._client.client.callback_uri = None
6✔
285
        self._client.client.realm = None
6✔
286
        return token
6✔
287

288
    def fetch_access_token(self, url, verifier=None, **request_kwargs):
6✔
289
        """Fetch an access token.
290

291
        This is the final step in the OAuth 1 workflow. An access token is
292
        obtained using all previously obtained credentials, including the
293
        verifier from the authorization step.
294

295
        Note that a previously set verifier will be reset for your
296
        convenience, or else signature creation will be incorrect on
297
        consecutive requests.
298

299
        >>> access_token_url = 'https://api.twitter.com/oauth/access_token'
300
        >>> redirect_response = 'https://127.0.0.1/callback?oauth_token=kjerht2309uf&oauth_token_secret=lsdajfh923874&oauth_verifier=w34o8967345'
301
        >>> oauth_session = OAuth1Session('client-key', client_secret='secret')
302
        >>> oauth_session.parse_authorization_response(redirect_response)
303
        {
304
            'oauth_token: 'kjerht2309u',
305
            'oauth_token_secret: 'lsdajfh923874',
306
            'oauth_verifier: 'w34o8967345',
307
        }
308
        >>> oauth_session.fetch_access_token(access_token_url)
309
        {
310
            'oauth_token': 'sdf0o9823sjdfsdf',
311
            'oauth_token_secret': '2kjshdfp92i34asdasd',
312
        }
313
        """
314
        if verifier:
6✔
315
            self._client.client.verifier = verifier
×
316
        if not getattr(self._client.client, "verifier", None):
6✔
317
            raise VerifierMissing("No client verifier has been set.")
6✔
318
        token = self._fetch_token(url, **request_kwargs)
6✔
319
        log.debug("Resetting verifier attribute, should not be used anymore.")
6✔
320
        self._client.client.verifier = None
6✔
321
        return token
6✔
322

323
    def parse_authorization_response(self, url):
6✔
324
        """Extract parameters from the post authorization redirect response URL.
325

326
        :param url: The full URL that resulted from the user being redirected
327
                    back from the OAuth provider to you, the client.
328
        :returns: A dict of parameters extracted from the URL.
329

330
        >>> redirect_response = 'https://127.0.0.1/callback?oauth_token=kjerht2309uf&oauth_token_secret=lsdajfh923874&oauth_verifier=w34o8967345'
331
        >>> oauth_session = OAuth1Session('client-key', client_secret='secret')
332
        >>> oauth_session.parse_authorization_response(redirect_response)
333
        {
334
            'oauth_token: 'kjerht2309u',
335
            'oauth_token_secret: 'lsdajfh923874',
336
            'oauth_verifier: 'w34o8967345',
337
        }
338
        """
339
        log.debug("Parsing token from query part of url %s", url)
6✔
340
        token = dict(urldecode(urlparse(url).query))
6✔
341
        log.debug("Updating internal client token attribute.")
6✔
342
        self._populate_attributes(token)
6✔
343
        self.token = token
6✔
344
        return token
6✔
345

346
    def _populate_attributes(self, token):
6✔
347
        if "oauth_token" in token:
6✔
348
            self._client.client.resource_owner_key = token["oauth_token"]
6✔
349
        else:
350
            raise TokenMissing(
×
351
                "Response does not contain a token: {resp}".format(resp=token), token
352
            )
353
        if "oauth_token_secret" in token:
6✔
354
            self._client.client.resource_owner_secret = token["oauth_token_secret"]
6✔
355
        if "oauth_verifier" in token:
6✔
356
            self._client.client.verifier = token["oauth_verifier"]
6✔
357

358
    def _fetch_token(self, url, **request_kwargs):
6✔
359
        log.debug("Fetching token from %s using client %s", url, self._client.client)
6✔
360
        r = self.post(url, **request_kwargs)
6✔
361

362
        if r.status_code >= 400:
6✔
363
            error = "Token request failed with code %s, response was '%s'."
6✔
364
            raise TokenRequestDenied(error % (r.status_code, r.text), r)
6✔
365

366
        log.debug('Decoding token from response "%s"', r.text)
6✔
367
        try:
6✔
368
            token = dict(urldecode(r.text.strip()))
6✔
369
        except ValueError as e:
6✔
370
            error = (
6✔
371
                "Unable to decode token from token response. "
372
                "This is commonly caused by an unsuccessful request where"
373
                " a non urlencoded error message is returned. "
374
                "The decoding error was %s"
375
                "" % e
376
            )
377
            raise ValueError(error)
6✔
378

379
        log.debug("Obtained token %s", token)
6✔
380
        log.debug("Updating internal client attributes from token data.")
6✔
381
        self._populate_attributes(token)
6✔
382
        self.token = token
6✔
383
        return token
6✔
384

385
    def rebuild_auth(self, prepared_request, response):
6✔
386
        """
387
        When being redirected we should always strip Authorization
388
        header, since nonce may not be reused as per OAuth spec.
389
        """
390
        if "Authorization" in prepared_request.headers:
×
391
            # If we get redirected to a new host, we should strip out
392
            # any authentication headers.
393
            prepared_request.headers.pop("Authorization", True)
×
394
            prepared_request.prepare_auth(self.auth)
×
395
        return
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc