Source code for fatsecret.fatsecret

"""
fatsecret
---------

Simple python wrapper of the Fatsecret API

"""

import datetime
import time
from importlib.metadata import PackageNotFoundError, version as _pkg_version
from typing import Any, List, Literal, Optional, Tuple, Union

from urllib.parse import parse_qs

import requests
import tenacity
from bs4 import BeautifulSoup
from oauthlib.oauth1 import SIGNATURE_TYPE_QUERY
from requests_oauthlib import OAuth1Session

from ._retry import default_policy
from .errors import (
    ApplicationError,
    AuthenticationError,
    GeneralError,
    ParameterError,
    PremierRequiredError,
    ScopeRequiredError,
)


def _user_agent() -> str:
    try:
        v = _pkg_version("fatsecret")
    except PackageNotFoundError:
        v = "unknown"
    return f"pyfatsecret-chocotonic/{v} python-requests/{requests.__version__}"


class Fatsecret:

    # ========================= CORE =========================

    """Core FatSecret API client logic (auth, request handling, utilities)."""

    REQUEST_TOKEN_URL = "https://authentication.fatsecret.com/oauth/request_token"
    AUTHORIZE_URL = "https://authentication.fatsecret.com/oauth/authorize"
    ACCESS_TOKEN_URL = "https://authentication.fatsecret.com/oauth/access_token"
    BASE_URL = "https://platform.fatsecret.com/rest/server.api"
    OAUTH2_TOKEN_URL = "https://oauth.fatsecret.com/connect/token"

    def __init__(
        self,
        consumer_key: str,
        consumer_secret: str,
        session_token: Optional[Tuple[str, str]] = None,
        auth: Literal["oauth1", "oauth2"] = "oauth1",
        scopes: Optional[List[str]] = None,
        retries: Union[bool, tenacity.Retrying] = True,
    ):
        """Initialize the FatSecret API session.

        Args:
            consumer_key: API consumer/client key (register at https://platform.fatsecret.com/api)
            consumer_secret: API consumer/client secret
            session_token: Optional (token, secret) tuple for an existing OAuth1 session
            auth: "oauth1" (default, three-legged, supports user-scoped methods)
                  or "oauth2" (client-credentials, required for native/Premier methods)
            scopes: OAuth2 scopes to request. Only used when auth="oauth2".
                    Valid scopes: basic, premier, barcode, localization, nlp,
                    image-recognition, feedback. None requests all scopes the
                    client has access to.
            retries: Retry policy for transient GET failures. ``True`` (default)
                installs the SDK's exponential-backoff + full-jitter policy
                (3 attempts, 30s total cap, honors numeric ``Retry-After``).
                ``False`` disables retries entirely. Pass a configured
                ``tenacity.Retrying`` instance for full custom control.
                Mutating requests (POST/PUT/DELETE) are never retried.
        """
        self.consumer_key = consumer_key
        self.consumer_secret = consumer_secret
        self.auth_mode: Literal["oauth1", "oauth2"] = auth
        self.scopes = scopes
        self._retries: Optional[tenacity.Retrying] = (
            None
            if retries is False
            else retries
            if isinstance(retries, tenacity.Retrying)
            else default_policy()
        )

        self.request_token = None
        self.request_token_secret = None
        self.access_token = None
        self.access_token_secret = None

        self._oauth2_token: Optional[str] = None
        self._oauth2_token_expires_at: float = 0.0

        if auth == "oauth1":
            if session_token:
                self.access_token = session_token[0]
                self.access_token_secret = session_token[1]
                self.session = OAuth1Session(
                    consumer_key,
                    client_secret=consumer_secret,
                    resource_owner_key=session_token[0],
                    resource_owner_secret=session_token[1],
                    signature_type=SIGNATURE_TYPE_QUERY,
                )
            else:
                self.session = OAuth1Session(
                    consumer_key,
                    client_secret=consumer_secret,
                    callback_uri="oob",
                    signature_type=SIGNATURE_TYPE_QUERY,
                )
        elif auth == "oauth2":
            self.session = requests.Session()
        else:
            raise ValueError(f"auth must be 'oauth1' or 'oauth2', got {auth!r}")

        # Identify ourselves so traces / server logs can distinguish SDK
        # traffic from other outbound HTTP in the caller's app.
        self.session.headers["User-Agent"] = _user_agent()

        # v2.0 resource-namespaced surface. Each resource exposes the
        # OAS-tag's endpoints via short names (e.g. fs.foods.search_v5).
        from .resources import (
            ClassificationResource,
            DiaryResource,
            ExercisesResource,
            FeedbackResource,
            FoodsResource,
            MealsResource,
            NativeResource,
            ProfileFoodsResource,
            ProfileResource,
            RecipesResource,
            WeightResource,
        )

        self.foods = FoodsResource(self)
        self.classification = ClassificationResource(self)
        self.recipes = RecipesResource(self)
        self.profile_foods = ProfileFoodsResource(self)
        self.meals = MealsResource(self)
        self.diary = DiaryResource(self)
        self.exercises = ExercisesResource(self)
        self.weight = WeightResource(self)
        self.profile = ProfileResource(self)
        self.native = NativeResource(self)
        self.feedback = FeedbackResource(self)

    def _get_oauth2_token(self) -> str:
        """Fetch (and cache) an OAuth2 bearer token via client_credentials.

        Tokens are cached in-memory and refreshed shortly before expiry.
        """
        now = time.time()
        if self._oauth2_token and now < self._oauth2_token_expires_at - 30:
            return self._oauth2_token

        data: dict = {"grant_type": "client_credentials"}
        if self.scopes:
            data["scope"] = " ".join(self.scopes)
        resp = requests.post(
            self.OAUTH2_TOKEN_URL,
            data=data,
            auth=(self.consumer_key, self.consumer_secret),
            timeout=30,
        )
        resp.raise_for_status()
        payload = resp.json()
        self._oauth2_token = payload["access_token"]
        self._oauth2_token_expires_at = now + int(payload.get("expires_in", 86400))
        return self._oauth2_token

    def _call(
        self,
        params: dict,
        *,
        url: Optional[str] = None,
        method: str = "GET",
        json_body: Optional[dict] = None,
    ) -> Any:
        """Unified request entrypoint used by all _vN methods.

        Routes through OAuth1 or OAuth2 based on `auth_mode`. Always parses
        the JSON response, runs `_check_errors`, and returns the decoded body.
        Response unwrapping is the caller's responsibility via `_unwrap`.
        """
        params = dict(params)
        params.setdefault("format", "json")
        target = url or self.api_url

        if self.auth_mode == "oauth2":
            headers = {"Authorization": f"Bearer {self._get_oauth2_token()}"}
        else:
            headers = None

        def _do() -> requests.Response:
            r = self.session.request(
                method,
                target,
                params=params,
                json=json_body,
                headers=headers,
                timeout=30,
            )
            # Surface transport-level HTTP failures (5xx, 429, etc.) as
            # `HTTPError` so the retry policy can classify them. FatSecret's
            # own error envelopes are served with HTTP 200 and handled below
            # by `_check_errors`, so this does not regress that path. A
            # non-transient 4xx (e.g. 401) propagates straight out — the
            # retry classifier only matches the transient set.
            r.raise_for_status()
            return r

        if self._retries is not None and method == "GET":
            resp = self._retries(_do)
        else:
            resp = _do()

        payload = resp.json()
        self._check_errors(payload)
        return payload

    @staticmethod
    def _check_errors(payload: Any) -> None:
        """Raise a typed exception if `payload` is an error envelope. No-op otherwise.

        Maps upstream error codes to the closest typed exception subclass.
        See https://platform.fatsecret.com/docs/guides/error-codes for the
        canonical list.
        """
        if not isinstance(payload, dict):
            return
        err = payload.get("error")
        if not err:
            return
        code = err.get("code")
        message = err.get("message", "")

        if code == 2:
            raise AuthenticationError(2, "This api call requires an authenticated session")
        if code in (3, 4, 5, 6, 7, 8, 9):
            raise AuthenticationError(code, message)
        if code in (1, 10, 11, 12, 13, 14, 20, 21, 22, 23, 24):
            raise GeneralError(code, message)
        if 101 <= code <= 109:
            raise ParameterError(code, message)
        if code == 207:
            # Premier scope required
            raise PremierRequiredError(code, message)
        if code in (208, 211):
            # Other scope-related rejections (image-recognition, nlp, barcode, etc.)
            raise ScopeRequiredError(code, message)
        if 201 <= code <= 211:
            raise ApplicationError(code, message)
        raise ApplicationError(code, message)

    @staticmethod
    def _unwrap(payload: Any, *path: str, list_key: Optional[str] = None) -> Any:
        """Walk `path` keys into `payload`. If `list_key` is set, coerce the
        terminal value into a list (FatSecret returns a single dict when there's
        one item, an array when there are many; `_unwrap` normalizes that to
        `list[dict]` always).

        Examples:
            _unwrap({"foods": {"food": [...]}}, "foods", list_key="food") -> list
            _unwrap({"foods_search": {"results": {"food": {...}}}},
                    "foods_search", "results", list_key="food") -> [{...}]
            _unwrap({"month": {"day": [...]}}, "month", list_key="day") -> list
        """
        cur: Any = payload
        for key in path:
            if cur is None:
                return [] if list_key else None
            if not isinstance(cur, dict):
                return [] if list_key else None
            cur = cur.get(key)
        if list_key is None:
            return cur
        if cur is None:
            return []
        if isinstance(cur, dict) and list_key in cur:
            inner = cur[list_key]
        else:
            inner = cur
        if inner is None:
            return []
        return inner if isinstance(inner, list) else [inner]

    @property
    def api_url(self) -> str:
        return self.BASE_URL

