"""
fatsecret
---------
Simple python wrapper of the Fatsecret API
"""
import base64
import datetime
import hashlib
import hmac
import time
import urllib
import uuid
from typing import Any, List, Literal, Optional, Tuple, Union
import requests
from bs4 import BeautifulSoup
from rauth.service import OAuth1Service
from .errors import (
ApplicationError,
AuthenticationError,
GeneralError,
ParameterError,
PremierRequiredError,
ScopeRequiredError,
)
[docs]
class Fatsecret:
# ========================= CORE =========================
"""Core FatSecret API client logic (auth, request handling, utilities)."""
REQUEST_TOKEN_URL = "https://authentication.fatsecret.com/oauth/request_token"
AUTHORIZE_URL = "https://authentication.fatsecret.com/oauth/authorize"
ACCESS_TOKEN_URL = "https://authentication.fatsecret.com/oauth/access_token"
BASE_URL = "https://platform.fatsecret.com/rest/server.api"
OAUTH2_TOKEN_URL = "https://oauth.fatsecret.com/connect/token"
def __init__(
self,
consumer_key: str,
consumer_secret: str,
session_token: Optional[Tuple[str, str]] = None,
auth: Literal["oauth1", "oauth2"] = "oauth1",
scopes: Optional[List[str]] = None,
):
"""Initialize the FatSecret API session.
Args:
consumer_key: API consumer/client key (register at https://platform.fatsecret.com/api)
consumer_secret: API consumer/client secret
session_token: Optional (token, secret) tuple for an existing OAuth1 session
auth: "oauth1" (default, three-legged, supports user-scoped methods)
or "oauth2" (client-credentials, required for native/Premier methods)
scopes: OAuth2 scopes to request. Only used when auth="oauth2".
Valid scopes: basic, premier, barcode, localization, nlp,
image-recognition, feedback. None requests all scopes the
client has access to.
"""
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.auth_mode: Literal["oauth1", "oauth2"] = auth
self.scopes = scopes
self.request_token = None
self.request_token_secret = None
self.access_token = None
self.access_token_secret = None
self._oauth2_token: Optional[str] = None
self._oauth2_token_expires_at: float = 0.0
if auth == "oauth1":
self.oauth = OAuth1Service(
name="fatsecret",
consumer_key=consumer_key,
consumer_secret=consumer_secret,
request_token_url=self.REQUEST_TOKEN_URL,
authorize_url=self.AUTHORIZE_URL,
access_token_url=self.ACCESS_TOKEN_URL,
base_url=self.BASE_URL,
)
if session_token:
self.access_token = session_token[0]
self.access_token_secret = session_token[1]
self.session = self.oauth.get_session(token=session_token)
else:
self.session = self.oauth.get_session()
elif auth == "oauth2":
self.oauth = None
self.session = requests.Session()
else:
raise ValueError(f"auth must be 'oauth1' or 'oauth2', got {auth!r}")
# v2.0 resource-namespaced surface. Each resource exposes the
# OAS-tag's endpoints via short names (e.g. fs.foods.search_v5).
from .resources import (
ClassificationResource,
DiaryResource,
ExercisesResource,
FeedbackResource,
FoodsResource,
MealsResource,
NativeResource,
ProfileFoodsResource,
ProfileResource,
RecipesResource,
WeightResource,
)
self.foods = FoodsResource(self)
self.classification = ClassificationResource(self)
self.recipes = RecipesResource(self)
self.profile_foods = ProfileFoodsResource(self)
self.meals = MealsResource(self)
self.diary = DiaryResource(self)
self.exercises = ExercisesResource(self)
self.weight = WeightResource(self)
self.profile = ProfileResource(self)
self.native = NativeResource(self)
self.feedback = FeedbackResource(self)
def _get_oauth2_token(self) -> str:
"""Fetch (and cache) an OAuth2 bearer token via client_credentials.
Tokens are cached in-memory and refreshed shortly before expiry.
"""
now = time.time()
if self._oauth2_token and now < self._oauth2_token_expires_at - 30:
return self._oauth2_token
data: dict = {"grant_type": "client_credentials"}
if self.scopes:
data["scope"] = " ".join(self.scopes)
resp = requests.post(
self.OAUTH2_TOKEN_URL,
data=data,
auth=(self.consumer_key, self.consumer_secret),
timeout=30,
)
resp.raise_for_status()
payload = resp.json()
self._oauth2_token = payload["access_token"]
self._oauth2_token_expires_at = now + int(payload.get("expires_in", 86400))
return self._oauth2_token
def _call(
self,
params: dict,
*,
url: Optional[str] = None,
method: str = "GET",
json_body: Optional[dict] = None,
) -> Any:
"""Unified request entrypoint used by all _vN methods.
Routes through OAuth1 or OAuth2 based on `auth_mode`. Always parses
the JSON response, runs `_check_errors`, and returns the decoded body.
Response unwrapping is the caller's responsibility via `_unwrap`.
"""
params = dict(params)
params.setdefault("format", "json")
target = url or self.api_url
if self.auth_mode == "oauth2":
headers = {"Authorization": f"Bearer {self._get_oauth2_token()}"}
resp = self.session.request(
method, target, params=params, json=json_body, headers=headers, timeout=30
)
else:
resp = self.session.request(
method, target, params=params, json=json_body, timeout=30
)
payload = resp.json()
self._check_errors(payload)
return payload
@staticmethod
def _check_errors(payload: Any) -> None:
"""Raise a typed exception if `payload` is an error envelope. No-op otherwise.
Maps upstream error codes to the closest typed exception subclass.
See https://platform.fatsecret.com/docs/guides/error-codes for the
canonical list.
"""
if not isinstance(payload, dict):
return
err = payload.get("error")
if not err:
return
code = err.get("code")
message = err.get("message", "")
if code == 2:
raise AuthenticationError(2, "This api call requires an authenticated session")
if code in (3, 4, 5, 6, 7, 8, 9):
raise AuthenticationError(code, message)
if code in (1, 10, 11, 12, 13, 14, 20, 21, 22, 23, 24):
raise GeneralError(code, message)
if 101 <= code <= 109:
raise ParameterError(code, message)
if code == 207:
# Premier scope required
raise PremierRequiredError(code, message)
if code in (208, 211):
# Other scope-related rejections (image-recognition, nlp, barcode, etc.)
raise ScopeRequiredError(code, message)
if 201 <= code <= 211:
raise ApplicationError(code, message)
raise ApplicationError(code, message)
@staticmethod
def _unwrap(payload: Any, *path: str, list_key: Optional[str] = None) -> Any:
"""Walk `path` keys into `payload`. If `list_key` is set, coerce the
terminal value into a list (FatSecret returns a single dict when there's
one item, an array when there are many; `_unwrap` normalizes that to
`list[dict]` always).
Examples:
_unwrap({"foods": {"food": [...]}}, "foods", list_key="food") -> list
_unwrap({"foods_search": {"results": {"food": {...}}}},
"foods_search", "results", list_key="food") -> [{...}]
_unwrap({"month": {"day": [...]}}, "month", list_key="day") -> list
"""
cur: Any = payload
for key in path:
if cur is None:
return [] if list_key else None
if not isinstance(cur, dict):
return [] if list_key else None
cur = cur.get(key)
if list_key is None:
return cur
if cur is None:
return []
if isinstance(cur, dict) and list_key in cur:
inner = cur[list_key]
else:
inner = cur
if inner is None:
return []
return inner if isinstance(inner, list) else [inner]
@property
def api_url(self) -> str:
if self.auth_mode == "oauth1" and self.oauth is not None:
return self.oauth.base_url
return self.BASE_URL
[docs]
def get_authorize_url(self, callback_url: str = "oob") -> str:
"""
New implementation using manual OAuth 1.0 flow to /oauth/request_token on the new endpoint.
:param callback_url: An absolute URL to redirect the User to when they have completed authentication
:type callback_url: str
"""
print("Generating request token...")
oauth_consumer_key = self.consumer_key
oauth_consumer_secret = self.consumer_secret
oauth_signature_method = "HMAC-SHA1"
oauth_timestamp = str(int(time.time()))
oauth_nonce = str(uuid.uuid4().hex)
oauth_version = "1.0"
oauth_callback = callback_url
# Collect parameters for base string
params = {
"oauth_consumer_key": oauth_consumer_key,
"oauth_signature_method": oauth_signature_method,
"oauth_timestamp": oauth_timestamp,
"oauth_nonce": oauth_nonce,
"oauth_version": oauth_version,
"oauth_callback": oauth_callback,
}
base_params = "&".join(
[
"{}={}".format(
urllib.parse.quote(k, safe=""), urllib.parse.quote(v, safe="")
)
for k, v in sorted(params.items())
]
)
method = "POST"
base_url = self.oauth.request_token_url
signature_base_string = "&".join(
[
method,
urllib.parse.quote(base_url, safe=""),
urllib.parse.quote(base_params, safe=""),
]
)
signing_key = f"{urllib.parse.quote(oauth_consumer_secret, safe='')}&"
hashed = hmac.new(
signing_key.encode("utf-8"),
signature_base_string.encode("utf-8"),
hashlib.sha1,
)
oauth_signature = base64.b64encode(hashed.digest()).decode()
params["oauth_signature"] = oauth_signature
headers = {
"Content-Type": "application/x-www-form-urlencoded",
}
full_request_url = f"{base_url}?{urllib.parse.urlencode(params)}"
print("Full request URL ends with:", full_request_url[-4:])
# POST request to fetch request_token
response = requests.post(base_url, data=params, headers=headers)
response.raise_for_status()
result = dict(urllib.parse.parse_qsl(response.text))
self.request_token = result["oauth_token"]
self.request_token_secret = result["oauth_token_secret"]
print(f"Request token ends with: {self.request_token[-4:]}")
print(f"secret ends with: {self.request_token_secret[-4:]}")
return f"{self.oauth.authorize_url}?oauth_token={self.request_token}"
[docs]
def authenticate(self, verifier: Union[str, int]) -> Tuple[str, str]:
"""Exchange the verifier (PIN or callback code) for permanent access tokens.
Args:
verifier: PIN displayed to user or returned via callback.
Returns:
(access_token, access_secret)
"""
session_token = self.oauth.get_access_token(
self.request_token,
self.request_token_secret,
params={"oauth_verifier": verifier},
)
self.access_token = session_token[0]
self.access_token_secret = session_token[1]
self.session = self.oauth.get_session(session_token)
# Return session token for app specific caching
return session_token
[docs]
def close(self) -> None:
"""Close the current HTTP session."""
self.session.close()
[docs]
@staticmethod
def unix_time(dt: datetime.datetime) -> int:
"""Convert a datetime to number of days since the Epoch (FatSecret style)."""
epoch = datetime.datetime.fromtimestamp(0, datetime.timezone.utc).replace(
tzinfo=None
)
delta = dt - epoch
return delta.days
[docs]
@staticmethod
def unix_time_v2(dt: Union[datetime.datetime, datetime.date, int, float]) -> int:
"""Convert datetime/date/timestamp into number of days since 1970-01-01."""
epoch = datetime.datetime(1970, 1, 1)
if isinstance(dt, datetime.datetime):
delta = dt - epoch
return delta.days
elif isinstance(dt, datetime.date):
delta = datetime.datetime(dt.year, dt.month, dt.day) - epoch
return delta.days
elif isinstance(dt, (int, float)):
# treat as unix timestamp, use timezone-aware UTC
dt_utc = datetime.datetime.fromtimestamp(
dt, tz=datetime.timezone.utc
).replace(tzinfo=None)
delta = dt_utc - epoch
return delta.days
else:
raise TypeError("dt must be datetime, date, int, or float")
[docs]
@staticmethod
def valid_response(response: requests.Response):
"""Validate a JSON API response and extract its data or raise an error."""
if response.json():
for key in response.json():
# Error Code Handling
if key == "error":
code = response.json()[key]["code"]
message = response.json()[key]["message"]
if code == 2:
raise AuthenticationError(
2, "This api call requires an authenticated session"
)
elif code in [1, 10, 11, 12, 20, 21]:
raise GeneralError(code, message)
elif 3 <= code <= 9:
raise AuthenticationError(code, message)
elif 101 <= code <= 108:
raise ParameterError(code, message)
elif 201 <= code <= 207:
raise ApplicationError(code, message)
# All other response options
elif key == "success":
return True
elif key == "foods":
return response.json()[key]["food"]
elif key == "suggestions":
return response.json()[key]
elif key == "recipes":
return response.json()[key]["recipe"]
elif key == "saved_meals":
return response.json()[key]["saved_meal"]
elif key == "saved_meal_items":
return response.json()[key]["saved_meal_item"]
elif key == "exercise_types":
return response.json()[key]["exercise"]
elif key == "food_entries":
if response.json()[key] is None:
return []
entries = response.json()[key]["food_entry"]
if isinstance(entries, dict):
return [entries]
elif isinstance(entries, list):
return entries
elif key == "month":
return response.json()[key]["day"]
elif key == "profile":
if "auth_token" in response.json()[key]:
return (
response.json()[key]["auth_token"],
response.json()[key]["auth_secret"],
)
else:
return response.json()[key]
elif key in (
"food",
"recipe",
"recipe_types",
"saved_meal_id",
"saved_meal_item_id",
"food_entry_id",
):
return response.json()[key]
# ========================= AUTH =========================
[docs]
def fatsecret_authenticate(
username: str, password: str, consumer_key: str, consumer_secret: str
):
"""Authenticate a user programmatically using credentials and return an authorized Fatsecret instance.
Note:
This uses HTML form emulation against FatSecret's login flow and may break if the website changes.
It is provided for convenience and developer testing, not production OAuth flows.
"""
try:
session = requests.Session()
fatsecret_client = Fatsecret(consumer_key, consumer_secret)
authorize_url = fatsecret_client.get_authorize_url().replace(
"authorize", "authorize.aspx"
)
# Fetch viewstate and generator dynamically
login_page_response = session.get(url=authorize_url)
login_page_soup = BeautifulSoup(login_page_response.text, "lxml")
viewstate_value = login_page_soup.find("input", {"name": "__VIEWSTATE"})[
"value"
]
viewstate_generator_value = login_page_soup.find(
"input", {"name": "__VIEWSTATEGENERATOR"}
)["value"]
payload = {
"__VIEWSTATE": viewstate_value,
"__VIEWSTATEGENERATOR": viewstate_generator_value,
"Name": username,
"Password": password,
"Login.x": 0,
"Login.y": 0,
}
pin_response = session.post(url=authorize_url, data=payload)
pin_soup = BeautifulSoup(pin_response.content, "lxml")
verifier_tag = pin_soup.find("b")
if not verifier_tag:
raise RuntimeError(
"Failed to find PIN in response. Login may have failed."
)
verifier_pin = verifier_tag.text.strip()
print(f"Obtained verifier PIN. {len(verifier_pin) = }")
fatsecret_client.authenticate(verifier_pin)
print("Authentication successful.")
return fatsecret_client
except Exception as error:
message = f"Failed to authenticate:\n{error}"
print(message)
return None
# ========================= HELPERS =========================
# Shared helpers used by the namespaced resource implementations.
@staticmethod
def _set_optional(params: dict, items: list) -> None:
"""Helper: add (key, value) pairs to params only when value is not None."""
for k, v in items:
if v is not None:
params[k] = v
@staticmethod
def _mutator_success(payload: Any) -> Union[bool, Any]:
"""Helper: collapse a `{"success": 1}` payload into True; pass-through otherwise."""
if isinstance(payload, dict) and "success" in payload:
return payload["success"] == 1 or payload["success"] == "1"
return payload