-
Notifications
You must be signed in to change notification settings - Fork 709
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Organizations Api uptake for twilio-python #815
base: main
Are you sure you want to change the base?
Changes from 23 commits
b4c5734
3e246e4
8395487
bc5c16b
a66f9e9
b5a6490
fac26ee
15e15c0
7b07ba7
af11fd2
661785d
6a8c2d8
1ba2f9b
98708f0
d78d5d5
7bdf1b5
bc77770
0211f23
27dec32
2959689
b973065
ceebd46
35b5015
76fecab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
from twilio.auth_strategy.auth_type import AuthType | ||
from enum import Enum | ||
from abc import abstractmethod | ||
|
||
|
||
class AuthStrategy(object): | ||
def __init__(self, auth_type: AuthType): | ||
self._auth_type = auth_type | ||
|
||
@property | ||
def auth_type(self) -> AuthType: | ||
return self._auth_type | ||
|
||
@abstractmethod | ||
def get_auth_string(self) -> str: | ||
"""Return the authentication string.""" | ||
pass | ||
|
||
@abstractmethod | ||
def requires_authentication(self) -> bool: | ||
"""Return True if authentication is required, else False.""" | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from enum import Enum | ||
|
||
class AuthType(Enum): | ||
ORGS_TOKEN = 'orgs_stoken' | ||
NO_AUTH = 'noauth' | ||
BASIC = 'basic' | ||
API_KEY = 'api_key' | ||
CLIENT_CREDENTIALS = 'client_credentials' | ||
|
||
def __str__(self): | ||
return self.value |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from auth_type import AuthType | ||
|
||
class NoAuthStrategy(AuthStrategy): | ||
def __init__(self): | ||
super().__init__(AuthType.NO_AUTH) | ||
|
||
def get_auth_string(self) -> str: | ||
return "" | ||
|
||
def requires_authentication(self) -> bool: | ||
return False |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import jwt | ||
import threading | ||
import logging | ||
from datetime import datetime, timedelta | ||
|
||
from twilio.auth_strategy.auth_type import AuthType | ||
from twilio.auth_strategy.auth_strategy import AuthStrategy | ||
from twilio.http.token_manager import TokenManager | ||
|
||
|
||
class TokenAuthStrategy(AuthStrategy): | ||
def __init__(self, token_manager: TokenManager): | ||
super().__init__(AuthType.ORGS_TOKEN) | ||
self.token_manager = token_manager | ||
self.token = None | ||
self.lock = threading.Lock() | ||
logging.basicConfig(level=logging.INFO) | ||
self.logger = logging.getLogger(__name__) | ||
|
||
def get_auth_string(self) -> str: | ||
self.fetch_token() | ||
return f"Bearer {self.token}" | ||
|
||
def requires_authentication(self) -> bool: | ||
return True | ||
|
||
def fetch_token(self): | ||
self.logger.info("New token fetched for accessing organization API") | ||
if self.token is None or self.token == "" or self.is_token_expired(self.token): | ||
with self.lock: | ||
if self.token is None or self.token == "" or self.is_token_expired(self.token): | ||
self.token = self.token_manager.fetch_access_token() | ||
|
||
def is_token_expired(self, token): | ||
try: | ||
decoded = jwt.decode(token, options={"verify_signature": False}) | ||
exp = decoded.get('exp') | ||
|
||
if exp is None: | ||
return True # No expiration time present, consider it expired | ||
Check failure Code scanning / SonarCloud JWT should be signed and verified
<!--SONAR_ISSUE_KEY:AZJhAlYCyje6SmAfcspM-->Don't use a JWT token without verifying its signature. <p>See more on <a href="https://sonarcloud.io/project/issues?id=twilio_twilio-python&issues=AZJhAlYCyje6SmAfcspM&open=AZJhAlYCyje6SmAfcspM&pullRequest=815">SonarCloud</a></p>
|
||
|
||
# Check if the expiration time has passed | ||
return datetime.fromtimestamp(exp) < datetime.utcnow() | ||
|
||
except jwt.DecodeError: | ||
return True # Token is invalid | ||
except Exception as e: | ||
print(f"An error occurred: {e}") | ||
return True |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,10 +4,11 @@ | |
from urllib.parse import urlparse, urlunparse | ||
|
||
from twilio import __version__ | ||
from twilio.base.exceptions import TwilioException | ||
from twilio.http import HttpClient | ||
from twilio.http.http_client import TwilioHttpClient | ||
from twilio.http.response import Response | ||
from twilio.auth_strategy.auth_type import AuthType | ||
from twilio.credential.credential_provider import CredentialProvider | ||
|
||
|
||
class ClientBase(object): | ||
|
@@ -23,6 +24,7 @@ def __init__( | |
environment: Optional[MutableMapping[str, str]] = None, | ||
edge: Optional[str] = None, | ||
user_agent_extensions: Optional[List[str]] = None, | ||
credential_provider: Optional[CredentialProvider] = None, | ||
): | ||
""" | ||
Initializes the Twilio Client | ||
|
@@ -35,7 +37,9 @@ def __init__( | |
:param environment: Environment to look for auth details, defaults to os.environ | ||
:param edge: Twilio Edge to make requests to, defaults to None | ||
:param user_agent_extensions: Additions to the user agent string | ||
:param credential_provider: credential provider for authentication method that needs to be used | ||
""" | ||
|
||
environment = environment or os.environ | ||
|
||
self.username = username or environment.get("TWILIO_ACCOUNT_SID") | ||
|
@@ -48,9 +52,8 @@ def __init__( | |
""" :type : str """ | ||
self.user_agent_extensions = user_agent_extensions or [] | ||
""" :type : list[str] """ | ||
|
||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if not self.username or not self.password: | ||
raise TwilioException("Credentials are required to create a TwilioClient") | ||
self.credential_provider = credential_provider or None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check this with Kridai, if existing customers use TwilioException - this is a breaking change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check if the exception being thrown as 401 is getting wrapped in TwilioException and being sent to customer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the below cases
Is this a breaking change? |
||
""" :type : CredentialProvider """ | ||
|
||
self.account_sid = account_sid or self.username | ||
""" :type : str """ | ||
|
@@ -85,8 +88,20 @@ def request( | |
|
||
:returns: Response from the Twilio API | ||
""" | ||
auth = self.get_auth(auth) | ||
|
||
headers = self.get_headers(method, headers) | ||
|
||
##If credential provider is provided by user, get the associated auth strategy | ||
##Using the auth strategy, fetch the auth string and set it to authorization header | ||
if self.credential_provider: | ||
auth_strategy = self.credential_provider.to_auth_strategy() | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
headers["Authorization"] = auth_strategy.get_auth_string() | ||
elif self.username is not None and self.password is not None: | ||
auth = self.get_auth(auth) | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
auth = None | ||
|
||
|
||
uri = self.get_hostname(uri) | ||
|
||
return self.http_client.request( | ||
|
@@ -132,8 +147,20 @@ async def request_async( | |
"http_client must be asynchronous to support async API requests" | ||
) | ||
|
||
auth = self.get_auth(auth) | ||
|
||
headers = self.get_headers(method, headers) | ||
|
||
##If credential provider is provided by user, get the associated auth strategy | ||
##Using the auth strategy, fetch the auth string and set it to authorization header | ||
|
||
if self.credential_provider: | ||
auth_strategy = self.credential_provider.to_auth_strategy() | ||
headers["Authorization"] = auth_strategy.get_auth_string() | ||
elif self.username is not None and self.password is not None: | ||
auth = self.get_auth(auth) | ||
else: | ||
auth = None | ||
|
||
uri = self.get_hostname(uri) | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return await self.http_client.request( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from twilio.auth_strategy.auth_type import AuthType | ||
|
||
class CredentialProvider: | ||
def __init__(self, auth_type: AuthType): | ||
self._auth_type = auth_type | ||
|
||
@property | ||
def auth_type(self) -> AuthType: | ||
return self._auth_type | ||
|
||
def to_auth_strategy(self): | ||
raise NotImplementedError("Subclasses must implement this method") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
|
||
|
||
from twilio.http.orgs_token_manager import OrgTokenManager | ||
from twilio.base.exceptions import TwilioException | ||
from twilio.credential.credential_provider import CredentialProvider | ||
from twilio.auth_strategy.auth_type import AuthType | ||
from twilio.auth_strategy.token_auth_strategy import TokenAuthStrategy | ||
|
||
|
||
class OrgsCredentialProvider(CredentialProvider): | ||
def __init__(self, client_id: str, client_secret: str, token_manager=None): | ||
super().__init__(AuthType.CLIENT_CREDENTIALS) | ||
|
||
if client_id is None or client_secret is None: | ||
raise TwilioException("Client id and Client secret are mandatory") | ||
|
||
self.grant_type = "client_credentials" | ||
self.client_id = client_id | ||
self.client_secret = client_secret | ||
self.token_manager = token_manager | ||
self.auth_strategy = None | ||
|
||
def to_auth_strategy(self): | ||
if self.token_manager is None: | ||
self.token_manager = OrgTokenManager(self.grant_type, self.client_id, self.client_secret) | ||
if self.auth_strategy is None: | ||
self.auth_strategy = TokenAuthStrategy(self.token_manager) | ||
return self.auth_strategy |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from twilio.base.version import Version | ||
from twilio.http.token_manager import TokenManager | ||
from twilio.rest.preview_iam.v1.token import TokenList | ||
from twilio.rest import Client | ||
|
||
|
||
class OrgTokenManager(TokenManager): | ||
""" | ||
Orgs Token Manager | ||
""" | ||
|
||
def __init__( | ||
self, | ||
grant_type: str, | ||
client_id: str, | ||
client_secret: str, | ||
code: str = None, | ||
redirect_uri: str = None, | ||
audience: str = None, | ||
refreshToken: str = None, | ||
scope: str = None, | ||
): | ||
self.grant_type = grant_type | ||
self.client_id = client_id | ||
self.client_secret = client_secret | ||
self.code = code | ||
self.redirect_uri = redirect_uri | ||
self.audience = audience | ||
self.refreshToken = refreshToken | ||
self.scope = scope | ||
self.client = Client() | ||
|
||
def fetch_access_token(self): | ||
token_instance = self.client.preview_iam.v1.token.create( | ||
grant_type=self.grant_type, | ||
client_id=self.client_id, | ||
client_secret=self.client_secret, | ||
code=self.code, | ||
redirect_uri=self.redirect_uri, | ||
audience=self.audience, | ||
scope=self.scope, | ||
) | ||
return token_instance.access_token |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from twilio.base.version import Version | ||
|
||
|
||
class TokenManager: | ||
|
||
def fetch_access_token(self, version: Version): | ||
pass |
Check failure
Code scanning / SonarCloud
JWT should be signed and verified High