[docs] def get_authorize_url(self, callback_url: str = "oob") -> str: """Fetch an OAuth1 request token and return the user-facing authorize URL. Args: callback_url: Absolute URL to redirect the user to after they authorize, or ``"oob"`` (out-of-band) to receive a verifier PIN. Returns: The authorize URL with the freshly minted ``oauth_token`` appended. """ # If the caller asked for a non-default callback, rebuild the session # so the right callback_uri is folded into the request-token signature. if callback_url != "oob": self.session = OAuth1Session( self.consumer_key, client_secret=self.consumer_secret, callback_uri=callback_url, signature_type=SIGNATURE_TYPE_QUERY, ) self.session.headers["User-Agent"] = _user_agent() # FatSecret's request-token endpoint only accepts GET. requests-oauthlib's # fetch_request_token() forces POST, so issue the signed GET ourselves. resp = self.session.get(self.REQUEST_TOKEN_URL) resp.raise_for_status() token = {k: v[0] for k, v in parse_qs(resp.text).items()} if "oauth_token" not in token: raise RuntimeError( f"Token request failed: {resp.status_code} {resp.text!r}" ) self.request_token = token["oauth_token"] self.request_token_secret = token["oauth_token_secret"] # Promote request token onto the session so authorization_url can find it. self.session._client.client.resource_owner_key = self.request_token self.session._client.client.resource_owner_secret = self.request_token_secret return self.session.authorization_url(self.AUTHORIZE_URL)
[docs] def authenticate(self, verifier: Union[str, int]) -> Tuple[str, str]: """Exchange the verifier (PIN or callback code) for permanent access tokens. Args: verifier: PIN displayed to user or returned via callback. Returns: (access_token, access_secret) """ # Re-instantiate session with request token + verifier so the GET # below is signed correctly. FatSecret's access-token endpoint also # only accepts GET, so we issue it directly rather than going through # fetch_access_token() (which would force POST). self.session = OAuth1Session( self.consumer_key, client_secret=self.consumer_secret, resource_owner_key=self.request_token, resource_owner_secret=self.request_token_secret, verifier=str(verifier), signature_type=SIGNATURE_TYPE_QUERY, ) self.session.headers["User-Agent"] = _user_agent() resp = self.session.get(self.ACCESS_TOKEN_URL) resp.raise_for_status() token = {k: v[0] for k, v in parse_qs(resp.text).items()} if "oauth_token" not in token: raise RuntimeError( f"Access-token exchange failed: {resp.status_code} {resp.text!r}" ) self.access_token = token["oauth_token"] self.access_token_secret = token["oauth_token_secret"] # Replace with a long-lived authed session for subsequent API calls. self.session = OAuth1Session( self.consumer_key, client_secret=self.consumer_secret, resource_owner_key=self.access_token, resource_owner_secret=self.access_token_secret, signature_type=SIGNATURE_TYPE_QUERY, ) self.session.headers["User-Agent"] = _user_agent() return (self.access_token, self.access_token_secret)
[docs] def close(self) -> None: """Close the current HTTP session.""" self.session.close()
[docs] @staticmethod def unix_time(dt: datetime.datetime) -> int: """Convert a datetime to number of days since the Epoch (FatSecret style).""" epoch = datetime.datetime.fromtimestamp(0, datetime.timezone.utc).replace( tzinfo=None ) delta = dt - epoch return delta.days
@staticmethod def unix_time_v2(dt: Union[datetime.datetime, datetime.date, int, float]) -> int: """Convert datetime/date/timestamp into number of days since 1970-01-01.""" epoch = datetime.datetime(1970, 1, 1) if isinstance(dt, datetime.datetime): delta = dt - epoch return delta.days elif isinstance(dt, datetime.date): delta = datetime.datetime(dt.year, dt.month, dt.day) - epoch return delta.days elif isinstance(dt, (int, float)): # treat as unix timestamp, use timezone-aware UTC dt_utc = datetime.datetime.fromtimestamp( dt, tz=datetime.timezone.utc ).replace(tzinfo=None) delta = dt_utc - epoch return delta.days else: raise TypeError("dt must be datetime, date, int, or float") @staticmethod def from_unix_time(days: int) -> datetime.datetime: """Convert FatSecret-style days-since-epoch back to a naive ``datetime``. Inverse of :meth:`unix_time` / :meth:`unix_time_v2`. Returns midnight UTC of the day corresponding to ``days`` (1970-01-01 + ``days``). :param days: Number of days since 1970-01-01 (as returned by ``unix_time``). :return: Naive ``datetime.datetime`` at midnight of that day. """ return datetime.datetime(1970, 1, 1) + datetime.timedelta(days=days)
[docs] @staticmethod def valid_response(response: requests.Response): """Validate a JSON API response and extract its data or raise an error.""" if response.json(): for key in response.json(): # Error Code Handling if key == "error": code = response.json()[key]["code"] message = response.json()[key]["message"] if code == 2: raise AuthenticationError( 2, "This api call requires an authenticated session" ) elif code in [1, 10, 11, 12, 20, 21]: raise GeneralError(code, message) elif 3 <= code <= 9: raise AuthenticationError(code, message) elif 101 <= code <= 108: raise ParameterError(code, message) elif 201 <= code <= 207: raise ApplicationError(code, message) # All other response options elif key == "success": return True elif key == "foods": return response.json()[key]["food"] elif key == "suggestions": return response.json()[key] elif key == "recipes": return response.json()[key]["recipe"] elif key == "saved_meals": return response.json()[key]["saved_meal"] elif key == "saved_meal_items": return response.json()[key]["saved_meal_item"] elif key == "exercise_types": return response.json()[key]["exercise"] elif key == "food_entries": if response.json()[key] is None: return [] entries = response.json()[key]["food_entry"] if isinstance(entries, dict): return [entries] elif isinstance(entries, list): return entries elif key == "month": return response.json()[key]["day"] elif key == "profile": if "auth_token" in response.json()[key]: return ( response.json()[key]["auth_token"], response.json()[key]["auth_secret"], ) else: return response.json()[key] elif key in ( "food", "recipe", "recipe_types", "saved_meal_id", "saved_meal_item_id", "food_entry_id", ): return response.json()[key]
# ========================= AUTH =========================
[docs] def fatsecret_authenticate( username: str, password: str, consumer_key: str, consumer_secret: str ): """Authenticate a user programmatically using credentials and return an authorized Fatsecret instance. Note: This uses HTML form emulation against FatSecret's login flow and may break if the website changes. It is provided for convenience and developer testing, not production OAuth flows. """ try: session = requests.Session() fatsecret_client = Fatsecret(consumer_key, consumer_secret) authorize_url = fatsecret_client.get_authorize_url().replace( "authorize", "authorize.aspx" ) # Fetch viewstate and generator dynamically login_page_response = session.get(url=authorize_url) login_page_soup = BeautifulSoup(login_page_response.text, "lxml") viewstate_value = login_page_soup.find("input", {"name": "__VIEWSTATE"})[ "value" ] viewstate_generator_value = login_page_soup.find( "input", {"name": "__VIEWSTATEGENERATOR"} )["value"] payload = { "__VIEWSTATE": viewstate_value, "__VIEWSTATEGENERATOR": viewstate_generator_value, "Name": username, "Password": password, "Login.x": 0, "Login.y": 0, } pin_response = session.post(url=authorize_url, data=payload) pin_soup = BeautifulSoup(pin_response.content, "lxml") verifier_tag = pin_soup.find("b") if not verifier_tag: raise RuntimeError( "Failed to find PIN in response. Login may have failed." ) verifier_pin = verifier_tag.text.strip() print(f"Obtained verifier PIN. {len(verifier_pin) = }") fatsecret_client.authenticate(verifier_pin) print("Authentication successful.") return fatsecret_client except Exception as error: message = f"Failed to authenticate:\n{error}" print(message) return None
# ========================= HELPERS ========================= # Shared helpers used by the namespaced resource implementations. @staticmethod def _set_optional(params: dict, items: list) -> None: """Helper: add (key, value) pairs to params only when value is not None.""" for k, v in items: if v is not None: params[k] = v @staticmethod def _mutator_success(payload: Any) -> Union[bool, Any]: """Helper: collapse a `{"success": 1}` payload into True; pass-through otherwise.""" if isinstance(payload, dict) and "success" in payload: return payload["success"] == 1 or payload["success"] == "1" return payload