Skip to content

Add banking signal adapter #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ test:
ENV=test poetry run python3 -m pytest -v --cov=huma_signals --color=yes --cov-report term-missing --ignore=tests/adapters/request_network

run-local:
ENV=development poetry run python3 -m uvicorn huma_signals.api.main:app --reload
ENV=development poetry run python3 -m uvicorn huma_signals.api.main:app --port 8001 --reload
2 changes: 1 addition & 1 deletion docs/decentralized_signal_portfolio.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class WalletEthTransactionsSignals(Model):
total_transactions: int
total_sent: int
total_received: int
wallet_teneur_in_days: int
wallet_tenure_in_days: int
total_income_90days: float
total_transactions_90days: int
```
Expand Down
Empty file.
189 changes: 189 additions & 0 deletions huma_signals/adapters/banking/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import collections
import decimal
from typing import Any, ClassVar, DefaultDict, List

import pydantic
import tenacity
from plaid.model import credit_bank_income_summary, transaction

from huma_signals import constants, models
from huma_signals.adapters import models as adapter_models
from huma_signals.adapters.banking import plaid_client
from huma_signals.commons import datetime_utils, pydantic_utils
from huma_signals.settings import settings


class MonthlyIncome(models.HumaBaseModel):
# Format: YYYY-MM
month: str
amount: decimal.Decimal

_validate_amount = pydantic.validator("amount", allow_reuse=True, pre=True)(
pydantic_utils.validate_amount
)


class MonthlyExpense(models.HumaBaseModel):
month: str
amount: decimal.Decimal

_validate_amount = pydantic.validator("amount", allow_reuse=True, pre=True)(
pydantic_utils.validate_amount
)


class IncomeSignal(models.HumaBaseModel):
monthly_income: List[MonthlyIncome]
total_income: decimal.Decimal
average_monthly_income: decimal.Decimal
monthly_expense: List[MonthlyExpense]
total_expense: decimal.Decimal
average_monthly_expense: decimal.Decimal

_validate_total_income = pydantic.validator(
"total_income", allow_reuse=True, pre=True
)(pydantic_utils.validate_amount)
_validate_average_monthly_income = pydantic.validator(
"average_monthly_income", allow_reuse=True, pre=True
)(pydantic_utils.validate_amount)


class BankingSignal(models.HumaBaseModel):
"""
Signals emitted by the banking adapter.
"""

monthly_income: List[MonthlyIncome]
total_income: decimal.Decimal
average_monthly_income: decimal.Decimal
monthly_expense: List[MonthlyExpense]
total_expense: decimal.Decimal
average_monthly_expense: decimal.Decimal
current_account_balance: decimal.Decimal


class BankingAdapter(adapter_models.SignalAdapterBase):
name: ClassVar[str] = "banking"
required_inputs: ClassVar[List[str]] = [
"plaid_public_token",
"user_token",
]
signals: ClassVar[List[str]] = list(BankingSignal.__fields__.keys())

def __init__(
self,
plaid_env: str = settings.plaid_env,
plaid_client_id: str = settings.plaid_client_id,
plaid_secret: str = settings.plaid_secret,
) -> None:
self.plaid_client = plaid_client.PlaidClient(
plaid_env=plaid_env,
plaid_client_id=plaid_client_id,
plaid_secret=plaid_secret,
)

async def fetch( # pylint: disable=arguments-differ
self,
plaid_public_token: str,
user_token: str, # pylint: disable=unused-argument
*args: Any,
**kwargs: Any,
) -> BankingSignal:
plaid_access_token = await self.plaid_client.exchange_access_token(
public_token=plaid_public_token,
)
async for attempt in tenacity.AsyncRetrying(
stop=tenacity.stop_after_attempt(3)
):
with attempt:
# Sometimes Plaid throws `PRODUCT_NOT_READY` error because the access token had
# just been created and the requested products haven't been initialized yet. Retry
# when this happens.
transactions = await self.plaid_client.fetch_transactions(
plaid_access_token=plaid_access_token, lookback_days=365
)

bank_income = self._aggregate_income_and_expense_from_transactions(transactions)
# bank_income = await self.plaid_client.fetch_bank_income(user_token=user_token)
current_balance = await self.plaid_client.fetch_bank_account_available_balance(
plaid_access_token=plaid_access_token
)
# return BankingSignal(
# income=self._aggregate_income_signal(bank_income=bank_income),
# current_account_balance=1000,
# )
return BankingSignal(
monthly_income=bank_income.monthly_income,
total_income=bank_income.total_income,
average_monthly_income=bank_income.average_monthly_income,
monthly_expense=bank_income.monthly_expense,
total_expense=bank_income.total_expense,
average_monthly_expense=bank_income.average_monthly_expense,
current_account_balance=current_balance,
)

@classmethod
def user_input_types(cls) -> List[constants.UserInputType]:
return ["plaid-bank-link"]

@classmethod
def _aggregate_income_signal(
cls, bank_income: credit_bank_income_summary.CreditBankIncomeSummary
) -> IncomeSignal:
monthly_income = [
MonthlyIncome(
month=datetime_utils.date_to_month_str(history.start_date),
amount=history.total_amounts.mount,
)
for history in bank_income.historical_summary
]
total_income = bank_income.total_amounts.amount
average_monthly_income = total_income / len(monthly_income)
return IncomeSignal(
monthly_income=monthly_income,
total_income=total_income,
average_monthly_income=average_monthly_income,
)

@classmethod
def _aggregate_income_and_expense_from_transactions(
cls,
transactions: List[transaction.Transaction],
) -> IncomeSignal:
income_by_month: DefaultDict[str, decimal.Decimal] = collections.defaultdict(
decimal.Decimal
)
expense_by_month: DefaultDict[str, decimal.Decimal] = collections.defaultdict(
decimal.Decimal
)
for tx in transactions:
# if tx.category_id == 21009000:
tx_month = tx.date.strftime("%Y-%m")
if tx.amount <= 0:
# Negative amounts represent money coming into the account, so negate them.
income_by_month[tx_month] -= decimal.Decimal(tx.amount)
else:
expense_by_month[tx_month] += decimal.Decimal(tx.amount)

monthly_income = [
MonthlyIncome(month=k, amount=v) for k, v in income_by_month.items()
]
total_income = sum(income_by_month.values())
average_monthly_income = (
total_income / len(monthly_income) if len(monthly_income) != 0 else 0
)
monthly_expense = [
MonthlyExpense(month=k, amount=v) for k, v in expense_by_month.items()
]
total_expense = sum(expense_by_month.values())
average_monthly_expense = (
total_income / len(monthly_expense) if len(monthly_expense) != 0 else 0
)
return IncomeSignal(
monthly_income=monthly_income,
total_income=total_income,
average_monthly_income=average_monthly_income,
monthly_expense=monthly_expense,
total_expense=total_expense,
average_monthly_expense=average_monthly_expense,
)
175 changes: 175 additions & 0 deletions huma_signals/adapters/banking/plaid_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# pylint: disable=line-too-long
import datetime
import decimal
from typing import List, Optional, Tuple

import plaid
from plaid.api import plaid_api
from plaid.model import (
accounts_balance_get_request,
accounts_balance_get_request_options,
country_code,
credit_bank_income_get_request,
credit_bank_income_summary,
income_verification_source_type,
item_public_token_exchange_request,
link_token_create_request,
link_token_create_request_user,
products,
sandbox_public_token_create_request,
sandbox_public_token_create_request_income_verification_bank_income,
sandbox_public_token_create_request_options,
sandbox_public_token_create_request_options_income_verification,
transaction,
transactions_get_request,
user_create_request,
)

from huma_signals.commons import async_utils, datetime_utils, string_utils

PLAID_ENVS = {
"production": plaid.Environment.Production,
"development": plaid.Environment.Development,
"sandbox": plaid.Environment.Sandbox,
}
_INCOME_DAYS_REQUESTED = 365


class PlaidClient:
def __init__(self, plaid_env: str, plaid_client_id: str, plaid_secret: str) -> None:
configuration = plaid.Configuration(
host=PLAID_ENVS[plaid_env],
api_key={
"clientId": plaid_client_id,
"secret": plaid_secret,
},
)
self.client = plaid_api.PlaidApi(plaid.ApiClient(configuration))

async def create_user_token(self, wallet_address: str) -> str:
# Obfuscate the wallet address by hashing it.
request = user_create_request.UserCreateRequest(
client_user_id=string_utils.sha256_hash_hex(wallet_address)
)
response = await async_utils.sync_to_async(self.client.user_create, request)
return response.user_token

async def create_link_token(self, wallet_address: str) -> Tuple[str, str]:
# user_token = await self.create_user_token(wallet_address=wallet_address)
request = link_token_create_request.LinkTokenCreateRequest(
# `balance` product is automatically included.
products=[products.Products("transactions")],
client_name="Huma Financials",
country_codes=[country_code.CountryCode("US")],
# redirect_uri="https://app.huma.finance/",
language="en",
webhook="http://localhost:8001",
link_customization_name="default",
# income_verification=link_token_create_request_income_verification.LinkTokenCreateRequestIncomeVerification(
# income_source_types=[
# income_verification_source_type.IncomeVerificationSourceType(
# "BANK"
# )
# ],
# bank_income=link_token_create_request_income_verification_bank_income.LinkTokenCreateRequestIncomeVerificationBankIncome( # noqa: E501
# days_requested=_INCOME_DAYS_REQUESTED,
# ),
# ),
user=link_token_create_request_user.LinkTokenCreateRequestUser(
client_user_id=string_utils.sha256_hash_hex(wallet_address)
),
# user_token=user_token,
)
response = await async_utils.sync_to_async(
self.client.link_token_create, request
)
return response.link_token, ""

async def exchange_access_token(self, public_token: str) -> str:
request = item_public_token_exchange_request.ItemPublicTokenExchangeRequest(
public_token=public_token
)
response = await async_utils.sync_to_async(
self.client.item_public_token_exchange, request
)
return response.access_token

async def fetch_transactions(
self, plaid_access_token: str, lookback_days: int
) -> List[transaction.Transaction]:
end_date = datetime_utils.tz_aware_utc_now().date()
start_date = end_date - datetime.timedelta(days=lookback_days)
request = transactions_get_request.TransactionsGetRequest(
access_token=plaid_access_token,
start_date=start_date,
end_date=end_date,
)
response = await async_utils.sync_to_async(
self.client.transactions_get, request
)
transactions = response.transactions
while len(transactions) < response.total_transactions:
request = transactions_get_request.TransactionsGetRequest(
access_token=plaid_access_token,
start_date=start_date,
end_date=end_date,
)
response = self.client.transactions_get(request)
transactions.extend(response.transactions)

return transactions

async def fetch_bank_income(
self, user_token: str
) -> credit_bank_income_summary.CreditBankIncomeSummary:
request = credit_bank_income_get_request.CreditBankIncomeGetRequest(
user_token=user_token,
)
response = self.client.credit_bank_income_get(request)
return response.bank_income.bank_income_summary

async def fetch_bank_account_available_balance(
self,
plaid_access_token: str,
) -> Optional[decimal.Decimal]:
# This field is only required and used for Capital One:
# https://plaid.com/docs/api/products/balance/#accounts-balance-get-request-options-min-last-updated-datetime.
# Plaid gets updated balance once a day, so use 24 hours + 1 hour of leeway.
min_last_updated_datetime = (
datetime_utils.tz_aware_utc_now() - datetime.timedelta(hours=25)
)
request = accounts_balance_get_request.AccountsBalanceGetRequest(
access_token=plaid_access_token,
options=accounts_balance_get_request_options.AccountsBalanceGetRequestOptions(
min_last_updated_datetime=min_last_updated_datetime,
),
)
response = await async_utils.sync_to_async(
self.client.accounts_balance_get, request
)
return response.accounts[0].balances.available

async def create_sandbox_public_token(
self, institution_id: str, user_token: str
) -> str:
request = sandbox_public_token_create_request.SandboxPublicTokenCreateRequest(
institution_id=institution_id,
initial_products=[products.Products("INCOME_VERIFICATION")],
options=sandbox_public_token_create_request_options.SandboxPublicTokenCreateRequestOptions(
income_verification=sandbox_public_token_create_request_options_income_verification.SandboxPublicTokenCreateRequestOptionsIncomeVerification( # noqa: E501
income_source_types=[
income_verification_source_type.IncomeVerificationSourceType(
"BANK"
)
],
bank_income=sandbox_public_token_create_request_income_verification_bank_income.SandboxPublicTokenCreateRequestIncomeVerificationBankIncome( # noqa: E501
days_requested=_INCOME_DAYS_REQUESTED,
),
),
),
user_token=user_token,
)
response = await async_utils.sync_to_async(
self.client.sandbox_public_token_create, request
)
return response.public_token
Loading